nautilus_data/engine/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a high-performance `DataEngine` for all environments.
17//!
18//! The `DataEngine` is the central component of the entire data stack.
19//! The data engines primary responsibility is to orchestrate interactions between
20//! the `DataClient` instances, and the rest of the platform. This includes sending
21//! requests to, and receiving responses from, data endpoints via its registered
22//! data clients.
23//!
24//! The engine employs a simple fan-in fan-out messaging pattern to execute
25//! `DataCommand` type messages, and process `DataResponse` messages or market data
26//! objects.
27//!
28//! Alternative implementations can be written on top of the generic engine - which
29//! just need to override the `execute`, `process`, `send` and `receive` methods.
30
31pub mod book;
32pub mod config;
33mod handlers;
34#[cfg(feature = "defi")]
35pub mod pool;
36
37use std::{
38    any::Any,
39    cell::{Ref, RefCell},
40    collections::hash_map::Entry,
41    fmt::Display,
42    num::NonZeroUsize,
43    rc::Rc,
44};
45
46use ahash::{AHashMap, AHashSet};
47use book::{BookSnapshotInfo, BookSnapshotter, BookUpdater};
48use config::DataEngineConfig;
49use handlers::{BarBarHandler, BarQuoteHandler, BarTradeHandler};
50use indexmap::IndexMap;
51#[cfg(feature = "defi")]
52use nautilus_common::messages::defi::{DefiSubscribeCommand, DefiUnsubscribeCommand};
53use nautilus_common::{
54    cache::Cache,
55    clock::Clock,
56    logging::{RECV, RES},
57    messages::data::{
58        DataCommand, DataResponse, RequestCommand, SubscribeBars, SubscribeBookDeltas,
59        SubscribeBookDepth10, SubscribeBookSnapshots, SubscribeCommand, UnsubscribeBars,
60        UnsubscribeBookDeltas, UnsubscribeBookDepth10, UnsubscribeBookSnapshots,
61        UnsubscribeCommand,
62    },
63    msgbus::{self, MStr, Topic, handler::ShareableMessageHandler, switchboard},
64    timer::TimeEventCallback,
65};
66use nautilus_core::{
67    correctness::{
68        FAILED, check_key_in_map, check_key_not_in_map, check_predicate_false, check_predicate_true,
69    },
70    datetime::millis_to_nanos,
71};
72#[cfg(feature = "defi")]
73use nautilus_model::defi::Blockchain;
74#[cfg(feature = "defi")]
75use nautilus_model::defi::DefiData;
76use nautilus_model::{
77    data::{
78        Bar, BarType, Data, DataType, FundingRateUpdate, IndexPriceUpdate, InstrumentClose,
79        MarkPriceUpdate, OrderBookDelta, OrderBookDeltas, OrderBookDepth10, QuoteTick, TradeTick,
80    },
81    enums::{AggregationSource, BarAggregation, BookType, PriceType, RecordFlag},
82    identifiers::{ClientId, InstrumentId, Venue},
83    instruments::{Instrument, InstrumentAny, SyntheticInstrument},
84    orderbook::OrderBook,
85};
86use nautilus_persistence::backend::catalog::ParquetDataCatalog;
87use ustr::Ustr;
88
89#[cfg(feature = "defi")]
90use crate::engine::pool::PoolUpdater;
91use crate::{
92    aggregation::{
93        BarAggregator, RenkoBarAggregator, TickBarAggregator, TimeBarAggregator,
94        ValueBarAggregator, VolumeBarAggregator,
95    },
96    client::DataClientAdapter,
97};
98
99/// Provides a high-performance `DataEngine` for all environments.
100#[derive(Debug)]
101pub struct DataEngine {
102    clock: Rc<RefCell<dyn Clock>>,
103    cache: Rc<RefCell<Cache>>,
104    clients: IndexMap<ClientId, DataClientAdapter>,
105    default_client: Option<DataClientAdapter>,
106    external_clients: AHashSet<ClientId>,
107    catalogs: AHashMap<Ustr, ParquetDataCatalog>,
108    routing_map: IndexMap<Venue, ClientId>,
109    book_intervals: AHashMap<NonZeroUsize, AHashSet<InstrumentId>>,
110    book_updaters: AHashMap<InstrumentId, Rc<BookUpdater>>,
111    book_snapshotters: AHashMap<InstrumentId, Rc<BookSnapshotter>>,
112    bar_aggregators: AHashMap<BarType, Rc<RefCell<Box<dyn BarAggregator>>>>,
113    bar_aggregator_handlers: AHashMap<BarType, Vec<(MStr<Topic>, ShareableMessageHandler)>>,
114    _synthetic_quote_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
115    _synthetic_trade_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
116    buffered_deltas_map: AHashMap<InstrumentId, OrderBookDeltas>,
117    msgbus_priority: u8,
118    config: DataEngineConfig,
119    #[cfg(feature = "defi")]
120    pool_updaters: AHashMap<InstrumentId, Rc<crate::engine::pool::PoolUpdater>>,
121}
122
123impl DataEngine {
124    /// Creates a new [`DataEngine`] instance.
125    #[must_use]
126    pub fn new(
127        clock: Rc<RefCell<dyn Clock>>,
128        cache: Rc<RefCell<Cache>>,
129        config: Option<DataEngineConfig>,
130    ) -> Self {
131        let config = config.unwrap_or_default();
132
133        let external_clients: AHashSet<ClientId> = config
134            .external_clients
135            .clone()
136            .unwrap_or_default()
137            .into_iter()
138            .collect();
139
140        Self {
141            clock,
142            cache,
143            clients: IndexMap::new(),
144            default_client: None,
145            external_clients,
146            catalogs: AHashMap::new(),
147            routing_map: IndexMap::new(),
148            book_intervals: AHashMap::new(),
149            book_updaters: AHashMap::new(),
150            book_snapshotters: AHashMap::new(),
151            bar_aggregators: AHashMap::new(),
152            bar_aggregator_handlers: AHashMap::new(),
153            _synthetic_quote_feeds: AHashMap::new(),
154            _synthetic_trade_feeds: AHashMap::new(),
155            buffered_deltas_map: AHashMap::new(),
156            msgbus_priority: 10, // High-priority for built-in component
157            config,
158            #[cfg(feature = "defi")]
159            pool_updaters: AHashMap::new(),
160        }
161    }
162
163    /// Returns a read-only reference to the engines clock.
164    #[must_use]
165    pub fn get_clock(&self) -> Ref<'_, dyn Clock> {
166        self.clock.borrow()
167    }
168
169    /// Returns a read-only reference to the engines cache.
170    #[must_use]
171    pub fn get_cache(&self) -> Ref<'_, Cache> {
172        self.cache.borrow()
173    }
174
175    /// Registers the `catalog` with the engine with an optional specific `name`.
176    ///
177    /// # Panics
178    ///
179    /// Panics if a catalog with the same `name` has already been registered.
180    pub fn register_catalog(&mut self, catalog: ParquetDataCatalog, name: Option<String>) {
181        let name = Ustr::from(&name.unwrap_or("catalog_0".to_string()));
182
183        check_key_not_in_map(&name, &self.catalogs, "name", "catalogs").expect(FAILED);
184
185        self.catalogs.insert(name, catalog);
186        log::info!("Registered catalog <{name}>");
187    }
188
189    /// Registers the `client` with the engine with an optional venue `routing`.
190    ///
191    ///
192    /// # Panics
193    ///
194    /// Panics if a client with the same client ID has already been registered.
195    pub fn register_client(&mut self, client: DataClientAdapter, routing: Option<Venue>) {
196        let client_id = client.client_id();
197
198        if let Some(default_client) = &self.default_client {
199            check_predicate_false(
200                default_client.client_id() == client.client_id(),
201                "client_id already registered as default client",
202            )
203            .expect(FAILED);
204        }
205
206        check_key_not_in_map(&client_id, &self.clients, "client_id", "clients").expect(FAILED);
207
208        if let Some(routing) = routing {
209            self.routing_map.insert(routing, client_id);
210            log::info!("Set client {client_id} routing for {routing}");
211        }
212
213        if client.venue.is_none() && self.default_client.is_none() {
214            self.default_client = Some(client);
215            log::info!("Registered client {client_id} for default routing");
216        } else {
217            self.clients.insert(client_id, client);
218            log::info!("Registered client {client_id}");
219        }
220    }
221
222    /// Deregisters the client for the `client_id`.
223    ///
224    /// # Panics
225    ///
226    /// Panics if the client ID has not been registered.
227    pub fn deregister_client(&mut self, client_id: &ClientId) {
228        check_key_in_map(client_id, &self.clients, "client_id", "clients").expect(FAILED);
229
230        self.clients.shift_remove(client_id);
231        log::info!("Deregistered client {client_id}");
232    }
233
234    /// Registers the data `client` with the engine as the default routing client.
235    ///
236    /// When a specific venue routing cannot be found, this client will receive messages.
237    ///
238    /// # Warnings
239    ///
240    /// Any existing default routing client will be overwritten.
241    ///
242    /// # Panics
243    ///
244    /// Panics if a default client has already been registered.
245    pub fn register_default_client(&mut self, client: DataClientAdapter) {
246        check_predicate_true(
247            self.default_client.is_none(),
248            "default client already registered",
249        )
250        .expect(FAILED);
251
252        let client_id = client.client_id();
253
254        self.default_client = Some(client);
255        log::info!("Registered default client {client_id}");
256    }
257
258    /// Starts all registered data clients.
259    pub fn start(&mut self) {
260        for client in self.get_clients_mut() {
261            if let Err(e) = client.start() {
262                log::error!("{e}");
263            }
264        }
265    }
266
267    /// Stops all registered data clients.
268    pub fn stop(&mut self) {
269        for client in self.get_clients_mut() {
270            if let Err(e) = client.stop() {
271                log::error!("{e}");
272            }
273        }
274    }
275
276    /// Resets all registered data clients to their initial state.
277    pub fn reset(&mut self) {
278        for client in self.get_clients_mut() {
279            if let Err(e) = client.reset() {
280                log::error!("{e}");
281            }
282        }
283    }
284
285    /// Disposes the engine, stopping all clients and canceling any timers.
286    pub fn dispose(&mut self) {
287        for client in self.get_clients_mut() {
288            if let Err(e) = client.dispose() {
289                log::error!("{e}");
290            }
291        }
292
293        self.clock.borrow_mut().cancel_timers();
294    }
295
296    /// Returns `true` if all registered data clients are currently connected.
297    #[must_use]
298    pub fn check_connected(&self) -> bool {
299        self.get_clients()
300            .iter()
301            .all(|client| client.is_connected())
302    }
303
304    /// Returns `true` if all registered data clients are currently disconnected.
305    #[must_use]
306    pub fn check_disconnected(&self) -> bool {
307        self.get_clients()
308            .iter()
309            .all(|client| !client.is_connected())
310    }
311
312    /// Returns a list of all registered client IDs, including the default client if set.
313    #[must_use]
314    pub fn registered_clients(&self) -> Vec<ClientId> {
315        self.get_clients()
316            .into_iter()
317            .map(|client| client.client_id())
318            .collect()
319    }
320
321    // -- SUBSCRIPTIONS ---------------------------------------------------------------------------
322
323    fn collect_subscriptions<F, T>(&self, get_subs: F) -> Vec<T>
324    where
325        F: Fn(&DataClientAdapter) -> &AHashSet<T>,
326        T: Clone,
327    {
328        self.get_clients()
329            .into_iter()
330            .flat_map(get_subs)
331            .cloned()
332            .collect()
333    }
334
335    #[must_use]
336    pub fn get_clients(&self) -> Vec<&DataClientAdapter> {
337        let (default_opt, clients_map) = (&self.default_client, &self.clients);
338        let mut clients: Vec<&DataClientAdapter> = clients_map.values().collect();
339
340        if let Some(default) = default_opt {
341            clients.push(default);
342        }
343
344        clients
345    }
346
347    #[must_use]
348    pub fn get_clients_mut(&mut self) -> Vec<&mut DataClientAdapter> {
349        let (default_opt, clients_map) = (&mut self.default_client, &mut self.clients);
350        let mut clients: Vec<&mut DataClientAdapter> = clients_map.values_mut().collect();
351
352        if let Some(default) = default_opt {
353            clients.push(default);
354        }
355
356        clients
357    }
358
359    pub fn get_client(
360        &mut self,
361        client_id: Option<&ClientId>,
362        venue: Option<&Venue>,
363    ) -> Option<&mut DataClientAdapter> {
364        if let Some(client_id) = client_id {
365            // Explicit ID: first look in registered clients
366            if let Some(client) = self.clients.get_mut(client_id) {
367                return Some(client);
368            }
369
370            // Then check if it matches the default client
371            if let Some(default) = self.default_client.as_mut()
372                && default.client_id() == *client_id
373            {
374                return Some(default);
375            }
376
377            // Unknown explicit client
378            return None;
379        }
380
381        if let Some(v) = venue {
382            // Route by venue if mapped client still registered
383            if let Some(client_id) = self.routing_map.get(v) {
384                return self.clients.get_mut(client_id);
385            }
386        }
387
388        // Fallback to default client
389        self.get_default_client()
390    }
391
392    const fn get_default_client(&mut self) -> Option<&mut DataClientAdapter> {
393        self.default_client.as_mut()
394    }
395
396    /// Returns all custom data types currently subscribed across all clients.
397    #[must_use]
398    pub fn subscribed_custom_data(&self) -> Vec<DataType> {
399        self.collect_subscriptions(|client| &client.subscriptions_custom)
400    }
401
402    /// Returns all instrument IDs currently subscribed across all clients.
403    #[must_use]
404    pub fn subscribed_instruments(&self) -> Vec<InstrumentId> {
405        self.collect_subscriptions(|client| &client.subscriptions_instrument)
406    }
407
408    /// Returns all instrument IDs for which book delta subscriptions exist.
409    #[must_use]
410    pub fn subscribed_book_deltas(&self) -> Vec<InstrumentId> {
411        self.collect_subscriptions(|client| &client.subscriptions_book_deltas)
412    }
413
414    /// Returns all instrument IDs for which book snapshot subscriptions exist.
415    #[must_use]
416    pub fn subscribed_book_snapshots(&self) -> Vec<InstrumentId> {
417        self.collect_subscriptions(|client| &client.subscriptions_book_snapshots)
418    }
419
420    /// Returns all instrument IDs for which quote subscriptions exist.
421    #[must_use]
422    pub fn subscribed_quotes(&self) -> Vec<InstrumentId> {
423        self.collect_subscriptions(|client| &client.subscriptions_quotes)
424    }
425
426    /// Returns all instrument IDs for which trade subscriptions exist.
427    #[must_use]
428    pub fn subscribed_trades(&self) -> Vec<InstrumentId> {
429        self.collect_subscriptions(|client| &client.subscriptions_trades)
430    }
431
432    /// Returns all bar types currently subscribed across all clients.
433    #[must_use]
434    pub fn subscribed_bars(&self) -> Vec<BarType> {
435        self.collect_subscriptions(|client| &client.subscriptions_bars)
436    }
437
438    /// Returns all instrument IDs for which mark price subscriptions exist.
439    #[must_use]
440    pub fn subscribed_mark_prices(&self) -> Vec<InstrumentId> {
441        self.collect_subscriptions(|client| &client.subscriptions_mark_prices)
442    }
443
444    /// Returns all instrument IDs for which index price subscriptions exist.
445    #[must_use]
446    pub fn subscribed_index_prices(&self) -> Vec<InstrumentId> {
447        self.collect_subscriptions(|client| &client.subscriptions_index_prices)
448    }
449
450    /// Returns all instrument IDs for which funding rate subscriptions exist.
451    #[must_use]
452    pub fn subscribed_funding_rates(&self) -> Vec<InstrumentId> {
453        self.collect_subscriptions(|client| &client.subscriptions_funding_rates)
454    }
455
456    /// Returns all instrument IDs for which status subscriptions exist.
457    #[must_use]
458    pub fn subscribed_instrument_status(&self) -> Vec<InstrumentId> {
459        self.collect_subscriptions(|client| &client.subscriptions_instrument_status)
460    }
461
462    /// Returns all instrument IDs for which instrument close subscriptions exist.
463    #[must_use]
464    pub fn subscribed_instrument_close(&self) -> Vec<InstrumentId> {
465        self.collect_subscriptions(|client| &client.subscriptions_instrument_close)
466    }
467
468    #[cfg(feature = "defi")]
469    /// Returns all blockchains for which blocks subscriptions exist.
470    #[must_use]
471    pub fn subscribed_blocks(&self) -> Vec<Blockchain> {
472        self.collect_subscriptions(|client| &client.subscriptions_blocks)
473    }
474
475    #[cfg(feature = "defi")]
476    /// Returns all instrument IDs for which pool subscriptions exist.
477    #[must_use]
478    pub fn subscribed_pools(&self) -> Vec<InstrumentId> {
479        self.collect_subscriptions(|client| &client.subscriptions_pools)
480    }
481
482    #[cfg(feature = "defi")]
483    /// Returns all instrument IDs for which swap subscriptions exist.
484    #[must_use]
485    pub fn subscribed_pool_swaps(&self) -> Vec<InstrumentId> {
486        self.collect_subscriptions(|client| &client.subscriptions_pool_swaps)
487    }
488
489    #[cfg(feature = "defi")]
490    /// Returns all instrument IDs for which liquidity update subscriptions exist.
491    #[must_use]
492    pub fn subscribed_pool_liquidity_updates(&self) -> Vec<InstrumentId> {
493        self.collect_subscriptions(|client| &client.subscriptions_pool_liquidity_updates)
494    }
495
496    // -- COMMANDS --------------------------------------------------------------------------------
497
498    /// Executes a `DataCommand` by delegating to subscribe, unsubscribe, or request handlers.
499    ///
500    /// Errors during execution are logged.
501    pub fn execute(&mut self, cmd: &DataCommand) {
502        if let Err(e) = match cmd {
503            DataCommand::Subscribe(c) => self.execute_subscribe(c),
504            DataCommand::Unsubscribe(c) => self.execute_unsubscribe(c),
505            DataCommand::Request(c) => self.execute_request(c),
506            #[cfg(feature = "defi")]
507            DataCommand::DefiSubscribe(c) => self.execute_defi_subscribe(c),
508            #[cfg(feature = "defi")]
509            DataCommand::DefiUnsubscribe(c) => self.execute_defi_unsubscribe(c),
510            _ => {
511                log::warn!("Unhandled DataCommand variant: {cmd:?}");
512                Ok(())
513            }
514        } {
515            log::error!("{e}");
516        }
517    }
518
519    /// Handles a subscribe command, updating internal state and forwarding to the client.
520    ///
521    /// # Errors
522    ///
523    /// Returns an error if the subscription is invalid (e.g., synthetic instrument for book data),
524    /// or if the underlying client operation fails.
525    pub fn execute_subscribe(&mut self, cmd: &SubscribeCommand) -> anyhow::Result<()> {
526        // Update internal engine state
527        match &cmd {
528            SubscribeCommand::BookDeltas(cmd) => self.subscribe_book_deltas(cmd)?,
529            SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd)?,
530            SubscribeCommand::BookSnapshots(cmd) => self.subscribe_book_snapshots(cmd)?,
531            SubscribeCommand::Bars(cmd) => self.subscribe_bars(cmd)?,
532            _ => {} // Do nothing else
533        }
534
535        if let Some(client_id) = cmd.client_id()
536            && self.external_clients.contains(client_id)
537        {
538            if self.config.debug {
539                log::debug!("Skipping subscribe command for external client {client_id}: {cmd:?}",);
540            }
541            return Ok(());
542        }
543
544        if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
545            client.execute_subscribe(cmd);
546        } else {
547            log::error!(
548                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
549                cmd.client_id(),
550                cmd.venue(),
551            );
552        }
553
554        Ok(())
555    }
556
557    #[cfg(feature = "defi")]
558    /// Handles a subscribe command, updating internal state and forwarding to the client.
559    ///
560    /// # Errors
561    ///
562    /// Returns an error if the subscription is invalid (e.g., synthetic instrument for book data),
563    /// or if the underlying client operation fails.
564    pub fn execute_defi_subscribe(&mut self, cmd: &DefiSubscribeCommand) -> anyhow::Result<()> {
565        if let Some(client_id) = cmd.client_id()
566            && self.external_clients.contains(client_id)
567        {
568            if self.config.debug {
569                log::debug!("Skipping defi subscribe for external client {client_id}: {cmd:?}",);
570            }
571            return Ok(());
572        }
573
574        match cmd {
575            DefiSubscribeCommand::Pool(cmd) => self.setup_pool_updater(&cmd.instrument_id),
576            DefiSubscribeCommand::PoolSwaps(cmd) => self.setup_pool_updater(&cmd.instrument_id),
577            DefiSubscribeCommand::PoolLiquidityUpdates(cmd) => {
578                self.setup_pool_updater(&cmd.instrument_id);
579            }
580            _ => {}
581        }
582
583        // Forward command to client
584        if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
585            client.execute_defi_subscribe(cmd);
586        } else {
587            log::error!(
588                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
589                cmd.client_id(),
590                cmd.venue(),
591            );
592        }
593
594        Ok(())
595    }
596
597    /// Handles an unsubscribe command, updating internal state and forwarding to the client.
598    ///
599    /// # Errors
600    ///
601    /// Returns an error if the underlying client operation fails.
602    pub fn execute_unsubscribe(&mut self, cmd: &UnsubscribeCommand) -> anyhow::Result<()> {
603        match &cmd {
604            UnsubscribeCommand::BookDeltas(cmd) => self.unsubscribe_book_deltas(cmd)?,
605            UnsubscribeCommand::BookDepth10(cmd) => self.unsubscribe_book_depth10(cmd)?,
606            UnsubscribeCommand::BookSnapshots(cmd) => self.unsubscribe_book_snapshots(cmd)?,
607            UnsubscribeCommand::Bars(cmd) => self.unsubscribe_bars(cmd)?,
608            _ => {} // Do nothing else
609        }
610
611        if let Some(client_id) = cmd.client_id()
612            && self.external_clients.contains(client_id)
613        {
614            if self.config.debug {
615                log::debug!(
616                    "Skipping unsubscribe command for external client {client_id}: {cmd:?}",
617                );
618            }
619            return Ok(());
620        }
621
622        if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
623            client.execute_unsubscribe(cmd);
624        } else {
625            log::error!(
626                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
627                cmd.client_id(),
628                cmd.venue(),
629            );
630        }
631
632        Ok(())
633    }
634
635    #[cfg(feature = "defi")]
636    /// Handles an unsubscribe command, updating internal state and forwarding to the client.
637    ///
638    /// # Errors
639    ///
640    /// Returns an error if the underlying client operation fails.
641    pub fn execute_defi_unsubscribe(&mut self, cmd: &DefiUnsubscribeCommand) -> anyhow::Result<()> {
642        if let Some(client_id) = cmd.client_id()
643            && self.external_clients.contains(client_id)
644        {
645            if self.config.debug {
646                log::debug!("Skipping defi unsubscribe for external client {client_id}: {cmd:?}",);
647            }
648            return Ok(());
649        }
650
651        if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
652            client.execute_defi_unsubscribe(cmd);
653        } else {
654            log::error!(
655                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
656                cmd.client_id(),
657                cmd.venue(),
658            );
659        }
660
661        Ok(())
662    }
663
664    /// Sends a [`RequestCommand`] to a suitable data client implementation.
665    ///
666    /// # Errors
667    ///
668    /// Returns an error if no client is found for the given client ID or venue,
669    /// or if the client fails to process the request.
670    pub fn execute_request(&mut self, req: &RequestCommand) -> anyhow::Result<()> {
671        // Skip requests for external clients
672        if let Some(cid) = req.client_id()
673            && self.external_clients.contains(cid)
674        {
675            if self.config.debug {
676                log::debug!("Skipping data request for external client {cid}: {req:?}");
677            }
678            return Ok(());
679        }
680        if let Some(client) = self.get_client(req.client_id(), req.venue()) {
681            match req {
682                RequestCommand::Data(req) => client.request_data(req),
683                RequestCommand::Instrument(req) => client.request_instrument(req),
684                RequestCommand::Instruments(req) => client.request_instruments(req),
685                RequestCommand::BookSnapshot(req) => client.request_book_snapshot(req),
686                RequestCommand::BookDepth(req) => client.request_book_depth(req),
687                RequestCommand::Quotes(req) => client.request_quotes(req),
688                RequestCommand::Trades(req) => client.request_trades(req),
689                RequestCommand::Bars(req) => client.request_bars(req),
690            }
691        } else {
692            anyhow::bail!(
693                "Cannot handle request: no client found for {:?} {:?}",
694                req.client_id(),
695                req.venue()
696            );
697        }
698    }
699
700    /// Processes a dynamically-typed data message.
701    ///
702    /// Currently supports `InstrumentAny` and `FundingRateUpdate`; unrecognized types are logged as errors.
703    pub fn process(&mut self, data: &dyn Any) {
704        // TODO: Eventually these could be added to the `Data` enum? process here for now
705        if let Some(data) = data.downcast_ref::<Data>() {
706            self.process_data(data.clone()); // TODO: Optimize (not necessary if we change handler)
707            return;
708        }
709
710        #[cfg(feature = "defi")]
711        if let Some(data) = data.downcast_ref::<DefiData>() {
712            self.process_defi_data(data.clone()); // TODO: Optimize (not necessary if we change handler)
713            return;
714        }
715
716        if let Some(instrument) = data.downcast_ref::<InstrumentAny>() {
717            self.handle_instrument(instrument.clone());
718        } else if let Some(funding_rate) = data.downcast_ref::<FundingRateUpdate>() {
719            self.handle_funding_rate(*funding_rate);
720        } else {
721            log::error!("Cannot process data {data:?}, type is unrecognized");
722        }
723    }
724
725    /// Processes a `Data` enum instance, dispatching to appropriate handlers.
726    pub fn process_data(&mut self, data: Data) {
727        match data {
728            Data::Delta(delta) => self.handle_delta(delta),
729            Data::Deltas(deltas) => self.handle_deltas(deltas.into_inner()),
730            Data::Depth10(depth) => self.handle_depth10(*depth),
731            Data::Quote(quote) => self.handle_quote(quote),
732            Data::Trade(trade) => self.handle_trade(trade),
733            Data::Bar(bar) => self.handle_bar(bar),
734            Data::MarkPriceUpdate(mark_price) => self.handle_mark_price(mark_price),
735            Data::IndexPriceUpdate(index_price) => self.handle_index_price(index_price),
736            Data::InstrumentClose(close) => self.handle_instrument_close(close),
737        }
738    }
739
740    /// Processes DeFi-specific data events.
741    #[cfg(feature = "defi")]
742    pub fn process_defi_data(&mut self, data: DefiData) {
743        match data {
744            DefiData::Block(block) => {
745                let topic = switchboard::get_defi_blocks_topic(block.chain());
746                msgbus::publish(topic, &block as &dyn Any);
747            }
748            DefiData::Pool(pool) => {
749                if let Err(err) = self.cache.borrow_mut().add_pool(pool.clone()) {
750                    log::error!("Failed to add Pool to cache: {err}");
751                }
752
753                let topic = switchboard::get_defi_pool_topic(pool.instrument_id);
754                msgbus::publish(topic, &pool as &dyn Any);
755            }
756            DefiData::PoolSwap(swap) => {
757                let topic = switchboard::get_defi_pool_swaps_topic(swap.instrument_id);
758                msgbus::publish(topic, &swap as &dyn Any);
759            }
760            DefiData::PoolLiquidityUpdate(update) => {
761                let topic = switchboard::get_defi_liquidity_topic(update.instrument_id);
762                msgbus::publish(topic, &update as &dyn Any);
763            }
764            DefiData::PoolFeeCollect(collect) => {
765                let topic = switchboard::get_defi_collect_topic(collect.instrument_id);
766                msgbus::publish(topic, &collect as &dyn Any);
767            }
768        }
769    }
770
771    /// Processes a `DataResponse`, handling and publishing the response message.
772    pub fn response(&self, resp: DataResponse) {
773        log::debug!("{RECV}{RES} {resp:?}");
774
775        match &resp {
776            DataResponse::Instrument(resp) => {
777                self.handle_instrument_response(resp.data.clone());
778            }
779            DataResponse::Instruments(resp) => {
780                self.handle_instruments(&resp.data);
781            }
782            DataResponse::Quotes(resp) => self.handle_quotes(&resp.data),
783            DataResponse::Trades(resp) => self.handle_trades(&resp.data),
784            DataResponse::Bars(resp) => self.handle_bars(&resp.data),
785            _ => todo!(),
786        }
787
788        msgbus::send_response(resp.correlation_id(), &resp);
789    }
790
791    // -- DATA HANDLERS ---------------------------------------------------------------------------
792
793    fn handle_instrument(&mut self, instrument: InstrumentAny) {
794        if let Err(e) = self
795            .cache
796            .as_ref()
797            .borrow_mut()
798            .add_instrument(instrument.clone())
799        {
800            log_error_on_cache_insert(&e);
801        }
802
803        let topic = switchboard::get_instrument_topic(instrument.id());
804        msgbus::publish(topic, &instrument as &dyn Any);
805    }
806
807    fn handle_delta(&mut self, delta: OrderBookDelta) {
808        let deltas = if self.config.buffer_deltas {
809            if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&delta.instrument_id) {
810                buffered_deltas.deltas.push(delta);
811            } else {
812                let buffered_deltas = OrderBookDeltas::new(delta.instrument_id, vec![delta]);
813                self.buffered_deltas_map
814                    .insert(delta.instrument_id, buffered_deltas);
815            }
816
817            if !RecordFlag::F_LAST.matches(delta.flags) {
818                return; // Not the last delta for event
819            }
820
821            // SAFETY: We know the deltas exists already
822            self.buffered_deltas_map
823                .remove(&delta.instrument_id)
824                .unwrap()
825        } else {
826            OrderBookDeltas::new(delta.instrument_id, vec![delta])
827        };
828
829        let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
830        msgbus::publish(topic, &deltas as &dyn Any);
831    }
832
833    fn handle_deltas(&mut self, deltas: OrderBookDeltas) {
834        let deltas = if self.config.buffer_deltas {
835            let mut is_last_delta = false;
836            for delta in &deltas.deltas {
837                if RecordFlag::F_LAST.matches(delta.flags) {
838                    is_last_delta = true;
839                    break;
840                }
841            }
842
843            let instrument_id = deltas.instrument_id;
844
845            if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&instrument_id) {
846                buffered_deltas.deltas.extend(deltas.deltas);
847            } else {
848                self.buffered_deltas_map.insert(instrument_id, deltas);
849            }
850
851            if !is_last_delta {
852                return;
853            }
854
855            // SAFETY: We know the deltas exists already
856            self.buffered_deltas_map.remove(&instrument_id).unwrap()
857        } else {
858            deltas
859        };
860
861        let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
862        msgbus::publish(topic, &deltas as &dyn Any);
863    }
864
865    fn handle_depth10(&mut self, depth: OrderBookDepth10) {
866        let topic = switchboard::get_book_depth10_topic(depth.instrument_id);
867        msgbus::publish(topic, &depth as &dyn Any);
868    }
869
870    fn handle_quote(&mut self, quote: QuoteTick) {
871        if let Err(e) = self.cache.as_ref().borrow_mut().add_quote(quote) {
872            log_error_on_cache_insert(&e);
873        }
874
875        // TODO: Handle synthetics
876
877        let topic = switchboard::get_quotes_topic(quote.instrument_id);
878        msgbus::publish(topic, &quote as &dyn Any);
879    }
880
881    fn handle_trade(&mut self, trade: TradeTick) {
882        if let Err(e) = self.cache.as_ref().borrow_mut().add_trade(trade) {
883            log_error_on_cache_insert(&e);
884        }
885
886        // TODO: Handle synthetics
887
888        let topic = switchboard::get_trades_topic(trade.instrument_id);
889        msgbus::publish(topic, &trade as &dyn Any);
890    }
891
892    fn handle_bar(&mut self, bar: Bar) {
893        // TODO: Handle additional bar logic
894        if self.config.validate_data_sequence
895            && let Some(last_bar) = self.cache.as_ref().borrow().bar(&bar.bar_type)
896        {
897            if bar.ts_event < last_bar.ts_event {
898                log::warn!(
899                    "Bar {bar} was prior to last bar `ts_event` {}",
900                    last_bar.ts_event
901                );
902                return; // Bar is out of sequence
903            }
904            if bar.ts_init < last_bar.ts_init {
905                log::warn!(
906                    "Bar {bar} was prior to last bar `ts_init` {}",
907                    last_bar.ts_init
908                );
909                return; // Bar is out of sequence
910            }
911            // TODO: Implement `bar.is_revision` logic
912        }
913
914        if let Err(e) = self.cache.as_ref().borrow_mut().add_bar(bar) {
915            log_error_on_cache_insert(&e);
916        }
917
918        let topic = switchboard::get_bars_topic(bar.bar_type);
919        msgbus::publish(topic, &bar as &dyn Any);
920    }
921
922    fn handle_mark_price(&mut self, mark_price: MarkPriceUpdate) {
923        if let Err(e) = self.cache.as_ref().borrow_mut().add_mark_price(mark_price) {
924            log_error_on_cache_insert(&e);
925        }
926
927        let topic = switchboard::get_mark_price_topic(mark_price.instrument_id);
928        msgbus::publish(topic, &mark_price as &dyn Any);
929    }
930
931    fn handle_index_price(&mut self, index_price: IndexPriceUpdate) {
932        if let Err(e) = self
933            .cache
934            .as_ref()
935            .borrow_mut()
936            .add_index_price(index_price)
937        {
938            log_error_on_cache_insert(&e);
939        }
940
941        let topic = switchboard::get_index_price_topic(index_price.instrument_id);
942        msgbus::publish(topic, &index_price as &dyn Any);
943    }
944
945    /// Handles a funding rate update by adding it to the cache and publishing to the message bus.
946    pub fn handle_funding_rate(&mut self, funding_rate: FundingRateUpdate) {
947        if let Err(e) = self
948            .cache
949            .as_ref()
950            .borrow_mut()
951            .add_funding_rate(funding_rate)
952        {
953            log_error_on_cache_insert(&e);
954        }
955
956        let topic = switchboard::get_funding_rate_topic(funding_rate.instrument_id);
957        msgbus::publish(topic, &funding_rate as &dyn Any);
958    }
959
960    fn handle_instrument_close(&mut self, close: InstrumentClose) {
961        let topic = switchboard::get_instrument_close_topic(close.instrument_id);
962        msgbus::publish(topic, &close as &dyn Any);
963    }
964
965    // -- SUBSCRIPTION HANDLERS -------------------------------------------------------------------
966
967    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
968        if cmd.instrument_id.is_synthetic() {
969            anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
970        }
971
972        self.setup_order_book(&cmd.instrument_id, cmd.book_type, true, cmd.managed)?;
973
974        Ok(())
975    }
976
977    fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
978        if cmd.instrument_id.is_synthetic() {
979            anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDepth10` data");
980        }
981
982        self.setup_order_book(&cmd.instrument_id, cmd.book_type, false, cmd.managed)?;
983
984        Ok(())
985    }
986
987    fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
988        if self.subscribed_book_deltas().contains(&cmd.instrument_id) {
989            return Ok(());
990        }
991
992        if cmd.instrument_id.is_synthetic() {
993            anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
994        }
995
996        // Track snapshot intervals per instrument, and set up timer on first subscription
997        let first_for_interval = match self.book_intervals.entry(cmd.interval_ms) {
998            Entry::Vacant(e) => {
999                let mut set = AHashSet::new();
1000                set.insert(cmd.instrument_id);
1001                e.insert(set);
1002                true
1003            }
1004            Entry::Occupied(mut e) => {
1005                e.get_mut().insert(cmd.instrument_id);
1006                false
1007            }
1008        };
1009
1010        if first_for_interval {
1011            // Initialize snapshotter and schedule its timer
1012            let interval_ns = millis_to_nanos(cmd.interval_ms.get() as f64);
1013            let topic = switchboard::get_book_snapshots_topic(cmd.instrument_id, cmd.interval_ms);
1014
1015            let snap_info = BookSnapshotInfo {
1016                instrument_id: cmd.instrument_id,
1017                venue: cmd.instrument_id.venue,
1018                is_composite: cmd.instrument_id.symbol.is_composite(),
1019                root: Ustr::from(cmd.instrument_id.symbol.root()),
1020                topic,
1021                interval_ms: cmd.interval_ms,
1022            };
1023
1024            // Schedule the first snapshot at the next interval boundary
1025            let now_ns = self.clock.borrow().timestamp_ns().as_u64();
1026            let start_time_ns = now_ns - (now_ns % interval_ns) + interval_ns;
1027
1028            let snapshotter = Rc::new(BookSnapshotter::new(snap_info, self.cache.clone()));
1029            self.book_snapshotters
1030                .insert(cmd.instrument_id, snapshotter.clone());
1031            let timer_name = snapshotter.timer_name;
1032
1033            let callback =
1034                TimeEventCallback::Rust(Rc::new(move |event| snapshotter.snapshot(event)));
1035
1036            self.clock
1037                .borrow_mut()
1038                .set_timer_ns(
1039                    &timer_name,
1040                    interval_ns,
1041                    Some(start_time_ns.into()),
1042                    None,
1043                    Some(callback),
1044                    None,
1045                    None,
1046                )
1047                .expect(FAILED);
1048        }
1049
1050        self.setup_order_book(&cmd.instrument_id, cmd.book_type, false, true)?;
1051
1052        Ok(())
1053    }
1054
1055    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
1056        match cmd.bar_type.aggregation_source() {
1057            AggregationSource::Internal => {
1058                if !self.bar_aggregators.contains_key(&cmd.bar_type.standard()) {
1059                    self.start_bar_aggregator(cmd.bar_type)?;
1060                }
1061            }
1062            AggregationSource::External => {
1063                if cmd.bar_type.instrument_id().is_synthetic() {
1064                    anyhow::bail!(
1065                        "Cannot subscribe for externally aggregated synthetic instrument bar data"
1066                    );
1067                }
1068            }
1069        }
1070
1071        Ok(())
1072    }
1073
1074    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
1075        if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
1076            log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
1077            return Ok(());
1078        }
1079
1080        let topics = vec![
1081            switchboard::get_book_deltas_topic(cmd.instrument_id),
1082            switchboard::get_book_depth10_topic(cmd.instrument_id),
1083            // TODO: Unsubscribe from snapshots?
1084        ];
1085
1086        self.maintain_book_updater(&cmd.instrument_id, &topics);
1087        self.maintain_book_snapshotter(&cmd.instrument_id);
1088
1089        Ok(())
1090    }
1091
1092    fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
1093        if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
1094            log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
1095            return Ok(());
1096        }
1097
1098        let topics = vec![
1099            switchboard::get_book_deltas_topic(cmd.instrument_id),
1100            switchboard::get_book_depth10_topic(cmd.instrument_id),
1101            // TODO: Unsubscribe from snapshots?
1102        ];
1103
1104        self.maintain_book_updater(&cmd.instrument_id, &topics);
1105        self.maintain_book_snapshotter(&cmd.instrument_id);
1106
1107        Ok(())
1108    }
1109
1110    fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
1111        if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
1112            log::warn!("Cannot unsubscribe from `OrderBook` snapshots: not subscribed");
1113            return Ok(());
1114        }
1115
1116        // Remove instrument from interval tracking, and drop empty intervals
1117        let mut to_remove = Vec::new();
1118        for (interval, set) in &mut self.book_intervals {
1119            if set.remove(&cmd.instrument_id) && set.is_empty() {
1120                to_remove.push(*interval);
1121            }
1122        }
1123
1124        for interval in to_remove {
1125            self.book_intervals.remove(&interval);
1126        }
1127
1128        let topics = vec![
1129            switchboard::get_book_deltas_topic(cmd.instrument_id),
1130            switchboard::get_book_depth10_topic(cmd.instrument_id),
1131            // TODO: Unsubscribe from snapshots (add interval_ms to message?)
1132        ];
1133
1134        self.maintain_book_updater(&cmd.instrument_id, &topics);
1135        self.maintain_book_snapshotter(&cmd.instrument_id);
1136
1137        Ok(())
1138    }
1139
1140    /// Unsubscribe internal bar aggregator for the given bar type.
1141    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
1142        // If we have an internal aggregator for this bar type, stop and remove it
1143        let bar_type = cmd.bar_type;
1144        if self.bar_aggregators.contains_key(&bar_type.standard()) {
1145            if let Err(err) = self.stop_bar_aggregator(bar_type) {
1146                log::error!("Error stopping bar aggregator for {bar_type}: {err}");
1147            }
1148            self.bar_aggregators.remove(&bar_type.standard());
1149            log::debug!("Removed bar aggregator for {bar_type}");
1150        }
1151        Ok(())
1152    }
1153
1154    fn maintain_book_updater(&mut self, instrument_id: &InstrumentId, topics: &[MStr<Topic>]) {
1155        if let Some(updater) = self.book_updaters.get(instrument_id) {
1156            let handler = ShareableMessageHandler(updater.clone());
1157
1158            // Unsubscribe handler if it is the last subscriber
1159            for topic in topics {
1160                if msgbus::subscriptions_count(topic.as_str()) == 1
1161                    && msgbus::is_subscribed(topic.as_str(), handler.clone())
1162                {
1163                    log::debug!("Unsubscribing BookUpdater from {topic}");
1164                    msgbus::unsubscribe_topic(*topic, handler.clone());
1165                }
1166            }
1167
1168            // Check remaining subscriptions, if none then remove updater
1169            let still_subscribed = topics
1170                .iter()
1171                .any(|topic| msgbus::is_subscribed(topic.as_str(), handler.clone()));
1172            if !still_subscribed {
1173                self.book_updaters.remove(instrument_id);
1174                log::debug!("Removed BookUpdater for instrument ID {instrument_id}");
1175            }
1176        }
1177    }
1178
1179    fn maintain_book_snapshotter(&mut self, instrument_id: &InstrumentId) {
1180        if let Some(snapshotter) = self.book_snapshotters.get(instrument_id) {
1181            let topic = switchboard::get_book_snapshots_topic(
1182                *instrument_id,
1183                snapshotter.snap_info.interval_ms,
1184            );
1185
1186            // Check remaining snapshot subscriptions, if none then remove snapshotter
1187            if msgbus::subscriptions_count(topic.as_str()) == 0 {
1188                let timer_name = snapshotter.timer_name;
1189                self.book_snapshotters.remove(instrument_id);
1190                let mut clock = self.clock.borrow_mut();
1191                if clock.timer_names().contains(&timer_name.as_str()) {
1192                    clock.cancel_timer(&timer_name);
1193                }
1194                log::debug!("Removed BookSnapshotter for instrument ID {instrument_id}");
1195            }
1196        }
1197    }
1198
1199    // -- RESPONSE HANDLERS -----------------------------------------------------------------------
1200
1201    fn handle_instrument_response(&self, instrument: InstrumentAny) {
1202        let mut cache = self.cache.as_ref().borrow_mut();
1203        if let Err(e) = cache.add_instrument(instrument) {
1204            log_error_on_cache_insert(&e);
1205        }
1206    }
1207
1208    fn handle_instruments(&self, instruments: &[InstrumentAny]) {
1209        // TODO: Improve by adding bulk update methods to cache and database
1210        let mut cache = self.cache.as_ref().borrow_mut();
1211        for instrument in instruments {
1212            if let Err(e) = cache.add_instrument(instrument.clone()) {
1213                log_error_on_cache_insert(&e);
1214            }
1215        }
1216    }
1217
1218    fn handle_quotes(&self, quotes: &[QuoteTick]) {
1219        if let Err(e) = self.cache.as_ref().borrow_mut().add_quotes(quotes) {
1220            log_error_on_cache_insert(&e);
1221        }
1222    }
1223
1224    fn handle_trades(&self, trades: &[TradeTick]) {
1225        if let Err(e) = self.cache.as_ref().borrow_mut().add_trades(trades) {
1226            log_error_on_cache_insert(&e);
1227        }
1228    }
1229
1230    fn handle_bars(&self, bars: &[Bar]) {
1231        if let Err(e) = self.cache.as_ref().borrow_mut().add_bars(bars) {
1232            log_error_on_cache_insert(&e);
1233        }
1234    }
1235
1236    // -- INTERNAL --------------------------------------------------------------------------------
1237
1238    #[allow(clippy::too_many_arguments)]
1239    fn setup_order_book(
1240        &mut self,
1241        instrument_id: &InstrumentId,
1242        book_type: BookType,
1243        only_deltas: bool,
1244        managed: bool,
1245    ) -> anyhow::Result<()> {
1246        let mut cache = self.cache.borrow_mut();
1247        if managed && !cache.has_order_book(instrument_id) {
1248            let book = OrderBook::new(*instrument_id, book_type);
1249            log::debug!("Created {book}");
1250            cache.add_order_book(book)?;
1251        }
1252
1253        // Set up subscriptions
1254        let updater = Rc::new(BookUpdater::new(instrument_id, self.cache.clone()));
1255        self.book_updaters.insert(*instrument_id, updater.clone());
1256
1257        let handler = ShareableMessageHandler(updater);
1258
1259        let topic = switchboard::get_book_deltas_topic(*instrument_id);
1260        if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1261            msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1262        }
1263
1264        let topic = switchboard::get_book_depth10_topic(*instrument_id);
1265        if !only_deltas && !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1266            msgbus::subscribe(topic.into(), handler, Some(self.msgbus_priority));
1267        }
1268
1269        Ok(())
1270    }
1271
1272    #[cfg(feature = "defi")]
1273    fn setup_pool_updater(&mut self, instrument_id: &InstrumentId) {
1274        if self.pool_updaters.contains_key(instrument_id) {
1275            return;
1276        }
1277
1278        let updater = Rc::new(PoolUpdater::new(instrument_id, self.cache.clone()));
1279        let handler = ShareableMessageHandler(updater.clone());
1280
1281        // Subscribe to pool swaps and liquidity updates
1282        let swap_topic = switchboard::get_defi_pool_swaps_topic(*instrument_id);
1283        if !msgbus::is_subscribed(swap_topic.as_str(), handler.clone()) {
1284            msgbus::subscribe(
1285                swap_topic.into(),
1286                handler.clone(),
1287                Some(self.msgbus_priority),
1288            );
1289        }
1290
1291        let liquidity_topic = switchboard::get_defi_liquidity_topic(*instrument_id);
1292        if !msgbus::is_subscribed(liquidity_topic.as_str(), handler.clone()) {
1293            msgbus::subscribe(liquidity_topic.into(), handler, Some(self.msgbus_priority));
1294        }
1295
1296        self.pool_updaters.insert(*instrument_id, updater);
1297        log::debug!("Created PoolUpdater for instrument ID {instrument_id}");
1298    }
1299
1300    fn create_bar_aggregator(
1301        &mut self,
1302        instrument: &InstrumentAny,
1303        bar_type: BarType,
1304    ) -> Box<dyn BarAggregator> {
1305        let cache = self.cache.clone();
1306
1307        let handler = move |bar: Bar| {
1308            if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
1309                log_error_on_cache_insert(&e);
1310            }
1311
1312            let topic = switchboard::get_bars_topic(bar.bar_type);
1313            msgbus::publish(topic, &bar as &dyn Any);
1314        };
1315
1316        let clock = self.clock.clone();
1317        let config = self.config.clone();
1318
1319        let price_precision = instrument.price_precision();
1320        let size_precision = instrument.size_precision();
1321
1322        if bar_type.spec().is_time_aggregated() {
1323            // Get time_bars_origin_offset from config
1324            let time_bars_origin_offset = config
1325                .time_bars_origins
1326                .get(&bar_type.spec().aggregation)
1327                .map(|duration| chrono::TimeDelta::from_std(*duration).unwrap_or_default());
1328
1329            Box::new(TimeBarAggregator::new(
1330                bar_type,
1331                price_precision,
1332                size_precision,
1333                clock,
1334                handler,
1335                false, // await_partial
1336                config.time_bars_build_with_no_updates,
1337                config.time_bars_timestamp_on_close,
1338                config.time_bars_interval_type,
1339                time_bars_origin_offset,
1340                20,    // TODO: TBD, composite bar build delay
1341                false, // TODO: skip_first_non_full_bar, make it config dependent
1342            ))
1343        } else {
1344            match bar_type.spec().aggregation {
1345                BarAggregation::Tick => Box::new(TickBarAggregator::new(
1346                    bar_type,
1347                    price_precision,
1348                    size_precision,
1349                    handler,
1350                    false,
1351                )) as Box<dyn BarAggregator>,
1352                BarAggregation::Volume => Box::new(VolumeBarAggregator::new(
1353                    bar_type,
1354                    price_precision,
1355                    size_precision,
1356                    handler,
1357                    false,
1358                )) as Box<dyn BarAggregator>,
1359                BarAggregation::Value => Box::new(ValueBarAggregator::new(
1360                    bar_type,
1361                    price_precision,
1362                    size_precision,
1363                    handler,
1364                    false,
1365                )) as Box<dyn BarAggregator>,
1366                BarAggregation::Renko => Box::new(RenkoBarAggregator::new(
1367                    bar_type,
1368                    price_precision,
1369                    size_precision,
1370                    instrument.price_increment(),
1371                    handler,
1372                    false,
1373                )) as Box<dyn BarAggregator>,
1374                _ => panic!(
1375                    "Cannot create aggregator: {} aggregation not currently supported",
1376                    bar_type.spec().aggregation
1377                ),
1378            }
1379        }
1380    }
1381
1382    fn start_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
1383        // Get the instrument for this bar type
1384        let instrument = {
1385            let cache = self.cache.borrow();
1386            cache
1387                .instrument(&bar_type.instrument_id())
1388                .ok_or_else(|| {
1389                    anyhow::anyhow!(
1390                        "Cannot start bar aggregation: no instrument found for {}",
1391                        bar_type.instrument_id(),
1392                    )
1393                })?
1394                .clone()
1395        };
1396
1397        // Use standard form of bar type as key
1398        let bar_key = bar_type.standard();
1399
1400        // Create or retrieve aggregator in Rc<RefCell>
1401        let aggregator = if let Some(rc) = self.bar_aggregators.get(&bar_key) {
1402            rc.clone()
1403        } else {
1404            let agg = self.create_bar_aggregator(&instrument, bar_type);
1405            let rc = Rc::new(RefCell::new(agg));
1406            self.bar_aggregators.insert(bar_key, rc.clone());
1407            rc
1408        };
1409
1410        // Subscribe to underlying data topics
1411        let mut handlers = Vec::new();
1412
1413        if bar_type.is_composite() {
1414            let topic = switchboard::get_bars_topic(bar_type.composite());
1415            let handler =
1416                ShareableMessageHandler(Rc::new(BarBarHandler::new(aggregator.clone(), bar_key)));
1417
1418            if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1419                msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1420            }
1421
1422            handlers.push((topic, handler));
1423        } else if bar_type.spec().price_type == PriceType::Last {
1424            let topic = switchboard::get_trades_topic(bar_type.instrument_id());
1425            let handler =
1426                ShareableMessageHandler(Rc::new(BarTradeHandler::new(aggregator.clone(), bar_key)));
1427
1428            if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1429                msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1430            }
1431
1432            handlers.push((topic, handler));
1433        } else {
1434            let topic = switchboard::get_quotes_topic(bar_type.instrument_id());
1435            let handler =
1436                ShareableMessageHandler(Rc::new(BarQuoteHandler::new(aggregator.clone(), bar_key)));
1437
1438            if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1439                msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1440            }
1441
1442            handlers.push((topic, handler));
1443        }
1444
1445        self.bar_aggregator_handlers.insert(bar_key, handlers);
1446        aggregator.borrow_mut().set_is_running(true);
1447
1448        Ok(())
1449    }
1450
1451    fn stop_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
1452        let aggregator = self
1453            .bar_aggregators
1454            .remove(&bar_type.standard())
1455            .ok_or_else(|| {
1456                anyhow::anyhow!("Cannot stop bar aggregator: no aggregator to stop for {bar_type}")
1457            })?;
1458
1459        aggregator.borrow_mut().stop();
1460
1461        // Unsubscribe any registered message handlers
1462        let bar_key = bar_type.standard();
1463        if let Some(subs) = self.bar_aggregator_handlers.remove(&bar_key) {
1464            for (topic, handler) in subs {
1465                if msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1466                    msgbus::unsubscribe_topic(topic, handler);
1467                }
1468            }
1469        }
1470
1471        Ok(())
1472    }
1473}
1474
1475#[inline(always)]
1476fn log_error_on_cache_insert<T: Display>(e: &T) {
1477    log::error!("Error on cache insert: {e}");
1478}