nautilus_data/engine/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a high-performance `DataEngine` for all environments.
17//!
18//! The `DataEngine` is the central component of the entire data stack.
19//! The data engines primary responsibility is to orchestrate interactions between
20//! the `DataClient` instances, and the rest of the platform. This includes sending
21//! requests to, and receiving responses from, data endpoints via its registered
22//! data clients.
23//!
24//! The engine employs a simple fan-in fan-out messaging pattern to execute
25//! `DataCommand` type messages, and process `DataResponse` messages or market data
26//! objects.
27//!
28//! Alternative implementations can be written on top of the generic engine - which
29//! just need to override the `execute`, `process`, `send` and `receive` methods.
30
31pub mod book;
32pub mod config;
33mod handlers;
34#[cfg(feature = "defi")]
35pub mod pool;
36
37use std::{
38    any::Any,
39    cell::{Ref, RefCell},
40    collections::hash_map::Entry,
41    fmt::Display,
42    num::NonZeroUsize,
43    rc::Rc,
44};
45
46use ahash::{AHashMap, AHashSet};
47use book::{BookSnapshotInfo, BookSnapshotter, BookUpdater};
48use config::DataEngineConfig;
49use handlers::{BarBarHandler, BarQuoteHandler, BarTradeHandler};
50use indexmap::IndexMap;
51#[cfg(feature = "defi")]
52use nautilus_common::messages::defi::{DefiSubscribeCommand, DefiUnsubscribeCommand};
53use nautilus_common::{
54    cache::Cache,
55    clock::Clock,
56    logging::{RECV, RES},
57    messages::data::{
58        DataCommand, DataResponse, RequestCommand, SubscribeBars, SubscribeBookDeltas,
59        SubscribeBookDepth10, SubscribeBookSnapshots, SubscribeCommand, UnsubscribeBars,
60        UnsubscribeBookDeltas, UnsubscribeBookDepth10, UnsubscribeBookSnapshots,
61        UnsubscribeCommand,
62    },
63    msgbus::{self, MStr, Topic, handler::ShareableMessageHandler, switchboard},
64    timer::TimeEventCallback,
65};
66use nautilus_core::{
67    correctness::{
68        FAILED, check_key_in_map, check_key_not_in_map, check_predicate_false, check_predicate_true,
69    },
70    datetime::millis_to_nanos,
71};
72#[cfg(feature = "defi")]
73use nautilus_model::defi::Blockchain;
74#[cfg(feature = "defi")]
75use nautilus_model::defi::DefiData;
76use nautilus_model::{
77    data::{
78        Bar, BarType, Data, DataType, FundingRateUpdate, IndexPriceUpdate, InstrumentClose,
79        MarkPriceUpdate, OrderBookDelta, OrderBookDeltas, OrderBookDepth10, QuoteTick, TradeTick,
80    },
81    enums::{AggregationSource, BarAggregation, BookType, PriceType, RecordFlag},
82    identifiers::{ClientId, InstrumentId, Venue},
83    instruments::{Instrument, InstrumentAny, SyntheticInstrument},
84    orderbook::OrderBook,
85};
86use nautilus_persistence::backend::catalog::ParquetDataCatalog;
87use ustr::Ustr;
88
89#[cfg(feature = "defi")]
90use crate::engine::pool::PoolUpdater;
91use crate::{
92    aggregation::{
93        BarAggregator, TickBarAggregator, TimeBarAggregator, ValueBarAggregator,
94        VolumeBarAggregator,
95    },
96    client::DataClientAdapter,
97};
98
99/// Provides a high-performance `DataEngine` for all environments.
100#[derive(Debug)]
101pub struct DataEngine {
102    clock: Rc<RefCell<dyn Clock>>,
103    cache: Rc<RefCell<Cache>>,
104    clients: IndexMap<ClientId, DataClientAdapter>,
105    default_client: Option<DataClientAdapter>,
106    external_clients: AHashSet<ClientId>,
107    catalogs: AHashMap<Ustr, ParquetDataCatalog>,
108    routing_map: IndexMap<Venue, ClientId>,
109    book_intervals: AHashMap<NonZeroUsize, AHashSet<InstrumentId>>,
110    book_updaters: AHashMap<InstrumentId, Rc<BookUpdater>>,
111    book_snapshotters: AHashMap<InstrumentId, Rc<BookSnapshotter>>,
112    bar_aggregators: AHashMap<BarType, Rc<RefCell<Box<dyn BarAggregator>>>>,
113    bar_aggregator_handlers: AHashMap<BarType, Vec<(MStr<Topic>, ShareableMessageHandler)>>,
114    _synthetic_quote_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
115    _synthetic_trade_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
116    buffered_deltas_map: AHashMap<InstrumentId, OrderBookDeltas>,
117    msgbus_priority: u8,
118    config: DataEngineConfig,
119    #[cfg(feature = "defi")]
120    pool_updaters: AHashMap<InstrumentId, Rc<crate::engine::pool::PoolUpdater>>,
121}
122
123impl DataEngine {
124    /// Creates a new [`DataEngine`] instance.
125    #[must_use]
126    pub fn new(
127        clock: Rc<RefCell<dyn Clock>>,
128        cache: Rc<RefCell<Cache>>,
129        config: Option<DataEngineConfig>,
130    ) -> Self {
131        let config = config.unwrap_or_default();
132
133        let external_clients: AHashSet<ClientId> = config
134            .external_clients
135            .clone()
136            .unwrap_or_default()
137            .into_iter()
138            .collect();
139
140        Self {
141            clock,
142            cache,
143            clients: IndexMap::new(),
144            default_client: None,
145            external_clients,
146            catalogs: AHashMap::new(),
147            routing_map: IndexMap::new(),
148            book_intervals: AHashMap::new(),
149            book_updaters: AHashMap::new(),
150            book_snapshotters: AHashMap::new(),
151            bar_aggregators: AHashMap::new(),
152            bar_aggregator_handlers: AHashMap::new(),
153            _synthetic_quote_feeds: AHashMap::new(),
154            _synthetic_trade_feeds: AHashMap::new(),
155            buffered_deltas_map: AHashMap::new(),
156            msgbus_priority: 10, // High-priority for built-in component
157            config,
158            #[cfg(feature = "defi")]
159            pool_updaters: AHashMap::new(),
160        }
161    }
162
163    /// Returns a read-only reference to the engines clock.
164    #[must_use]
165    pub fn get_clock(&self) -> Ref<'_, dyn Clock> {
166        self.clock.borrow()
167    }
168
169    /// Returns a read-only reference to the engines cache.
170    #[must_use]
171    pub fn get_cache(&self) -> Ref<'_, Cache> {
172        self.cache.borrow()
173    }
174
175    /// Registers the `catalog` with the engine with an optional specific `name`.
176    ///
177    /// # Panics
178    ///
179    /// Panics if a catalog with the same `name` has already been registered.
180    pub fn register_catalog(&mut self, catalog: ParquetDataCatalog, name: Option<String>) {
181        let name = Ustr::from(&name.unwrap_or("catalog_0".to_string()));
182
183        check_key_not_in_map(&name, &self.catalogs, "name", "catalogs").expect(FAILED);
184
185        self.catalogs.insert(name, catalog);
186        log::info!("Registered catalog <{name}>");
187    }
188
189    /// Registers the `client` with the engine with an optional venue `routing`.
190    ///
191    ///
192    /// # Panics
193    ///
194    /// Panics if a client with the same client ID has already been registered.
195    pub fn register_client(&mut self, client: DataClientAdapter, routing: Option<Venue>) {
196        let client_id = client.client_id();
197
198        if let Some(default_client) = &self.default_client {
199            check_predicate_false(
200                default_client.client_id() == client.client_id(),
201                "client_id already registered as default client",
202            )
203            .expect(FAILED);
204        }
205
206        check_key_not_in_map(&client_id, &self.clients, "client_id", "clients").expect(FAILED);
207
208        if let Some(routing) = routing {
209            self.routing_map.insert(routing, client_id);
210            log::info!("Set client {client_id} routing for {routing}");
211        }
212
213        if client.venue.is_none() && self.default_client.is_none() {
214            self.default_client = Some(client);
215            log::info!("Registered client {client_id} for default routing");
216        } else {
217            self.clients.insert(client_id, client);
218            log::info!("Registered client {client_id}");
219        }
220    }
221
222    /// Deregisters the client for the `client_id`.
223    ///
224    /// # Panics
225    ///
226    /// Panics if the client ID has not been registered.
227    pub fn deregister_client(&mut self, client_id: &ClientId) {
228        check_key_in_map(client_id, &self.clients, "client_id", "clients").expect(FAILED);
229
230        self.clients.shift_remove(client_id);
231        log::info!("Deregistered client {client_id}");
232    }
233
234    /// Registers the data `client` with the engine as the default routing client.
235    ///
236    /// When a specific venue routing cannot be found, this client will receive messages.
237    ///
238    /// # Warnings
239    ///
240    /// Any existing default routing client will be overwritten.
241    ///
242    /// # Panics
243    ///
244    /// Panics if a default client has already been registered.
245    pub fn register_default_client(&mut self, client: DataClientAdapter) {
246        check_predicate_true(
247            self.default_client.is_none(),
248            "default client already registered",
249        )
250        .expect(FAILED);
251
252        let client_id = client.client_id();
253
254        self.default_client = Some(client);
255        log::info!("Registered default client {client_id}");
256    }
257
258    /// Starts all registered data clients.
259    pub fn start(&mut self) {
260        for client in self.get_clients_mut() {
261            if let Err(e) = client.start() {
262                log::error!("{e}");
263            }
264        }
265    }
266
267    /// Stops all registered data clients.
268    pub fn stop(&mut self) {
269        for client in self.get_clients_mut() {
270            if let Err(e) = client.stop() {
271                log::error!("{e}");
272            }
273        }
274    }
275
276    /// Resets all registered data clients to their initial state.
277    pub fn reset(&mut self) {
278        for client in self.get_clients_mut() {
279            if let Err(e) = client.reset() {
280                log::error!("{e}");
281            }
282        }
283    }
284
285    /// Disposes the engine, stopping all clients and canceling any timers.
286    pub fn dispose(&mut self) {
287        for client in self.get_clients_mut() {
288            if let Err(e) = client.dispose() {
289                log::error!("{e}");
290            }
291        }
292
293        self.clock.borrow_mut().cancel_timers();
294    }
295
296    /// Returns `true` if all registered data clients are currently connected.
297    #[must_use]
298    pub fn check_connected(&self) -> bool {
299        self.get_clients()
300            .iter()
301            .all(|client| client.is_connected())
302    }
303
304    /// Returns `true` if all registered data clients are currently disconnected.
305    #[must_use]
306    pub fn check_disconnected(&self) -> bool {
307        self.get_clients()
308            .iter()
309            .all(|client| !client.is_connected())
310    }
311
312    /// Returns a list of all registered client IDs, including the default client if set.
313    #[must_use]
314    pub fn registered_clients(&self) -> Vec<ClientId> {
315        self.get_clients()
316            .into_iter()
317            .map(|client| client.client_id())
318            .collect()
319    }
320
321    // -- SUBSCRIPTIONS ---------------------------------------------------------------------------
322
323    fn collect_subscriptions<F, T>(&self, get_subs: F) -> Vec<T>
324    where
325        F: Fn(&DataClientAdapter) -> &AHashSet<T>,
326        T: Clone,
327    {
328        self.get_clients()
329            .into_iter()
330            .flat_map(get_subs)
331            .cloned()
332            .collect()
333    }
334
335    #[must_use]
336    pub fn get_clients(&self) -> Vec<&DataClientAdapter> {
337        let (default_opt, clients_map) = (&self.default_client, &self.clients);
338        let mut clients: Vec<&DataClientAdapter> = clients_map.values().collect();
339
340        if let Some(default) = default_opt {
341            clients.push(default);
342        }
343
344        clients
345    }
346
347    #[must_use]
348    pub fn get_clients_mut(&mut self) -> Vec<&mut DataClientAdapter> {
349        let (default_opt, clients_map) = (&mut self.default_client, &mut self.clients);
350        let mut clients: Vec<&mut DataClientAdapter> = clients_map.values_mut().collect();
351
352        if let Some(default) = default_opt {
353            clients.push(default);
354        }
355
356        clients
357    }
358
359    pub fn get_client(
360        &mut self,
361        client_id: Option<&ClientId>,
362        venue: Option<&Venue>,
363    ) -> Option<&mut DataClientAdapter> {
364        if let Some(client_id) = client_id {
365            // Explicit ID: first look in registered clients
366            if let Some(client) = self.clients.get_mut(client_id) {
367                return Some(client);
368            }
369
370            // Then check if it matches the default client
371            if let Some(default) = self.default_client.as_mut()
372                && default.client_id() == *client_id
373            {
374                return Some(default);
375            }
376
377            // Unknown explicit client
378            return None;
379        }
380
381        if let Some(v) = venue {
382            // Route by venue if mapped client still registered
383            if let Some(client_id) = self.routing_map.get(v) {
384                return self.clients.get_mut(client_id);
385            }
386        }
387
388        // Fallback to default client
389        self.get_default_client()
390    }
391
392    const fn get_default_client(&mut self) -> Option<&mut DataClientAdapter> {
393        self.default_client.as_mut()
394    }
395
396    /// Returns all custom data types currently subscribed across all clients.
397    #[must_use]
398    pub fn subscribed_custom_data(&self) -> Vec<DataType> {
399        self.collect_subscriptions(|client| &client.subscriptions_custom)
400    }
401
402    /// Returns all instrument IDs currently subscribed across all clients.
403    #[must_use]
404    pub fn subscribed_instruments(&self) -> Vec<InstrumentId> {
405        self.collect_subscriptions(|client| &client.subscriptions_instrument)
406    }
407
408    /// Returns all instrument IDs for which book delta subscriptions exist.
409    #[must_use]
410    pub fn subscribed_book_deltas(&self) -> Vec<InstrumentId> {
411        self.collect_subscriptions(|client| &client.subscriptions_book_deltas)
412    }
413
414    /// Returns all instrument IDs for which book snapshot subscriptions exist.
415    #[must_use]
416    pub fn subscribed_book_snapshots(&self) -> Vec<InstrumentId> {
417        self.collect_subscriptions(|client| &client.subscriptions_book_snapshots)
418    }
419
420    /// Returns all instrument IDs for which quote subscriptions exist.
421    #[must_use]
422    pub fn subscribed_quotes(&self) -> Vec<InstrumentId> {
423        self.collect_subscriptions(|client| &client.subscriptions_quotes)
424    }
425
426    /// Returns all instrument IDs for which trade subscriptions exist.
427    #[must_use]
428    pub fn subscribed_trades(&self) -> Vec<InstrumentId> {
429        self.collect_subscriptions(|client| &client.subscriptions_trades)
430    }
431
432    /// Returns all bar types currently subscribed across all clients.
433    #[must_use]
434    pub fn subscribed_bars(&self) -> Vec<BarType> {
435        self.collect_subscriptions(|client| &client.subscriptions_bars)
436    }
437
438    /// Returns all instrument IDs for which mark price subscriptions exist.
439    #[must_use]
440    pub fn subscribed_mark_prices(&self) -> Vec<InstrumentId> {
441        self.collect_subscriptions(|client| &client.subscriptions_mark_prices)
442    }
443
444    /// Returns all instrument IDs for which index price subscriptions exist.
445    #[must_use]
446    pub fn subscribed_index_prices(&self) -> Vec<InstrumentId> {
447        self.collect_subscriptions(|client| &client.subscriptions_index_prices)
448    }
449
450    /// Returns all instrument IDs for which funding rate subscriptions exist.
451    #[must_use]
452    pub fn subscribed_funding_rates(&self) -> Vec<InstrumentId> {
453        self.collect_subscriptions(|client| &client.subscriptions_funding_rates)
454    }
455
456    /// Returns all instrument IDs for which status subscriptions exist.
457    #[must_use]
458    pub fn subscribed_instrument_status(&self) -> Vec<InstrumentId> {
459        self.collect_subscriptions(|client| &client.subscriptions_instrument_status)
460    }
461
462    /// Returns all instrument IDs for which instrument close subscriptions exist.
463    #[must_use]
464    pub fn subscribed_instrument_close(&self) -> Vec<InstrumentId> {
465        self.collect_subscriptions(|client| &client.subscriptions_instrument_close)
466    }
467
468    #[cfg(feature = "defi")]
469    /// Returns all blockchains for which blocks subscriptions exist.
470    #[must_use]
471    pub fn subscribed_blocks(&self) -> Vec<Blockchain> {
472        self.collect_subscriptions(|client| &client.subscriptions_blocks)
473    }
474
475    #[cfg(feature = "defi")]
476    /// Returns all instrument IDs for which pool subscriptions exist.
477    #[must_use]
478    pub fn subscribed_pools(&self) -> Vec<InstrumentId> {
479        self.collect_subscriptions(|client| &client.subscriptions_pools)
480    }
481
482    #[cfg(feature = "defi")]
483    /// Returns all instrument IDs for which swap subscriptions exist.
484    #[must_use]
485    pub fn subscribed_pool_swaps(&self) -> Vec<InstrumentId> {
486        self.collect_subscriptions(|client| &client.subscriptions_pool_swaps)
487    }
488
489    #[cfg(feature = "defi")]
490    /// Returns all instrument IDs for which liquidity update subscriptions exist.
491    #[must_use]
492    pub fn subscribed_pool_liquidity_updates(&self) -> Vec<InstrumentId> {
493        self.collect_subscriptions(|client| &client.subscriptions_pool_liquidity_updates)
494    }
495
496    // -- COMMANDS --------------------------------------------------------------------------------
497
498    /// Executes a `DataCommand` by delegating to subscribe, unsubscribe, or request handlers.
499    ///
500    /// Errors during execution are logged.
501    pub fn execute(&mut self, cmd: &DataCommand) {
502        if let Err(e) = match cmd {
503            DataCommand::Subscribe(c) => self.execute_subscribe(c),
504            DataCommand::Unsubscribe(c) => self.execute_unsubscribe(c),
505            DataCommand::Request(c) => self.execute_request(c),
506            #[cfg(feature = "defi")]
507            DataCommand::DefiSubscribe(c) => self.execute_defi_subscribe(c),
508            #[cfg(feature = "defi")]
509            DataCommand::DefiUnsubscribe(c) => self.execute_defi_unsubscribe(c),
510            _ => {
511                log::warn!("Unhandled DataCommand variant: {cmd:?}");
512                Ok(())
513            }
514        } {
515            log::error!("{e}");
516        }
517    }
518
519    /// Handles a subscribe command, updating internal state and forwarding to the client.
520    ///
521    /// # Errors
522    ///
523    /// Returns an error if the subscription is invalid (e.g., synthetic instrument for book data),
524    /// or if the underlying client operation fails.
525    pub fn execute_subscribe(&mut self, cmd: &SubscribeCommand) -> anyhow::Result<()> {
526        // Update internal engine state
527        match &cmd {
528            SubscribeCommand::BookDeltas(cmd) => self.subscribe_book_deltas(cmd)?,
529            SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd)?,
530            SubscribeCommand::BookSnapshots(cmd) => self.subscribe_book_snapshots(cmd)?,
531            SubscribeCommand::Bars(cmd) => self.subscribe_bars(cmd)?,
532            _ => {} // Do nothing else
533        }
534
535        if let Some(client_id) = cmd.client_id()
536            && self.external_clients.contains(client_id)
537        {
538            if self.config.debug {
539                log::debug!("Skipping subscribe command for external client {client_id}: {cmd:?}",);
540            }
541            return Ok(());
542        }
543
544        if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
545            client.execute_subscribe(cmd);
546        } else {
547            log::error!(
548                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
549                cmd.client_id(),
550                cmd.venue(),
551            );
552        }
553
554        Ok(())
555    }
556
557    #[cfg(feature = "defi")]
558    /// Handles a subscribe command, updating internal state and forwarding to the client.
559    ///
560    /// # Errors
561    ///
562    /// Returns an error if the subscription is invalid (e.g., synthetic instrument for book data),
563    /// or if the underlying client operation fails.
564    pub fn execute_defi_subscribe(&mut self, cmd: &DefiSubscribeCommand) -> anyhow::Result<()> {
565        if let Some(client_id) = cmd.client_id()
566            && self.external_clients.contains(client_id)
567        {
568            if self.config.debug {
569                log::debug!("Skipping defi subscribe for external client {client_id}: {cmd:?}",);
570            }
571            return Ok(());
572        }
573
574        match cmd {
575            DefiSubscribeCommand::Pool(cmd) => self.setup_pool_updater(&cmd.instrument_id),
576            DefiSubscribeCommand::PoolSwaps(cmd) => self.setup_pool_updater(&cmd.instrument_id),
577            DefiSubscribeCommand::PoolLiquidityUpdates(cmd) => {
578                self.setup_pool_updater(&cmd.instrument_id);
579            }
580            _ => {}
581        }
582
583        // Forward command to client
584        if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
585            client.execute_defi_subscribe(cmd);
586        } else {
587            log::error!(
588                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
589                cmd.client_id(),
590                cmd.venue(),
591            );
592        }
593
594        Ok(())
595    }
596
597    /// Handles an unsubscribe command, updating internal state and forwarding to the client.
598    ///
599    /// # Errors
600    ///
601    /// Returns an error if the underlying client operation fails.
602    pub fn execute_unsubscribe(&mut self, cmd: &UnsubscribeCommand) -> anyhow::Result<()> {
603        match &cmd {
604            UnsubscribeCommand::BookDeltas(cmd) => self.unsubscribe_book_deltas(cmd)?,
605            UnsubscribeCommand::BookDepth10(cmd) => self.unsubscribe_book_depth10(cmd)?,
606            UnsubscribeCommand::BookSnapshots(cmd) => self.unsubscribe_book_snapshots(cmd)?,
607            UnsubscribeCommand::Bars(cmd) => self.unsubscribe_bars(cmd)?,
608            _ => {} // Do nothing else
609        }
610
611        if let Some(client_id) = cmd.client_id()
612            && self.external_clients.contains(client_id)
613        {
614            if self.config.debug {
615                log::debug!(
616                    "Skipping unsubscribe command for external client {client_id}: {cmd:?}",
617                );
618            }
619            return Ok(());
620        }
621
622        if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
623            client.execute_unsubscribe(cmd);
624        } else {
625            log::error!(
626                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
627                cmd.client_id(),
628                cmd.venue(),
629            );
630        }
631
632        Ok(())
633    }
634
635    #[cfg(feature = "defi")]
636    /// Handles an unsubscribe command, updating internal state and forwarding to the client.
637    ///
638    /// # Errors
639    ///
640    /// Returns an error if the underlying client operation fails.
641    pub fn execute_defi_unsubscribe(&mut self, cmd: &DefiUnsubscribeCommand) -> anyhow::Result<()> {
642        if let Some(client_id) = cmd.client_id()
643            && self.external_clients.contains(client_id)
644        {
645            if self.config.debug {
646                log::debug!("Skipping defi unsubscribe for external client {client_id}: {cmd:?}",);
647            }
648            return Ok(());
649        }
650
651        if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
652            client.execute_defi_unsubscribe(cmd);
653        } else {
654            log::error!(
655                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
656                cmd.client_id(),
657                cmd.venue(),
658            );
659        }
660
661        Ok(())
662    }
663
664    /// Sends a [`RequestCommand`] to a suitable data client implementation.
665    ///
666    /// # Errors
667    ///
668    /// Returns an error if no client is found for the given client ID or venue,
669    /// or if the client fails to process the request.
670    pub fn execute_request(&mut self, req: &RequestCommand) -> anyhow::Result<()> {
671        // Skip requests for external clients
672        if let Some(cid) = req.client_id()
673            && self.external_clients.contains(cid)
674        {
675            if self.config.debug {
676                log::debug!("Skipping data request for external client {cid}: {req:?}");
677            }
678            return Ok(());
679        }
680        if let Some(client) = self.get_client(req.client_id(), req.venue()) {
681            match req {
682                RequestCommand::Data(req) => client.request_data(req),
683                RequestCommand::Instrument(req) => client.request_instrument(req),
684                RequestCommand::Instruments(req) => client.request_instruments(req),
685                RequestCommand::BookSnapshot(req) => client.request_book_snapshot(req),
686                RequestCommand::Quotes(req) => client.request_quotes(req),
687                RequestCommand::Trades(req) => client.request_trades(req),
688                RequestCommand::Bars(req) => client.request_bars(req),
689            }
690        } else {
691            anyhow::bail!(
692                "Cannot handle request: no client found for {:?} {:?}",
693                req.client_id(),
694                req.venue()
695            );
696        }
697    }
698
699    /// Processes a dynamically-typed data message.
700    ///
701    /// Currently supports `InstrumentAny` and `FundingRateUpdate`; unrecognized types are logged as errors.
702    pub fn process(&mut self, data: &dyn Any) {
703        // TODO: Eventually these could be added to the `Data` enum? process here for now
704        if let Some(data) = data.downcast_ref::<Data>() {
705            self.process_data(data.clone()); // TODO: Optimize (not necessary if we change handler)
706            return;
707        }
708
709        #[cfg(feature = "defi")]
710        if let Some(data) = data.downcast_ref::<DefiData>() {
711            self.process_defi_data(data.clone()); // TODO: Optimize (not necessary if we change handler)
712            return;
713        }
714
715        if let Some(instrument) = data.downcast_ref::<InstrumentAny>() {
716            self.handle_instrument(instrument.clone());
717        } else if let Some(funding_rate) = data.downcast_ref::<FundingRateUpdate>() {
718            self.handle_funding_rate(*funding_rate);
719        } else {
720            log::error!("Cannot process data {data:?}, type is unrecognized");
721        }
722    }
723
724    /// Processes a `Data` enum instance, dispatching to appropriate handlers.
725    pub fn process_data(&mut self, data: Data) {
726        match data {
727            Data::Delta(delta) => self.handle_delta(delta),
728            Data::Deltas(deltas) => self.handle_deltas(deltas.into_inner()),
729            Data::Depth10(depth) => self.handle_depth10(*depth),
730            Data::Quote(quote) => self.handle_quote(quote),
731            Data::Trade(trade) => self.handle_trade(trade),
732            Data::Bar(bar) => self.handle_bar(bar),
733            Data::MarkPriceUpdate(mark_price) => self.handle_mark_price(mark_price),
734            Data::IndexPriceUpdate(index_price) => self.handle_index_price(index_price),
735            Data::InstrumentClose(close) => self.handle_instrument_close(close),
736        }
737    }
738
739    /// Processes DeFi-specific data events.
740    #[cfg(feature = "defi")]
741    pub fn process_defi_data(&mut self, data: DefiData) {
742        match data {
743            DefiData::Block(block) => {
744                let topic = switchboard::get_defi_blocks_topic(block.chain());
745                msgbus::publish(topic, &block as &dyn Any);
746            }
747            DefiData::Pool(pool) => {
748                if let Err(err) = self.cache.borrow_mut().add_pool(pool.clone()) {
749                    log::error!("Failed to add Pool to cache: {err}");
750                }
751
752                let topic = switchboard::get_defi_pool_topic(pool.instrument_id);
753                msgbus::publish(topic, &pool as &dyn Any);
754            }
755            DefiData::PoolSwap(swap) => {
756                let topic = switchboard::get_defi_pool_swaps_topic(swap.instrument_id);
757                msgbus::publish(topic, &swap as &dyn Any);
758            }
759            DefiData::PoolLiquidityUpdate(update) => {
760                let topic = switchboard::get_defi_liquidity_topic(update.instrument_id);
761                msgbus::publish(topic, &update as &dyn Any);
762            }
763        }
764    }
765
766    /// Processes a `DataResponse`, handling and publishing the response message.
767    pub fn response(&self, resp: DataResponse) {
768        log::debug!("{RECV}{RES} {resp:?}");
769
770        match &resp {
771            DataResponse::Instrument(resp) => {
772                self.handle_instrument_response(resp.data.clone());
773            }
774            DataResponse::Instruments(resp) => {
775                self.handle_instruments(&resp.data);
776            }
777            DataResponse::Quotes(resp) => self.handle_quotes(&resp.data),
778            DataResponse::Trades(resp) => self.handle_trades(&resp.data),
779            DataResponse::Bars(resp) => self.handle_bars(&resp.data),
780            _ => todo!(),
781        }
782
783        msgbus::send_response(resp.correlation_id(), &resp);
784    }
785
786    // -- DATA HANDLERS ---------------------------------------------------------------------------
787
788    fn handle_instrument(&mut self, instrument: InstrumentAny) {
789        if let Err(e) = self
790            .cache
791            .as_ref()
792            .borrow_mut()
793            .add_instrument(instrument.clone())
794        {
795            log_error_on_cache_insert(&e);
796        }
797
798        let topic = switchboard::get_instrument_topic(instrument.id());
799        msgbus::publish(topic, &instrument as &dyn Any);
800    }
801
802    fn handle_delta(&mut self, delta: OrderBookDelta) {
803        let deltas = if self.config.buffer_deltas {
804            if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&delta.instrument_id) {
805                buffered_deltas.deltas.push(delta);
806            } else {
807                let buffered_deltas = OrderBookDeltas::new(delta.instrument_id, vec![delta]);
808                self.buffered_deltas_map
809                    .insert(delta.instrument_id, buffered_deltas);
810            }
811
812            if !RecordFlag::F_LAST.matches(delta.flags) {
813                return; // Not the last delta for event
814            }
815
816            // SAFETY: We know the deltas exists already
817            self.buffered_deltas_map
818                .remove(&delta.instrument_id)
819                .unwrap()
820        } else {
821            OrderBookDeltas::new(delta.instrument_id, vec![delta])
822        };
823
824        let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
825        msgbus::publish(topic, &deltas as &dyn Any);
826    }
827
828    fn handle_deltas(&mut self, deltas: OrderBookDeltas) {
829        let deltas = if self.config.buffer_deltas {
830            let mut is_last_delta = false;
831            for delta in &deltas.deltas {
832                if RecordFlag::F_LAST.matches(delta.flags) {
833                    is_last_delta = true;
834                    break;
835                }
836            }
837
838            let instrument_id = deltas.instrument_id;
839
840            if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&instrument_id) {
841                buffered_deltas.deltas.extend(deltas.deltas);
842            } else {
843                self.buffered_deltas_map.insert(instrument_id, deltas);
844            }
845
846            if !is_last_delta {
847                return;
848            }
849
850            // SAFETY: We know the deltas exists already
851            self.buffered_deltas_map.remove(&instrument_id).unwrap()
852        } else {
853            deltas
854        };
855
856        let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
857        msgbus::publish(topic, &deltas as &dyn Any);
858    }
859
860    fn handle_depth10(&mut self, depth: OrderBookDepth10) {
861        let topic = switchboard::get_book_depth10_topic(depth.instrument_id);
862        msgbus::publish(topic, &depth as &dyn Any);
863    }
864
865    fn handle_quote(&mut self, quote: QuoteTick) {
866        if let Err(e) = self.cache.as_ref().borrow_mut().add_quote(quote) {
867            log_error_on_cache_insert(&e);
868        }
869
870        // TODO: Handle synthetics
871
872        let topic = switchboard::get_quotes_topic(quote.instrument_id);
873        msgbus::publish(topic, &quote as &dyn Any);
874    }
875
876    fn handle_trade(&mut self, trade: TradeTick) {
877        if let Err(e) = self.cache.as_ref().borrow_mut().add_trade(trade) {
878            log_error_on_cache_insert(&e);
879        }
880
881        // TODO: Handle synthetics
882
883        let topic = switchboard::get_trades_topic(trade.instrument_id);
884        msgbus::publish(topic, &trade as &dyn Any);
885    }
886
887    fn handle_bar(&mut self, bar: Bar) {
888        // TODO: Handle additional bar logic
889        if self.config.validate_data_sequence
890            && let Some(last_bar) = self.cache.as_ref().borrow().bar(&bar.bar_type)
891        {
892            if bar.ts_event < last_bar.ts_event {
893                log::warn!(
894                    "Bar {bar} was prior to last bar `ts_event` {}",
895                    last_bar.ts_event
896                );
897                return; // Bar is out of sequence
898            }
899            if bar.ts_init < last_bar.ts_init {
900                log::warn!(
901                    "Bar {bar} was prior to last bar `ts_init` {}",
902                    last_bar.ts_init
903                );
904                return; // Bar is out of sequence
905            }
906            // TODO: Implement `bar.is_revision` logic
907        }
908
909        if let Err(e) = self.cache.as_ref().borrow_mut().add_bar(bar) {
910            log_error_on_cache_insert(&e);
911        }
912
913        let topic = switchboard::get_bars_topic(bar.bar_type);
914        msgbus::publish(topic, &bar as &dyn Any);
915    }
916
917    fn handle_mark_price(&mut self, mark_price: MarkPriceUpdate) {
918        if let Err(e) = self.cache.as_ref().borrow_mut().add_mark_price(mark_price) {
919            log_error_on_cache_insert(&e);
920        }
921
922        let topic = switchboard::get_mark_price_topic(mark_price.instrument_id);
923        msgbus::publish(topic, &mark_price as &dyn Any);
924    }
925
926    fn handle_index_price(&mut self, index_price: IndexPriceUpdate) {
927        if let Err(e) = self
928            .cache
929            .as_ref()
930            .borrow_mut()
931            .add_index_price(index_price)
932        {
933            log_error_on_cache_insert(&e);
934        }
935
936        let topic = switchboard::get_index_price_topic(index_price.instrument_id);
937        msgbus::publish(topic, &index_price as &dyn Any);
938    }
939
940    /// Handles a funding rate update by adding it to the cache and publishing to the message bus.
941    pub fn handle_funding_rate(&mut self, funding_rate: FundingRateUpdate) {
942        if let Err(e) = self
943            .cache
944            .as_ref()
945            .borrow_mut()
946            .add_funding_rate(funding_rate)
947        {
948            log_error_on_cache_insert(&e);
949        }
950
951        let topic = switchboard::get_funding_rate_topic(funding_rate.instrument_id);
952        msgbus::publish(topic, &funding_rate as &dyn Any);
953    }
954
955    fn handle_instrument_close(&mut self, close: InstrumentClose) {
956        let topic = switchboard::get_instrument_close_topic(close.instrument_id);
957        msgbus::publish(topic, &close as &dyn Any);
958    }
959
960    // -- SUBSCRIPTION HANDLERS -------------------------------------------------------------------
961
962    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
963        if cmd.instrument_id.is_synthetic() {
964            anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
965        }
966
967        self.setup_order_book(&cmd.instrument_id, cmd.book_type, true, cmd.managed)?;
968
969        Ok(())
970    }
971
972    fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
973        if cmd.instrument_id.is_synthetic() {
974            anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDepth10` data");
975        }
976
977        self.setup_order_book(&cmd.instrument_id, cmd.book_type, false, cmd.managed)?;
978
979        Ok(())
980    }
981
982    fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
983        if self.subscribed_book_deltas().contains(&cmd.instrument_id) {
984            return Ok(());
985        }
986
987        if cmd.instrument_id.is_synthetic() {
988            anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
989        }
990
991        // Track snapshot intervals per instrument, and set up timer on first subscription
992        let first_for_interval = match self.book_intervals.entry(cmd.interval_ms) {
993            Entry::Vacant(e) => {
994                let mut set = AHashSet::new();
995                set.insert(cmd.instrument_id);
996                e.insert(set);
997                true
998            }
999            Entry::Occupied(mut e) => {
1000                e.get_mut().insert(cmd.instrument_id);
1001                false
1002            }
1003        };
1004
1005        if first_for_interval {
1006            // Initialize snapshotter and schedule its timer
1007            let interval_ns = millis_to_nanos(cmd.interval_ms.get() as f64);
1008            let topic = switchboard::get_book_snapshots_topic(cmd.instrument_id, cmd.interval_ms);
1009
1010            let snap_info = BookSnapshotInfo {
1011                instrument_id: cmd.instrument_id,
1012                venue: cmd.instrument_id.venue,
1013                is_composite: cmd.instrument_id.symbol.is_composite(),
1014                root: Ustr::from(cmd.instrument_id.symbol.root()),
1015                topic,
1016                interval_ms: cmd.interval_ms,
1017            };
1018
1019            // Schedule the first snapshot at the next interval boundary
1020            let now_ns = self.clock.borrow().timestamp_ns().as_u64();
1021            let start_time_ns = now_ns - (now_ns % interval_ns) + interval_ns;
1022
1023            let snapshotter = Rc::new(BookSnapshotter::new(snap_info, self.cache.clone()));
1024            self.book_snapshotters
1025                .insert(cmd.instrument_id, snapshotter.clone());
1026            let timer_name = snapshotter.timer_name;
1027
1028            let callback =
1029                TimeEventCallback::Rust(Rc::new(move |event| snapshotter.snapshot(event)));
1030
1031            self.clock
1032                .borrow_mut()
1033                .set_timer_ns(
1034                    &timer_name,
1035                    interval_ns,
1036                    Some(start_time_ns.into()),
1037                    None,
1038                    Some(callback),
1039                    None,
1040                    None,
1041                )
1042                .expect(FAILED);
1043        }
1044
1045        self.setup_order_book(&cmd.instrument_id, cmd.book_type, false, true)?;
1046
1047        Ok(())
1048    }
1049
1050    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
1051        match cmd.bar_type.aggregation_source() {
1052            AggregationSource::Internal => {
1053                if !self.bar_aggregators.contains_key(&cmd.bar_type.standard()) {
1054                    self.start_bar_aggregator(cmd.bar_type)?;
1055                }
1056            }
1057            AggregationSource::External => {
1058                if cmd.bar_type.instrument_id().is_synthetic() {
1059                    anyhow::bail!(
1060                        "Cannot subscribe for externally aggregated synthetic instrument bar data"
1061                    );
1062                }
1063            }
1064        }
1065
1066        Ok(())
1067    }
1068
1069    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
1070        if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
1071            log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
1072            return Ok(());
1073        }
1074
1075        let topics = vec![
1076            switchboard::get_book_deltas_topic(cmd.instrument_id),
1077            switchboard::get_book_depth10_topic(cmd.instrument_id),
1078            // TODO: Unsubscribe from snapshots?
1079        ];
1080
1081        self.maintain_book_updater(&cmd.instrument_id, &topics);
1082        self.maintain_book_snapshotter(&cmd.instrument_id);
1083
1084        Ok(())
1085    }
1086
1087    fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
1088        if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
1089            log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
1090            return Ok(());
1091        }
1092
1093        let topics = vec![
1094            switchboard::get_book_deltas_topic(cmd.instrument_id),
1095            switchboard::get_book_depth10_topic(cmd.instrument_id),
1096            // TODO: Unsubscribe from snapshots?
1097        ];
1098
1099        self.maintain_book_updater(&cmd.instrument_id, &topics);
1100        self.maintain_book_snapshotter(&cmd.instrument_id);
1101
1102        Ok(())
1103    }
1104
1105    fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
1106        if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
1107            log::warn!("Cannot unsubscribe from `OrderBook` snapshots: not subscribed");
1108            return Ok(());
1109        }
1110
1111        // Remove instrument from interval tracking, and drop empty intervals
1112        let mut to_remove = Vec::new();
1113        for (interval, set) in &mut self.book_intervals {
1114            if set.remove(&cmd.instrument_id) && set.is_empty() {
1115                to_remove.push(*interval);
1116            }
1117        }
1118
1119        for interval in to_remove {
1120            self.book_intervals.remove(&interval);
1121        }
1122
1123        let topics = vec![
1124            switchboard::get_book_deltas_topic(cmd.instrument_id),
1125            switchboard::get_book_depth10_topic(cmd.instrument_id),
1126            // TODO: Unsubscribe from snapshots (add interval_ms to message?)
1127        ];
1128
1129        self.maintain_book_updater(&cmd.instrument_id, &topics);
1130        self.maintain_book_snapshotter(&cmd.instrument_id);
1131
1132        Ok(())
1133    }
1134
1135    /// Unsubscribe internal bar aggregator for the given bar type.
1136    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
1137        // If we have an internal aggregator for this bar type, stop and remove it
1138        let bar_type = cmd.bar_type;
1139        if self.bar_aggregators.contains_key(&bar_type.standard()) {
1140            if let Err(err) = self.stop_bar_aggregator(bar_type) {
1141                log::error!("Error stopping bar aggregator for {bar_type}: {err}");
1142            }
1143            self.bar_aggregators.remove(&bar_type.standard());
1144            log::debug!("Removed bar aggregator for {bar_type}");
1145        }
1146        Ok(())
1147    }
1148
1149    fn maintain_book_updater(&mut self, instrument_id: &InstrumentId, topics: &[MStr<Topic>]) {
1150        if let Some(updater) = self.book_updaters.get(instrument_id) {
1151            let handler = ShareableMessageHandler(updater.clone());
1152
1153            // Unsubscribe handler if it is the last subscriber
1154            for topic in topics {
1155                if msgbus::subscriptions_count(topic.as_str()) == 1
1156                    && msgbus::is_subscribed(topic.as_str(), handler.clone())
1157                {
1158                    log::debug!("Unsubscribing BookUpdater from {topic}");
1159                    msgbus::unsubscribe_topic(*topic, handler.clone());
1160                }
1161            }
1162
1163            // Check remaining subscriptions, if none then remove updater
1164            let still_subscribed = topics
1165                .iter()
1166                .any(|topic| msgbus::is_subscribed(topic.as_str(), handler.clone()));
1167            if !still_subscribed {
1168                self.book_updaters.remove(instrument_id);
1169                log::debug!("Removed BookUpdater for instrument ID {instrument_id}");
1170            }
1171        }
1172    }
1173
1174    fn maintain_book_snapshotter(&mut self, instrument_id: &InstrumentId) {
1175        if let Some(snapshotter) = self.book_snapshotters.get(instrument_id) {
1176            let topic = switchboard::get_book_snapshots_topic(
1177                *instrument_id,
1178                snapshotter.snap_info.interval_ms,
1179            );
1180
1181            // Check remaining snapshot subscriptions, if none then remove snapshotter
1182            if msgbus::subscriptions_count(topic.as_str()) == 0 {
1183                let timer_name = snapshotter.timer_name;
1184                self.book_snapshotters.remove(instrument_id);
1185                let mut clock = self.clock.borrow_mut();
1186                if clock.timer_names().contains(&timer_name.as_str()) {
1187                    clock.cancel_timer(&timer_name);
1188                }
1189                log::debug!("Removed BookSnapshotter for instrument ID {instrument_id}");
1190            }
1191        }
1192    }
1193
1194    // -- RESPONSE HANDLERS -----------------------------------------------------------------------
1195
1196    fn handle_instrument_response(&self, instrument: InstrumentAny) {
1197        let mut cache = self.cache.as_ref().borrow_mut();
1198        if let Err(e) = cache.add_instrument(instrument) {
1199            log_error_on_cache_insert(&e);
1200        }
1201    }
1202
1203    fn handle_instruments(&self, instruments: &[InstrumentAny]) {
1204        // TODO: Improve by adding bulk update methods to cache and database
1205        let mut cache = self.cache.as_ref().borrow_mut();
1206        for instrument in instruments {
1207            if let Err(e) = cache.add_instrument(instrument.clone()) {
1208                log_error_on_cache_insert(&e);
1209            }
1210        }
1211    }
1212
1213    fn handle_quotes(&self, quotes: &[QuoteTick]) {
1214        if let Err(e) = self.cache.as_ref().borrow_mut().add_quotes(quotes) {
1215            log_error_on_cache_insert(&e);
1216        }
1217    }
1218
1219    fn handle_trades(&self, trades: &[TradeTick]) {
1220        if let Err(e) = self.cache.as_ref().borrow_mut().add_trades(trades) {
1221            log_error_on_cache_insert(&e);
1222        }
1223    }
1224
1225    fn handle_bars(&self, bars: &[Bar]) {
1226        if let Err(e) = self.cache.as_ref().borrow_mut().add_bars(bars) {
1227            log_error_on_cache_insert(&e);
1228        }
1229    }
1230
1231    // -- INTERNAL --------------------------------------------------------------------------------
1232
1233    #[allow(clippy::too_many_arguments)]
1234    fn setup_order_book(
1235        &mut self,
1236        instrument_id: &InstrumentId,
1237        book_type: BookType,
1238        only_deltas: bool,
1239        managed: bool,
1240    ) -> anyhow::Result<()> {
1241        let mut cache = self.cache.borrow_mut();
1242        if managed && !cache.has_order_book(instrument_id) {
1243            let book = OrderBook::new(*instrument_id, book_type);
1244            log::debug!("Created {book}");
1245            cache.add_order_book(book)?;
1246        }
1247
1248        // Set up subscriptions
1249        let updater = Rc::new(BookUpdater::new(instrument_id, self.cache.clone()));
1250        self.book_updaters.insert(*instrument_id, updater.clone());
1251
1252        let handler = ShareableMessageHandler(updater);
1253
1254        let topic = switchboard::get_book_deltas_topic(*instrument_id);
1255        if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1256            msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1257        }
1258
1259        let topic = switchboard::get_book_depth10_topic(*instrument_id);
1260        if !only_deltas && !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1261            msgbus::subscribe(topic.into(), handler, Some(self.msgbus_priority));
1262        }
1263
1264        Ok(())
1265    }
1266
1267    #[cfg(feature = "defi")]
1268    fn setup_pool_updater(&mut self, instrument_id: &InstrumentId) {
1269        if self.pool_updaters.contains_key(instrument_id) {
1270            return;
1271        }
1272
1273        let updater = Rc::new(PoolUpdater::new(instrument_id, self.cache.clone()));
1274        let handler = ShareableMessageHandler(updater.clone());
1275
1276        // Subscribe to pool swaps and liquidity updates
1277        let swap_topic = switchboard::get_defi_pool_swaps_topic(*instrument_id);
1278        if !msgbus::is_subscribed(swap_topic.as_str(), handler.clone()) {
1279            msgbus::subscribe(
1280                swap_topic.into(),
1281                handler.clone(),
1282                Some(self.msgbus_priority),
1283            );
1284        }
1285
1286        let liquidity_topic = switchboard::get_defi_liquidity_topic(*instrument_id);
1287        if !msgbus::is_subscribed(liquidity_topic.as_str(), handler.clone()) {
1288            msgbus::subscribe(liquidity_topic.into(), handler, Some(self.msgbus_priority));
1289        }
1290
1291        self.pool_updaters.insert(*instrument_id, updater);
1292        log::debug!("Created PoolUpdater for instrument ID {instrument_id}");
1293    }
1294
1295    fn create_bar_aggregator(
1296        &mut self,
1297        instrument: &InstrumentAny,
1298        bar_type: BarType,
1299    ) -> Box<dyn BarAggregator> {
1300        let cache = self.cache.clone();
1301
1302        let handler = move |bar: Bar| {
1303            if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
1304                log_error_on_cache_insert(&e);
1305            }
1306
1307            let topic = switchboard::get_bars_topic(bar.bar_type);
1308            msgbus::publish(topic, &bar as &dyn Any);
1309        };
1310
1311        let clock = self.clock.clone();
1312        let config = self.config.clone();
1313
1314        let price_precision = instrument.price_precision();
1315        let size_precision = instrument.size_precision();
1316
1317        if bar_type.spec().is_time_aggregated() {
1318            // Get time_bars_origin_offset from config
1319            let time_bars_origin_offset = config
1320                .time_bars_origins
1321                .get(&bar_type.spec().aggregation)
1322                .map(|duration| chrono::TimeDelta::from_std(*duration).unwrap_or_default());
1323
1324            Box::new(TimeBarAggregator::new(
1325                bar_type,
1326                price_precision,
1327                size_precision,
1328                clock,
1329                handler,
1330                false, // await_partial
1331                config.time_bars_build_with_no_updates,
1332                config.time_bars_timestamp_on_close,
1333                config.time_bars_interval_type,
1334                time_bars_origin_offset,
1335                20,    // TODO: TBD, composite bar build delay
1336                false, // TODO: skip_first_non_full_bar, make it config dependent
1337            ))
1338        } else {
1339            match bar_type.spec().aggregation {
1340                BarAggregation::Tick => Box::new(TickBarAggregator::new(
1341                    bar_type,
1342                    price_precision,
1343                    size_precision,
1344                    handler,
1345                    false,
1346                )) as Box<dyn BarAggregator>,
1347                BarAggregation::Volume => Box::new(VolumeBarAggregator::new(
1348                    bar_type,
1349                    price_precision,
1350                    size_precision,
1351                    handler,
1352                    false,
1353                )) as Box<dyn BarAggregator>,
1354                BarAggregation::Value => Box::new(ValueBarAggregator::new(
1355                    bar_type,
1356                    price_precision,
1357                    size_precision,
1358                    handler,
1359                    false,
1360                )) as Box<dyn BarAggregator>,
1361                _ => panic!(
1362                    "Cannot create aggregator: {} aggregation not currently supported",
1363                    bar_type.spec().aggregation
1364                ),
1365            }
1366        }
1367    }
1368
1369    fn start_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
1370        // Get the instrument for this bar type
1371        let instrument = {
1372            let cache = self.cache.borrow();
1373            cache
1374                .instrument(&bar_type.instrument_id())
1375                .ok_or_else(|| {
1376                    anyhow::anyhow!(
1377                        "Cannot start bar aggregation: no instrument found for {}",
1378                        bar_type.instrument_id(),
1379                    )
1380                })?
1381                .clone()
1382        };
1383
1384        // Use standard form of bar type as key
1385        let bar_key = bar_type.standard();
1386
1387        // Create or retrieve aggregator in Rc<RefCell>
1388        let aggregator = if let Some(rc) = self.bar_aggregators.get(&bar_key) {
1389            rc.clone()
1390        } else {
1391            let agg = self.create_bar_aggregator(&instrument, bar_type);
1392            let rc = Rc::new(RefCell::new(agg));
1393            self.bar_aggregators.insert(bar_key, rc.clone());
1394            rc
1395        };
1396
1397        // Subscribe to underlying data topics
1398        let mut handlers = Vec::new();
1399
1400        if bar_type.is_composite() {
1401            let topic = switchboard::get_bars_topic(bar_type.composite());
1402            let handler =
1403                ShareableMessageHandler(Rc::new(BarBarHandler::new(aggregator.clone(), bar_key)));
1404
1405            if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1406                msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1407            }
1408
1409            handlers.push((topic, handler));
1410        } else if bar_type.spec().price_type == PriceType::Last {
1411            let topic = switchboard::get_trades_topic(bar_type.instrument_id());
1412            let handler =
1413                ShareableMessageHandler(Rc::new(BarTradeHandler::new(aggregator.clone(), bar_key)));
1414
1415            if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1416                msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1417            }
1418
1419            handlers.push((topic, handler));
1420        } else {
1421            let topic = switchboard::get_quotes_topic(bar_type.instrument_id());
1422            let handler =
1423                ShareableMessageHandler(Rc::new(BarQuoteHandler::new(aggregator.clone(), bar_key)));
1424
1425            if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1426                msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1427            }
1428
1429            handlers.push((topic, handler));
1430        }
1431
1432        self.bar_aggregator_handlers.insert(bar_key, handlers);
1433        aggregator.borrow_mut().set_is_running(true);
1434
1435        Ok(())
1436    }
1437
1438    fn stop_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
1439        let aggregator = self
1440            .bar_aggregators
1441            .remove(&bar_type.standard())
1442            .ok_or_else(|| {
1443                anyhow::anyhow!("Cannot stop bar aggregator: no aggregator to stop for {bar_type}")
1444            })?;
1445
1446        aggregator.borrow_mut().stop();
1447
1448        // Unsubscribe any registered message handlers
1449        let bar_key = bar_type.standard();
1450        if let Some(subs) = self.bar_aggregator_handlers.remove(&bar_key) {
1451            for (topic, handler) in subs {
1452                if msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1453                    msgbus::unsubscribe_topic(topic, handler);
1454                }
1455            }
1456        }
1457
1458        Ok(())
1459    }
1460}
1461
1462#[inline(always)]
1463fn log_error_on_cache_insert<T: Display>(e: &T) {
1464    log::error!("Error on cache insert: {e}");
1465}