nautilus_data/engine/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a high-performance `DataEngine` for all environments.
17//!
18//! The `DataEngine` is the central component of the entire data stack.
19//! The data engines primary responsibility is to orchestrate interactions between
20//! the `DataClient` instances, and the rest of the platform. This includes sending
21//! requests to, and receiving responses from, data endpoints via its registered
22//! data clients.
23//!
24//! The engine employs a simple fan-in fan-out messaging pattern to execute
25//! `DataCommand` type messages, and process `DataResponse` messages or market data
26//! objects.
27//!
28//! Alternative implementations can be written on top of the generic engine - which
29//! just need to override the `execute`, `process`, `send` and `receive` methods.
30
31pub mod book;
32pub mod config;
33mod handlers;
34#[cfg(feature = "defi")]
35pub mod pool;
36
37use std::{
38    any::Any,
39    cell::{Ref, RefCell},
40    collections::hash_map::Entry,
41    fmt::Display,
42    num::NonZeroUsize,
43    rc::Rc,
44};
45
46use ahash::{AHashMap, AHashSet};
47use book::{BookSnapshotInfo, BookSnapshotter, BookUpdater};
48use config::DataEngineConfig;
49use handlers::{BarBarHandler, BarQuoteHandler, BarTradeHandler};
50use indexmap::IndexMap;
51#[cfg(feature = "defi")]
52use nautilus_common::messages::defi::{DefiSubscribeCommand, DefiUnsubscribeCommand};
53use nautilus_common::{
54    cache::Cache,
55    clock::Clock,
56    logging::{RECV, RES},
57    messages::data::{
58        DataCommand, DataResponse, RequestCommand, SubscribeBars, SubscribeBookDeltas,
59        SubscribeBookDepth10, SubscribeBookSnapshots, SubscribeCommand, UnsubscribeBars,
60        UnsubscribeBookDeltas, UnsubscribeBookDepth10, UnsubscribeBookSnapshots,
61        UnsubscribeCommand,
62    },
63    msgbus::{self, MStr, Topic, handler::ShareableMessageHandler, switchboard},
64    timer::TimeEventCallback,
65};
66use nautilus_core::{
67    correctness::{
68        FAILED, check_key_in_map, check_key_not_in_map, check_predicate_false, check_predicate_true,
69    },
70    datetime::millis_to_nanos,
71};
72#[cfg(feature = "defi")]
73use nautilus_model::defi::Blockchain;
74#[cfg(feature = "defi")]
75use nautilus_model::defi::DefiData;
76use nautilus_model::{
77    data::{
78        Bar, BarType, Data, DataType, FundingRateUpdate, IndexPriceUpdate, InstrumentClose,
79        MarkPriceUpdate, OrderBookDelta, OrderBookDeltas, OrderBookDepth10, QuoteTick, TradeTick,
80    },
81    enums::{AggregationSource, BarAggregation, BookType, PriceType, RecordFlag},
82    identifiers::{ClientId, InstrumentId, Venue},
83    instruments::{Instrument, InstrumentAny, SyntheticInstrument},
84    orderbook::OrderBook,
85};
86use nautilus_persistence::backend::catalog::ParquetDataCatalog;
87use ustr::Ustr;
88
89#[cfg(feature = "defi")]
90use crate::engine::pool::PoolUpdater;
91use crate::{
92    aggregation::{
93        BarAggregator, TickBarAggregator, TimeBarAggregator, ValueBarAggregator,
94        VolumeBarAggregator,
95    },
96    client::DataClientAdapter,
97};
98
99/// Provides a high-performance `DataEngine` for all environments.
100#[derive(Debug)]
101pub struct DataEngine {
102    clock: Rc<RefCell<dyn Clock>>,
103    cache: Rc<RefCell<Cache>>,
104    clients: IndexMap<ClientId, DataClientAdapter>,
105    default_client: Option<DataClientAdapter>,
106    external_clients: AHashSet<ClientId>,
107    catalogs: AHashMap<Ustr, ParquetDataCatalog>,
108    routing_map: IndexMap<Venue, ClientId>,
109    book_intervals: AHashMap<NonZeroUsize, AHashSet<InstrumentId>>,
110    book_updaters: AHashMap<InstrumentId, Rc<BookUpdater>>,
111    book_snapshotters: AHashMap<InstrumentId, Rc<BookSnapshotter>>,
112    bar_aggregators: AHashMap<BarType, Rc<RefCell<Box<dyn BarAggregator>>>>,
113    bar_aggregator_handlers: AHashMap<BarType, Vec<(MStr<Topic>, ShareableMessageHandler)>>,
114    _synthetic_quote_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
115    _synthetic_trade_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
116    buffered_deltas_map: AHashMap<InstrumentId, OrderBookDeltas>,
117    msgbus_priority: u8,
118    config: DataEngineConfig,
119    #[cfg(feature = "defi")]
120    pool_updaters: AHashMap<InstrumentId, Rc<crate::engine::pool::PoolUpdater>>,
121}
122
123impl DataEngine {
124    /// Creates a new [`DataEngine`] instance.
125    #[must_use]
126    pub fn new(
127        clock: Rc<RefCell<dyn Clock>>,
128        cache: Rc<RefCell<Cache>>,
129        config: Option<DataEngineConfig>,
130    ) -> Self {
131        let config = config.unwrap_or_default();
132
133        let external_clients: AHashSet<ClientId> = config
134            .external_clients
135            .clone()
136            .unwrap_or_default()
137            .into_iter()
138            .collect();
139
140        Self {
141            clock,
142            cache,
143            clients: IndexMap::new(),
144            default_client: None,
145            external_clients,
146            catalogs: AHashMap::new(),
147            routing_map: IndexMap::new(),
148            book_intervals: AHashMap::new(),
149            book_updaters: AHashMap::new(),
150            book_snapshotters: AHashMap::new(),
151            bar_aggregators: AHashMap::new(),
152            bar_aggregator_handlers: AHashMap::new(),
153            _synthetic_quote_feeds: AHashMap::new(),
154            _synthetic_trade_feeds: AHashMap::new(),
155            buffered_deltas_map: AHashMap::new(),
156            msgbus_priority: 10, // High-priority for built-in component
157            config,
158            #[cfg(feature = "defi")]
159            pool_updaters: AHashMap::new(),
160        }
161    }
162
163    /// Returns a read-only reference to the engines clock.
164    #[must_use]
165    pub fn get_clock(&self) -> Ref<'_, dyn Clock> {
166        self.clock.borrow()
167    }
168
169    /// Returns a read-only reference to the engines cache.
170    #[must_use]
171    pub fn get_cache(&self) -> Ref<'_, Cache> {
172        self.cache.borrow()
173    }
174
175    /// Registers the `catalog` with the engine with an optional specific `name`.
176    ///
177    /// # Panics
178    ///
179    /// Panics if a catalog with the same `name` has already been registered.
180    pub fn register_catalog(&mut self, catalog: ParquetDataCatalog, name: Option<String>) {
181        let name = Ustr::from(&name.unwrap_or("catalog_0".to_string()));
182
183        check_key_not_in_map(&name, &self.catalogs, "name", "catalogs").expect(FAILED);
184
185        self.catalogs.insert(name, catalog);
186        log::info!("Registered catalog <{name}>");
187    }
188
189    /// Registers the `client` with the engine with an optional venue `routing`.
190    ///
191    ///
192    /// # Panics
193    ///
194    /// Panics if a client with the same client ID has already been registered.
195    pub fn register_client(&mut self, client: DataClientAdapter, routing: Option<Venue>) {
196        let client_id = client.client_id();
197
198        if let Some(default_client) = &self.default_client {
199            check_predicate_false(
200                default_client.client_id() == client.client_id(),
201                "client_id already registered as default client",
202            )
203            .expect(FAILED);
204        }
205
206        check_key_not_in_map(&client_id, &self.clients, "client_id", "clients").expect(FAILED);
207
208        if let Some(routing) = routing {
209            self.routing_map.insert(routing, client_id);
210            log::info!("Set client {client_id} routing for {routing}");
211        }
212
213        if client.venue.is_none() && self.default_client.is_none() {
214            self.default_client = Some(client);
215            log::info!("Registered client {client_id} for default routing");
216        } else {
217            self.clients.insert(client_id, client);
218            log::info!("Registered client {client_id}");
219        }
220    }
221
222    /// Deregisters the client for the `client_id`.
223    ///
224    /// # Panics
225    ///
226    /// Panics if the client ID has not been registered.
227    pub fn deregister_client(&mut self, client_id: &ClientId) {
228        check_key_in_map(client_id, &self.clients, "client_id", "clients").expect(FAILED);
229
230        self.clients.shift_remove(client_id);
231        log::info!("Deregistered client {client_id}");
232    }
233
234    /// Registers the data `client` with the engine as the default routing client.
235    ///
236    /// When a specific venue routing cannot be found, this client will receive messages.
237    ///
238    /// # Warnings
239    ///
240    /// Any existing default routing client will be overwritten.
241    ///
242    /// # Panics
243    ///
244    /// Panics if a default client has already been registered.
245    pub fn register_default_client(&mut self, client: DataClientAdapter) {
246        check_predicate_true(
247            self.default_client.is_none(),
248            "default client already registered",
249        )
250        .expect(FAILED);
251
252        let client_id = client.client_id();
253
254        self.default_client = Some(client);
255        log::info!("Registered default client {client_id}");
256    }
257
258    /// Starts all registered data clients.
259    pub fn start(&mut self) {
260        for client in self.get_clients_mut() {
261            if let Err(e) = client.start() {
262                log::error!("{e}");
263            }
264        }
265    }
266
267    /// Stops all registered data clients.
268    pub fn stop(&mut self) {
269        for client in self.get_clients_mut() {
270            if let Err(e) = client.stop() {
271                log::error!("{e}");
272            }
273        }
274    }
275
276    /// Resets all registered data clients to their initial state.
277    pub fn reset(&mut self) {
278        for client in self.get_clients_mut() {
279            if let Err(e) = client.reset() {
280                log::error!("{e}");
281            }
282        }
283    }
284
285    /// Disposes the engine, stopping all clients and canceling any timers.
286    pub fn dispose(&mut self) {
287        for client in self.get_clients_mut() {
288            if let Err(e) = client.dispose() {
289                log::error!("{e}");
290            }
291        }
292
293        self.clock.borrow_mut().cancel_timers();
294    }
295
296    /// Returns `true` if all registered data clients are currently connected.
297    #[must_use]
298    pub fn check_connected(&self) -> bool {
299        self.get_clients()
300            .iter()
301            .all(|client| client.is_connected())
302    }
303
304    /// Returns `true` if all registered data clients are currently disconnected.
305    #[must_use]
306    pub fn check_disconnected(&self) -> bool {
307        self.get_clients()
308            .iter()
309            .all(|client| !client.is_connected())
310    }
311
312    /// Returns a list of all registered client IDs, including the default client if set.
313    #[must_use]
314    pub fn registered_clients(&self) -> Vec<ClientId> {
315        self.get_clients()
316            .into_iter()
317            .map(|client| client.client_id())
318            .collect()
319    }
320
321    // -- SUBSCRIPTIONS ---------------------------------------------------------------------------
322
323    fn collect_subscriptions<F, T>(&self, get_subs: F) -> Vec<T>
324    where
325        F: Fn(&DataClientAdapter) -> &AHashSet<T>,
326        T: Clone,
327    {
328        self.get_clients()
329            .into_iter()
330            .flat_map(get_subs)
331            .cloned()
332            .collect()
333    }
334
335    #[must_use]
336    pub fn get_clients(&self) -> Vec<&DataClientAdapter> {
337        let (default_opt, clients_map) = (&self.default_client, &self.clients);
338        let mut clients: Vec<&DataClientAdapter> = clients_map.values().collect();
339
340        if let Some(default) = default_opt {
341            clients.push(default);
342        }
343
344        clients
345    }
346
347    #[must_use]
348    pub fn get_clients_mut(&mut self) -> Vec<&mut DataClientAdapter> {
349        let (default_opt, clients_map) = (&mut self.default_client, &mut self.clients);
350        let mut clients: Vec<&mut DataClientAdapter> = clients_map.values_mut().collect();
351
352        if let Some(default) = default_opt {
353            clients.push(default);
354        }
355
356        clients
357    }
358
359    pub fn get_client(
360        &mut self,
361        client_id: Option<&ClientId>,
362        venue: Option<&Venue>,
363    ) -> Option<&mut DataClientAdapter> {
364        if let Some(client_id) = client_id {
365            // Explicit ID: first look in registered clients
366            if let Some(client) = self.clients.get_mut(client_id) {
367                return Some(client);
368            }
369
370            // Then check if it matches the default client
371            if let Some(default) = self.default_client.as_mut()
372                && default.client_id() == *client_id
373            {
374                return Some(default);
375            }
376
377            // Unknown explicit client
378            return None;
379        }
380
381        if let Some(v) = venue {
382            // Route by venue if mapped client still registered
383            if let Some(client_id) = self.routing_map.get(v) {
384                return self.clients.get_mut(client_id);
385            }
386        }
387
388        // Fallback to default client
389        self.get_default_client()
390    }
391
392    const fn get_default_client(&mut self) -> Option<&mut DataClientAdapter> {
393        self.default_client.as_mut()
394    }
395
396    /// Returns all custom data types currently subscribed across all clients.
397    #[must_use]
398    pub fn subscribed_custom_data(&self) -> Vec<DataType> {
399        self.collect_subscriptions(|client| &client.subscriptions_custom)
400    }
401
402    /// Returns all instrument IDs currently subscribed across all clients.
403    #[must_use]
404    pub fn subscribed_instruments(&self) -> Vec<InstrumentId> {
405        self.collect_subscriptions(|client| &client.subscriptions_instrument)
406    }
407
408    /// Returns all instrument IDs for which book delta subscriptions exist.
409    #[must_use]
410    pub fn subscribed_book_deltas(&self) -> Vec<InstrumentId> {
411        self.collect_subscriptions(|client| &client.subscriptions_book_deltas)
412    }
413
414    /// Returns all instrument IDs for which book snapshot subscriptions exist.
415    #[must_use]
416    pub fn subscribed_book_snapshots(&self) -> Vec<InstrumentId> {
417        self.collect_subscriptions(|client| &client.subscriptions_book_snapshots)
418    }
419
420    /// Returns all instrument IDs for which quote subscriptions exist.
421    #[must_use]
422    pub fn subscribed_quotes(&self) -> Vec<InstrumentId> {
423        self.collect_subscriptions(|client| &client.subscriptions_quotes)
424    }
425
426    /// Returns all instrument IDs for which trade subscriptions exist.
427    #[must_use]
428    pub fn subscribed_trades(&self) -> Vec<InstrumentId> {
429        self.collect_subscriptions(|client| &client.subscriptions_trades)
430    }
431
432    /// Returns all bar types currently subscribed across all clients.
433    #[must_use]
434    pub fn subscribed_bars(&self) -> Vec<BarType> {
435        self.collect_subscriptions(|client| &client.subscriptions_bars)
436    }
437
438    /// Returns all instrument IDs for which mark price subscriptions exist.
439    #[must_use]
440    pub fn subscribed_mark_prices(&self) -> Vec<InstrumentId> {
441        self.collect_subscriptions(|client| &client.subscriptions_mark_prices)
442    }
443
444    /// Returns all instrument IDs for which index price subscriptions exist.
445    #[must_use]
446    pub fn subscribed_index_prices(&self) -> Vec<InstrumentId> {
447        self.collect_subscriptions(|client| &client.subscriptions_index_prices)
448    }
449
450    /// Returns all instrument IDs for which funding rate subscriptions exist.
451    #[must_use]
452    pub fn subscribed_funding_rates(&self) -> Vec<InstrumentId> {
453        self.collect_subscriptions(|client| &client.subscriptions_funding_rates)
454    }
455
456    /// Returns all instrument IDs for which status subscriptions exist.
457    #[must_use]
458    pub fn subscribed_instrument_status(&self) -> Vec<InstrumentId> {
459        self.collect_subscriptions(|client| &client.subscriptions_instrument_status)
460    }
461
462    /// Returns all instrument IDs for which instrument close subscriptions exist.
463    #[must_use]
464    pub fn subscribed_instrument_close(&self) -> Vec<InstrumentId> {
465        self.collect_subscriptions(|client| &client.subscriptions_instrument_close)
466    }
467
468    #[cfg(feature = "defi")]
469    /// Returns all blockchains for which blocks subscriptions exist.
470    #[must_use]
471    pub fn subscribed_blocks(&self) -> Vec<Blockchain> {
472        self.collect_subscriptions(|client| &client.subscriptions_blocks)
473    }
474
475    #[cfg(feature = "defi")]
476    /// Returns all instrument IDs for which pool subscriptions exist.
477    #[must_use]
478    pub fn subscribed_pools(&self) -> Vec<InstrumentId> {
479        self.collect_subscriptions(|client| &client.subscriptions_pools)
480    }
481
482    #[cfg(feature = "defi")]
483    /// Returns all instrument IDs for which swap subscriptions exist.
484    #[must_use]
485    pub fn subscribed_pool_swaps(&self) -> Vec<InstrumentId> {
486        self.collect_subscriptions(|client| &client.subscriptions_pool_swaps)
487    }
488
489    #[cfg(feature = "defi")]
490    /// Returns all instrument IDs for which liquidity update subscriptions exist.
491    #[must_use]
492    pub fn subscribed_pool_liquidity_updates(&self) -> Vec<InstrumentId> {
493        self.collect_subscriptions(|client| &client.subscriptions_pool_liquidity_updates)
494    }
495
496    // -- COMMANDS --------------------------------------------------------------------------------
497
498    /// Executes a `DataCommand` by delegating to subscribe, unsubscribe, or request handlers.
499    ///
500    /// Errors during execution are logged.
501    pub fn execute(&mut self, cmd: &DataCommand) {
502        if let Err(e) = match cmd {
503            DataCommand::Subscribe(c) => self.execute_subscribe(c),
504            DataCommand::Unsubscribe(c) => self.execute_unsubscribe(c),
505            DataCommand::Request(c) => self.execute_request(c),
506            #[cfg(feature = "defi")]
507            DataCommand::DefiSubscribe(c) => self.execute_defi_subscribe(c),
508            #[cfg(feature = "defi")]
509            DataCommand::DefiUnsubscribe(c) => self.execute_defi_unsubscribe(c),
510            _ => {
511                log::warn!("Unhandled DataCommand variant: {cmd:?}");
512                Ok(())
513            }
514        } {
515            log::error!("{e}");
516        }
517    }
518
519    /// Handles a subscribe command, updating internal state and forwarding to the client.
520    ///
521    /// # Errors
522    ///
523    /// Returns an error if the subscription is invalid (e.g., synthetic instrument for book data),
524    /// or if the underlying client operation fails.
525    pub fn execute_subscribe(&mut self, cmd: &SubscribeCommand) -> anyhow::Result<()> {
526        // Update internal engine state
527        match &cmd {
528            SubscribeCommand::BookDeltas(cmd) => self.subscribe_book_deltas(cmd)?,
529            SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd)?,
530            SubscribeCommand::BookSnapshots(cmd) => self.subscribe_book_snapshots(cmd)?,
531            SubscribeCommand::Bars(cmd) => self.subscribe_bars(cmd)?,
532            _ => {} // Do nothing else
533        }
534
535        if let Some(client_id) = cmd.client_id()
536            && self.external_clients.contains(client_id)
537        {
538            if self.config.debug {
539                log::debug!("Skipping subscribe command for external client {client_id}: {cmd:?}",);
540            }
541            return Ok(());
542        }
543
544        if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
545            client.execute_subscribe(cmd);
546        } else {
547            log::error!(
548                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
549                cmd.client_id(),
550                cmd.venue(),
551            );
552        }
553
554        Ok(())
555    }
556
557    #[cfg(feature = "defi")]
558    /// Handles a subscribe command, updating internal state and forwarding to the client.
559    ///
560    /// # Errors
561    ///
562    /// Returns an error if the subscription is invalid (e.g., synthetic instrument for book data),
563    /// or if the underlying client operation fails.
564    pub fn execute_defi_subscribe(&mut self, cmd: &DefiSubscribeCommand) -> anyhow::Result<()> {
565        if let Some(client_id) = cmd.client_id()
566            && self.external_clients.contains(client_id)
567        {
568            if self.config.debug {
569                log::debug!("Skipping defi subscribe for external client {client_id}: {cmd:?}",);
570            }
571            return Ok(());
572        }
573
574        match cmd {
575            DefiSubscribeCommand::Pool(cmd) => self.setup_pool_updater(&cmd.instrument_id),
576            DefiSubscribeCommand::PoolSwaps(cmd) => self.setup_pool_updater(&cmd.instrument_id),
577            DefiSubscribeCommand::PoolLiquidityUpdates(cmd) => {
578                self.setup_pool_updater(&cmd.instrument_id);
579            }
580            _ => {}
581        }
582
583        // Forward command to client
584        if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
585            client.execute_defi_subscribe(cmd);
586        } else {
587            log::error!(
588                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
589                cmd.client_id(),
590                cmd.venue(),
591            );
592        }
593
594        Ok(())
595    }
596
597    /// Handles an unsubscribe command, updating internal state and forwarding to the client.
598    ///
599    /// # Errors
600    ///
601    /// Returns an error if the underlying client operation fails.
602    pub fn execute_unsubscribe(&mut self, cmd: &UnsubscribeCommand) -> anyhow::Result<()> {
603        match &cmd {
604            UnsubscribeCommand::BookDeltas(cmd) => self.unsubscribe_book_deltas(cmd)?,
605            UnsubscribeCommand::BookDepth10(cmd) => self.unsubscribe_book_depth10(cmd)?,
606            UnsubscribeCommand::BookSnapshots(cmd) => self.unsubscribe_book_snapshots(cmd)?,
607            UnsubscribeCommand::Bars(cmd) => self.unsubscribe_bars(cmd)?,
608            _ => {} // Do nothing else
609        }
610
611        if let Some(client_id) = cmd.client_id()
612            && self.external_clients.contains(client_id)
613        {
614            if self.config.debug {
615                log::debug!(
616                    "Skipping unsubscribe command for external client {client_id}: {cmd:?}",
617                );
618            }
619            return Ok(());
620        }
621
622        if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
623            client.execute_unsubscribe(cmd);
624        } else {
625            log::error!(
626                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
627                cmd.client_id(),
628                cmd.venue(),
629            );
630        }
631
632        Ok(())
633    }
634
635    #[cfg(feature = "defi")]
636    /// Handles an unsubscribe command, updating internal state and forwarding to the client.
637    ///
638    /// # Errors
639    ///
640    /// Returns an error if the underlying client operation fails.
641    pub fn execute_defi_unsubscribe(&mut self, cmd: &DefiUnsubscribeCommand) -> anyhow::Result<()> {
642        if let Some(client_id) = cmd.client_id()
643            && self.external_clients.contains(client_id)
644        {
645            if self.config.debug {
646                log::debug!("Skipping defi unsubscribe for external client {client_id}: {cmd:?}",);
647            }
648            return Ok(());
649        }
650
651        if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
652            client.execute_defi_unsubscribe(cmd);
653        } else {
654            log::error!(
655                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
656                cmd.client_id(),
657                cmd.venue(),
658            );
659        }
660
661        Ok(())
662    }
663
664    /// Sends a [`RequestCommand`] to a suitable data client implementation.
665    ///
666    /// # Errors
667    ///
668    /// Returns an error if no client is found for the given client ID or venue,
669    /// or if the client fails to process the request.
670    pub fn execute_request(&mut self, req: &RequestCommand) -> anyhow::Result<()> {
671        // Skip requests for external clients
672        if let Some(cid) = req.client_id()
673            && self.external_clients.contains(cid)
674        {
675            if self.config.debug {
676                log::debug!("Skipping data request for external client {cid}: {req:?}");
677            }
678            return Ok(());
679        }
680        if let Some(client) = self.get_client(req.client_id(), req.venue()) {
681            match req {
682                RequestCommand::Data(req) => client.request_data(req),
683                RequestCommand::Instrument(req) => client.request_instrument(req),
684                RequestCommand::Instruments(req) => client.request_instruments(req),
685                RequestCommand::BookSnapshot(req) => client.request_book_snapshot(req),
686                RequestCommand::Quotes(req) => client.request_quotes(req),
687                RequestCommand::Trades(req) => client.request_trades(req),
688                RequestCommand::Bars(req) => client.request_bars(req),
689            }
690        } else {
691            anyhow::bail!(
692                "Cannot handle request: no client found for {:?} {:?}",
693                req.client_id(),
694                req.venue()
695            );
696        }
697    }
698
699    /// Processes a dynamically-typed data message.
700    ///
701    /// Currently supports `InstrumentAny` and `FundingRateUpdate`; unrecognized types are logged as errors.
702    pub fn process(&mut self, data: &dyn Any) {
703        // TODO: Eventually these could be added to the `Data` enum? process here for now
704        if let Some(data) = data.downcast_ref::<Data>() {
705            self.process_data(data.clone()); // TODO: Optimize (not necessary if we change handler)
706            return;
707        }
708
709        #[cfg(feature = "defi")]
710        if let Some(data) = data.downcast_ref::<DefiData>() {
711            self.process_defi_data(data.clone()); // TODO: Optimize (not necessary if we change handler)
712            return;
713        }
714
715        if let Some(instrument) = data.downcast_ref::<InstrumentAny>() {
716            self.handle_instrument(instrument.clone());
717        } else if let Some(funding_rate) = data.downcast_ref::<FundingRateUpdate>() {
718            self.handle_funding_rate(*funding_rate);
719        } else {
720            log::error!("Cannot process data {data:?}, type is unrecognized");
721        }
722    }
723
724    /// Processes a `Data` enum instance, dispatching to appropriate handlers.
725    pub fn process_data(&mut self, data: Data) {
726        match data {
727            Data::Delta(delta) => self.handle_delta(delta),
728            Data::Deltas(deltas) => self.handle_deltas(deltas.into_inner()),
729            Data::Depth10(depth) => self.handle_depth10(*depth),
730            Data::Quote(quote) => self.handle_quote(quote),
731            Data::Trade(trade) => self.handle_trade(trade),
732            Data::Bar(bar) => self.handle_bar(bar),
733            Data::MarkPriceUpdate(mark_price) => self.handle_mark_price(mark_price),
734            Data::IndexPriceUpdate(index_price) => self.handle_index_price(index_price),
735            Data::InstrumentClose(close) => self.handle_instrument_close(close),
736        }
737    }
738
739    /// Processes DeFi-specific data events.
740    #[cfg(feature = "defi")]
741    pub fn process_defi_data(&mut self, data: DefiData) {
742        match data {
743            DefiData::Block(block) => {
744                let topic = switchboard::get_defi_blocks_topic(block.chain());
745                msgbus::publish(topic, &block as &dyn Any);
746            }
747            DefiData::Pool(pool) => {
748                if let Err(err) = self.cache.borrow_mut().add_pool(pool.clone()) {
749                    log::error!("Failed to add Pool to cache: {err}");
750                }
751
752                let topic = switchboard::get_defi_pool_topic(pool.instrument_id);
753                msgbus::publish(topic, &pool as &dyn Any);
754            }
755            DefiData::PoolSwap(swap) => {
756                let topic = switchboard::get_defi_pool_swaps_topic(swap.instrument_id);
757                msgbus::publish(topic, &swap as &dyn Any);
758            }
759            DefiData::PoolLiquidityUpdate(update) => {
760                let topic = switchboard::get_defi_liquidity_topic(update.instrument_id);
761                msgbus::publish(topic, &update as &dyn Any);
762            }
763            DefiData::PoolFeeCollect(collect) => {
764                let topic = switchboard::get_defi_collect_topic(collect.instrument_id);
765                msgbus::publish(topic, &collect as &dyn Any);
766            }
767        }
768    }
769
770    /// Processes a `DataResponse`, handling and publishing the response message.
771    pub fn response(&self, resp: DataResponse) {
772        log::debug!("{RECV}{RES} {resp:?}");
773
774        match &resp {
775            DataResponse::Instrument(resp) => {
776                self.handle_instrument_response(resp.data.clone());
777            }
778            DataResponse::Instruments(resp) => {
779                self.handle_instruments(&resp.data);
780            }
781            DataResponse::Quotes(resp) => self.handle_quotes(&resp.data),
782            DataResponse::Trades(resp) => self.handle_trades(&resp.data),
783            DataResponse::Bars(resp) => self.handle_bars(&resp.data),
784            _ => todo!(),
785        }
786
787        msgbus::send_response(resp.correlation_id(), &resp);
788    }
789
790    // -- DATA HANDLERS ---------------------------------------------------------------------------
791
792    fn handle_instrument(&mut self, instrument: InstrumentAny) {
793        if let Err(e) = self
794            .cache
795            .as_ref()
796            .borrow_mut()
797            .add_instrument(instrument.clone())
798        {
799            log_error_on_cache_insert(&e);
800        }
801
802        let topic = switchboard::get_instrument_topic(instrument.id());
803        msgbus::publish(topic, &instrument as &dyn Any);
804    }
805
806    fn handle_delta(&mut self, delta: OrderBookDelta) {
807        let deltas = if self.config.buffer_deltas {
808            if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&delta.instrument_id) {
809                buffered_deltas.deltas.push(delta);
810            } else {
811                let buffered_deltas = OrderBookDeltas::new(delta.instrument_id, vec![delta]);
812                self.buffered_deltas_map
813                    .insert(delta.instrument_id, buffered_deltas);
814            }
815
816            if !RecordFlag::F_LAST.matches(delta.flags) {
817                return; // Not the last delta for event
818            }
819
820            // SAFETY: We know the deltas exists already
821            self.buffered_deltas_map
822                .remove(&delta.instrument_id)
823                .unwrap()
824        } else {
825            OrderBookDeltas::new(delta.instrument_id, vec![delta])
826        };
827
828        let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
829        msgbus::publish(topic, &deltas as &dyn Any);
830    }
831
832    fn handle_deltas(&mut self, deltas: OrderBookDeltas) {
833        let deltas = if self.config.buffer_deltas {
834            let mut is_last_delta = false;
835            for delta in &deltas.deltas {
836                if RecordFlag::F_LAST.matches(delta.flags) {
837                    is_last_delta = true;
838                    break;
839                }
840            }
841
842            let instrument_id = deltas.instrument_id;
843
844            if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&instrument_id) {
845                buffered_deltas.deltas.extend(deltas.deltas);
846            } else {
847                self.buffered_deltas_map.insert(instrument_id, deltas);
848            }
849
850            if !is_last_delta {
851                return;
852            }
853
854            // SAFETY: We know the deltas exists already
855            self.buffered_deltas_map.remove(&instrument_id).unwrap()
856        } else {
857            deltas
858        };
859
860        let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
861        msgbus::publish(topic, &deltas as &dyn Any);
862    }
863
864    fn handle_depth10(&mut self, depth: OrderBookDepth10) {
865        let topic = switchboard::get_book_depth10_topic(depth.instrument_id);
866        msgbus::publish(topic, &depth as &dyn Any);
867    }
868
869    fn handle_quote(&mut self, quote: QuoteTick) {
870        if let Err(e) = self.cache.as_ref().borrow_mut().add_quote(quote) {
871            log_error_on_cache_insert(&e);
872        }
873
874        // TODO: Handle synthetics
875
876        let topic = switchboard::get_quotes_topic(quote.instrument_id);
877        msgbus::publish(topic, &quote as &dyn Any);
878    }
879
880    fn handle_trade(&mut self, trade: TradeTick) {
881        if let Err(e) = self.cache.as_ref().borrow_mut().add_trade(trade) {
882            log_error_on_cache_insert(&e);
883        }
884
885        // TODO: Handle synthetics
886
887        let topic = switchboard::get_trades_topic(trade.instrument_id);
888        msgbus::publish(topic, &trade as &dyn Any);
889    }
890
891    fn handle_bar(&mut self, bar: Bar) {
892        // TODO: Handle additional bar logic
893        if self.config.validate_data_sequence
894            && let Some(last_bar) = self.cache.as_ref().borrow().bar(&bar.bar_type)
895        {
896            if bar.ts_event < last_bar.ts_event {
897                log::warn!(
898                    "Bar {bar} was prior to last bar `ts_event` {}",
899                    last_bar.ts_event
900                );
901                return; // Bar is out of sequence
902            }
903            if bar.ts_init < last_bar.ts_init {
904                log::warn!(
905                    "Bar {bar} was prior to last bar `ts_init` {}",
906                    last_bar.ts_init
907                );
908                return; // Bar is out of sequence
909            }
910            // TODO: Implement `bar.is_revision` logic
911        }
912
913        if let Err(e) = self.cache.as_ref().borrow_mut().add_bar(bar) {
914            log_error_on_cache_insert(&e);
915        }
916
917        let topic = switchboard::get_bars_topic(bar.bar_type);
918        msgbus::publish(topic, &bar as &dyn Any);
919    }
920
921    fn handle_mark_price(&mut self, mark_price: MarkPriceUpdate) {
922        if let Err(e) = self.cache.as_ref().borrow_mut().add_mark_price(mark_price) {
923            log_error_on_cache_insert(&e);
924        }
925
926        let topic = switchboard::get_mark_price_topic(mark_price.instrument_id);
927        msgbus::publish(topic, &mark_price as &dyn Any);
928    }
929
930    fn handle_index_price(&mut self, index_price: IndexPriceUpdate) {
931        if let Err(e) = self
932            .cache
933            .as_ref()
934            .borrow_mut()
935            .add_index_price(index_price)
936        {
937            log_error_on_cache_insert(&e);
938        }
939
940        let topic = switchboard::get_index_price_topic(index_price.instrument_id);
941        msgbus::publish(topic, &index_price as &dyn Any);
942    }
943
944    /// Handles a funding rate update by adding it to the cache and publishing to the message bus.
945    pub fn handle_funding_rate(&mut self, funding_rate: FundingRateUpdate) {
946        if let Err(e) = self
947            .cache
948            .as_ref()
949            .borrow_mut()
950            .add_funding_rate(funding_rate)
951        {
952            log_error_on_cache_insert(&e);
953        }
954
955        let topic = switchboard::get_funding_rate_topic(funding_rate.instrument_id);
956        msgbus::publish(topic, &funding_rate as &dyn Any);
957    }
958
959    fn handle_instrument_close(&mut self, close: InstrumentClose) {
960        let topic = switchboard::get_instrument_close_topic(close.instrument_id);
961        msgbus::publish(topic, &close as &dyn Any);
962    }
963
964    // -- SUBSCRIPTION HANDLERS -------------------------------------------------------------------
965
966    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
967        if cmd.instrument_id.is_synthetic() {
968            anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
969        }
970
971        self.setup_order_book(&cmd.instrument_id, cmd.book_type, true, cmd.managed)?;
972
973        Ok(())
974    }
975
976    fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
977        if cmd.instrument_id.is_synthetic() {
978            anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDepth10` data");
979        }
980
981        self.setup_order_book(&cmd.instrument_id, cmd.book_type, false, cmd.managed)?;
982
983        Ok(())
984    }
985
986    fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
987        if self.subscribed_book_deltas().contains(&cmd.instrument_id) {
988            return Ok(());
989        }
990
991        if cmd.instrument_id.is_synthetic() {
992            anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
993        }
994
995        // Track snapshot intervals per instrument, and set up timer on first subscription
996        let first_for_interval = match self.book_intervals.entry(cmd.interval_ms) {
997            Entry::Vacant(e) => {
998                let mut set = AHashSet::new();
999                set.insert(cmd.instrument_id);
1000                e.insert(set);
1001                true
1002            }
1003            Entry::Occupied(mut e) => {
1004                e.get_mut().insert(cmd.instrument_id);
1005                false
1006            }
1007        };
1008
1009        if first_for_interval {
1010            // Initialize snapshotter and schedule its timer
1011            let interval_ns = millis_to_nanos(cmd.interval_ms.get() as f64);
1012            let topic = switchboard::get_book_snapshots_topic(cmd.instrument_id, cmd.interval_ms);
1013
1014            let snap_info = BookSnapshotInfo {
1015                instrument_id: cmd.instrument_id,
1016                venue: cmd.instrument_id.venue,
1017                is_composite: cmd.instrument_id.symbol.is_composite(),
1018                root: Ustr::from(cmd.instrument_id.symbol.root()),
1019                topic,
1020                interval_ms: cmd.interval_ms,
1021            };
1022
1023            // Schedule the first snapshot at the next interval boundary
1024            let now_ns = self.clock.borrow().timestamp_ns().as_u64();
1025            let start_time_ns = now_ns - (now_ns % interval_ns) + interval_ns;
1026
1027            let snapshotter = Rc::new(BookSnapshotter::new(snap_info, self.cache.clone()));
1028            self.book_snapshotters
1029                .insert(cmd.instrument_id, snapshotter.clone());
1030            let timer_name = snapshotter.timer_name;
1031
1032            let callback =
1033                TimeEventCallback::Rust(Rc::new(move |event| snapshotter.snapshot(event)));
1034
1035            self.clock
1036                .borrow_mut()
1037                .set_timer_ns(
1038                    &timer_name,
1039                    interval_ns,
1040                    Some(start_time_ns.into()),
1041                    None,
1042                    Some(callback),
1043                    None,
1044                    None,
1045                )
1046                .expect(FAILED);
1047        }
1048
1049        self.setup_order_book(&cmd.instrument_id, cmd.book_type, false, true)?;
1050
1051        Ok(())
1052    }
1053
1054    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
1055        match cmd.bar_type.aggregation_source() {
1056            AggregationSource::Internal => {
1057                if !self.bar_aggregators.contains_key(&cmd.bar_type.standard()) {
1058                    self.start_bar_aggregator(cmd.bar_type)?;
1059                }
1060            }
1061            AggregationSource::External => {
1062                if cmd.bar_type.instrument_id().is_synthetic() {
1063                    anyhow::bail!(
1064                        "Cannot subscribe for externally aggregated synthetic instrument bar data"
1065                    );
1066                }
1067            }
1068        }
1069
1070        Ok(())
1071    }
1072
1073    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
1074        if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
1075            log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
1076            return Ok(());
1077        }
1078
1079        let topics = vec![
1080            switchboard::get_book_deltas_topic(cmd.instrument_id),
1081            switchboard::get_book_depth10_topic(cmd.instrument_id),
1082            // TODO: Unsubscribe from snapshots?
1083        ];
1084
1085        self.maintain_book_updater(&cmd.instrument_id, &topics);
1086        self.maintain_book_snapshotter(&cmd.instrument_id);
1087
1088        Ok(())
1089    }
1090
1091    fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
1092        if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
1093            log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
1094            return Ok(());
1095        }
1096
1097        let topics = vec![
1098            switchboard::get_book_deltas_topic(cmd.instrument_id),
1099            switchboard::get_book_depth10_topic(cmd.instrument_id),
1100            // TODO: Unsubscribe from snapshots?
1101        ];
1102
1103        self.maintain_book_updater(&cmd.instrument_id, &topics);
1104        self.maintain_book_snapshotter(&cmd.instrument_id);
1105
1106        Ok(())
1107    }
1108
1109    fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
1110        if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
1111            log::warn!("Cannot unsubscribe from `OrderBook` snapshots: not subscribed");
1112            return Ok(());
1113        }
1114
1115        // Remove instrument from interval tracking, and drop empty intervals
1116        let mut to_remove = Vec::new();
1117        for (interval, set) in &mut self.book_intervals {
1118            if set.remove(&cmd.instrument_id) && set.is_empty() {
1119                to_remove.push(*interval);
1120            }
1121        }
1122
1123        for interval in to_remove {
1124            self.book_intervals.remove(&interval);
1125        }
1126
1127        let topics = vec![
1128            switchboard::get_book_deltas_topic(cmd.instrument_id),
1129            switchboard::get_book_depth10_topic(cmd.instrument_id),
1130            // TODO: Unsubscribe from snapshots (add interval_ms to message?)
1131        ];
1132
1133        self.maintain_book_updater(&cmd.instrument_id, &topics);
1134        self.maintain_book_snapshotter(&cmd.instrument_id);
1135
1136        Ok(())
1137    }
1138
1139    /// Unsubscribe internal bar aggregator for the given bar type.
1140    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
1141        // If we have an internal aggregator for this bar type, stop and remove it
1142        let bar_type = cmd.bar_type;
1143        if self.bar_aggregators.contains_key(&bar_type.standard()) {
1144            if let Err(err) = self.stop_bar_aggregator(bar_type) {
1145                log::error!("Error stopping bar aggregator for {bar_type}: {err}");
1146            }
1147            self.bar_aggregators.remove(&bar_type.standard());
1148            log::debug!("Removed bar aggregator for {bar_type}");
1149        }
1150        Ok(())
1151    }
1152
1153    fn maintain_book_updater(&mut self, instrument_id: &InstrumentId, topics: &[MStr<Topic>]) {
1154        if let Some(updater) = self.book_updaters.get(instrument_id) {
1155            let handler = ShareableMessageHandler(updater.clone());
1156
1157            // Unsubscribe handler if it is the last subscriber
1158            for topic in topics {
1159                if msgbus::subscriptions_count(topic.as_str()) == 1
1160                    && msgbus::is_subscribed(topic.as_str(), handler.clone())
1161                {
1162                    log::debug!("Unsubscribing BookUpdater from {topic}");
1163                    msgbus::unsubscribe_topic(*topic, handler.clone());
1164                }
1165            }
1166
1167            // Check remaining subscriptions, if none then remove updater
1168            let still_subscribed = topics
1169                .iter()
1170                .any(|topic| msgbus::is_subscribed(topic.as_str(), handler.clone()));
1171            if !still_subscribed {
1172                self.book_updaters.remove(instrument_id);
1173                log::debug!("Removed BookUpdater for instrument ID {instrument_id}");
1174            }
1175        }
1176    }
1177
1178    fn maintain_book_snapshotter(&mut self, instrument_id: &InstrumentId) {
1179        if let Some(snapshotter) = self.book_snapshotters.get(instrument_id) {
1180            let topic = switchboard::get_book_snapshots_topic(
1181                *instrument_id,
1182                snapshotter.snap_info.interval_ms,
1183            );
1184
1185            // Check remaining snapshot subscriptions, if none then remove snapshotter
1186            if msgbus::subscriptions_count(topic.as_str()) == 0 {
1187                let timer_name = snapshotter.timer_name;
1188                self.book_snapshotters.remove(instrument_id);
1189                let mut clock = self.clock.borrow_mut();
1190                if clock.timer_names().contains(&timer_name.as_str()) {
1191                    clock.cancel_timer(&timer_name);
1192                }
1193                log::debug!("Removed BookSnapshotter for instrument ID {instrument_id}");
1194            }
1195        }
1196    }
1197
1198    // -- RESPONSE HANDLERS -----------------------------------------------------------------------
1199
1200    fn handle_instrument_response(&self, instrument: InstrumentAny) {
1201        let mut cache = self.cache.as_ref().borrow_mut();
1202        if let Err(e) = cache.add_instrument(instrument) {
1203            log_error_on_cache_insert(&e);
1204        }
1205    }
1206
1207    fn handle_instruments(&self, instruments: &[InstrumentAny]) {
1208        // TODO: Improve by adding bulk update methods to cache and database
1209        let mut cache = self.cache.as_ref().borrow_mut();
1210        for instrument in instruments {
1211            if let Err(e) = cache.add_instrument(instrument.clone()) {
1212                log_error_on_cache_insert(&e);
1213            }
1214        }
1215    }
1216
1217    fn handle_quotes(&self, quotes: &[QuoteTick]) {
1218        if let Err(e) = self.cache.as_ref().borrow_mut().add_quotes(quotes) {
1219            log_error_on_cache_insert(&e);
1220        }
1221    }
1222
1223    fn handle_trades(&self, trades: &[TradeTick]) {
1224        if let Err(e) = self.cache.as_ref().borrow_mut().add_trades(trades) {
1225            log_error_on_cache_insert(&e);
1226        }
1227    }
1228
1229    fn handle_bars(&self, bars: &[Bar]) {
1230        if let Err(e) = self.cache.as_ref().borrow_mut().add_bars(bars) {
1231            log_error_on_cache_insert(&e);
1232        }
1233    }
1234
1235    // -- INTERNAL --------------------------------------------------------------------------------
1236
1237    #[allow(clippy::too_many_arguments)]
1238    fn setup_order_book(
1239        &mut self,
1240        instrument_id: &InstrumentId,
1241        book_type: BookType,
1242        only_deltas: bool,
1243        managed: bool,
1244    ) -> anyhow::Result<()> {
1245        let mut cache = self.cache.borrow_mut();
1246        if managed && !cache.has_order_book(instrument_id) {
1247            let book = OrderBook::new(*instrument_id, book_type);
1248            log::debug!("Created {book}");
1249            cache.add_order_book(book)?;
1250        }
1251
1252        // Set up subscriptions
1253        let updater = Rc::new(BookUpdater::new(instrument_id, self.cache.clone()));
1254        self.book_updaters.insert(*instrument_id, updater.clone());
1255
1256        let handler = ShareableMessageHandler(updater);
1257
1258        let topic = switchboard::get_book_deltas_topic(*instrument_id);
1259        if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1260            msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1261        }
1262
1263        let topic = switchboard::get_book_depth10_topic(*instrument_id);
1264        if !only_deltas && !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1265            msgbus::subscribe(topic.into(), handler, Some(self.msgbus_priority));
1266        }
1267
1268        Ok(())
1269    }
1270
1271    #[cfg(feature = "defi")]
1272    fn setup_pool_updater(&mut self, instrument_id: &InstrumentId) {
1273        if self.pool_updaters.contains_key(instrument_id) {
1274            return;
1275        }
1276
1277        let updater = Rc::new(PoolUpdater::new(instrument_id, self.cache.clone()));
1278        let handler = ShareableMessageHandler(updater.clone());
1279
1280        // Subscribe to pool swaps and liquidity updates
1281        let swap_topic = switchboard::get_defi_pool_swaps_topic(*instrument_id);
1282        if !msgbus::is_subscribed(swap_topic.as_str(), handler.clone()) {
1283            msgbus::subscribe(
1284                swap_topic.into(),
1285                handler.clone(),
1286                Some(self.msgbus_priority),
1287            );
1288        }
1289
1290        let liquidity_topic = switchboard::get_defi_liquidity_topic(*instrument_id);
1291        if !msgbus::is_subscribed(liquidity_topic.as_str(), handler.clone()) {
1292            msgbus::subscribe(liquidity_topic.into(), handler, Some(self.msgbus_priority));
1293        }
1294
1295        self.pool_updaters.insert(*instrument_id, updater);
1296        log::debug!("Created PoolUpdater for instrument ID {instrument_id}");
1297    }
1298
1299    fn create_bar_aggregator(
1300        &mut self,
1301        instrument: &InstrumentAny,
1302        bar_type: BarType,
1303    ) -> Box<dyn BarAggregator> {
1304        let cache = self.cache.clone();
1305
1306        let handler = move |bar: Bar| {
1307            if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
1308                log_error_on_cache_insert(&e);
1309            }
1310
1311            let topic = switchboard::get_bars_topic(bar.bar_type);
1312            msgbus::publish(topic, &bar as &dyn Any);
1313        };
1314
1315        let clock = self.clock.clone();
1316        let config = self.config.clone();
1317
1318        let price_precision = instrument.price_precision();
1319        let size_precision = instrument.size_precision();
1320
1321        if bar_type.spec().is_time_aggregated() {
1322            // Get time_bars_origin_offset from config
1323            let time_bars_origin_offset = config
1324                .time_bars_origins
1325                .get(&bar_type.spec().aggregation)
1326                .map(|duration| chrono::TimeDelta::from_std(*duration).unwrap_or_default());
1327
1328            Box::new(TimeBarAggregator::new(
1329                bar_type,
1330                price_precision,
1331                size_precision,
1332                clock,
1333                handler,
1334                false, // await_partial
1335                config.time_bars_build_with_no_updates,
1336                config.time_bars_timestamp_on_close,
1337                config.time_bars_interval_type,
1338                time_bars_origin_offset,
1339                20,    // TODO: TBD, composite bar build delay
1340                false, // TODO: skip_first_non_full_bar, make it config dependent
1341            ))
1342        } else {
1343            match bar_type.spec().aggregation {
1344                BarAggregation::Tick => Box::new(TickBarAggregator::new(
1345                    bar_type,
1346                    price_precision,
1347                    size_precision,
1348                    handler,
1349                    false,
1350                )) as Box<dyn BarAggregator>,
1351                BarAggregation::Volume => Box::new(VolumeBarAggregator::new(
1352                    bar_type,
1353                    price_precision,
1354                    size_precision,
1355                    handler,
1356                    false,
1357                )) as Box<dyn BarAggregator>,
1358                BarAggregation::Value => Box::new(ValueBarAggregator::new(
1359                    bar_type,
1360                    price_precision,
1361                    size_precision,
1362                    handler,
1363                    false,
1364                )) as Box<dyn BarAggregator>,
1365                _ => panic!(
1366                    "Cannot create aggregator: {} aggregation not currently supported",
1367                    bar_type.spec().aggregation
1368                ),
1369            }
1370        }
1371    }
1372
1373    fn start_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
1374        // Get the instrument for this bar type
1375        let instrument = {
1376            let cache = self.cache.borrow();
1377            cache
1378                .instrument(&bar_type.instrument_id())
1379                .ok_or_else(|| {
1380                    anyhow::anyhow!(
1381                        "Cannot start bar aggregation: no instrument found for {}",
1382                        bar_type.instrument_id(),
1383                    )
1384                })?
1385                .clone()
1386        };
1387
1388        // Use standard form of bar type as key
1389        let bar_key = bar_type.standard();
1390
1391        // Create or retrieve aggregator in Rc<RefCell>
1392        let aggregator = if let Some(rc) = self.bar_aggregators.get(&bar_key) {
1393            rc.clone()
1394        } else {
1395            let agg = self.create_bar_aggregator(&instrument, bar_type);
1396            let rc = Rc::new(RefCell::new(agg));
1397            self.bar_aggregators.insert(bar_key, rc.clone());
1398            rc
1399        };
1400
1401        // Subscribe to underlying data topics
1402        let mut handlers = Vec::new();
1403
1404        if bar_type.is_composite() {
1405            let topic = switchboard::get_bars_topic(bar_type.composite());
1406            let handler =
1407                ShareableMessageHandler(Rc::new(BarBarHandler::new(aggregator.clone(), bar_key)));
1408
1409            if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1410                msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1411            }
1412
1413            handlers.push((topic, handler));
1414        } else if bar_type.spec().price_type == PriceType::Last {
1415            let topic = switchboard::get_trades_topic(bar_type.instrument_id());
1416            let handler =
1417                ShareableMessageHandler(Rc::new(BarTradeHandler::new(aggregator.clone(), bar_key)));
1418
1419            if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1420                msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1421            }
1422
1423            handlers.push((topic, handler));
1424        } else {
1425            let topic = switchboard::get_quotes_topic(bar_type.instrument_id());
1426            let handler =
1427                ShareableMessageHandler(Rc::new(BarQuoteHandler::new(aggregator.clone(), bar_key)));
1428
1429            if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1430                msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1431            }
1432
1433            handlers.push((topic, handler));
1434        }
1435
1436        self.bar_aggregator_handlers.insert(bar_key, handlers);
1437        aggregator.borrow_mut().set_is_running(true);
1438
1439        Ok(())
1440    }
1441
1442    fn stop_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
1443        let aggregator = self
1444            .bar_aggregators
1445            .remove(&bar_type.standard())
1446            .ok_or_else(|| {
1447                anyhow::anyhow!("Cannot stop bar aggregator: no aggregator to stop for {bar_type}")
1448            })?;
1449
1450        aggregator.borrow_mut().stop();
1451
1452        // Unsubscribe any registered message handlers
1453        let bar_key = bar_type.standard();
1454        if let Some(subs) = self.bar_aggregator_handlers.remove(&bar_key) {
1455            for (topic, handler) in subs {
1456                if msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1457                    msgbus::unsubscribe_topic(topic, handler);
1458                }
1459            }
1460        }
1461
1462        Ok(())
1463    }
1464}
1465
1466#[inline(always)]
1467fn log_error_on_cache_insert<T: Display>(e: &T) {
1468    log::error!("Error on cache insert: {e}");
1469}