nautilus_data/engine/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a high-performance `DataEngine` for all environments.
17//!
18//! The `DataEngine` is the central component of the entire data stack.
19//! The data engines primary responsibility is to orchestrate interactions between
20//! the `DataClient` instances, and the rest of the platform. This includes sending
21//! requests to, and receiving responses from, data endpoints via its registered
22//! data clients.
23//!
24//! The engine employs a simple fan-in fan-out messaging pattern to execute
25//! `DataCommand` type messages, and process `DataResponse` messages or market data
26//! objects.
27//!
28//! Alternative implementations can be written on top of the generic engine - which
29//! just need to override the `execute`, `process`, `send` and `receive` methods.
30
31pub mod book;
32pub mod config;
33mod handlers;
34
35#[cfg(feature = "defi")]
36pub mod pool;
37
38use std::{
39    any::Any,
40    cell::{Ref, RefCell},
41    collections::hash_map::Entry,
42    fmt::Display,
43    num::NonZeroUsize,
44    rc::Rc,
45};
46
47use ahash::{AHashMap, AHashSet};
48use book::{BookSnapshotInfo, BookSnapshotter, BookUpdater};
49use config::DataEngineConfig;
50use handlers::{BarBarHandler, BarQuoteHandler, BarTradeHandler};
51use indexmap::IndexMap;
52use nautilus_common::{
53    cache::Cache,
54    clock::Clock,
55    logging::{RECV, RES},
56    messages::data::{
57        DataCommand, DataResponse, RequestCommand, SubscribeBars, SubscribeBookDeltas,
58        SubscribeBookDepth10, SubscribeBookSnapshots, SubscribeCommand, UnsubscribeBars,
59        UnsubscribeBookDeltas, UnsubscribeBookDepth10, UnsubscribeBookSnapshots,
60        UnsubscribeCommand,
61    },
62    msgbus::{self, MStr, Topic, handler::ShareableMessageHandler, switchboard},
63    timer::{TimeEvent, TimeEventCallback},
64};
65use nautilus_core::{
66    correctness::{
67        FAILED, check_key_in_map, check_key_not_in_map, check_predicate_false, check_predicate_true,
68    },
69    datetime::millis_to_nanos,
70};
71#[cfg(feature = "defi")]
72use nautilus_model::defi::DefiData;
73use nautilus_model::{
74    data::{
75        Bar, BarType, Data, DataType, FundingRateUpdate, IndexPriceUpdate, InstrumentClose,
76        MarkPriceUpdate, OrderBookDelta, OrderBookDeltas, OrderBookDepth10, QuoteTick, TradeTick,
77    },
78    enums::{AggregationSource, BarAggregation, BookType, PriceType, RecordFlag},
79    identifiers::{ClientId, InstrumentId, Venue},
80    instruments::{Instrument, InstrumentAny, SyntheticInstrument},
81    orderbook::OrderBook,
82};
83use nautilus_persistence::backend::catalog::ParquetDataCatalog;
84use ustr::Ustr;
85
86#[cfg(feature = "defi")]
87#[allow(unused_imports)] // Brings DeFi impl blocks into scope
88use crate::defi::engine as _;
89#[cfg(feature = "defi")]
90use crate::engine::pool::PoolUpdater;
91use crate::{
92    aggregation::{
93        BarAggregator, RenkoBarAggregator, TickBarAggregator, TickImbalanceBarAggregator,
94        TickRunsBarAggregator, TimeBarAggregator, ValueBarAggregator, ValueImbalanceBarAggregator,
95        ValueRunsBarAggregator, VolumeBarAggregator, VolumeImbalanceBarAggregator,
96        VolumeRunsBarAggregator,
97    },
98    client::DataClientAdapter,
99};
100
101/// Provides a high-performance `DataEngine` for all environments.
102#[derive(Debug)]
103pub struct DataEngine {
104    pub(crate) clock: Rc<RefCell<dyn Clock>>,
105    pub(crate) cache: Rc<RefCell<Cache>>,
106    pub(crate) external_clients: AHashSet<ClientId>,
107    clients: IndexMap<ClientId, DataClientAdapter>,
108    default_client: Option<DataClientAdapter>,
109    catalogs: AHashMap<Ustr, ParquetDataCatalog>,
110    routing_map: IndexMap<Venue, ClientId>,
111    book_intervals: AHashMap<NonZeroUsize, AHashSet<InstrumentId>>,
112    book_updaters: AHashMap<InstrumentId, Rc<BookUpdater>>,
113    book_snapshotters: AHashMap<InstrumentId, Rc<BookSnapshotter>>,
114    bar_aggregators: AHashMap<BarType, Rc<RefCell<Box<dyn BarAggregator>>>>,
115    bar_aggregator_handlers: AHashMap<BarType, Vec<(MStr<Topic>, ShareableMessageHandler)>>,
116    _synthetic_quote_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
117    _synthetic_trade_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
118    buffered_deltas_map: AHashMap<InstrumentId, OrderBookDeltas>,
119    pub(crate) msgbus_priority: u8,
120    pub(crate) config: DataEngineConfig,
121    #[cfg(feature = "defi")]
122    pub(crate) pool_updaters: AHashMap<InstrumentId, Rc<PoolUpdater>>,
123    #[cfg(feature = "defi")]
124    pub(crate) pool_updaters_pending: AHashSet<InstrumentId>,
125    #[cfg(feature = "defi")]
126    pub(crate) pool_snapshot_pending: AHashSet<InstrumentId>,
127    #[cfg(feature = "defi")]
128    pub(crate) pool_event_buffers: AHashMap<InstrumentId, Vec<DefiData>>,
129}
130
131impl DataEngine {
132    /// Creates a new [`DataEngine`] instance.
133    #[must_use]
134    pub fn new(
135        clock: Rc<RefCell<dyn Clock>>,
136        cache: Rc<RefCell<Cache>>,
137        config: Option<DataEngineConfig>,
138    ) -> Self {
139        let config = config.unwrap_or_default();
140
141        let external_clients: AHashSet<ClientId> = config
142            .external_clients
143            .clone()
144            .unwrap_or_default()
145            .into_iter()
146            .collect();
147
148        Self {
149            clock,
150            cache,
151            external_clients,
152            clients: IndexMap::new(),
153            default_client: None,
154            catalogs: AHashMap::new(),
155            routing_map: IndexMap::new(),
156            book_intervals: AHashMap::new(),
157            book_updaters: AHashMap::new(),
158            book_snapshotters: AHashMap::new(),
159            bar_aggregators: AHashMap::new(),
160            bar_aggregator_handlers: AHashMap::new(),
161            _synthetic_quote_feeds: AHashMap::new(),
162            _synthetic_trade_feeds: AHashMap::new(),
163            buffered_deltas_map: AHashMap::new(),
164            msgbus_priority: 10, // High-priority for built-in component
165            config,
166            #[cfg(feature = "defi")]
167            pool_updaters: AHashMap::new(),
168            #[cfg(feature = "defi")]
169            pool_updaters_pending: AHashSet::new(),
170            #[cfg(feature = "defi")]
171            pool_snapshot_pending: AHashSet::new(),
172            #[cfg(feature = "defi")]
173            pool_event_buffers: AHashMap::new(),
174        }
175    }
176
177    /// Returns a read-only reference to the engines clock.
178    #[must_use]
179    pub fn get_clock(&self) -> Ref<'_, dyn Clock> {
180        self.clock.borrow()
181    }
182
183    /// Returns a read-only reference to the engines cache.
184    #[must_use]
185    pub fn get_cache(&self) -> Ref<'_, Cache> {
186        self.cache.borrow()
187    }
188
189    /// Returns the `Rc<RefCell<Cache>>` used by this engine.
190    #[must_use]
191    pub fn cache_rc(&self) -> Rc<RefCell<Cache>> {
192        Rc::clone(&self.cache)
193    }
194
195    /// Registers the `catalog` with the engine with an optional specific `name`.
196    ///
197    /// # Panics
198    ///
199    /// Panics if a catalog with the same `name` has already been registered.
200    pub fn register_catalog(&mut self, catalog: ParquetDataCatalog, name: Option<String>) {
201        let name = Ustr::from(&name.unwrap_or("catalog_0".to_string()));
202
203        check_key_not_in_map(&name, &self.catalogs, "name", "catalogs").expect(FAILED);
204
205        self.catalogs.insert(name, catalog);
206        log::info!("Registered catalog <{name}>");
207    }
208
209    /// Registers the `client` with the engine with an optional venue `routing`.
210    ///
211    ///
212    /// # Panics
213    ///
214    /// Panics if a client with the same client ID has already been registered.
215    pub fn register_client(&mut self, client: DataClientAdapter, routing: Option<Venue>) {
216        let client_id = client.client_id();
217
218        if let Some(default_client) = &self.default_client {
219            check_predicate_false(
220                default_client.client_id() == client.client_id(),
221                "client_id already registered as default client",
222            )
223            .expect(FAILED);
224        }
225
226        check_key_not_in_map(&client_id, &self.clients, "client_id", "clients").expect(FAILED);
227
228        if let Some(routing) = routing {
229            self.routing_map.insert(routing, client_id);
230            log::info!("Set client {client_id} routing for {routing}");
231        }
232
233        if client.venue.is_none() && self.default_client.is_none() {
234            self.default_client = Some(client);
235            log::info!("Registered client {client_id} for default routing");
236        } else {
237            self.clients.insert(client_id, client);
238            log::info!("Registered client {client_id}");
239        }
240    }
241
242    /// Deregisters the client for the `client_id`.
243    ///
244    /// # Panics
245    ///
246    /// Panics if the client ID has not been registered.
247    pub fn deregister_client(&mut self, client_id: &ClientId) {
248        check_key_in_map(client_id, &self.clients, "client_id", "clients").expect(FAILED);
249
250        self.clients.shift_remove(client_id);
251        log::info!("Deregistered client {client_id}");
252    }
253
254    /// Registers the data `client` with the engine as the default routing client.
255    ///
256    /// When a specific venue routing cannot be found, this client will receive messages.
257    ///
258    /// # Warnings
259    ///
260    /// Any existing default routing client will be overwritten.
261    ///
262    /// # Panics
263    ///
264    /// Panics if a default client has already been registered.
265    pub fn register_default_client(&mut self, client: DataClientAdapter) {
266        check_predicate_true(
267            self.default_client.is_none(),
268            "default client already registered",
269        )
270        .expect(FAILED);
271
272        let client_id = client.client_id();
273
274        self.default_client = Some(client);
275        log::info!("Registered default client {client_id}");
276    }
277
278    /// Starts all registered data clients.
279    pub fn start(&mut self) {
280        for client in self.get_clients_mut() {
281            if let Err(e) = client.start() {
282                log::error!("{e}");
283            }
284        }
285    }
286
287    /// Stops all registered data clients.
288    pub fn stop(&mut self) {
289        for client in self.get_clients_mut() {
290            if let Err(e) = client.stop() {
291                log::error!("{e}");
292            }
293        }
294    }
295
296    /// Resets all registered data clients to their initial state.
297    pub fn reset(&mut self) {
298        for client in self.get_clients_mut() {
299            if let Err(e) = client.reset() {
300                log::error!("{e}");
301            }
302        }
303    }
304
305    /// Disposes the engine, stopping all clients and canceling any timers.
306    pub fn dispose(&mut self) {
307        for client in self.get_clients_mut() {
308            if let Err(e) = client.dispose() {
309                log::error!("{e}");
310            }
311        }
312
313        self.clock.borrow_mut().cancel_timers();
314    }
315
316    /// Returns `true` if all registered data clients are currently connected.
317    #[must_use]
318    pub fn check_connected(&self) -> bool {
319        self.get_clients()
320            .iter()
321            .all(|client| client.is_connected())
322    }
323
324    /// Returns `true` if all registered data clients are currently disconnected.
325    #[must_use]
326    pub fn check_disconnected(&self) -> bool {
327        self.get_clients()
328            .iter()
329            .all(|client| !client.is_connected())
330    }
331
332    /// Returns a list of all registered client IDs, including the default client if set.
333    #[must_use]
334    pub fn registered_clients(&self) -> Vec<ClientId> {
335        self.get_clients()
336            .into_iter()
337            .map(|client| client.client_id())
338            .collect()
339    }
340
341    // -- SUBSCRIPTIONS ---------------------------------------------------------------------------
342
343    pub(crate) fn collect_subscriptions<F, T>(&self, get_subs: F) -> Vec<T>
344    where
345        F: Fn(&DataClientAdapter) -> &AHashSet<T>,
346        T: Clone,
347    {
348        self.get_clients()
349            .into_iter()
350            .flat_map(get_subs)
351            .cloned()
352            .collect()
353    }
354
355    #[must_use]
356    pub fn get_clients(&self) -> Vec<&DataClientAdapter> {
357        let (default_opt, clients_map) = (&self.default_client, &self.clients);
358        let mut clients: Vec<&DataClientAdapter> = clients_map.values().collect();
359
360        if let Some(default) = default_opt {
361            clients.push(default);
362        }
363
364        clients
365    }
366
367    #[must_use]
368    pub fn get_clients_mut(&mut self) -> Vec<&mut DataClientAdapter> {
369        let (default_opt, clients_map) = (&mut self.default_client, &mut self.clients);
370        let mut clients: Vec<&mut DataClientAdapter> = clients_map.values_mut().collect();
371
372        if let Some(default) = default_opt {
373            clients.push(default);
374        }
375
376        clients
377    }
378
379    pub fn get_client(
380        &mut self,
381        client_id: Option<&ClientId>,
382        venue: Option<&Venue>,
383    ) -> Option<&mut DataClientAdapter> {
384        if let Some(client_id) = client_id {
385            // Explicit ID: first look in registered clients
386            if let Some(client) = self.clients.get_mut(client_id) {
387                return Some(client);
388            }
389
390            // Then check if it matches the default client
391            if let Some(default) = self.default_client.as_mut()
392                && default.client_id() == *client_id
393            {
394                return Some(default);
395            }
396
397            // Unknown explicit client
398            return None;
399        }
400
401        if let Some(v) = venue {
402            // Route by venue if mapped client still registered
403            if let Some(client_id) = self.routing_map.get(v) {
404                return self.clients.get_mut(client_id);
405            }
406        }
407
408        // Fallback to default client
409        self.get_default_client()
410    }
411
412    const fn get_default_client(&mut self) -> Option<&mut DataClientAdapter> {
413        self.default_client.as_mut()
414    }
415
416    /// Returns all custom data types currently subscribed across all clients.
417    #[must_use]
418    pub fn subscribed_custom_data(&self) -> Vec<DataType> {
419        self.collect_subscriptions(|client| &client.subscriptions_custom)
420    }
421
422    /// Returns all instrument IDs currently subscribed across all clients.
423    #[must_use]
424    pub fn subscribed_instruments(&self) -> Vec<InstrumentId> {
425        self.collect_subscriptions(|client| &client.subscriptions_instrument)
426    }
427
428    /// Returns all instrument IDs for which book delta subscriptions exist.
429    #[must_use]
430    pub fn subscribed_book_deltas(&self) -> Vec<InstrumentId> {
431        self.collect_subscriptions(|client| &client.subscriptions_book_deltas)
432    }
433
434    /// Returns all instrument IDs for which book snapshot subscriptions exist.
435    #[must_use]
436    pub fn subscribed_book_snapshots(&self) -> Vec<InstrumentId> {
437        self.collect_subscriptions(|client| &client.subscriptions_book_snapshots)
438    }
439
440    /// Returns all instrument IDs for which quote subscriptions exist.
441    #[must_use]
442    pub fn subscribed_quotes(&self) -> Vec<InstrumentId> {
443        self.collect_subscriptions(|client| &client.subscriptions_quotes)
444    }
445
446    /// Returns all instrument IDs for which trade subscriptions exist.
447    #[must_use]
448    pub fn subscribed_trades(&self) -> Vec<InstrumentId> {
449        self.collect_subscriptions(|client| &client.subscriptions_trades)
450    }
451
452    /// Returns all bar types currently subscribed across all clients.
453    #[must_use]
454    pub fn subscribed_bars(&self) -> Vec<BarType> {
455        self.collect_subscriptions(|client| &client.subscriptions_bars)
456    }
457
458    /// Returns all instrument IDs for which mark price subscriptions exist.
459    #[must_use]
460    pub fn subscribed_mark_prices(&self) -> Vec<InstrumentId> {
461        self.collect_subscriptions(|client| &client.subscriptions_mark_prices)
462    }
463
464    /// Returns all instrument IDs for which index price subscriptions exist.
465    #[must_use]
466    pub fn subscribed_index_prices(&self) -> Vec<InstrumentId> {
467        self.collect_subscriptions(|client| &client.subscriptions_index_prices)
468    }
469
470    /// Returns all instrument IDs for which funding rate subscriptions exist.
471    #[must_use]
472    pub fn subscribed_funding_rates(&self) -> Vec<InstrumentId> {
473        self.collect_subscriptions(|client| &client.subscriptions_funding_rates)
474    }
475
476    /// Returns all instrument IDs for which status subscriptions exist.
477    #[must_use]
478    pub fn subscribed_instrument_status(&self) -> Vec<InstrumentId> {
479        self.collect_subscriptions(|client| &client.subscriptions_instrument_status)
480    }
481
482    /// Returns all instrument IDs for which instrument close subscriptions exist.
483    #[must_use]
484    pub fn subscribed_instrument_close(&self) -> Vec<InstrumentId> {
485        self.collect_subscriptions(|client| &client.subscriptions_instrument_close)
486    }
487
488    // -- COMMANDS --------------------------------------------------------------------------------
489
490    /// Executes a `DataCommand` by delegating to subscribe, unsubscribe, or request handlers.
491    ///
492    /// Errors during execution are logged.
493    pub fn execute(&mut self, cmd: &DataCommand) {
494        if let Err(e) = match cmd {
495            DataCommand::Subscribe(c) => self.execute_subscribe(c),
496            DataCommand::Unsubscribe(c) => self.execute_unsubscribe(c),
497            DataCommand::Request(c) => self.execute_request(c),
498            #[cfg(feature = "defi")]
499            DataCommand::DefiRequest(c) => self.execute_defi_request(c),
500            #[cfg(feature = "defi")]
501            DataCommand::DefiSubscribe(c) => self.execute_defi_subscribe(c),
502            #[cfg(feature = "defi")]
503            DataCommand::DefiUnsubscribe(c) => self.execute_defi_unsubscribe(c),
504            _ => {
505                log::warn!("Unhandled DataCommand variant: {cmd:?}");
506                Ok(())
507            }
508        } {
509            log::error!("{e}");
510        }
511    }
512
513    /// Handles a subscribe command, updating internal state and forwarding to the client.
514    ///
515    /// # Errors
516    ///
517    /// Returns an error if the subscription is invalid (e.g., synthetic instrument for book data),
518    /// or if the underlying client operation fails.
519    pub fn execute_subscribe(&mut self, cmd: &SubscribeCommand) -> anyhow::Result<()> {
520        // Update internal engine state
521        match &cmd {
522            SubscribeCommand::BookDeltas(cmd) => self.subscribe_book_deltas(cmd)?,
523            SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd)?,
524            SubscribeCommand::BookSnapshots(cmd) => self.subscribe_book_snapshots(cmd)?,
525            SubscribeCommand::Bars(cmd) => self.subscribe_bars(cmd)?,
526            _ => {} // Do nothing else
527        }
528
529        if let Some(client_id) = cmd.client_id()
530            && self.external_clients.contains(client_id)
531        {
532            if self.config.debug {
533                log::debug!("Skipping subscribe command for external client {client_id}: {cmd:?}",);
534            }
535            return Ok(());
536        }
537
538        if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
539            client.execute_subscribe(cmd);
540        } else {
541            log::error!(
542                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
543                cmd.client_id(),
544                cmd.venue(),
545            );
546        }
547
548        Ok(())
549    }
550
551    /// Handles an unsubscribe command, updating internal state and forwarding to the client.
552    ///
553    /// # Errors
554    ///
555    /// Returns an error if the underlying client operation fails.
556    pub fn execute_unsubscribe(&mut self, cmd: &UnsubscribeCommand) -> anyhow::Result<()> {
557        match &cmd {
558            UnsubscribeCommand::BookDeltas(cmd) => self.unsubscribe_book_deltas(cmd)?,
559            UnsubscribeCommand::BookDepth10(cmd) => self.unsubscribe_book_depth10(cmd)?,
560            UnsubscribeCommand::BookSnapshots(cmd) => self.unsubscribe_book_snapshots(cmd)?,
561            UnsubscribeCommand::Bars(cmd) => self.unsubscribe_bars(cmd)?,
562            _ => {} // Do nothing else
563        }
564
565        if let Some(client_id) = cmd.client_id()
566            && self.external_clients.contains(client_id)
567        {
568            if self.config.debug {
569                log::debug!(
570                    "Skipping unsubscribe command for external client {client_id}: {cmd:?}",
571                );
572            }
573            return Ok(());
574        }
575
576        if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
577            client.execute_unsubscribe(cmd);
578        } else {
579            log::error!(
580                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
581                cmd.client_id(),
582                cmd.venue(),
583            );
584        }
585
586        Ok(())
587    }
588
589    /// Sends a [`RequestCommand`] to a suitable data client implementation.
590    ///
591    /// # Errors
592    ///
593    /// Returns an error if no client is found for the given client ID or venue,
594    /// or if the client fails to process the request.
595    pub fn execute_request(&mut self, req: &RequestCommand) -> anyhow::Result<()> {
596        // Skip requests for external clients
597        if let Some(cid) = req.client_id()
598            && self.external_clients.contains(cid)
599        {
600            if self.config.debug {
601                log::debug!("Skipping data request for external client {cid}: {req:?}");
602            }
603            return Ok(());
604        }
605        if let Some(client) = self.get_client(req.client_id(), req.venue()) {
606            match req {
607                RequestCommand::Data(req) => client.request_data(req),
608                RequestCommand::Instrument(req) => client.request_instrument(req),
609                RequestCommand::Instruments(req) => client.request_instruments(req),
610                RequestCommand::BookSnapshot(req) => client.request_book_snapshot(req),
611                RequestCommand::BookDepth(req) => client.request_book_depth(req),
612                RequestCommand::Quotes(req) => client.request_quotes(req),
613                RequestCommand::Trades(req) => client.request_trades(req),
614                RequestCommand::Bars(req) => client.request_bars(req),
615            }
616        } else {
617            anyhow::bail!(
618                "Cannot handle request: no client found for {:?} {:?}",
619                req.client_id(),
620                req.venue()
621            );
622        }
623    }
624
625    /// Processes a dynamically-typed data message.
626    ///
627    /// Currently supports `InstrumentAny` and `FundingRateUpdate`; unrecognized types are logged as errors.
628    pub fn process(&mut self, data: &dyn Any) {
629        // TODO: Eventually these could be added to the `Data` enum? process here for now
630        if let Some(data) = data.downcast_ref::<Data>() {
631            self.process_data(data.clone()); // TODO: Optimize (not necessary if we change handler)
632            return;
633        }
634
635        #[cfg(feature = "defi")]
636        if let Some(data) = data.downcast_ref::<DefiData>() {
637            self.process_defi_data(data.clone()); // TODO: Optimize (not necessary if we change handler)
638            return;
639        }
640
641        if let Some(instrument) = data.downcast_ref::<InstrumentAny>() {
642            self.handle_instrument(instrument.clone());
643        } else if let Some(funding_rate) = data.downcast_ref::<FundingRateUpdate>() {
644            self.handle_funding_rate(*funding_rate);
645        } else {
646            log::error!("Cannot process data {data:?}, type is unrecognized");
647        }
648    }
649
650    /// Processes a `Data` enum instance, dispatching to appropriate handlers.
651    pub fn process_data(&mut self, data: Data) {
652        match data {
653            Data::Delta(delta) => self.handle_delta(delta),
654            Data::Deltas(deltas) => self.handle_deltas(deltas.into_inner()),
655            Data::Depth10(depth) => self.handle_depth10(*depth),
656            Data::Quote(quote) => self.handle_quote(quote),
657            Data::Trade(trade) => self.handle_trade(trade),
658            Data::Bar(bar) => self.handle_bar(bar),
659            Data::MarkPriceUpdate(mark_price) => self.handle_mark_price(mark_price),
660            Data::IndexPriceUpdate(index_price) => self.handle_index_price(index_price),
661            Data::InstrumentClose(close) => self.handle_instrument_close(close),
662        }
663    }
664
665    /// Processes a `DataResponse`, handling and publishing the response message.
666    pub fn response(&self, resp: DataResponse) {
667        log::debug!("{RECV}{RES} {resp:?}");
668
669        match &resp {
670            DataResponse::Instrument(resp) => {
671                self.handle_instrument_response(resp.data.clone());
672            }
673            DataResponse::Instruments(resp) => {
674                self.handle_instruments(&resp.data);
675            }
676            DataResponse::Quotes(resp) => self.handle_quotes(&resp.data),
677            DataResponse::Trades(resp) => self.handle_trades(&resp.data),
678            DataResponse::Bars(resp) => self.handle_bars(&resp.data),
679            _ => todo!(),
680        }
681
682        msgbus::send_response(resp.correlation_id(), &resp);
683    }
684
685    // -- DATA HANDLERS ---------------------------------------------------------------------------
686
687    fn handle_instrument(&mut self, instrument: InstrumentAny) {
688        if let Err(e) = self
689            .cache
690            .as_ref()
691            .borrow_mut()
692            .add_instrument(instrument.clone())
693        {
694            log_error_on_cache_insert(&e);
695        }
696
697        let topic = switchboard::get_instrument_topic(instrument.id());
698        msgbus::publish(topic, &instrument as &dyn Any);
699    }
700
701    fn handle_delta(&mut self, delta: OrderBookDelta) {
702        let deltas = if self.config.buffer_deltas {
703            if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&delta.instrument_id) {
704                buffered_deltas.deltas.push(delta);
705                buffered_deltas.flags = delta.flags;
706                buffered_deltas.sequence = delta.sequence;
707                buffered_deltas.ts_event = delta.ts_event;
708                buffered_deltas.ts_init = delta.ts_init;
709            } else {
710                let buffered_deltas = OrderBookDeltas::new(delta.instrument_id, vec![delta]);
711                self.buffered_deltas_map
712                    .insert(delta.instrument_id, buffered_deltas);
713            }
714
715            if !RecordFlag::F_LAST.matches(delta.flags) {
716                return; // Not the last delta for event
717            }
718
719            // SAFETY: We know the deltas exists already
720            self.buffered_deltas_map
721                .remove(&delta.instrument_id)
722                .unwrap()
723        } else {
724            OrderBookDeltas::new(delta.instrument_id, vec![delta])
725        };
726
727        let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
728        msgbus::publish(topic, &deltas as &dyn Any);
729    }
730
731    fn handle_deltas(&mut self, deltas: OrderBookDeltas) {
732        let deltas = if self.config.buffer_deltas {
733            let mut is_last_delta = false;
734            for delta in &deltas.deltas {
735                if RecordFlag::F_LAST.matches(delta.flags) {
736                    is_last_delta = true;
737                    break;
738                }
739            }
740
741            let instrument_id = deltas.instrument_id;
742
743            if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&instrument_id) {
744                buffered_deltas.deltas.extend(deltas.deltas);
745
746                if let Some(last_delta) = buffered_deltas.deltas.last() {
747                    buffered_deltas.flags = last_delta.flags;
748                    buffered_deltas.sequence = last_delta.sequence;
749                    buffered_deltas.ts_event = last_delta.ts_event;
750                    buffered_deltas.ts_init = last_delta.ts_init;
751                }
752            } else {
753                self.buffered_deltas_map.insert(instrument_id, deltas);
754            }
755
756            if !is_last_delta {
757                return;
758            }
759
760            // SAFETY: We know the deltas exists already
761            self.buffered_deltas_map.remove(&instrument_id).unwrap()
762        } else {
763            deltas
764        };
765
766        let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
767        msgbus::publish(topic, &deltas as &dyn Any);
768    }
769
770    fn handle_depth10(&mut self, depth: OrderBookDepth10) {
771        let topic = switchboard::get_book_depth10_topic(depth.instrument_id);
772        msgbus::publish(topic, &depth as &dyn Any);
773    }
774
775    fn handle_quote(&mut self, quote: QuoteTick) {
776        if let Err(e) = self.cache.as_ref().borrow_mut().add_quote(quote) {
777            log_error_on_cache_insert(&e);
778        }
779
780        // TODO: Handle synthetics
781
782        let topic = switchboard::get_quotes_topic(quote.instrument_id);
783        msgbus::publish(topic, &quote as &dyn Any);
784    }
785
786    fn handle_trade(&mut self, trade: TradeTick) {
787        if let Err(e) = self.cache.as_ref().borrow_mut().add_trade(trade) {
788            log_error_on_cache_insert(&e);
789        }
790
791        // TODO: Handle synthetics
792
793        let topic = switchboard::get_trades_topic(trade.instrument_id);
794        msgbus::publish(topic, &trade as &dyn Any);
795    }
796
797    fn handle_bar(&mut self, bar: Bar) {
798        // TODO: Handle additional bar logic
799        if self.config.validate_data_sequence
800            && let Some(last_bar) = self.cache.as_ref().borrow().bar(&bar.bar_type)
801        {
802            if bar.ts_event < last_bar.ts_event {
803                log::warn!(
804                    "Bar {bar} was prior to last bar `ts_event` {}",
805                    last_bar.ts_event
806                );
807                return; // Bar is out of sequence
808            }
809            if bar.ts_init < last_bar.ts_init {
810                log::warn!(
811                    "Bar {bar} was prior to last bar `ts_init` {}",
812                    last_bar.ts_init
813                );
814                return; // Bar is out of sequence
815            }
816            // TODO: Implement `bar.is_revision` logic
817        }
818
819        if let Err(e) = self.cache.as_ref().borrow_mut().add_bar(bar) {
820            log_error_on_cache_insert(&e);
821        }
822
823        let topic = switchboard::get_bars_topic(bar.bar_type);
824        msgbus::publish(topic, &bar as &dyn Any);
825    }
826
827    fn handle_mark_price(&mut self, mark_price: MarkPriceUpdate) {
828        if let Err(e) = self.cache.as_ref().borrow_mut().add_mark_price(mark_price) {
829            log_error_on_cache_insert(&e);
830        }
831
832        let topic = switchboard::get_mark_price_topic(mark_price.instrument_id);
833        msgbus::publish(topic, &mark_price as &dyn Any);
834    }
835
836    fn handle_index_price(&mut self, index_price: IndexPriceUpdate) {
837        if let Err(e) = self
838            .cache
839            .as_ref()
840            .borrow_mut()
841            .add_index_price(index_price)
842        {
843            log_error_on_cache_insert(&e);
844        }
845
846        let topic = switchboard::get_index_price_topic(index_price.instrument_id);
847        msgbus::publish(topic, &index_price as &dyn Any);
848    }
849
850    /// Handles a funding rate update by adding it to the cache and publishing to the message bus.
851    pub fn handle_funding_rate(&mut self, funding_rate: FundingRateUpdate) {
852        if let Err(e) = self
853            .cache
854            .as_ref()
855            .borrow_mut()
856            .add_funding_rate(funding_rate)
857        {
858            log_error_on_cache_insert(&e);
859        }
860
861        let topic = switchboard::get_funding_rate_topic(funding_rate.instrument_id);
862        msgbus::publish(topic, &funding_rate as &dyn Any);
863    }
864
865    fn handle_instrument_close(&mut self, close: InstrumentClose) {
866        let topic = switchboard::get_instrument_close_topic(close.instrument_id);
867        msgbus::publish(topic, &close as &dyn Any);
868    }
869
870    // -- SUBSCRIPTION HANDLERS -------------------------------------------------------------------
871
872    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
873        if cmd.instrument_id.is_synthetic() {
874            anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
875        }
876
877        self.setup_book_updater(&cmd.instrument_id, cmd.book_type, true, cmd.managed)?;
878
879        Ok(())
880    }
881
882    fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
883        if cmd.instrument_id.is_synthetic() {
884            anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDepth10` data");
885        }
886
887        self.setup_book_updater(&cmd.instrument_id, cmd.book_type, false, cmd.managed)?;
888
889        Ok(())
890    }
891
892    fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
893        if self.subscribed_book_deltas().contains(&cmd.instrument_id) {
894            return Ok(());
895        }
896
897        if cmd.instrument_id.is_synthetic() {
898            anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
899        }
900
901        // Track snapshot intervals per instrument, and set up timer on first subscription
902        let first_for_interval = match self.book_intervals.entry(cmd.interval_ms) {
903            Entry::Vacant(e) => {
904                let mut set = AHashSet::new();
905                set.insert(cmd.instrument_id);
906                e.insert(set);
907                true
908            }
909            Entry::Occupied(mut e) => {
910                e.get_mut().insert(cmd.instrument_id);
911                false
912            }
913        };
914
915        if first_for_interval {
916            // Initialize snapshotter and schedule its timer
917            let interval_ns = millis_to_nanos(cmd.interval_ms.get() as f64);
918            let topic = switchboard::get_book_snapshots_topic(cmd.instrument_id, cmd.interval_ms);
919
920            let snap_info = BookSnapshotInfo {
921                instrument_id: cmd.instrument_id,
922                venue: cmd.instrument_id.venue,
923                is_composite: cmd.instrument_id.symbol.is_composite(),
924                root: Ustr::from(cmd.instrument_id.symbol.root()),
925                topic,
926                interval_ms: cmd.interval_ms,
927            };
928
929            // Schedule the first snapshot at the next interval boundary
930            let now_ns = self.clock.borrow().timestamp_ns().as_u64();
931            let start_time_ns = now_ns - (now_ns % interval_ns) + interval_ns;
932
933            let snapshotter = Rc::new(BookSnapshotter::new(snap_info, self.cache.clone()));
934            self.book_snapshotters
935                .insert(cmd.instrument_id, snapshotter.clone());
936            let timer_name = snapshotter.timer_name;
937
938            let callback_fn: Rc<dyn Fn(TimeEvent)> =
939                Rc::new(move |event| snapshotter.snapshot(event));
940            let callback = TimeEventCallback::from(callback_fn);
941
942            self.clock
943                .borrow_mut()
944                .set_timer_ns(
945                    &timer_name,
946                    interval_ns,
947                    Some(start_time_ns.into()),
948                    None,
949                    Some(callback),
950                    None,
951                    None,
952                )
953                .expect(FAILED);
954        }
955
956        self.setup_book_updater(&cmd.instrument_id, cmd.book_type, false, true)?;
957
958        Ok(())
959    }
960
961    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
962        match cmd.bar_type.aggregation_source() {
963            AggregationSource::Internal => {
964                if !self.bar_aggregators.contains_key(&cmd.bar_type.standard()) {
965                    self.start_bar_aggregator(cmd.bar_type)?;
966                }
967            }
968            AggregationSource::External => {
969                if cmd.bar_type.instrument_id().is_synthetic() {
970                    anyhow::bail!(
971                        "Cannot subscribe for externally aggregated synthetic instrument bar data"
972                    );
973                }
974            }
975        }
976
977        Ok(())
978    }
979
980    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
981        if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
982            log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
983            return Ok(());
984        }
985
986        let topics = vec![
987            switchboard::get_book_deltas_topic(cmd.instrument_id),
988            switchboard::get_book_depth10_topic(cmd.instrument_id),
989            // TODO: Unsubscribe from snapshots?
990        ];
991
992        self.maintain_book_updater(&cmd.instrument_id, &topics);
993        self.maintain_book_snapshotter(&cmd.instrument_id);
994
995        Ok(())
996    }
997
998    fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
999        if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
1000            log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
1001            return Ok(());
1002        }
1003
1004        let topics = vec![
1005            switchboard::get_book_deltas_topic(cmd.instrument_id),
1006            switchboard::get_book_depth10_topic(cmd.instrument_id),
1007            // TODO: Unsubscribe from snapshots?
1008        ];
1009
1010        self.maintain_book_updater(&cmd.instrument_id, &topics);
1011        self.maintain_book_snapshotter(&cmd.instrument_id);
1012
1013        Ok(())
1014    }
1015
1016    fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
1017        if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
1018            log::warn!("Cannot unsubscribe from `OrderBook` snapshots: not subscribed");
1019            return Ok(());
1020        }
1021
1022        // Remove instrument from interval tracking, and drop empty intervals
1023        let mut to_remove = Vec::new();
1024        for (interval, set) in &mut self.book_intervals {
1025            if set.remove(&cmd.instrument_id) && set.is_empty() {
1026                to_remove.push(*interval);
1027            }
1028        }
1029
1030        for interval in to_remove {
1031            self.book_intervals.remove(&interval);
1032        }
1033
1034        let topics = vec![
1035            switchboard::get_book_deltas_topic(cmd.instrument_id),
1036            switchboard::get_book_depth10_topic(cmd.instrument_id),
1037            // TODO: Unsubscribe from snapshots (add interval_ms to message?)
1038        ];
1039
1040        self.maintain_book_updater(&cmd.instrument_id, &topics);
1041        self.maintain_book_snapshotter(&cmd.instrument_id);
1042
1043        Ok(())
1044    }
1045
1046    /// Unsubscribe internal bar aggregator for the given bar type.
1047    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
1048        // If we have an internal aggregator for this bar type, stop and remove it
1049        let bar_type = cmd.bar_type;
1050        if self.bar_aggregators.contains_key(&bar_type.standard()) {
1051            if let Err(e) = self.stop_bar_aggregator(bar_type) {
1052                log::error!("Error stopping bar aggregator for {bar_type}: {e}");
1053            }
1054            self.bar_aggregators.remove(&bar_type.standard());
1055            log::debug!("Removed bar aggregator for {bar_type}");
1056        }
1057        Ok(())
1058    }
1059
1060    fn maintain_book_updater(&mut self, instrument_id: &InstrumentId, topics: &[MStr<Topic>]) {
1061        if let Some(updater) = self.book_updaters.get(instrument_id) {
1062            let handler = ShareableMessageHandler(updater.clone());
1063
1064            // Unsubscribe handler if it is the last subscriber
1065            for topic in topics {
1066                if msgbus::subscriptions_count(topic.as_str()) == 1
1067                    && msgbus::is_subscribed(topic.as_str(), handler.clone())
1068                {
1069                    log::debug!("Unsubscribing BookUpdater from {topic}");
1070                    msgbus::unsubscribe_topic(*topic, handler.clone());
1071                }
1072            }
1073
1074            // Check remaining subscriptions, if none then remove updater
1075            let still_subscribed = topics
1076                .iter()
1077                .any(|topic| msgbus::is_subscribed(topic.as_str(), handler.clone()));
1078            if !still_subscribed {
1079                self.book_updaters.remove(instrument_id);
1080                log::debug!("Removed BookUpdater for instrument ID {instrument_id}");
1081            }
1082        }
1083    }
1084
1085    fn maintain_book_snapshotter(&mut self, instrument_id: &InstrumentId) {
1086        if let Some(snapshotter) = self.book_snapshotters.get(instrument_id) {
1087            let topic = switchboard::get_book_snapshots_topic(
1088                *instrument_id,
1089                snapshotter.snap_info.interval_ms,
1090            );
1091
1092            // Check remaining snapshot subscriptions, if none then remove snapshotter
1093            if msgbus::subscriptions_count(topic.as_str()) == 0 {
1094                let timer_name = snapshotter.timer_name;
1095                self.book_snapshotters.remove(instrument_id);
1096                let mut clock = self.clock.borrow_mut();
1097                if clock.timer_exists(&timer_name) {
1098                    clock.cancel_timer(&timer_name);
1099                }
1100                log::debug!("Removed BookSnapshotter for instrument ID {instrument_id}");
1101            }
1102        }
1103    }
1104
1105    // -- RESPONSE HANDLERS -----------------------------------------------------------------------
1106
1107    fn handle_instrument_response(&self, instrument: InstrumentAny) {
1108        let mut cache = self.cache.as_ref().borrow_mut();
1109        if let Err(e) = cache.add_instrument(instrument) {
1110            log_error_on_cache_insert(&e);
1111        }
1112    }
1113
1114    fn handle_instruments(&self, instruments: &[InstrumentAny]) {
1115        // TODO: Improve by adding bulk update methods to cache and database
1116        let mut cache = self.cache.as_ref().borrow_mut();
1117        for instrument in instruments {
1118            if let Err(e) = cache.add_instrument(instrument.clone()) {
1119                log_error_on_cache_insert(&e);
1120            }
1121        }
1122    }
1123
1124    fn handle_quotes(&self, quotes: &[QuoteTick]) {
1125        if let Err(e) = self.cache.as_ref().borrow_mut().add_quotes(quotes) {
1126            log_error_on_cache_insert(&e);
1127        }
1128    }
1129
1130    fn handle_trades(&self, trades: &[TradeTick]) {
1131        if let Err(e) = self.cache.as_ref().borrow_mut().add_trades(trades) {
1132            log_error_on_cache_insert(&e);
1133        }
1134    }
1135
1136    fn handle_bars(&self, bars: &[Bar]) {
1137        if let Err(e) = self.cache.as_ref().borrow_mut().add_bars(bars) {
1138            log_error_on_cache_insert(&e);
1139        }
1140    }
1141
1142    // -- INTERNAL --------------------------------------------------------------------------------
1143
1144    #[allow(clippy::too_many_arguments)]
1145    fn setup_book_updater(
1146        &mut self,
1147        instrument_id: &InstrumentId,
1148        book_type: BookType,
1149        only_deltas: bool,
1150        managed: bool,
1151    ) -> anyhow::Result<()> {
1152        let mut cache = self.cache.borrow_mut();
1153        if managed && !cache.has_order_book(instrument_id) {
1154            let book = OrderBook::new(*instrument_id, book_type);
1155            log::debug!("Created {book}");
1156            cache.add_order_book(book)?;
1157        }
1158
1159        // Set up subscriptions
1160        let updater = Rc::new(BookUpdater::new(instrument_id, self.cache.clone()));
1161        self.book_updaters.insert(*instrument_id, updater.clone());
1162
1163        let handler = ShareableMessageHandler(updater);
1164
1165        let topic = switchboard::get_book_deltas_topic(*instrument_id);
1166        if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1167            msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1168        }
1169
1170        let topic = switchboard::get_book_depth10_topic(*instrument_id);
1171        if !only_deltas && !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1172            msgbus::subscribe(topic.into(), handler, Some(self.msgbus_priority));
1173        }
1174
1175        Ok(())
1176    }
1177
1178    fn create_bar_aggregator(
1179        &mut self,
1180        instrument: &InstrumentAny,
1181        bar_type: BarType,
1182    ) -> Box<dyn BarAggregator> {
1183        let cache = self.cache.clone();
1184
1185        let handler = move |bar: Bar| {
1186            if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
1187                log_error_on_cache_insert(&e);
1188            }
1189
1190            let topic = switchboard::get_bars_topic(bar.bar_type);
1191            msgbus::publish(topic, &bar as &dyn Any);
1192        };
1193
1194        let clock = self.clock.clone();
1195        let config = self.config.clone();
1196
1197        let price_precision = instrument.price_precision();
1198        let size_precision = instrument.size_precision();
1199
1200        if bar_type.spec().is_time_aggregated() {
1201            // Get time_bars_origin_offset from config
1202            let time_bars_origin_offset = config
1203                .time_bars_origins
1204                .get(&bar_type.spec().aggregation)
1205                .map(|duration| chrono::TimeDelta::from_std(*duration).unwrap_or_default());
1206
1207            Box::new(TimeBarAggregator::new(
1208                bar_type,
1209                price_precision,
1210                size_precision,
1211                clock,
1212                handler,
1213                config.time_bars_build_with_no_updates,
1214                config.time_bars_timestamp_on_close,
1215                config.time_bars_interval_type,
1216                time_bars_origin_offset,
1217                20,    // TODO: TBD, composite bar build delay
1218                false, // TODO: skip_first_non_full_bar, make it config dependent
1219            ))
1220        } else {
1221            match bar_type.spec().aggregation {
1222                BarAggregation::Tick => Box::new(TickBarAggregator::new(
1223                    bar_type,
1224                    price_precision,
1225                    size_precision,
1226                    handler,
1227                )) as Box<dyn BarAggregator>,
1228                BarAggregation::TickImbalance => Box::new(TickImbalanceBarAggregator::new(
1229                    bar_type,
1230                    price_precision,
1231                    size_precision,
1232                    handler,
1233                )) as Box<dyn BarAggregator>,
1234                BarAggregation::TickRuns => Box::new(TickRunsBarAggregator::new(
1235                    bar_type,
1236                    price_precision,
1237                    size_precision,
1238                    handler,
1239                )) as Box<dyn BarAggregator>,
1240                BarAggregation::Volume => Box::new(VolumeBarAggregator::new(
1241                    bar_type,
1242                    price_precision,
1243                    size_precision,
1244                    handler,
1245                )) as Box<dyn BarAggregator>,
1246                BarAggregation::VolumeImbalance => Box::new(VolumeImbalanceBarAggregator::new(
1247                    bar_type,
1248                    price_precision,
1249                    size_precision,
1250                    handler,
1251                )) as Box<dyn BarAggregator>,
1252                BarAggregation::VolumeRuns => Box::new(VolumeRunsBarAggregator::new(
1253                    bar_type,
1254                    price_precision,
1255                    size_precision,
1256                    handler,
1257                )) as Box<dyn BarAggregator>,
1258                BarAggregation::Value => Box::new(ValueBarAggregator::new(
1259                    bar_type,
1260                    price_precision,
1261                    size_precision,
1262                    handler,
1263                )) as Box<dyn BarAggregator>,
1264                BarAggregation::ValueImbalance => Box::new(ValueImbalanceBarAggregator::new(
1265                    bar_type,
1266                    price_precision,
1267                    size_precision,
1268                    handler,
1269                )) as Box<dyn BarAggregator>,
1270                BarAggregation::ValueRuns => Box::new(ValueRunsBarAggregator::new(
1271                    bar_type,
1272                    price_precision,
1273                    size_precision,
1274                    handler,
1275                )) as Box<dyn BarAggregator>,
1276                BarAggregation::Renko => Box::new(RenkoBarAggregator::new(
1277                    bar_type,
1278                    price_precision,
1279                    size_precision,
1280                    instrument.price_increment(),
1281                    handler,
1282                )) as Box<dyn BarAggregator>,
1283                _ => panic!(
1284                    "BarAggregation {:?} is not currently implemented. Supported aggregations: MILLISECOND, SECOND, MINUTE, HOUR, DAY, WEEK, MONTH, YEAR, TICK, TICK_IMBALANCE, TICK_RUNS, VOLUME, VOLUME_IMBALANCE, VOLUME_RUNS, VALUE, VALUE_IMBALANCE, VALUE_RUNS, RENKO",
1285                    bar_type.spec().aggregation
1286                ),
1287            }
1288        }
1289    }
1290
1291    fn start_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
1292        // Get the instrument for this bar type
1293        let instrument = {
1294            let cache = self.cache.borrow();
1295            cache
1296                .instrument(&bar_type.instrument_id())
1297                .ok_or_else(|| {
1298                    anyhow::anyhow!(
1299                        "Cannot start bar aggregation: no instrument found for {}",
1300                        bar_type.instrument_id(),
1301                    )
1302                })?
1303                .clone()
1304        };
1305
1306        // Use standard form of bar type as key
1307        let bar_key = bar_type.standard();
1308
1309        // Create or retrieve aggregator in Rc<RefCell>
1310        let aggregator = if let Some(rc) = self.bar_aggregators.get(&bar_key) {
1311            rc.clone()
1312        } else {
1313            let agg = self.create_bar_aggregator(&instrument, bar_type);
1314            let rc = Rc::new(RefCell::new(agg));
1315            self.bar_aggregators.insert(bar_key, rc.clone());
1316            rc
1317        };
1318
1319        // Subscribe to underlying data topics
1320        let mut handlers = Vec::new();
1321
1322        if bar_type.is_composite() {
1323            let topic = switchboard::get_bars_topic(bar_type.composite());
1324            let handler =
1325                ShareableMessageHandler(Rc::new(BarBarHandler::new(aggregator.clone(), bar_key)));
1326
1327            if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1328                msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1329            }
1330
1331            handlers.push((topic, handler));
1332        } else if bar_type.spec().price_type == PriceType::Last {
1333            let topic = switchboard::get_trades_topic(bar_type.instrument_id());
1334            let handler =
1335                ShareableMessageHandler(Rc::new(BarTradeHandler::new(aggregator.clone(), bar_key)));
1336
1337            if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1338                msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1339            }
1340
1341            handlers.push((topic, handler));
1342        } else {
1343            let topic = switchboard::get_quotes_topic(bar_type.instrument_id());
1344            let handler =
1345                ShareableMessageHandler(Rc::new(BarQuoteHandler::new(aggregator.clone(), bar_key)));
1346
1347            if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1348                msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1349            }
1350
1351            handlers.push((topic, handler));
1352        }
1353
1354        self.bar_aggregator_handlers.insert(bar_key, handlers);
1355
1356        // Setup time bar aggregator if needed (matches Cython _setup_bar_aggregator)
1357        self.setup_bar_aggregator(bar_type, false)?;
1358
1359        aggregator.borrow_mut().set_is_running(true);
1360
1361        Ok(())
1362    }
1363
1364    /// Sets up a bar aggregator, matching Cython _setup_bar_aggregator logic.
1365    ///
1366    /// This method handles historical mode, message bus subscriptions, and time bar aggregator setup.
1367    fn setup_bar_aggregator(&mut self, bar_type: BarType, historical: bool) -> anyhow::Result<()> {
1368        let bar_key = bar_type.standard();
1369        let aggregator = self.bar_aggregators.get(&bar_key).ok_or_else(|| {
1370            anyhow::anyhow!("Cannot setup bar aggregator: no aggregator found for {bar_type}")
1371        })?;
1372
1373        // Set historical mode and handler
1374        let handler: Box<dyn FnMut(Bar)> = if historical {
1375            // Historical handler - process_historical equivalent
1376            let cache = self.cache.clone();
1377            Box::new(move |bar: Bar| {
1378                if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
1379                    log_error_on_cache_insert(&e);
1380                }
1381                // In historical mode, bars are processed but not published to message bus
1382            })
1383        } else {
1384            // Regular handler - process equivalent
1385            let cache = self.cache.clone();
1386            Box::new(move |bar: Bar| {
1387                if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
1388                    log_error_on_cache_insert(&e);
1389                }
1390                let topic = switchboard::get_bars_topic(bar.bar_type);
1391                msgbus::publish(topic, &bar as &dyn Any);
1392            })
1393        };
1394
1395        aggregator
1396            .borrow_mut()
1397            .set_historical_mode(historical, handler);
1398
1399        // For TimeBarAggregator, set clock and start timer
1400        if bar_type.spec().is_time_aggregated() {
1401            use nautilus_common::clock::TestClock;
1402
1403            if historical {
1404                // Each aggregator gets its own independent clock
1405                let test_clock = Rc::new(RefCell::new(TestClock::new()));
1406                aggregator.borrow_mut().set_clock(test_clock);
1407                // Set weak reference for historical mode (start_timer called later from preprocess_historical_events)
1408                // Store weak reference so start_timer can use it when called later
1409                let aggregator_weak = Rc::downgrade(aggregator);
1410                aggregator.borrow_mut().set_aggregator_weak(aggregator_weak);
1411            } else {
1412                aggregator.borrow_mut().set_clock(self.clock.clone());
1413                aggregator
1414                    .borrow_mut()
1415                    .start_timer(Some(aggregator.clone()));
1416            }
1417        }
1418
1419        Ok(())
1420    }
1421
1422    fn stop_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
1423        let aggregator = self
1424            .bar_aggregators
1425            .remove(&bar_type.standard())
1426            .ok_or_else(|| {
1427                anyhow::anyhow!("Cannot stop bar aggregator: no aggregator to stop for {bar_type}")
1428            })?;
1429
1430        aggregator.borrow_mut().stop();
1431
1432        // Unsubscribe any registered message handlers
1433        let bar_key = bar_type.standard();
1434        if let Some(subs) = self.bar_aggregator_handlers.remove(&bar_key) {
1435            for (topic, handler) in subs {
1436                if msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1437                    msgbus::unsubscribe_topic(topic, handler);
1438                }
1439            }
1440        }
1441
1442        Ok(())
1443    }
1444}
1445
1446#[inline(always)]
1447fn log_error_on_cache_insert<T: Display>(e: &T) {
1448    log::error!("Error on cache insert: {e}");
1449}