nautilus_data/engine/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a high-performance `DataEngine` for all environments.
17//!
18//! The `DataEngine` is the central component of the entire data stack.
19//! The data engines primary responsibility is to orchestrate interactions between
20//! the `DataClient` instances, and the rest of the platform. This includes sending
21//! requests to, and receiving responses from, data endpoints via its registered
22//! data clients.
23//!
24//! The engine employs a simple fan-in fan-out messaging pattern to execute
25//! `DataCommand` type messages, and process `DataResponse` messages or market data
26//! objects.
27//!
28//! Alternative implementations can be written on top of the generic engine - which
29//! just need to override the `execute`, `process`, `send` and `receive` methods.
30
31pub mod book;
32pub mod config;
33mod handlers;
34
35#[cfg(feature = "defi")]
36pub mod pool;
37
38use std::{
39    any::Any,
40    cell::{Ref, RefCell},
41    collections::hash_map::Entry,
42    fmt::Display,
43    num::NonZeroUsize,
44    rc::Rc,
45};
46
47use ahash::{AHashMap, AHashSet};
48use book::{BookSnapshotInfo, BookSnapshotter, BookUpdater};
49use config::DataEngineConfig;
50use futures::future::join_all;
51use handlers::{BarBarHandler, BarQuoteHandler, BarTradeHandler};
52use indexmap::IndexMap;
53use nautilus_common::{
54    cache::Cache,
55    clock::Clock,
56    logging::{RECV, RES},
57    messages::data::{
58        DataCommand, DataResponse, RequestCommand, SubscribeBars, SubscribeBookDeltas,
59        SubscribeBookDepth10, SubscribeBookSnapshots, SubscribeCommand, UnsubscribeBars,
60        UnsubscribeBookDeltas, UnsubscribeBookDepth10, UnsubscribeBookSnapshots,
61        UnsubscribeCommand,
62    },
63    msgbus::{self, MStr, Topic, handler::ShareableMessageHandler, switchboard},
64    timer::{TimeEvent, TimeEventCallback},
65};
66use nautilus_core::{
67    correctness::{
68        FAILED, check_key_in_map, check_key_not_in_map, check_predicate_false, check_predicate_true,
69    },
70    datetime::millis_to_nanos_unchecked,
71};
72#[cfg(feature = "defi")]
73use nautilus_model::defi::DefiData;
74use nautilus_model::{
75    data::{
76        Bar, BarType, Data, DataType, FundingRateUpdate, IndexPriceUpdate, InstrumentClose,
77        MarkPriceUpdate, OrderBookDelta, OrderBookDeltas, OrderBookDepth10, QuoteTick, TradeTick,
78    },
79    enums::{AggregationSource, BarAggregation, BookType, PriceType, RecordFlag},
80    identifiers::{ClientId, InstrumentId, Venue},
81    instruments::{Instrument, InstrumentAny, SyntheticInstrument},
82    orderbook::OrderBook,
83};
84#[cfg(feature = "streaming")]
85use nautilus_persistence::backend::catalog::ParquetDataCatalog;
86use ustr::Ustr;
87
88#[cfg(feature = "defi")]
89#[allow(unused_imports)] // Brings DeFi impl blocks into scope
90use crate::defi::engine as _;
91#[cfg(feature = "defi")]
92use crate::engine::pool::PoolUpdater;
93use crate::{
94    aggregation::{
95        BarAggregator, RenkoBarAggregator, TickBarAggregator, TickImbalanceBarAggregator,
96        TickRunsBarAggregator, TimeBarAggregator, ValueBarAggregator, ValueImbalanceBarAggregator,
97        ValueRunsBarAggregator, VolumeBarAggregator, VolumeImbalanceBarAggregator,
98        VolumeRunsBarAggregator,
99    },
100    client::DataClientAdapter,
101};
102
103/// Provides a high-performance `DataEngine` for all environments.
104#[derive(Debug)]
105pub struct DataEngine {
106    pub(crate) clock: Rc<RefCell<dyn Clock>>,
107    pub(crate) cache: Rc<RefCell<Cache>>,
108    pub(crate) external_clients: AHashSet<ClientId>,
109    clients: IndexMap<ClientId, DataClientAdapter>,
110    default_client: Option<DataClientAdapter>,
111    #[cfg(feature = "streaming")]
112    catalogs: AHashMap<Ustr, ParquetDataCatalog>,
113    routing_map: IndexMap<Venue, ClientId>,
114    book_intervals: AHashMap<NonZeroUsize, AHashSet<InstrumentId>>,
115    book_updaters: AHashMap<InstrumentId, Rc<BookUpdater>>,
116    book_snapshotters: AHashMap<InstrumentId, Rc<BookSnapshotter>>,
117    bar_aggregators: AHashMap<BarType, Rc<RefCell<Box<dyn BarAggregator>>>>,
118    bar_aggregator_handlers: AHashMap<BarType, Vec<(MStr<Topic>, ShareableMessageHandler)>>,
119    _synthetic_quote_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
120    _synthetic_trade_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
121    buffered_deltas_map: AHashMap<InstrumentId, OrderBookDeltas>,
122    pub(crate) msgbus_priority: u8,
123    pub(crate) config: DataEngineConfig,
124    #[cfg(feature = "defi")]
125    pub(crate) pool_updaters: AHashMap<InstrumentId, Rc<PoolUpdater>>,
126    #[cfg(feature = "defi")]
127    pub(crate) pool_updaters_pending: AHashSet<InstrumentId>,
128    #[cfg(feature = "defi")]
129    pub(crate) pool_snapshot_pending: AHashSet<InstrumentId>,
130    #[cfg(feature = "defi")]
131    pub(crate) pool_event_buffers: AHashMap<InstrumentId, Vec<DefiData>>,
132}
133
134impl DataEngine {
135    /// Creates a new [`DataEngine`] instance.
136    #[must_use]
137    pub fn new(
138        clock: Rc<RefCell<dyn Clock>>,
139        cache: Rc<RefCell<Cache>>,
140        config: Option<DataEngineConfig>,
141    ) -> Self {
142        let config = config.unwrap_or_default();
143
144        let external_clients: AHashSet<ClientId> = config
145            .external_clients
146            .clone()
147            .unwrap_or_default()
148            .into_iter()
149            .collect();
150
151        Self {
152            clock,
153            cache,
154            external_clients,
155            clients: IndexMap::new(),
156            default_client: None,
157            #[cfg(feature = "streaming")]
158            catalogs: AHashMap::new(),
159            routing_map: IndexMap::new(),
160            book_intervals: AHashMap::new(),
161            book_updaters: AHashMap::new(),
162            book_snapshotters: AHashMap::new(),
163            bar_aggregators: AHashMap::new(),
164            bar_aggregator_handlers: AHashMap::new(),
165            _synthetic_quote_feeds: AHashMap::new(),
166            _synthetic_trade_feeds: AHashMap::new(),
167            buffered_deltas_map: AHashMap::new(),
168            msgbus_priority: 10, // High-priority for built-in component
169            config,
170            #[cfg(feature = "defi")]
171            pool_updaters: AHashMap::new(),
172            #[cfg(feature = "defi")]
173            pool_updaters_pending: AHashSet::new(),
174            #[cfg(feature = "defi")]
175            pool_snapshot_pending: AHashSet::new(),
176            #[cfg(feature = "defi")]
177            pool_event_buffers: AHashMap::new(),
178        }
179    }
180
181    /// Returns a read-only reference to the engines clock.
182    #[must_use]
183    pub fn get_clock(&self) -> Ref<'_, dyn Clock> {
184        self.clock.borrow()
185    }
186
187    /// Returns a read-only reference to the engines cache.
188    #[must_use]
189    pub fn get_cache(&self) -> Ref<'_, Cache> {
190        self.cache.borrow()
191    }
192
193    /// Returns the `Rc<RefCell<Cache>>` used by this engine.
194    #[must_use]
195    pub fn cache_rc(&self) -> Rc<RefCell<Cache>> {
196        Rc::clone(&self.cache)
197    }
198
199    /// Registers the `catalog` with the engine with an optional specific `name`.
200    ///
201    /// # Panics
202    ///
203    /// Panics if a catalog with the same `name` has already been registered.
204    #[cfg(feature = "streaming")]
205    pub fn register_catalog(&mut self, catalog: ParquetDataCatalog, name: Option<String>) {
206        let name = Ustr::from(name.as_deref().unwrap_or("catalog_0"));
207
208        check_key_not_in_map(&name, &self.catalogs, "name", "catalogs").expect(FAILED);
209
210        self.catalogs.insert(name, catalog);
211        log::info!("Registered catalog <{name}>");
212    }
213
214    /// Registers the `client` with the engine with an optional venue `routing`.
215    ///
216    ///
217    /// # Panics
218    ///
219    /// Panics if a client with the same client ID has already been registered.
220    pub fn register_client(&mut self, client: DataClientAdapter, routing: Option<Venue>) {
221        let client_id = client.client_id();
222
223        if let Some(default_client) = &self.default_client {
224            check_predicate_false(
225                default_client.client_id() == client.client_id(),
226                "client_id already registered as default client",
227            )
228            .expect(FAILED);
229        }
230
231        check_key_not_in_map(&client_id, &self.clients, "client_id", "clients").expect(FAILED);
232
233        if let Some(routing) = routing {
234            self.routing_map.insert(routing, client_id);
235            log::info!("Set client {client_id} routing for {routing}");
236        }
237
238        if client.venue.is_none() && self.default_client.is_none() {
239            self.default_client = Some(client);
240            log::info!("Registered client {client_id} for default routing");
241        } else {
242            self.clients.insert(client_id, client);
243            log::info!("Registered client {client_id}");
244        }
245    }
246
247    /// Deregisters the client for the `client_id`.
248    ///
249    /// # Panics
250    ///
251    /// Panics if the client ID has not been registered.
252    pub fn deregister_client(&mut self, client_id: &ClientId) {
253        check_key_in_map(client_id, &self.clients, "client_id", "clients").expect(FAILED);
254
255        self.clients.shift_remove(client_id);
256        log::info!("Deregistered client {client_id}");
257    }
258
259    /// Registers the data `client` with the engine as the default routing client.
260    ///
261    /// When a specific venue routing cannot be found, this client will receive messages.
262    ///
263    /// # Warnings
264    ///
265    /// Any existing default routing client will be overwritten.
266    ///
267    /// # Panics
268    ///
269    /// Panics if a default client has already been registered.
270    pub fn register_default_client(&mut self, client: DataClientAdapter) {
271        check_predicate_true(
272            self.default_client.is_none(),
273            "default client already registered",
274        )
275        .expect(FAILED);
276
277        let client_id = client.client_id();
278
279        self.default_client = Some(client);
280        log::info!("Registered default client {client_id}");
281    }
282
283    /// Starts all registered data clients.
284    pub fn start(&mut self) {
285        for client in self.get_clients_mut() {
286            if let Err(e) = client.start() {
287                log::error!("{e}");
288            }
289        }
290    }
291
292    /// Stops all registered data clients.
293    pub fn stop(&mut self) {
294        for client in self.get_clients_mut() {
295            if let Err(e) = client.stop() {
296                log::error!("{e}");
297            }
298        }
299    }
300
301    /// Resets all registered data clients to their initial state.
302    pub fn reset(&mut self) {
303        for client in self.get_clients_mut() {
304            if let Err(e) = client.reset() {
305                log::error!("{e}");
306            }
307        }
308    }
309
310    /// Disposes the engine, stopping all clients and canceling any timers.
311    pub fn dispose(&mut self) {
312        for client in self.get_clients_mut() {
313            if let Err(e) = client.dispose() {
314                log::error!("{e}");
315            }
316        }
317
318        self.clock.borrow_mut().cancel_timers();
319    }
320
321    /// Connects all registered data clients concurrently.
322    ///
323    /// # Errors
324    ///
325    /// Returns an error if any client fails to connect.
326    pub async fn connect(&mut self) -> anyhow::Result<()> {
327        let futures: Vec<_> = self
328            .get_clients_mut()
329            .into_iter()
330            .map(|client| client.connect())
331            .collect();
332
333        let results = join_all(futures).await;
334        let errors: Vec<_> = results.into_iter().filter_map(Result::err).collect();
335
336        if errors.is_empty() {
337            Ok(())
338        } else {
339            let error_msgs: Vec<_> = errors.iter().map(|e| e.to_string()).collect();
340            anyhow::bail!("Failed to connect data clients: {}", error_msgs.join("; "))
341        }
342    }
343
344    /// Disconnects all registered data clients concurrently.
345    ///
346    /// # Errors
347    ///
348    /// Returns an error if any client fails to disconnect.
349    pub async fn disconnect(&mut self) -> anyhow::Result<()> {
350        let futures: Vec<_> = self
351            .get_clients_mut()
352            .into_iter()
353            .map(|client| client.disconnect())
354            .collect();
355
356        let results = join_all(futures).await;
357        let errors: Vec<_> = results.into_iter().filter_map(Result::err).collect();
358
359        if errors.is_empty() {
360            Ok(())
361        } else {
362            let error_msgs: Vec<_> = errors.iter().map(|e| e.to_string()).collect();
363            anyhow::bail!(
364                "Failed to disconnect data clients: {}",
365                error_msgs.join("; ")
366            )
367        }
368    }
369
370    /// Returns `true` if all registered data clients are currently connected.
371    #[must_use]
372    pub fn check_connected(&self) -> bool {
373        self.get_clients()
374            .iter()
375            .all(|client| client.is_connected())
376    }
377
378    /// Returns `true` if all registered data clients are currently disconnected.
379    #[must_use]
380    pub fn check_disconnected(&self) -> bool {
381        self.get_clients()
382            .iter()
383            .all(|client| !client.is_connected())
384    }
385
386    /// Returns a list of all registered client IDs, including the default client if set.
387    #[must_use]
388    pub fn registered_clients(&self) -> Vec<ClientId> {
389        self.get_clients()
390            .into_iter()
391            .map(|client| client.client_id())
392            .collect()
393    }
394
395    // -- SUBSCRIPTIONS ---------------------------------------------------------------------------
396
397    pub(crate) fn collect_subscriptions<F, T>(&self, get_subs: F) -> Vec<T>
398    where
399        F: Fn(&DataClientAdapter) -> &AHashSet<T>,
400        T: Clone,
401    {
402        self.get_clients()
403            .into_iter()
404            .flat_map(get_subs)
405            .cloned()
406            .collect()
407    }
408
409    #[must_use]
410    pub fn get_clients(&self) -> Vec<&DataClientAdapter> {
411        let (default_opt, clients_map) = (&self.default_client, &self.clients);
412        let mut clients: Vec<&DataClientAdapter> = clients_map.values().collect();
413
414        if let Some(default) = default_opt {
415            clients.push(default);
416        }
417
418        clients
419    }
420
421    #[must_use]
422    pub fn get_clients_mut(&mut self) -> Vec<&mut DataClientAdapter> {
423        let (default_opt, clients_map) = (&mut self.default_client, &mut self.clients);
424        let mut clients: Vec<&mut DataClientAdapter> = clients_map.values_mut().collect();
425
426        if let Some(default) = default_opt {
427            clients.push(default);
428        }
429
430        clients
431    }
432
433    pub fn get_client(
434        &mut self,
435        client_id: Option<&ClientId>,
436        venue: Option<&Venue>,
437    ) -> Option<&mut DataClientAdapter> {
438        if let Some(client_id) = client_id {
439            // Explicit ID: first look in registered clients
440            if let Some(client) = self.clients.get_mut(client_id) {
441                return Some(client);
442            }
443
444            // Then check if it matches the default client
445            if let Some(default) = self.default_client.as_mut()
446                && default.client_id() == *client_id
447            {
448                return Some(default);
449            }
450
451            // Unknown explicit client
452            return None;
453        }
454
455        if let Some(v) = venue {
456            // Route by venue if mapped client still registered
457            if let Some(client_id) = self.routing_map.get(v) {
458                return self.clients.get_mut(client_id);
459            }
460        }
461
462        // Fallback to default client
463        self.get_default_client()
464    }
465
466    const fn get_default_client(&mut self) -> Option<&mut DataClientAdapter> {
467        self.default_client.as_mut()
468    }
469
470    /// Returns all custom data types currently subscribed across all clients.
471    #[must_use]
472    pub fn subscribed_custom_data(&self) -> Vec<DataType> {
473        self.collect_subscriptions(|client| &client.subscriptions_custom)
474    }
475
476    /// Returns all instrument IDs currently subscribed across all clients.
477    #[must_use]
478    pub fn subscribed_instruments(&self) -> Vec<InstrumentId> {
479        self.collect_subscriptions(|client| &client.subscriptions_instrument)
480    }
481
482    /// Returns all instrument IDs for which book delta subscriptions exist.
483    #[must_use]
484    pub fn subscribed_book_deltas(&self) -> Vec<InstrumentId> {
485        self.collect_subscriptions(|client| &client.subscriptions_book_deltas)
486    }
487
488    /// Returns all instrument IDs for which book snapshot subscriptions exist.
489    #[must_use]
490    pub fn subscribed_book_snapshots(&self) -> Vec<InstrumentId> {
491        self.collect_subscriptions(|client| &client.subscriptions_book_snapshots)
492    }
493
494    /// Returns all instrument IDs for which quote subscriptions exist.
495    #[must_use]
496    pub fn subscribed_quotes(&self) -> Vec<InstrumentId> {
497        self.collect_subscriptions(|client| &client.subscriptions_quotes)
498    }
499
500    /// Returns all instrument IDs for which trade subscriptions exist.
501    #[must_use]
502    pub fn subscribed_trades(&self) -> Vec<InstrumentId> {
503        self.collect_subscriptions(|client| &client.subscriptions_trades)
504    }
505
506    /// Returns all bar types currently subscribed across all clients.
507    #[must_use]
508    pub fn subscribed_bars(&self) -> Vec<BarType> {
509        self.collect_subscriptions(|client| &client.subscriptions_bars)
510    }
511
512    /// Returns all instrument IDs for which mark price subscriptions exist.
513    #[must_use]
514    pub fn subscribed_mark_prices(&self) -> Vec<InstrumentId> {
515        self.collect_subscriptions(|client| &client.subscriptions_mark_prices)
516    }
517
518    /// Returns all instrument IDs for which index price subscriptions exist.
519    #[must_use]
520    pub fn subscribed_index_prices(&self) -> Vec<InstrumentId> {
521        self.collect_subscriptions(|client| &client.subscriptions_index_prices)
522    }
523
524    /// Returns all instrument IDs for which funding rate subscriptions exist.
525    #[must_use]
526    pub fn subscribed_funding_rates(&self) -> Vec<InstrumentId> {
527        self.collect_subscriptions(|client| &client.subscriptions_funding_rates)
528    }
529
530    /// Returns all instrument IDs for which status subscriptions exist.
531    #[must_use]
532    pub fn subscribed_instrument_status(&self) -> Vec<InstrumentId> {
533        self.collect_subscriptions(|client| &client.subscriptions_instrument_status)
534    }
535
536    /// Returns all instrument IDs for which instrument close subscriptions exist.
537    #[must_use]
538    pub fn subscribed_instrument_close(&self) -> Vec<InstrumentId> {
539        self.collect_subscriptions(|client| &client.subscriptions_instrument_close)
540    }
541
542    // -- COMMANDS --------------------------------------------------------------------------------
543
544    /// Executes a `DataCommand` by delegating to subscribe, unsubscribe, or request handlers.
545    ///
546    /// Errors during execution are logged.
547    pub fn execute(&mut self, cmd: &DataCommand) {
548        if let Err(e) = match cmd {
549            DataCommand::Subscribe(c) => self.execute_subscribe(c),
550            DataCommand::Unsubscribe(c) => self.execute_unsubscribe(c),
551            DataCommand::Request(c) => self.execute_request(c),
552            #[cfg(feature = "defi")]
553            DataCommand::DefiRequest(c) => self.execute_defi_request(c),
554            #[cfg(feature = "defi")]
555            DataCommand::DefiSubscribe(c) => self.execute_defi_subscribe(c),
556            #[cfg(feature = "defi")]
557            DataCommand::DefiUnsubscribe(c) => self.execute_defi_unsubscribe(c),
558            _ => {
559                log::warn!("Unhandled DataCommand variant: {cmd:?}");
560                Ok(())
561            }
562        } {
563            log::error!("{e}");
564        }
565    }
566
567    /// Handles a subscribe command, updating internal state and forwarding to the client.
568    ///
569    /// # Errors
570    ///
571    /// Returns an error if the subscription is invalid (e.g., synthetic instrument for book data),
572    /// or if the underlying client operation fails.
573    pub fn execute_subscribe(&mut self, cmd: &SubscribeCommand) -> anyhow::Result<()> {
574        // Update internal engine state
575        match &cmd {
576            SubscribeCommand::BookDeltas(cmd) => self.subscribe_book_deltas(cmd)?,
577            SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd)?,
578            SubscribeCommand::BookSnapshots(cmd) => self.subscribe_book_snapshots(cmd)?,
579            SubscribeCommand::Bars(cmd) => self.subscribe_bars(cmd)?,
580            _ => {} // Do nothing else
581        }
582
583        if let Some(client_id) = cmd.client_id()
584            && self.external_clients.contains(client_id)
585        {
586            if self.config.debug {
587                log::debug!("Skipping subscribe command for external client {client_id}: {cmd:?}",);
588            }
589            return Ok(());
590        }
591
592        if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
593            client.execute_subscribe(cmd);
594        } else {
595            log::error!(
596                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
597                cmd.client_id(),
598                cmd.venue(),
599            );
600        }
601
602        Ok(())
603    }
604
605    /// Handles an unsubscribe command, updating internal state and forwarding to the client.
606    ///
607    /// # Errors
608    ///
609    /// Returns an error if the underlying client operation fails.
610    pub fn execute_unsubscribe(&mut self, cmd: &UnsubscribeCommand) -> anyhow::Result<()> {
611        match &cmd {
612            UnsubscribeCommand::BookDeltas(cmd) => self.unsubscribe_book_deltas(cmd)?,
613            UnsubscribeCommand::BookDepth10(cmd) => self.unsubscribe_book_depth10(cmd)?,
614            UnsubscribeCommand::BookSnapshots(cmd) => self.unsubscribe_book_snapshots(cmd)?,
615            UnsubscribeCommand::Bars(cmd) => self.unsubscribe_bars(cmd)?,
616            _ => {} // Do nothing else
617        }
618
619        if let Some(client_id) = cmd.client_id()
620            && self.external_clients.contains(client_id)
621        {
622            if self.config.debug {
623                log::debug!(
624                    "Skipping unsubscribe command for external client {client_id}: {cmd:?}",
625                );
626            }
627            return Ok(());
628        }
629
630        if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
631            client.execute_unsubscribe(cmd);
632        } else {
633            log::error!(
634                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
635                cmd.client_id(),
636                cmd.venue(),
637            );
638        }
639
640        Ok(())
641    }
642
643    /// Sends a [`RequestCommand`] to a suitable data client implementation.
644    ///
645    /// # Errors
646    ///
647    /// Returns an error if no client is found for the given client ID or venue,
648    /// or if the client fails to process the request.
649    pub fn execute_request(&mut self, req: &RequestCommand) -> anyhow::Result<()> {
650        // Skip requests for external clients
651        if let Some(cid) = req.client_id()
652            && self.external_clients.contains(cid)
653        {
654            if self.config.debug {
655                log::debug!("Skipping data request for external client {cid}: {req:?}");
656            }
657            return Ok(());
658        }
659        if let Some(client) = self.get_client(req.client_id(), req.venue()) {
660            match req {
661                RequestCommand::Data(req) => client.request_data(req),
662                RequestCommand::Instrument(req) => client.request_instrument(req),
663                RequestCommand::Instruments(req) => client.request_instruments(req),
664                RequestCommand::BookSnapshot(req) => client.request_book_snapshot(req),
665                RequestCommand::BookDepth(req) => client.request_book_depth(req),
666                RequestCommand::Quotes(req) => client.request_quotes(req),
667                RequestCommand::Trades(req) => client.request_trades(req),
668                RequestCommand::Bars(req) => client.request_bars(req),
669            }
670        } else {
671            anyhow::bail!(
672                "Cannot handle request: no client found for {:?} {:?}",
673                req.client_id(),
674                req.venue()
675            );
676        }
677    }
678
679    /// Processes a dynamically-typed data message.
680    ///
681    /// Currently supports `InstrumentAny` and `FundingRateUpdate`; unrecognized types are logged as errors.
682    pub fn process(&mut self, data: &dyn Any) {
683        // TODO: Eventually these could be added to the `Data` enum? process here for now
684        if let Some(data) = data.downcast_ref::<Data>() {
685            self.process_data(data.clone()); // TODO: Optimize (not necessary if we change handler)
686            return;
687        }
688
689        #[cfg(feature = "defi")]
690        if let Some(data) = data.downcast_ref::<DefiData>() {
691            self.process_defi_data(data.clone()); // TODO: Optimize (not necessary if we change handler)
692            return;
693        }
694
695        if let Some(instrument) = data.downcast_ref::<InstrumentAny>() {
696            self.handle_instrument(instrument.clone());
697        } else if let Some(funding_rate) = data.downcast_ref::<FundingRateUpdate>() {
698            self.handle_funding_rate(*funding_rate);
699        } else {
700            log::error!("Cannot process data {data:?}, type is unrecognized");
701        }
702    }
703
704    /// Processes a `Data` enum instance, dispatching to appropriate handlers.
705    pub fn process_data(&mut self, data: Data) {
706        match data {
707            Data::Delta(delta) => self.handle_delta(delta),
708            Data::Deltas(deltas) => self.handle_deltas(deltas.into_inner()),
709            Data::Depth10(depth) => self.handle_depth10(*depth),
710            Data::Quote(quote) => self.handle_quote(quote),
711            Data::Trade(trade) => self.handle_trade(trade),
712            Data::Bar(bar) => self.handle_bar(bar),
713            Data::MarkPriceUpdate(mark_price) => self.handle_mark_price(mark_price),
714            Data::IndexPriceUpdate(index_price) => self.handle_index_price(index_price),
715            Data::InstrumentClose(close) => self.handle_instrument_close(close),
716        }
717    }
718
719    /// Processes a `DataResponse`, handling and publishing the response message.
720    pub fn response(&self, resp: DataResponse) {
721        log::debug!("{RECV}{RES} {resp:?}");
722
723        match &resp {
724            DataResponse::Instrument(resp) => {
725                self.handle_instrument_response(resp.data.clone());
726            }
727            DataResponse::Instruments(resp) => {
728                self.handle_instruments(&resp.data);
729            }
730            DataResponse::Quotes(resp) => self.handle_quotes(&resp.data),
731            DataResponse::Trades(resp) => self.handle_trades(&resp.data),
732            DataResponse::Bars(resp) => self.handle_bars(&resp.data),
733            _ => todo!(),
734        }
735
736        msgbus::send_response(resp.correlation_id(), &resp);
737    }
738
739    // -- DATA HANDLERS ---------------------------------------------------------------------------
740
741    fn handle_instrument(&mut self, instrument: InstrumentAny) {
742        if let Err(e) = self
743            .cache
744            .as_ref()
745            .borrow_mut()
746            .add_instrument(instrument.clone())
747        {
748            log_error_on_cache_insert(&e);
749        }
750
751        let topic = switchboard::get_instrument_topic(instrument.id());
752        msgbus::publish(topic, &instrument as &dyn Any);
753    }
754
755    fn handle_delta(&mut self, delta: OrderBookDelta) {
756        let deltas = if self.config.buffer_deltas {
757            if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&delta.instrument_id) {
758                buffered_deltas.deltas.push(delta);
759                buffered_deltas.flags = delta.flags;
760                buffered_deltas.sequence = delta.sequence;
761                buffered_deltas.ts_event = delta.ts_event;
762                buffered_deltas.ts_init = delta.ts_init;
763            } else {
764                let buffered_deltas = OrderBookDeltas::new(delta.instrument_id, vec![delta]);
765                self.buffered_deltas_map
766                    .insert(delta.instrument_id, buffered_deltas);
767            }
768
769            if !RecordFlag::F_LAST.matches(delta.flags) {
770                return; // Not the last delta for event
771            }
772
773            // SAFETY: We know the deltas exists already
774            self.buffered_deltas_map
775                .remove(&delta.instrument_id)
776                .unwrap()
777        } else {
778            OrderBookDeltas::new(delta.instrument_id, vec![delta])
779        };
780
781        let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
782        msgbus::publish(topic, &deltas as &dyn Any);
783    }
784
785    fn handle_deltas(&mut self, deltas: OrderBookDeltas) {
786        let deltas = if self.config.buffer_deltas {
787            let mut is_last_delta = false;
788            for delta in &deltas.deltas {
789                if RecordFlag::F_LAST.matches(delta.flags) {
790                    is_last_delta = true;
791                    break;
792                }
793            }
794
795            let instrument_id = deltas.instrument_id;
796
797            if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&instrument_id) {
798                buffered_deltas.deltas.extend(deltas.deltas);
799
800                if let Some(last_delta) = buffered_deltas.deltas.last() {
801                    buffered_deltas.flags = last_delta.flags;
802                    buffered_deltas.sequence = last_delta.sequence;
803                    buffered_deltas.ts_event = last_delta.ts_event;
804                    buffered_deltas.ts_init = last_delta.ts_init;
805                }
806            } else {
807                self.buffered_deltas_map.insert(instrument_id, deltas);
808            }
809
810            if !is_last_delta {
811                return;
812            }
813
814            // SAFETY: We know the deltas exists already
815            self.buffered_deltas_map.remove(&instrument_id).unwrap()
816        } else {
817            deltas
818        };
819
820        let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
821        msgbus::publish(topic, &deltas as &dyn Any);
822    }
823
824    fn handle_depth10(&mut self, depth: OrderBookDepth10) {
825        let topic = switchboard::get_book_depth10_topic(depth.instrument_id);
826        msgbus::publish(topic, &depth as &dyn Any);
827    }
828
829    fn handle_quote(&mut self, quote: QuoteTick) {
830        if let Err(e) = self.cache.as_ref().borrow_mut().add_quote(quote) {
831            log_error_on_cache_insert(&e);
832        }
833
834        // TODO: Handle synthetics
835
836        let topic = switchboard::get_quotes_topic(quote.instrument_id);
837        msgbus::publish(topic, &quote as &dyn Any);
838    }
839
840    fn handle_trade(&mut self, trade: TradeTick) {
841        if let Err(e) = self.cache.as_ref().borrow_mut().add_trade(trade) {
842            log_error_on_cache_insert(&e);
843        }
844
845        // TODO: Handle synthetics
846
847        let topic = switchboard::get_trades_topic(trade.instrument_id);
848        msgbus::publish(topic, &trade as &dyn Any);
849    }
850
851    fn handle_bar(&mut self, bar: Bar) {
852        // TODO: Handle additional bar logic
853        if self.config.validate_data_sequence
854            && let Some(last_bar) = self.cache.as_ref().borrow().bar(&bar.bar_type)
855        {
856            if bar.ts_event < last_bar.ts_event {
857                log::warn!(
858                    "Bar {bar} was prior to last bar `ts_event` {}",
859                    last_bar.ts_event
860                );
861                return; // Bar is out of sequence
862            }
863            if bar.ts_init < last_bar.ts_init {
864                log::warn!(
865                    "Bar {bar} was prior to last bar `ts_init` {}",
866                    last_bar.ts_init
867                );
868                return; // Bar is out of sequence
869            }
870            // TODO: Implement `bar.is_revision` logic
871        }
872
873        if let Err(e) = self.cache.as_ref().borrow_mut().add_bar(bar) {
874            log_error_on_cache_insert(&e);
875        }
876
877        let topic = switchboard::get_bars_topic(bar.bar_type);
878        msgbus::publish(topic, &bar as &dyn Any);
879    }
880
881    fn handle_mark_price(&mut self, mark_price: MarkPriceUpdate) {
882        if let Err(e) = self.cache.as_ref().borrow_mut().add_mark_price(mark_price) {
883            log_error_on_cache_insert(&e);
884        }
885
886        let topic = switchboard::get_mark_price_topic(mark_price.instrument_id);
887        msgbus::publish(topic, &mark_price as &dyn Any);
888    }
889
890    fn handle_index_price(&mut self, index_price: IndexPriceUpdate) {
891        if let Err(e) = self
892            .cache
893            .as_ref()
894            .borrow_mut()
895            .add_index_price(index_price)
896        {
897            log_error_on_cache_insert(&e);
898        }
899
900        let topic = switchboard::get_index_price_topic(index_price.instrument_id);
901        msgbus::publish(topic, &index_price as &dyn Any);
902    }
903
904    /// Handles a funding rate update by adding it to the cache and publishing to the message bus.
905    pub fn handle_funding_rate(&mut self, funding_rate: FundingRateUpdate) {
906        if let Err(e) = self
907            .cache
908            .as_ref()
909            .borrow_mut()
910            .add_funding_rate(funding_rate)
911        {
912            log_error_on_cache_insert(&e);
913        }
914
915        let topic = switchboard::get_funding_rate_topic(funding_rate.instrument_id);
916        msgbus::publish(topic, &funding_rate as &dyn Any);
917    }
918
919    fn handle_instrument_close(&mut self, close: InstrumentClose) {
920        let topic = switchboard::get_instrument_close_topic(close.instrument_id);
921        msgbus::publish(topic, &close as &dyn Any);
922    }
923
924    // -- SUBSCRIPTION HANDLERS -------------------------------------------------------------------
925
926    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
927        if cmd.instrument_id.is_synthetic() {
928            anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
929        }
930
931        self.setup_book_updater(&cmd.instrument_id, cmd.book_type, true, cmd.managed)?;
932
933        Ok(())
934    }
935
936    fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
937        if cmd.instrument_id.is_synthetic() {
938            anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDepth10` data");
939        }
940
941        self.setup_book_updater(&cmd.instrument_id, cmd.book_type, false, cmd.managed)?;
942
943        Ok(())
944    }
945
946    fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
947        if self.subscribed_book_deltas().contains(&cmd.instrument_id) {
948            return Ok(());
949        }
950
951        if cmd.instrument_id.is_synthetic() {
952            anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
953        }
954
955        // Track snapshot intervals per instrument, and set up timer on first subscription
956        let first_for_interval = match self.book_intervals.entry(cmd.interval_ms) {
957            Entry::Vacant(e) => {
958                let mut set = AHashSet::new();
959                set.insert(cmd.instrument_id);
960                e.insert(set);
961                true
962            }
963            Entry::Occupied(mut e) => {
964                e.get_mut().insert(cmd.instrument_id);
965                false
966            }
967        };
968
969        if first_for_interval {
970            // Initialize snapshotter and schedule its timer
971            let interval_ns = millis_to_nanos_unchecked(cmd.interval_ms.get() as f64);
972            let topic = switchboard::get_book_snapshots_topic(cmd.instrument_id, cmd.interval_ms);
973
974            let snap_info = BookSnapshotInfo {
975                instrument_id: cmd.instrument_id,
976                venue: cmd.instrument_id.venue,
977                is_composite: cmd.instrument_id.symbol.is_composite(),
978                root: Ustr::from(cmd.instrument_id.symbol.root()),
979                topic,
980                interval_ms: cmd.interval_ms,
981            };
982
983            // Schedule the first snapshot at the next interval boundary
984            let now_ns = self.clock.borrow().timestamp_ns().as_u64();
985            let start_time_ns = now_ns - (now_ns % interval_ns) + interval_ns;
986
987            let snapshotter = Rc::new(BookSnapshotter::new(snap_info, self.cache.clone()));
988            self.book_snapshotters
989                .insert(cmd.instrument_id, snapshotter.clone());
990            let timer_name = snapshotter.timer_name;
991
992            let callback_fn: Rc<dyn Fn(TimeEvent)> =
993                Rc::new(move |event| snapshotter.snapshot(event));
994            let callback = TimeEventCallback::from(callback_fn);
995
996            self.clock
997                .borrow_mut()
998                .set_timer_ns(
999                    &timer_name,
1000                    interval_ns,
1001                    Some(start_time_ns.into()),
1002                    None,
1003                    Some(callback),
1004                    None,
1005                    None,
1006                )
1007                .expect(FAILED);
1008        }
1009
1010        self.setup_book_updater(&cmd.instrument_id, cmd.book_type, false, true)?;
1011
1012        Ok(())
1013    }
1014
1015    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
1016        match cmd.bar_type.aggregation_source() {
1017            AggregationSource::Internal => {
1018                if !self.bar_aggregators.contains_key(&cmd.bar_type.standard()) {
1019                    self.start_bar_aggregator(cmd.bar_type)?;
1020                }
1021            }
1022            AggregationSource::External => {
1023                if cmd.bar_type.instrument_id().is_synthetic() {
1024                    anyhow::bail!(
1025                        "Cannot subscribe for externally aggregated synthetic instrument bar data"
1026                    );
1027                }
1028            }
1029        }
1030
1031        Ok(())
1032    }
1033
1034    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
1035        if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
1036            log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
1037            return Ok(());
1038        }
1039
1040        let topics = vec![
1041            switchboard::get_book_deltas_topic(cmd.instrument_id),
1042            switchboard::get_book_depth10_topic(cmd.instrument_id),
1043            // TODO: Unsubscribe from snapshots?
1044        ];
1045
1046        self.maintain_book_updater(&cmd.instrument_id, &topics);
1047        self.maintain_book_snapshotter(&cmd.instrument_id);
1048
1049        Ok(())
1050    }
1051
1052    fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
1053        if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
1054            log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
1055            return Ok(());
1056        }
1057
1058        let topics = vec![
1059            switchboard::get_book_deltas_topic(cmd.instrument_id),
1060            switchboard::get_book_depth10_topic(cmd.instrument_id),
1061            // TODO: Unsubscribe from snapshots?
1062        ];
1063
1064        self.maintain_book_updater(&cmd.instrument_id, &topics);
1065        self.maintain_book_snapshotter(&cmd.instrument_id);
1066
1067        Ok(())
1068    }
1069
1070    fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
1071        if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
1072            log::warn!("Cannot unsubscribe from `OrderBook` snapshots: not subscribed");
1073            return Ok(());
1074        }
1075
1076        // Remove instrument from interval tracking, and drop empty intervals
1077        let mut to_remove = Vec::new();
1078        for (interval, set) in &mut self.book_intervals {
1079            if set.remove(&cmd.instrument_id) && set.is_empty() {
1080                to_remove.push(*interval);
1081            }
1082        }
1083
1084        for interval in to_remove {
1085            self.book_intervals.remove(&interval);
1086        }
1087
1088        let topics = vec![
1089            switchboard::get_book_deltas_topic(cmd.instrument_id),
1090            switchboard::get_book_depth10_topic(cmd.instrument_id),
1091            // TODO: Unsubscribe from snapshots (add interval_ms to message?)
1092        ];
1093
1094        self.maintain_book_updater(&cmd.instrument_id, &topics);
1095        self.maintain_book_snapshotter(&cmd.instrument_id);
1096
1097        Ok(())
1098    }
1099
1100    /// Unsubscribe internal bar aggregator for the given bar type.
1101    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
1102        // If we have an internal aggregator for this bar type, stop and remove it
1103        let bar_type = cmd.bar_type;
1104        if self.bar_aggregators.contains_key(&bar_type.standard()) {
1105            if let Err(e) = self.stop_bar_aggregator(bar_type) {
1106                log::error!("Error stopping bar aggregator for {bar_type}: {e}");
1107            }
1108            self.bar_aggregators.remove(&bar_type.standard());
1109            log::debug!("Removed bar aggregator for {bar_type}");
1110        }
1111        Ok(())
1112    }
1113
1114    fn maintain_book_updater(&mut self, instrument_id: &InstrumentId, topics: &[MStr<Topic>]) {
1115        if let Some(updater) = self.book_updaters.get(instrument_id) {
1116            let handler = ShareableMessageHandler(updater.clone());
1117
1118            // Unsubscribe handler if it is the last subscriber
1119            for topic in topics {
1120                if msgbus::subscriptions_count(topic.as_str()) == 1
1121                    && msgbus::is_subscribed(topic.as_str(), handler.clone())
1122                {
1123                    log::debug!("Unsubscribing BookUpdater from {topic}");
1124                    msgbus::unsubscribe_topic(*topic, handler.clone());
1125                }
1126            }
1127
1128            // Check remaining subscriptions, if none then remove updater
1129            let still_subscribed = topics
1130                .iter()
1131                .any(|topic| msgbus::is_subscribed(topic.as_str(), handler.clone()));
1132            if !still_subscribed {
1133                self.book_updaters.remove(instrument_id);
1134                log::debug!("Removed BookUpdater for instrument ID {instrument_id}");
1135            }
1136        }
1137    }
1138
1139    fn maintain_book_snapshotter(&mut self, instrument_id: &InstrumentId) {
1140        if let Some(snapshotter) = self.book_snapshotters.get(instrument_id) {
1141            let topic = switchboard::get_book_snapshots_topic(
1142                *instrument_id,
1143                snapshotter.snap_info.interval_ms,
1144            );
1145
1146            // Check remaining snapshot subscriptions, if none then remove snapshotter
1147            if msgbus::subscriptions_count(topic.as_str()) == 0 {
1148                let timer_name = snapshotter.timer_name;
1149                self.book_snapshotters.remove(instrument_id);
1150                let mut clock = self.clock.borrow_mut();
1151                if clock.timer_exists(&timer_name) {
1152                    clock.cancel_timer(&timer_name);
1153                }
1154                log::debug!("Removed BookSnapshotter for instrument ID {instrument_id}");
1155            }
1156        }
1157    }
1158
1159    // -- RESPONSE HANDLERS -----------------------------------------------------------------------
1160
1161    fn handle_instrument_response(&self, instrument: InstrumentAny) {
1162        let mut cache = self.cache.as_ref().borrow_mut();
1163        if let Err(e) = cache.add_instrument(instrument) {
1164            log_error_on_cache_insert(&e);
1165        }
1166    }
1167
1168    fn handle_instruments(&self, instruments: &[InstrumentAny]) {
1169        // TODO: Improve by adding bulk update methods to cache and database
1170        let mut cache = self.cache.as_ref().borrow_mut();
1171        for instrument in instruments {
1172            if let Err(e) = cache.add_instrument(instrument.clone()) {
1173                log_error_on_cache_insert(&e);
1174            }
1175        }
1176    }
1177
1178    fn handle_quotes(&self, quotes: &[QuoteTick]) {
1179        if let Err(e) = self.cache.as_ref().borrow_mut().add_quotes(quotes) {
1180            log_error_on_cache_insert(&e);
1181        }
1182    }
1183
1184    fn handle_trades(&self, trades: &[TradeTick]) {
1185        if let Err(e) = self.cache.as_ref().borrow_mut().add_trades(trades) {
1186            log_error_on_cache_insert(&e);
1187        }
1188    }
1189
1190    fn handle_bars(&self, bars: &[Bar]) {
1191        if let Err(e) = self.cache.as_ref().borrow_mut().add_bars(bars) {
1192            log_error_on_cache_insert(&e);
1193        }
1194    }
1195
1196    // -- INTERNAL --------------------------------------------------------------------------------
1197
1198    #[allow(clippy::too_many_arguments)]
1199    fn setup_book_updater(
1200        &mut self,
1201        instrument_id: &InstrumentId,
1202        book_type: BookType,
1203        only_deltas: bool,
1204        managed: bool,
1205    ) -> anyhow::Result<()> {
1206        let mut cache = self.cache.borrow_mut();
1207        if managed && !cache.has_order_book(instrument_id) {
1208            let book = OrderBook::new(*instrument_id, book_type);
1209            log::debug!("Created {book}");
1210            cache.add_order_book(book)?;
1211        }
1212
1213        // Set up subscriptions
1214        let updater = Rc::new(BookUpdater::new(instrument_id, self.cache.clone()));
1215        self.book_updaters.insert(*instrument_id, updater.clone());
1216
1217        let handler = ShareableMessageHandler(updater);
1218
1219        let topic = switchboard::get_book_deltas_topic(*instrument_id);
1220        if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1221            msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1222        }
1223
1224        let topic = switchboard::get_book_depth10_topic(*instrument_id);
1225        if !only_deltas && !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1226            msgbus::subscribe(topic.into(), handler, Some(self.msgbus_priority));
1227        }
1228
1229        Ok(())
1230    }
1231
1232    fn create_bar_aggregator(
1233        &mut self,
1234        instrument: &InstrumentAny,
1235        bar_type: BarType,
1236    ) -> Box<dyn BarAggregator> {
1237        let cache = self.cache.clone();
1238
1239        let handler = move |bar: Bar| {
1240            if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
1241                log_error_on_cache_insert(&e);
1242            }
1243
1244            let topic = switchboard::get_bars_topic(bar.bar_type);
1245            msgbus::publish(topic, &bar as &dyn Any);
1246        };
1247
1248        let clock = self.clock.clone();
1249        let config = self.config.clone();
1250
1251        let price_precision = instrument.price_precision();
1252        let size_precision = instrument.size_precision();
1253
1254        if bar_type.spec().is_time_aggregated() {
1255            // Get time_bars_origin_offset from config
1256            let time_bars_origin_offset = config
1257                .time_bars_origins
1258                .get(&bar_type.spec().aggregation)
1259                .map(|duration| chrono::TimeDelta::from_std(*duration).unwrap_or_default());
1260
1261            Box::new(TimeBarAggregator::new(
1262                bar_type,
1263                price_precision,
1264                size_precision,
1265                clock,
1266                handler,
1267                config.time_bars_build_with_no_updates,
1268                config.time_bars_timestamp_on_close,
1269                config.time_bars_interval_type,
1270                time_bars_origin_offset,
1271                20,    // TODO: TBD, composite bar build delay
1272                false, // TODO: skip_first_non_full_bar, make it config dependent
1273            ))
1274        } else {
1275            match bar_type.spec().aggregation {
1276                BarAggregation::Tick => Box::new(TickBarAggregator::new(
1277                    bar_type,
1278                    price_precision,
1279                    size_precision,
1280                    handler,
1281                )) as Box<dyn BarAggregator>,
1282                BarAggregation::TickImbalance => Box::new(TickImbalanceBarAggregator::new(
1283                    bar_type,
1284                    price_precision,
1285                    size_precision,
1286                    handler,
1287                )) as Box<dyn BarAggregator>,
1288                BarAggregation::TickRuns => Box::new(TickRunsBarAggregator::new(
1289                    bar_type,
1290                    price_precision,
1291                    size_precision,
1292                    handler,
1293                )) as Box<dyn BarAggregator>,
1294                BarAggregation::Volume => Box::new(VolumeBarAggregator::new(
1295                    bar_type,
1296                    price_precision,
1297                    size_precision,
1298                    handler,
1299                )) as Box<dyn BarAggregator>,
1300                BarAggregation::VolumeImbalance => Box::new(VolumeImbalanceBarAggregator::new(
1301                    bar_type,
1302                    price_precision,
1303                    size_precision,
1304                    handler,
1305                )) as Box<dyn BarAggregator>,
1306                BarAggregation::VolumeRuns => Box::new(VolumeRunsBarAggregator::new(
1307                    bar_type,
1308                    price_precision,
1309                    size_precision,
1310                    handler,
1311                )) as Box<dyn BarAggregator>,
1312                BarAggregation::Value => Box::new(ValueBarAggregator::new(
1313                    bar_type,
1314                    price_precision,
1315                    size_precision,
1316                    handler,
1317                )) as Box<dyn BarAggregator>,
1318                BarAggregation::ValueImbalance => Box::new(ValueImbalanceBarAggregator::new(
1319                    bar_type,
1320                    price_precision,
1321                    size_precision,
1322                    handler,
1323                )) as Box<dyn BarAggregator>,
1324                BarAggregation::ValueRuns => Box::new(ValueRunsBarAggregator::new(
1325                    bar_type,
1326                    price_precision,
1327                    size_precision,
1328                    handler,
1329                )) as Box<dyn BarAggregator>,
1330                BarAggregation::Renko => Box::new(RenkoBarAggregator::new(
1331                    bar_type,
1332                    price_precision,
1333                    size_precision,
1334                    instrument.price_increment(),
1335                    handler,
1336                )) as Box<dyn BarAggregator>,
1337                _ => panic!(
1338                    "BarAggregation {:?} is not currently implemented. Supported aggregations: MILLISECOND, SECOND, MINUTE, HOUR, DAY, WEEK, MONTH, YEAR, TICK, TICK_IMBALANCE, TICK_RUNS, VOLUME, VOLUME_IMBALANCE, VOLUME_RUNS, VALUE, VALUE_IMBALANCE, VALUE_RUNS, RENKO",
1339                    bar_type.spec().aggregation
1340                ),
1341            }
1342        }
1343    }
1344
1345    fn start_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
1346        // Get the instrument for this bar type
1347        let instrument = {
1348            let cache = self.cache.borrow();
1349            cache
1350                .instrument(&bar_type.instrument_id())
1351                .ok_or_else(|| {
1352                    anyhow::anyhow!(
1353                        "Cannot start bar aggregation: no instrument found for {}",
1354                        bar_type.instrument_id(),
1355                    )
1356                })?
1357                .clone()
1358        };
1359
1360        // Use standard form of bar type as key
1361        let bar_key = bar_type.standard();
1362
1363        // Create or retrieve aggregator in Rc<RefCell>
1364        let aggregator = if let Some(rc) = self.bar_aggregators.get(&bar_key) {
1365            rc.clone()
1366        } else {
1367            let agg = self.create_bar_aggregator(&instrument, bar_type);
1368            let rc = Rc::new(RefCell::new(agg));
1369            self.bar_aggregators.insert(bar_key, rc.clone());
1370            rc
1371        };
1372
1373        // Subscribe to underlying data topics
1374        let mut handlers = Vec::new();
1375
1376        if bar_type.is_composite() {
1377            let topic = switchboard::get_bars_topic(bar_type.composite());
1378            let handler =
1379                ShareableMessageHandler(Rc::new(BarBarHandler::new(aggregator.clone(), bar_key)));
1380
1381            if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1382                msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1383            }
1384
1385            handlers.push((topic, handler));
1386        } else if bar_type.spec().price_type == PriceType::Last {
1387            let topic = switchboard::get_trades_topic(bar_type.instrument_id());
1388            let handler =
1389                ShareableMessageHandler(Rc::new(BarTradeHandler::new(aggregator.clone(), bar_key)));
1390
1391            if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1392                msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1393            }
1394
1395            handlers.push((topic, handler));
1396        } else {
1397            let topic = switchboard::get_quotes_topic(bar_type.instrument_id());
1398            let handler =
1399                ShareableMessageHandler(Rc::new(BarQuoteHandler::new(aggregator.clone(), bar_key)));
1400
1401            if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1402                msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1403            }
1404
1405            handlers.push((topic, handler));
1406        }
1407
1408        self.bar_aggregator_handlers.insert(bar_key, handlers);
1409
1410        // Setup time bar aggregator if needed (matches Cython _setup_bar_aggregator)
1411        self.setup_bar_aggregator(bar_type, false)?;
1412
1413        aggregator.borrow_mut().set_is_running(true);
1414
1415        Ok(())
1416    }
1417
1418    /// Sets up a bar aggregator, matching Cython _setup_bar_aggregator logic.
1419    ///
1420    /// This method handles historical mode, message bus subscriptions, and time bar aggregator setup.
1421    fn setup_bar_aggregator(&mut self, bar_type: BarType, historical: bool) -> anyhow::Result<()> {
1422        let bar_key = bar_type.standard();
1423        let aggregator = self.bar_aggregators.get(&bar_key).ok_or_else(|| {
1424            anyhow::anyhow!("Cannot setup bar aggregator: no aggregator found for {bar_type}")
1425        })?;
1426
1427        // Set historical mode and handler
1428        let handler: Box<dyn FnMut(Bar)> = if historical {
1429            // Historical handler - process_historical equivalent
1430            let cache = self.cache.clone();
1431            Box::new(move |bar: Bar| {
1432                if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
1433                    log_error_on_cache_insert(&e);
1434                }
1435                // In historical mode, bars are processed but not published to message bus
1436            })
1437        } else {
1438            // Regular handler - process equivalent
1439            let cache = self.cache.clone();
1440            Box::new(move |bar: Bar| {
1441                if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
1442                    log_error_on_cache_insert(&e);
1443                }
1444                let topic = switchboard::get_bars_topic(bar.bar_type);
1445                msgbus::publish(topic, &bar as &dyn Any);
1446            })
1447        };
1448
1449        aggregator
1450            .borrow_mut()
1451            .set_historical_mode(historical, handler);
1452
1453        // For TimeBarAggregator, set clock and start timer
1454        if bar_type.spec().is_time_aggregated() {
1455            use nautilus_common::clock::TestClock;
1456
1457            if historical {
1458                // Each aggregator gets its own independent clock
1459                let test_clock = Rc::new(RefCell::new(TestClock::new()));
1460                aggregator.borrow_mut().set_clock(test_clock);
1461                // Set weak reference for historical mode (start_timer called later from preprocess_historical_events)
1462                // Store weak reference so start_timer can use it when called later
1463                let aggregator_weak = Rc::downgrade(aggregator);
1464                aggregator.borrow_mut().set_aggregator_weak(aggregator_weak);
1465            } else {
1466                aggregator.borrow_mut().set_clock(self.clock.clone());
1467                aggregator
1468                    .borrow_mut()
1469                    .start_timer(Some(aggregator.clone()));
1470            }
1471        }
1472
1473        Ok(())
1474    }
1475
1476    fn stop_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
1477        let aggregator = self
1478            .bar_aggregators
1479            .remove(&bar_type.standard())
1480            .ok_or_else(|| {
1481                anyhow::anyhow!("Cannot stop bar aggregator: no aggregator to stop for {bar_type}")
1482            })?;
1483
1484        aggregator.borrow_mut().stop();
1485
1486        // Unsubscribe any registered message handlers
1487        let bar_key = bar_type.standard();
1488        if let Some(subs) = self.bar_aggregator_handlers.remove(&bar_key) {
1489            for (topic, handler) in subs {
1490                if msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1491                    msgbus::unsubscribe_topic(topic, handler);
1492                }
1493            }
1494        }
1495
1496        Ok(())
1497    }
1498}
1499
1500#[inline(always)]
1501fn log_error_on_cache_insert<T: Display>(e: &T) {
1502    log::error!("Error on cache insert: {e}");
1503}