nautilus_data/engine/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a high-performance `DataEngine` for all environments.
17//!
18//! The `DataEngine` is the central component of the entire data stack.
19//! The data engines primary responsibility is to orchestrate interactions between
20//! the `DataClient` instances, and the rest of the platform. This includes sending
21//! requests to, and receiving responses from, data endpoints via its registered
22//! data clients.
23//!
24//! The engine employs a simple fan-in fan-out messaging pattern to execute
25//! `DataCommand` type messages, and process `DataResponse` messages or market data
26//! objects.
27//!
28//! Alternative implementations can be written on top of the generic engine - which
29//! just need to override the `execute`, `process`, `send` and `receive` methods.
30
31pub mod book;
32pub mod config;
33mod handlers;
34
35#[cfg(feature = "defi")]
36pub mod pool;
37
38use std::{
39    any::Any,
40    cell::{Ref, RefCell},
41    collections::hash_map::Entry,
42    fmt::Display,
43    num::NonZeroUsize,
44    rc::Rc,
45};
46
47use ahash::{AHashMap, AHashSet};
48use book::{BookSnapshotInfo, BookSnapshotter, BookUpdater};
49use config::DataEngineConfig;
50use futures::future::join_all;
51use handlers::{BarBarHandler, BarQuoteHandler, BarTradeHandler};
52use indexmap::IndexMap;
53use nautilus_common::{
54    cache::Cache,
55    clock::Clock,
56    logging::{RECV, RES},
57    messages::data::{
58        DataCommand, DataResponse, RequestCommand, SubscribeBars, SubscribeBookDeltas,
59        SubscribeBookDepth10, SubscribeBookSnapshots, SubscribeCommand, UnsubscribeBars,
60        UnsubscribeBookDeltas, UnsubscribeBookDepth10, UnsubscribeBookSnapshots,
61        UnsubscribeCommand,
62    },
63    msgbus::{self, MStr, Topic, handler::ShareableMessageHandler, switchboard},
64    timer::{TimeEvent, TimeEventCallback},
65};
66use nautilus_core::{
67    UUID4,
68    correctness::{
69        FAILED, check_key_in_map, check_key_not_in_map, check_predicate_false, check_predicate_true,
70    },
71    datetime::millis_to_nanos_unchecked,
72};
73#[cfg(feature = "defi")]
74use nautilus_model::defi::DefiData;
75use nautilus_model::{
76    data::{
77        Bar, BarType, Data, DataType, FundingRateUpdate, IndexPriceUpdate, InstrumentClose,
78        MarkPriceUpdate, OrderBookDelta, OrderBookDeltas, OrderBookDepth10, QuoteTick, TradeTick,
79    },
80    enums::{AggregationSource, BarAggregation, BookType, PriceType, RecordFlag},
81    identifiers::{ClientId, InstrumentId, Venue},
82    instruments::{Instrument, InstrumentAny, SyntheticInstrument},
83    orderbook::OrderBook,
84};
85#[cfg(feature = "streaming")]
86use nautilus_persistence::backend::catalog::ParquetDataCatalog;
87use ustr::Ustr;
88
89#[cfg(feature = "defi")]
90#[allow(unused_imports)] // Brings DeFi impl blocks into scope
91use crate::defi::engine as _;
92#[cfg(feature = "defi")]
93use crate::engine::pool::PoolUpdater;
94use crate::{
95    aggregation::{
96        BarAggregator, RenkoBarAggregator, TickBarAggregator, TickImbalanceBarAggregator,
97        TickRunsBarAggregator, TimeBarAggregator, ValueBarAggregator, ValueImbalanceBarAggregator,
98        ValueRunsBarAggregator, VolumeBarAggregator, VolumeImbalanceBarAggregator,
99        VolumeRunsBarAggregator,
100    },
101    client::DataClientAdapter,
102};
103
104/// Provides a high-performance `DataEngine` for all environments.
105#[derive(Debug)]
106pub struct DataEngine {
107    pub(crate) clock: Rc<RefCell<dyn Clock>>,
108    pub(crate) cache: Rc<RefCell<Cache>>,
109    pub(crate) external_clients: AHashSet<ClientId>,
110    clients: IndexMap<ClientId, DataClientAdapter>,
111    default_client: Option<DataClientAdapter>,
112    #[cfg(feature = "streaming")]
113    catalogs: AHashMap<Ustr, ParquetDataCatalog>,
114    routing_map: IndexMap<Venue, ClientId>,
115    book_intervals: AHashMap<NonZeroUsize, AHashSet<InstrumentId>>,
116    book_deltas_subs: AHashSet<InstrumentId>,
117    book_updaters: AHashMap<InstrumentId, Rc<BookUpdater>>,
118    book_snapshotters: AHashMap<InstrumentId, Rc<BookSnapshotter>>,
119    bar_aggregators: AHashMap<BarType, Rc<RefCell<Box<dyn BarAggregator>>>>,
120    bar_aggregator_handlers: AHashMap<BarType, Vec<(MStr<Topic>, ShareableMessageHandler)>>,
121    _synthetic_quote_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
122    _synthetic_trade_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
123    buffered_deltas_map: AHashMap<InstrumentId, OrderBookDeltas>,
124    pub(crate) msgbus_priority: u8,
125    pub(crate) config: DataEngineConfig,
126    #[cfg(feature = "defi")]
127    pub(crate) pool_updaters: AHashMap<InstrumentId, Rc<PoolUpdater>>,
128    #[cfg(feature = "defi")]
129    pub(crate) pool_updaters_pending: AHashSet<InstrumentId>,
130    #[cfg(feature = "defi")]
131    pub(crate) pool_snapshot_pending: AHashSet<InstrumentId>,
132    #[cfg(feature = "defi")]
133    pub(crate) pool_event_buffers: AHashMap<InstrumentId, Vec<DefiData>>,
134}
135
136impl DataEngine {
137    /// Creates a new [`DataEngine`] instance.
138    #[must_use]
139    pub fn new(
140        clock: Rc<RefCell<dyn Clock>>,
141        cache: Rc<RefCell<Cache>>,
142        config: Option<DataEngineConfig>,
143    ) -> Self {
144        let config = config.unwrap_or_default();
145
146        let external_clients: AHashSet<ClientId> = config
147            .external_clients
148            .clone()
149            .unwrap_or_default()
150            .into_iter()
151            .collect();
152
153        Self {
154            clock,
155            cache,
156            external_clients,
157            clients: IndexMap::new(),
158            default_client: None,
159            #[cfg(feature = "streaming")]
160            catalogs: AHashMap::new(),
161            routing_map: IndexMap::new(),
162            book_intervals: AHashMap::new(),
163            book_deltas_subs: AHashSet::new(),
164            book_updaters: AHashMap::new(),
165            book_snapshotters: AHashMap::new(),
166            bar_aggregators: AHashMap::new(),
167            bar_aggregator_handlers: AHashMap::new(),
168            _synthetic_quote_feeds: AHashMap::new(),
169            _synthetic_trade_feeds: AHashMap::new(),
170            buffered_deltas_map: AHashMap::new(),
171            msgbus_priority: 10, // High-priority for built-in component
172            config,
173            #[cfg(feature = "defi")]
174            pool_updaters: AHashMap::new(),
175            #[cfg(feature = "defi")]
176            pool_updaters_pending: AHashSet::new(),
177            #[cfg(feature = "defi")]
178            pool_snapshot_pending: AHashSet::new(),
179            #[cfg(feature = "defi")]
180            pool_event_buffers: AHashMap::new(),
181        }
182    }
183
184    /// Returns a read-only reference to the engines clock.
185    #[must_use]
186    pub fn get_clock(&self) -> Ref<'_, dyn Clock> {
187        self.clock.borrow()
188    }
189
190    /// Returns a read-only reference to the engines cache.
191    #[must_use]
192    pub fn get_cache(&self) -> Ref<'_, Cache> {
193        self.cache.borrow()
194    }
195
196    /// Returns the `Rc<RefCell<Cache>>` used by this engine.
197    #[must_use]
198    pub fn cache_rc(&self) -> Rc<RefCell<Cache>> {
199        Rc::clone(&self.cache)
200    }
201
202    /// Registers the `catalog` with the engine with an optional specific `name`.
203    ///
204    /// # Panics
205    ///
206    /// Panics if a catalog with the same `name` has already been registered.
207    #[cfg(feature = "streaming")]
208    pub fn register_catalog(&mut self, catalog: ParquetDataCatalog, name: Option<String>) {
209        let name = Ustr::from(name.as_deref().unwrap_or("catalog_0"));
210
211        check_key_not_in_map(&name, &self.catalogs, "name", "catalogs").expect(FAILED);
212
213        self.catalogs.insert(name, catalog);
214        log::info!("Registered catalog <{name}>");
215    }
216
217    /// Registers the `client` with the engine with an optional venue `routing`.
218    ///
219    ///
220    /// # Panics
221    ///
222    /// Panics if a client with the same client ID has already been registered.
223    pub fn register_client(&mut self, client: DataClientAdapter, routing: Option<Venue>) {
224        let client_id = client.client_id();
225
226        if let Some(default_client) = &self.default_client {
227            check_predicate_false(
228                default_client.client_id() == client.client_id(),
229                "client_id already registered as default client",
230            )
231            .expect(FAILED);
232        }
233
234        check_key_not_in_map(&client_id, &self.clients, "client_id", "clients").expect(FAILED);
235
236        if let Some(routing) = routing {
237            self.routing_map.insert(routing, client_id);
238            log::debug!("Set client {client_id} routing for {routing}");
239        }
240
241        if client.venue.is_none() && self.default_client.is_none() {
242            self.default_client = Some(client);
243            log::debug!("Registered client {client_id} for default routing");
244        } else {
245            self.clients.insert(client_id, client);
246            log::debug!("Registered client {client_id}");
247        }
248    }
249
250    /// Deregisters the client for the `client_id`.
251    ///
252    /// # Panics
253    ///
254    /// Panics if the client ID has not been registered.
255    pub fn deregister_client(&mut self, client_id: &ClientId) {
256        check_key_in_map(client_id, &self.clients, "client_id", "clients").expect(FAILED);
257
258        self.clients.shift_remove(client_id);
259        log::info!("Deregistered client {client_id}");
260    }
261
262    /// Registers the data `client` with the engine as the default routing client.
263    ///
264    /// When a specific venue routing cannot be found, this client will receive messages.
265    ///
266    /// # Warnings
267    ///
268    /// Any existing default routing client will be overwritten.
269    ///
270    /// # Panics
271    ///
272    /// Panics if a default client has already been registered.
273    pub fn register_default_client(&mut self, client: DataClientAdapter) {
274        check_predicate_true(
275            self.default_client.is_none(),
276            "default client already registered",
277        )
278        .expect(FAILED);
279
280        let client_id = client.client_id();
281
282        self.default_client = Some(client);
283        log::debug!("Registered default client {client_id}");
284    }
285
286    /// Starts all registered data clients.
287    pub fn start(&mut self) {
288        for client in self.get_clients_mut() {
289            if let Err(e) = client.start() {
290                log::error!("{e}");
291            }
292        }
293    }
294
295    /// Stops all registered data clients.
296    pub fn stop(&mut self) {
297        for client in self.get_clients_mut() {
298            if let Err(e) = client.stop() {
299                log::error!("{e}");
300            }
301        }
302    }
303
304    /// Resets all registered data clients to their initial state.
305    pub fn reset(&mut self) {
306        for client in self.get_clients_mut() {
307            if let Err(e) = client.reset() {
308                log::error!("{e}");
309            }
310        }
311    }
312
313    /// Disposes the engine, stopping all clients and canceling any timers.
314    pub fn dispose(&mut self) {
315        for client in self.get_clients_mut() {
316            if let Err(e) = client.dispose() {
317                log::error!("{e}");
318            }
319        }
320
321        self.clock.borrow_mut().cancel_timers();
322    }
323
324    /// Connects all registered data clients concurrently.
325    ///
326    /// # Errors
327    ///
328    /// Returns an error if any client fails to connect.
329    pub async fn connect(&mut self) -> anyhow::Result<()> {
330        let futures: Vec<_> = self
331            .get_clients_mut()
332            .into_iter()
333            .map(|client| client.connect())
334            .collect();
335
336        let results = join_all(futures).await;
337        let errors: Vec<_> = results.into_iter().filter_map(Result::err).collect();
338
339        if errors.is_empty() {
340            Ok(())
341        } else {
342            let error_msgs: Vec<_> = errors.iter().map(|e| e.to_string()).collect();
343            anyhow::bail!("Failed to connect data clients: {}", error_msgs.join("; "))
344        }
345    }
346
347    /// Disconnects all registered data clients concurrently.
348    ///
349    /// # Errors
350    ///
351    /// Returns an error if any client fails to disconnect.
352    pub async fn disconnect(&mut self) -> anyhow::Result<()> {
353        let futures: Vec<_> = self
354            .get_clients_mut()
355            .into_iter()
356            .map(|client| client.disconnect())
357            .collect();
358
359        let results = join_all(futures).await;
360        let errors: Vec<_> = results.into_iter().filter_map(Result::err).collect();
361
362        if errors.is_empty() {
363            Ok(())
364        } else {
365            let error_msgs: Vec<_> = errors.iter().map(|e| e.to_string()).collect();
366            anyhow::bail!(
367                "Failed to disconnect data clients: {}",
368                error_msgs.join("; ")
369            )
370        }
371    }
372
373    /// Returns `true` if all registered data clients are currently connected.
374    #[must_use]
375    pub fn check_connected(&self) -> bool {
376        self.get_clients()
377            .iter()
378            .all(|client| client.is_connected())
379    }
380
381    /// Returns `true` if all registered data clients are currently disconnected.
382    #[must_use]
383    pub fn check_disconnected(&self) -> bool {
384        self.get_clients()
385            .iter()
386            .all(|client| !client.is_connected())
387    }
388
389    /// Returns a list of all registered client IDs, including the default client if set.
390    #[must_use]
391    pub fn registered_clients(&self) -> Vec<ClientId> {
392        self.get_clients()
393            .into_iter()
394            .map(|client| client.client_id())
395            .collect()
396    }
397
398    // -- SUBSCRIPTIONS ---------------------------------------------------------------------------
399
400    pub(crate) fn collect_subscriptions<F, T>(&self, get_subs: F) -> Vec<T>
401    where
402        F: Fn(&DataClientAdapter) -> &AHashSet<T>,
403        T: Clone,
404    {
405        self.get_clients()
406            .into_iter()
407            .flat_map(get_subs)
408            .cloned()
409            .collect()
410    }
411
412    #[must_use]
413    pub fn get_clients(&self) -> Vec<&DataClientAdapter> {
414        let (default_opt, clients_map) = (&self.default_client, &self.clients);
415        let mut clients: Vec<&DataClientAdapter> = clients_map.values().collect();
416
417        if let Some(default) = default_opt {
418            clients.push(default);
419        }
420
421        clients
422    }
423
424    #[must_use]
425    pub fn get_clients_mut(&mut self) -> Vec<&mut DataClientAdapter> {
426        let (default_opt, clients_map) = (&mut self.default_client, &mut self.clients);
427        let mut clients: Vec<&mut DataClientAdapter> = clients_map.values_mut().collect();
428
429        if let Some(default) = default_opt {
430            clients.push(default);
431        }
432
433        clients
434    }
435
436    pub fn get_client(
437        &mut self,
438        client_id: Option<&ClientId>,
439        venue: Option<&Venue>,
440    ) -> Option<&mut DataClientAdapter> {
441        if let Some(client_id) = client_id {
442            // Explicit ID: first look in registered clients
443            if let Some(client) = self.clients.get_mut(client_id) {
444                return Some(client);
445            }
446
447            // Then check if it matches the default client
448            if let Some(default) = self.default_client.as_mut()
449                && default.client_id() == *client_id
450            {
451                return Some(default);
452            }
453
454            // Unknown explicit client
455            return None;
456        }
457
458        if let Some(v) = venue {
459            // Route by venue if mapped client still registered
460            if let Some(client_id) = self.routing_map.get(v) {
461                return self.clients.get_mut(client_id);
462            }
463        }
464
465        // Fallback to default client
466        self.get_default_client()
467    }
468
469    const fn get_default_client(&mut self) -> Option<&mut DataClientAdapter> {
470        self.default_client.as_mut()
471    }
472
473    /// Returns all custom data types currently subscribed across all clients.
474    #[must_use]
475    pub fn subscribed_custom_data(&self) -> Vec<DataType> {
476        self.collect_subscriptions(|client| &client.subscriptions_custom)
477    }
478
479    /// Returns all instrument IDs currently subscribed across all clients.
480    #[must_use]
481    pub fn subscribed_instruments(&self) -> Vec<InstrumentId> {
482        self.collect_subscriptions(|client| &client.subscriptions_instrument)
483    }
484
485    /// Returns all instrument IDs for which book delta subscriptions exist.
486    #[must_use]
487    pub fn subscribed_book_deltas(&self) -> Vec<InstrumentId> {
488        self.collect_subscriptions(|client| &client.subscriptions_book_deltas)
489    }
490
491    /// Returns all instrument IDs for which book snapshot subscriptions exist.
492    #[must_use]
493    pub fn subscribed_book_snapshots(&self) -> Vec<InstrumentId> {
494        self.book_intervals
495            .values()
496            .flat_map(|set| set.iter().copied())
497            .collect()
498    }
499
500    /// Returns all instrument IDs for which quote subscriptions exist.
501    #[must_use]
502    pub fn subscribed_quotes(&self) -> Vec<InstrumentId> {
503        self.collect_subscriptions(|client| &client.subscriptions_quotes)
504    }
505
506    /// Returns all instrument IDs for which trade subscriptions exist.
507    #[must_use]
508    pub fn subscribed_trades(&self) -> Vec<InstrumentId> {
509        self.collect_subscriptions(|client| &client.subscriptions_trades)
510    }
511
512    /// Returns all bar types currently subscribed across all clients.
513    #[must_use]
514    pub fn subscribed_bars(&self) -> Vec<BarType> {
515        self.collect_subscriptions(|client| &client.subscriptions_bars)
516    }
517
518    /// Returns all instrument IDs for which mark price subscriptions exist.
519    #[must_use]
520    pub fn subscribed_mark_prices(&self) -> Vec<InstrumentId> {
521        self.collect_subscriptions(|client| &client.subscriptions_mark_prices)
522    }
523
524    /// Returns all instrument IDs for which index price subscriptions exist.
525    #[must_use]
526    pub fn subscribed_index_prices(&self) -> Vec<InstrumentId> {
527        self.collect_subscriptions(|client| &client.subscriptions_index_prices)
528    }
529
530    /// Returns all instrument IDs for which funding rate subscriptions exist.
531    #[must_use]
532    pub fn subscribed_funding_rates(&self) -> Vec<InstrumentId> {
533        self.collect_subscriptions(|client| &client.subscriptions_funding_rates)
534    }
535
536    /// Returns all instrument IDs for which status subscriptions exist.
537    #[must_use]
538    pub fn subscribed_instrument_status(&self) -> Vec<InstrumentId> {
539        self.collect_subscriptions(|client| &client.subscriptions_instrument_status)
540    }
541
542    /// Returns all instrument IDs for which instrument close subscriptions exist.
543    #[must_use]
544    pub fn subscribed_instrument_close(&self) -> Vec<InstrumentId> {
545        self.collect_subscriptions(|client| &client.subscriptions_instrument_close)
546    }
547
548    // -- COMMANDS --------------------------------------------------------------------------------
549
550    /// Executes a `DataCommand` by delegating to subscribe, unsubscribe, or request handlers.
551    ///
552    /// Errors during execution are logged.
553    pub fn execute(&mut self, cmd: &DataCommand) {
554        if let Err(e) = match cmd {
555            DataCommand::Subscribe(c) => self.execute_subscribe(c),
556            DataCommand::Unsubscribe(c) => self.execute_unsubscribe(c),
557            DataCommand::Request(c) => self.execute_request(c),
558            #[cfg(feature = "defi")]
559            DataCommand::DefiRequest(c) => self.execute_defi_request(c),
560            #[cfg(feature = "defi")]
561            DataCommand::DefiSubscribe(c) => self.execute_defi_subscribe(c),
562            #[cfg(feature = "defi")]
563            DataCommand::DefiUnsubscribe(c) => self.execute_defi_unsubscribe(c),
564            _ => {
565                log::warn!("Unhandled DataCommand variant: {cmd:?}");
566                Ok(())
567            }
568        } {
569            log::error!("{e}");
570        }
571    }
572
573    /// Handles a subscribe command, updating internal state and forwarding to the client.
574    ///
575    /// # Errors
576    ///
577    /// Returns an error if the subscription is invalid (e.g., synthetic instrument for book data),
578    /// or if the underlying client operation fails.
579    pub fn execute_subscribe(&mut self, cmd: &SubscribeCommand) -> anyhow::Result<()> {
580        // Update internal engine state
581        match &cmd {
582            SubscribeCommand::BookDeltas(cmd) => self.subscribe_book_deltas(cmd)?,
583            SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd)?,
584            SubscribeCommand::BookSnapshots(cmd) => {
585                // Handles client forwarding internally (forwards as BookDeltas)
586                return self.subscribe_book_snapshots(cmd);
587            }
588            SubscribeCommand::Bars(cmd) => self.subscribe_bars(cmd)?,
589            _ => {} // Do nothing else
590        }
591
592        if let Some(client_id) = cmd.client_id()
593            && self.external_clients.contains(client_id)
594        {
595            if self.config.debug {
596                log::debug!("Skipping subscribe command for external client {client_id}: {cmd:?}",);
597            }
598            return Ok(());
599        }
600
601        if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
602            client.execute_subscribe(cmd);
603        } else {
604            log::error!(
605                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
606                cmd.client_id(),
607                cmd.venue(),
608            );
609        }
610
611        Ok(())
612    }
613
614    /// Handles an unsubscribe command, updating internal state and forwarding to the client.
615    ///
616    /// # Errors
617    ///
618    /// Returns an error if the underlying client operation fails.
619    pub fn execute_unsubscribe(&mut self, cmd: &UnsubscribeCommand) -> anyhow::Result<()> {
620        match &cmd {
621            UnsubscribeCommand::BookDeltas(cmd) => self.unsubscribe_book_deltas(cmd)?,
622            UnsubscribeCommand::BookDepth10(cmd) => self.unsubscribe_book_depth10(cmd)?,
623            UnsubscribeCommand::BookSnapshots(cmd) => {
624                // Handles client forwarding internally (forwards as BookDeltas)
625                return self.unsubscribe_book_snapshots(cmd);
626            }
627            UnsubscribeCommand::Bars(cmd) => self.unsubscribe_bars(cmd)?,
628            _ => {} // Do nothing else
629        }
630
631        if let Some(client_id) = cmd.client_id()
632            && self.external_clients.contains(client_id)
633        {
634            if self.config.debug {
635                log::debug!(
636                    "Skipping unsubscribe command for external client {client_id}: {cmd:?}",
637                );
638            }
639            return Ok(());
640        }
641
642        if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
643            client.execute_unsubscribe(cmd);
644        } else {
645            log::error!(
646                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
647                cmd.client_id(),
648                cmd.venue(),
649            );
650        }
651
652        Ok(())
653    }
654
655    /// Sends a [`RequestCommand`] to a suitable data client implementation.
656    ///
657    /// # Errors
658    ///
659    /// Returns an error if no client is found for the given client ID or venue,
660    /// or if the client fails to process the request.
661    pub fn execute_request(&mut self, req: &RequestCommand) -> anyhow::Result<()> {
662        // Skip requests for external clients
663        if let Some(cid) = req.client_id()
664            && self.external_clients.contains(cid)
665        {
666            if self.config.debug {
667                log::debug!("Skipping data request for external client {cid}: {req:?}");
668            }
669            return Ok(());
670        }
671        if let Some(client) = self.get_client(req.client_id(), req.venue()) {
672            match req {
673                RequestCommand::Data(req) => client.request_data(req),
674                RequestCommand::Instrument(req) => client.request_instrument(req),
675                RequestCommand::Instruments(req) => client.request_instruments(req),
676                RequestCommand::BookSnapshot(req) => client.request_book_snapshot(req),
677                RequestCommand::BookDepth(req) => client.request_book_depth(req),
678                RequestCommand::Quotes(req) => client.request_quotes(req),
679                RequestCommand::Trades(req) => client.request_trades(req),
680                RequestCommand::Bars(req) => client.request_bars(req),
681            }
682        } else {
683            anyhow::bail!(
684                "Cannot handle request: no client found for {:?} {:?}",
685                req.client_id(),
686                req.venue()
687            );
688        }
689    }
690
691    /// Processes a dynamically-typed data message.
692    ///
693    /// Currently supports `InstrumentAny` and `FundingRateUpdate`; unrecognized types are logged as errors.
694    pub fn process(&mut self, data: &dyn Any) {
695        // TODO: Eventually these could be added to the `Data` enum? process here for now
696        if let Some(data) = data.downcast_ref::<Data>() {
697            self.process_data(data.clone()); // TODO: Optimize (not necessary if we change handler)
698            return;
699        }
700
701        #[cfg(feature = "defi")]
702        if let Some(data) = data.downcast_ref::<DefiData>() {
703            self.process_defi_data(data.clone()); // TODO: Optimize (not necessary if we change handler)
704            return;
705        }
706
707        if let Some(instrument) = data.downcast_ref::<InstrumentAny>() {
708            self.handle_instrument(instrument.clone());
709        } else if let Some(funding_rate) = data.downcast_ref::<FundingRateUpdate>() {
710            self.handle_funding_rate(*funding_rate);
711        } else {
712            log::error!("Cannot process data {data:?}, type is unrecognized");
713        }
714    }
715
716    /// Processes a `Data` enum instance, dispatching to appropriate handlers.
717    pub fn process_data(&mut self, data: Data) {
718        match data {
719            Data::Delta(delta) => self.handle_delta(delta),
720            Data::Deltas(deltas) => self.handle_deltas(deltas.into_inner()),
721            Data::Depth10(depth) => self.handle_depth10(*depth),
722            Data::Quote(quote) => self.handle_quote(quote),
723            Data::Trade(trade) => self.handle_trade(trade),
724            Data::Bar(bar) => self.handle_bar(bar),
725            Data::MarkPriceUpdate(mark_price) => self.handle_mark_price(mark_price),
726            Data::IndexPriceUpdate(index_price) => self.handle_index_price(index_price),
727            Data::InstrumentClose(close) => self.handle_instrument_close(close),
728        }
729    }
730
731    /// Processes a `DataResponse`, handling and publishing the response message.
732    pub fn response(&self, resp: DataResponse) {
733        log::debug!("{RECV}{RES} {resp:?}");
734
735        match &resp {
736            DataResponse::Instrument(resp) => {
737                self.handle_instrument_response(resp.data.clone());
738            }
739            DataResponse::Instruments(resp) => {
740                self.handle_instruments(&resp.data);
741            }
742            DataResponse::Quotes(resp) => self.handle_quotes(&resp.data),
743            DataResponse::Trades(resp) => self.handle_trades(&resp.data),
744            DataResponse::Bars(resp) => self.handle_bars(&resp.data),
745            DataResponse::Book(resp) => self.handle_book_response(&resp.data),
746            _ => todo!("Handle other response types"),
747        }
748
749        msgbus::send_response(resp.correlation_id(), &resp);
750    }
751
752    // -- DATA HANDLERS ---------------------------------------------------------------------------
753
754    fn handle_instrument(&mut self, instrument: InstrumentAny) {
755        if let Err(e) = self
756            .cache
757            .as_ref()
758            .borrow_mut()
759            .add_instrument(instrument.clone())
760        {
761            log_error_on_cache_insert(&e);
762        }
763
764        let topic = switchboard::get_instrument_topic(instrument.id());
765        msgbus::publish(topic, &instrument as &dyn Any);
766    }
767
768    fn handle_delta(&mut self, delta: OrderBookDelta) {
769        let deltas = if self.config.buffer_deltas {
770            if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&delta.instrument_id) {
771                buffered_deltas.deltas.push(delta);
772                buffered_deltas.flags = delta.flags;
773                buffered_deltas.sequence = delta.sequence;
774                buffered_deltas.ts_event = delta.ts_event;
775                buffered_deltas.ts_init = delta.ts_init;
776            } else {
777                let buffered_deltas = OrderBookDeltas::new(delta.instrument_id, vec![delta]);
778                self.buffered_deltas_map
779                    .insert(delta.instrument_id, buffered_deltas);
780            }
781
782            if !RecordFlag::F_LAST.matches(delta.flags) {
783                return; // Not the last delta for event
784            }
785
786            // SAFETY: We know the deltas exists already
787            self.buffered_deltas_map
788                .remove(&delta.instrument_id)
789                .unwrap()
790        } else {
791            OrderBookDeltas::new(delta.instrument_id, vec![delta])
792        };
793
794        let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
795        msgbus::publish(topic, &deltas as &dyn Any);
796    }
797
798    fn handle_deltas(&mut self, deltas: OrderBookDeltas) {
799        let deltas = if self.config.buffer_deltas {
800            let mut is_last_delta = false;
801            for delta in &deltas.deltas {
802                if RecordFlag::F_LAST.matches(delta.flags) {
803                    is_last_delta = true;
804                    break;
805                }
806            }
807
808            let instrument_id = deltas.instrument_id;
809
810            if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&instrument_id) {
811                buffered_deltas.deltas.extend(deltas.deltas);
812
813                if let Some(last_delta) = buffered_deltas.deltas.last() {
814                    buffered_deltas.flags = last_delta.flags;
815                    buffered_deltas.sequence = last_delta.sequence;
816                    buffered_deltas.ts_event = last_delta.ts_event;
817                    buffered_deltas.ts_init = last_delta.ts_init;
818                }
819            } else {
820                self.buffered_deltas_map.insert(instrument_id, deltas);
821            }
822
823            if !is_last_delta {
824                return;
825            }
826
827            // SAFETY: We know the deltas exists already
828            self.buffered_deltas_map.remove(&instrument_id).unwrap()
829        } else {
830            deltas
831        };
832
833        let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
834        msgbus::publish(topic, &deltas as &dyn Any);
835    }
836
837    fn handle_depth10(&mut self, depth: OrderBookDepth10) {
838        let topic = switchboard::get_book_depth10_topic(depth.instrument_id);
839        msgbus::publish(topic, &depth as &dyn Any);
840    }
841
842    fn handle_quote(&mut self, quote: QuoteTick) {
843        if let Err(e) = self.cache.as_ref().borrow_mut().add_quote(quote) {
844            log_error_on_cache_insert(&e);
845        }
846
847        // TODO: Handle synthetics
848
849        let topic = switchboard::get_quotes_topic(quote.instrument_id);
850        msgbus::publish(topic, &quote as &dyn Any);
851    }
852
853    fn handle_trade(&mut self, trade: TradeTick) {
854        if let Err(e) = self.cache.as_ref().borrow_mut().add_trade(trade) {
855            log_error_on_cache_insert(&e);
856        }
857
858        // TODO: Handle synthetics
859
860        let topic = switchboard::get_trades_topic(trade.instrument_id);
861        msgbus::publish(topic, &trade as &dyn Any);
862    }
863
864    fn handle_bar(&mut self, bar: Bar) {
865        // TODO: Handle additional bar logic
866        if self.config.validate_data_sequence
867            && let Some(last_bar) = self.cache.as_ref().borrow().bar(&bar.bar_type)
868        {
869            if bar.ts_event < last_bar.ts_event {
870                log::warn!(
871                    "Bar {bar} was prior to last bar `ts_event` {}",
872                    last_bar.ts_event
873                );
874                return; // Bar is out of sequence
875            }
876            if bar.ts_init < last_bar.ts_init {
877                log::warn!(
878                    "Bar {bar} was prior to last bar `ts_init` {}",
879                    last_bar.ts_init
880                );
881                return; // Bar is out of sequence
882            }
883            // TODO: Implement `bar.is_revision` logic
884        }
885
886        if let Err(e) = self.cache.as_ref().borrow_mut().add_bar(bar) {
887            log_error_on_cache_insert(&e);
888        }
889
890        let topic = switchboard::get_bars_topic(bar.bar_type);
891        msgbus::publish(topic, &bar as &dyn Any);
892    }
893
894    fn handle_mark_price(&mut self, mark_price: MarkPriceUpdate) {
895        if let Err(e) = self.cache.as_ref().borrow_mut().add_mark_price(mark_price) {
896            log_error_on_cache_insert(&e);
897        }
898
899        let topic = switchboard::get_mark_price_topic(mark_price.instrument_id);
900        msgbus::publish(topic, &mark_price as &dyn Any);
901    }
902
903    fn handle_index_price(&mut self, index_price: IndexPriceUpdate) {
904        if let Err(e) = self
905            .cache
906            .as_ref()
907            .borrow_mut()
908            .add_index_price(index_price)
909        {
910            log_error_on_cache_insert(&e);
911        }
912
913        let topic = switchboard::get_index_price_topic(index_price.instrument_id);
914        msgbus::publish(topic, &index_price as &dyn Any);
915    }
916
917    /// Handles a funding rate update by adding it to the cache and publishing to the message bus.
918    pub fn handle_funding_rate(&mut self, funding_rate: FundingRateUpdate) {
919        if let Err(e) = self
920            .cache
921            .as_ref()
922            .borrow_mut()
923            .add_funding_rate(funding_rate)
924        {
925            log_error_on_cache_insert(&e);
926        }
927
928        let topic = switchboard::get_funding_rate_topic(funding_rate.instrument_id);
929        msgbus::publish(topic, &funding_rate as &dyn Any);
930    }
931
932    fn handle_instrument_close(&mut self, close: InstrumentClose) {
933        let topic = switchboard::get_instrument_close_topic(close.instrument_id);
934        msgbus::publish(topic, &close as &dyn Any);
935    }
936
937    // -- SUBSCRIPTION HANDLERS -------------------------------------------------------------------
938
939    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
940        if cmd.instrument_id.is_synthetic() {
941            anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
942        }
943
944        self.book_deltas_subs.insert(cmd.instrument_id);
945        self.setup_book_updater(&cmd.instrument_id, cmd.book_type, true, cmd.managed)?;
946
947        Ok(())
948    }
949
950    fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
951        if cmd.instrument_id.is_synthetic() {
952            anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDepth10` data");
953        }
954
955        self.setup_book_updater(&cmd.instrument_id, cmd.book_type, false, cmd.managed)?;
956
957        Ok(())
958    }
959
960    fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
961        if cmd.instrument_id.is_synthetic() {
962            anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
963        }
964
965        // Track snapshot intervals per instrument, and set up timer on first subscription
966        let first_for_interval = match self.book_intervals.entry(cmd.interval_ms) {
967            Entry::Vacant(e) => {
968                let mut set = AHashSet::new();
969                set.insert(cmd.instrument_id);
970                e.insert(set);
971                true
972            }
973            Entry::Occupied(mut e) => {
974                e.get_mut().insert(cmd.instrument_id);
975                false
976            }
977        };
978
979        if first_for_interval {
980            // Initialize snapshotter and schedule its timer
981            let interval_ns = millis_to_nanos_unchecked(cmd.interval_ms.get() as f64);
982            let topic = switchboard::get_book_snapshots_topic(cmd.instrument_id, cmd.interval_ms);
983
984            let snap_info = BookSnapshotInfo {
985                instrument_id: cmd.instrument_id,
986                venue: cmd.instrument_id.venue,
987                is_composite: cmd.instrument_id.symbol.is_composite(),
988                root: Ustr::from(cmd.instrument_id.symbol.root()),
989                topic,
990                interval_ms: cmd.interval_ms,
991            };
992
993            // Schedule the first snapshot at the next interval boundary
994            let now_ns = self.clock.borrow().timestamp_ns().as_u64();
995            let start_time_ns = now_ns - (now_ns % interval_ns) + interval_ns;
996
997            let snapshotter = Rc::new(BookSnapshotter::new(snap_info, self.cache.clone()));
998            self.book_snapshotters
999                .insert(cmd.instrument_id, snapshotter.clone());
1000            let timer_name = snapshotter.timer_name;
1001
1002            let callback_fn: Rc<dyn Fn(TimeEvent)> =
1003                Rc::new(move |event| snapshotter.snapshot(event));
1004            let callback = TimeEventCallback::from(callback_fn);
1005
1006            self.clock
1007                .borrow_mut()
1008                .set_timer_ns(
1009                    &timer_name,
1010                    interval_ns,
1011                    Some(start_time_ns.into()),
1012                    None,
1013                    Some(callback),
1014                    None,
1015                    None,
1016                )
1017                .expect(FAILED);
1018        }
1019
1020        // Only set up book updater if not already subscribed to deltas
1021        if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
1022            self.setup_book_updater(&cmd.instrument_id, cmd.book_type, false, true)?;
1023        }
1024
1025        if let Some(client_id) = cmd.client_id.as_ref()
1026            && self.external_clients.contains(client_id)
1027        {
1028            if self.config.debug {
1029                log::debug!("Skipping subscribe command for external client {client_id}: {cmd:?}",);
1030            }
1031            return Ok(());
1032        }
1033
1034        log::debug!(
1035            "Forwarding BookSnapshots as BookDeltas for {}, client_id={:?}, venue={:?}",
1036            cmd.instrument_id,
1037            cmd.client_id,
1038            cmd.venue,
1039        );
1040
1041        if let Some(client) = self.get_client(cmd.client_id.as_ref(), cmd.venue.as_ref()) {
1042            let deltas_cmd = SubscribeBookDeltas::new(
1043                cmd.instrument_id,
1044                cmd.book_type,
1045                cmd.client_id,
1046                cmd.venue,
1047                UUID4::new(),
1048                cmd.ts_init,
1049                cmd.depth,
1050                true, // managed
1051                Some(cmd.command_id),
1052                cmd.params.clone(),
1053            );
1054            log::debug!(
1055                "Calling client.execute_subscribe for BookDeltas: {}",
1056                cmd.instrument_id
1057            );
1058            client.execute_subscribe(&SubscribeCommand::BookDeltas(deltas_cmd));
1059        } else {
1060            log::error!(
1061                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
1062                cmd.client_id,
1063                cmd.venue,
1064            );
1065        }
1066
1067        Ok(())
1068    }
1069
1070    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
1071        match cmd.bar_type.aggregation_source() {
1072            AggregationSource::Internal => {
1073                if !self.bar_aggregators.contains_key(&cmd.bar_type.standard()) {
1074                    self.start_bar_aggregator(cmd.bar_type)?;
1075                }
1076            }
1077            AggregationSource::External => {
1078                if cmd.bar_type.instrument_id().is_synthetic() {
1079                    anyhow::bail!(
1080                        "Cannot subscribe for externally aggregated synthetic instrument bar data"
1081                    );
1082                }
1083            }
1084        }
1085
1086        Ok(())
1087    }
1088
1089    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
1090        if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
1091            log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
1092            return Ok(());
1093        }
1094
1095        self.book_deltas_subs.remove(&cmd.instrument_id);
1096
1097        let topics = vec![
1098            switchboard::get_book_deltas_topic(cmd.instrument_id),
1099            switchboard::get_book_depth10_topic(cmd.instrument_id),
1100            // TODO: Unsubscribe from snapshots?
1101        ];
1102
1103        self.maintain_book_updater(&cmd.instrument_id, &topics);
1104        self.maintain_book_snapshotter(&cmd.instrument_id);
1105
1106        Ok(())
1107    }
1108
1109    fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
1110        if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
1111            log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
1112            return Ok(());
1113        }
1114
1115        let topics = vec![
1116            switchboard::get_book_deltas_topic(cmd.instrument_id),
1117            switchboard::get_book_depth10_topic(cmd.instrument_id),
1118            // TODO: Unsubscribe from snapshots?
1119        ];
1120
1121        self.maintain_book_updater(&cmd.instrument_id, &topics);
1122        self.maintain_book_snapshotter(&cmd.instrument_id);
1123
1124        Ok(())
1125    }
1126
1127    fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
1128        let is_subscribed = self
1129            .book_intervals
1130            .values()
1131            .any(|set| set.contains(&cmd.instrument_id));
1132
1133        if !is_subscribed {
1134            log::warn!("Cannot unsubscribe from `OrderBook` snapshots: not subscribed");
1135            return Ok(());
1136        }
1137
1138        // Remove instrument from interval tracking, and drop empty intervals
1139        let mut to_remove = Vec::new();
1140        for (interval, set) in &mut self.book_intervals {
1141            if set.remove(&cmd.instrument_id) && set.is_empty() {
1142                to_remove.push(*interval);
1143            }
1144        }
1145
1146        for interval in to_remove {
1147            self.book_intervals.remove(&interval);
1148        }
1149
1150        let topics = vec![
1151            switchboard::get_book_deltas_topic(cmd.instrument_id),
1152            switchboard::get_book_depth10_topic(cmd.instrument_id),
1153        ];
1154
1155        self.maintain_book_updater(&cmd.instrument_id, &topics);
1156        self.maintain_book_snapshotter(&cmd.instrument_id);
1157
1158        let still_in_intervals = self
1159            .book_intervals
1160            .values()
1161            .any(|set| set.contains(&cmd.instrument_id));
1162
1163        if !still_in_intervals && !self.book_deltas_subs.contains(&cmd.instrument_id) {
1164            if let Some(client_id) = cmd.client_id.as_ref()
1165                && self.external_clients.contains(client_id)
1166            {
1167                return Ok(());
1168            }
1169
1170            if let Some(client) = self.get_client(cmd.client_id.as_ref(), cmd.venue.as_ref()) {
1171                let deltas_cmd = UnsubscribeBookDeltas::new(
1172                    cmd.instrument_id,
1173                    cmd.client_id,
1174                    cmd.venue,
1175                    UUID4::new(),
1176                    cmd.ts_init,
1177                    Some(cmd.command_id),
1178                    cmd.params.clone(),
1179                );
1180                client.execute_unsubscribe(&UnsubscribeCommand::BookDeltas(deltas_cmd));
1181            }
1182        }
1183
1184        Ok(())
1185    }
1186
1187    /// Unsubscribe internal bar aggregator for the given bar type.
1188    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
1189        // If we have an internal aggregator for this bar type, stop and remove it
1190        let bar_type = cmd.bar_type;
1191        if self.bar_aggregators.contains_key(&bar_type.standard()) {
1192            if let Err(e) = self.stop_bar_aggregator(bar_type) {
1193                log::error!("Error stopping bar aggregator for {bar_type}: {e}");
1194            }
1195            self.bar_aggregators.remove(&bar_type.standard());
1196            log::debug!("Removed bar aggregator for {bar_type}");
1197        }
1198        Ok(())
1199    }
1200
1201    fn maintain_book_updater(&mut self, instrument_id: &InstrumentId, topics: &[MStr<Topic>]) {
1202        if let Some(updater) = self.book_updaters.get(instrument_id) {
1203            let handler = ShareableMessageHandler(updater.clone());
1204
1205            // Unsubscribe handler if it is the last subscriber
1206            for topic in topics {
1207                if msgbus::subscriptions_count(topic.as_str()) == 1
1208                    && msgbus::is_subscribed(topic.as_str(), handler.clone())
1209                {
1210                    log::debug!("Unsubscribing BookUpdater from {topic}");
1211                    msgbus::unsubscribe_topic(*topic, handler.clone());
1212                }
1213            }
1214
1215            // Check remaining subscriptions, if none then remove updater
1216            let still_subscribed = topics
1217                .iter()
1218                .any(|topic| msgbus::is_subscribed(topic.as_str(), handler.clone()));
1219            if !still_subscribed {
1220                self.book_updaters.remove(instrument_id);
1221                log::debug!("Removed BookUpdater for instrument ID {instrument_id}");
1222            }
1223        }
1224    }
1225
1226    fn maintain_book_snapshotter(&mut self, instrument_id: &InstrumentId) {
1227        if let Some(snapshotter) = self.book_snapshotters.get(instrument_id) {
1228            let topic = switchboard::get_book_snapshots_topic(
1229                *instrument_id,
1230                snapshotter.snap_info.interval_ms,
1231            );
1232
1233            // Check remaining snapshot subscriptions, if none then remove snapshotter
1234            if msgbus::subscriptions_count(topic.as_str()) == 0 {
1235                let timer_name = snapshotter.timer_name;
1236                self.book_snapshotters.remove(instrument_id);
1237                let mut clock = self.clock.borrow_mut();
1238                if clock.timer_exists(&timer_name) {
1239                    clock.cancel_timer(&timer_name);
1240                }
1241                log::debug!("Removed BookSnapshotter for instrument ID {instrument_id}");
1242            }
1243        }
1244    }
1245
1246    // -- RESPONSE HANDLERS -----------------------------------------------------------------------
1247
1248    fn handle_instrument_response(&self, instrument: InstrumentAny) {
1249        let mut cache = self.cache.as_ref().borrow_mut();
1250        if let Err(e) = cache.add_instrument(instrument) {
1251            log_error_on_cache_insert(&e);
1252        }
1253    }
1254
1255    fn handle_instruments(&self, instruments: &[InstrumentAny]) {
1256        // TODO: Improve by adding bulk update methods to cache and database
1257        let mut cache = self.cache.as_ref().borrow_mut();
1258        for instrument in instruments {
1259            if let Err(e) = cache.add_instrument(instrument.clone()) {
1260                log_error_on_cache_insert(&e);
1261            }
1262        }
1263    }
1264
1265    fn handle_quotes(&self, quotes: &[QuoteTick]) {
1266        if let Err(e) = self.cache.as_ref().borrow_mut().add_quotes(quotes) {
1267            log_error_on_cache_insert(&e);
1268        }
1269    }
1270
1271    fn handle_trades(&self, trades: &[TradeTick]) {
1272        if let Err(e) = self.cache.as_ref().borrow_mut().add_trades(trades) {
1273            log_error_on_cache_insert(&e);
1274        }
1275    }
1276
1277    fn handle_bars(&self, bars: &[Bar]) {
1278        if let Err(e) = self.cache.as_ref().borrow_mut().add_bars(bars) {
1279            log_error_on_cache_insert(&e);
1280        }
1281    }
1282
1283    fn handle_book_response(&self, book: &OrderBook) {
1284        log::debug!("Adding order book {} to cache", book.instrument_id);
1285        if let Err(e) = self
1286            .cache
1287            .as_ref()
1288            .borrow_mut()
1289            .add_order_book(book.clone())
1290        {
1291            log_error_on_cache_insert(&e);
1292        }
1293    }
1294
1295    // -- INTERNAL --------------------------------------------------------------------------------
1296
1297    #[allow(clippy::too_many_arguments)]
1298    fn setup_book_updater(
1299        &mut self,
1300        instrument_id: &InstrumentId,
1301        book_type: BookType,
1302        only_deltas: bool,
1303        managed: bool,
1304    ) -> anyhow::Result<()> {
1305        let mut cache = self.cache.borrow_mut();
1306        if managed && !cache.has_order_book(instrument_id) {
1307            let book = OrderBook::new(*instrument_id, book_type);
1308            log::debug!("Created {book}");
1309            cache.add_order_book(book)?;
1310        }
1311
1312        // Set up subscriptions
1313        let updater = Rc::new(BookUpdater::new(instrument_id, self.cache.clone()));
1314        self.book_updaters.insert(*instrument_id, updater.clone());
1315
1316        let handler = ShareableMessageHandler(updater);
1317
1318        let topic = switchboard::get_book_deltas_topic(*instrument_id);
1319        if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1320            msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1321        }
1322
1323        let topic = switchboard::get_book_depth10_topic(*instrument_id);
1324        if !only_deltas && !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1325            msgbus::subscribe(topic.into(), handler, Some(self.msgbus_priority));
1326        }
1327
1328        Ok(())
1329    }
1330
1331    fn create_bar_aggregator(
1332        &mut self,
1333        instrument: &InstrumentAny,
1334        bar_type: BarType,
1335    ) -> Box<dyn BarAggregator> {
1336        let cache = self.cache.clone();
1337
1338        let handler = move |bar: Bar| {
1339            if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
1340                log_error_on_cache_insert(&e);
1341            }
1342
1343            let topic = switchboard::get_bars_topic(bar.bar_type);
1344            msgbus::publish(topic, &bar as &dyn Any);
1345        };
1346
1347        let clock = self.clock.clone();
1348        let config = self.config.clone();
1349
1350        let price_precision = instrument.price_precision();
1351        let size_precision = instrument.size_precision();
1352
1353        if bar_type.spec().is_time_aggregated() {
1354            // Get time_bars_origin_offset from config
1355            let time_bars_origin_offset = config
1356                .time_bars_origins
1357                .get(&bar_type.spec().aggregation)
1358                .map(|duration| chrono::TimeDelta::from_std(*duration).unwrap_or_default());
1359
1360            Box::new(TimeBarAggregator::new(
1361                bar_type,
1362                price_precision,
1363                size_precision,
1364                clock,
1365                handler,
1366                config.time_bars_build_with_no_updates,
1367                config.time_bars_timestamp_on_close,
1368                config.time_bars_interval_type,
1369                time_bars_origin_offset,
1370                20,    // TODO: TBD, composite bar build delay
1371                false, // TODO: skip_first_non_full_bar, make it config dependent
1372            ))
1373        } else {
1374            match bar_type.spec().aggregation {
1375                BarAggregation::Tick => Box::new(TickBarAggregator::new(
1376                    bar_type,
1377                    price_precision,
1378                    size_precision,
1379                    handler,
1380                )) as Box<dyn BarAggregator>,
1381                BarAggregation::TickImbalance => Box::new(TickImbalanceBarAggregator::new(
1382                    bar_type,
1383                    price_precision,
1384                    size_precision,
1385                    handler,
1386                )) as Box<dyn BarAggregator>,
1387                BarAggregation::TickRuns => Box::new(TickRunsBarAggregator::new(
1388                    bar_type,
1389                    price_precision,
1390                    size_precision,
1391                    handler,
1392                )) as Box<dyn BarAggregator>,
1393                BarAggregation::Volume => Box::new(VolumeBarAggregator::new(
1394                    bar_type,
1395                    price_precision,
1396                    size_precision,
1397                    handler,
1398                )) as Box<dyn BarAggregator>,
1399                BarAggregation::VolumeImbalance => Box::new(VolumeImbalanceBarAggregator::new(
1400                    bar_type,
1401                    price_precision,
1402                    size_precision,
1403                    handler,
1404                )) as Box<dyn BarAggregator>,
1405                BarAggregation::VolumeRuns => Box::new(VolumeRunsBarAggregator::new(
1406                    bar_type,
1407                    price_precision,
1408                    size_precision,
1409                    handler,
1410                )) as Box<dyn BarAggregator>,
1411                BarAggregation::Value => Box::new(ValueBarAggregator::new(
1412                    bar_type,
1413                    price_precision,
1414                    size_precision,
1415                    handler,
1416                )) as Box<dyn BarAggregator>,
1417                BarAggregation::ValueImbalance => Box::new(ValueImbalanceBarAggregator::new(
1418                    bar_type,
1419                    price_precision,
1420                    size_precision,
1421                    handler,
1422                )) as Box<dyn BarAggregator>,
1423                BarAggregation::ValueRuns => Box::new(ValueRunsBarAggregator::new(
1424                    bar_type,
1425                    price_precision,
1426                    size_precision,
1427                    handler,
1428                )) as Box<dyn BarAggregator>,
1429                BarAggregation::Renko => Box::new(RenkoBarAggregator::new(
1430                    bar_type,
1431                    price_precision,
1432                    size_precision,
1433                    instrument.price_increment(),
1434                    handler,
1435                )) as Box<dyn BarAggregator>,
1436                _ => panic!(
1437                    "BarAggregation {:?} is not currently implemented. Supported aggregations: MILLISECOND, SECOND, MINUTE, HOUR, DAY, WEEK, MONTH, YEAR, TICK, TICK_IMBALANCE, TICK_RUNS, VOLUME, VOLUME_IMBALANCE, VOLUME_RUNS, VALUE, VALUE_IMBALANCE, VALUE_RUNS, RENKO",
1438                    bar_type.spec().aggregation
1439                ),
1440            }
1441        }
1442    }
1443
1444    fn start_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
1445        // Get the instrument for this bar type
1446        let instrument = {
1447            let cache = self.cache.borrow();
1448            cache
1449                .instrument(&bar_type.instrument_id())
1450                .ok_or_else(|| {
1451                    anyhow::anyhow!(
1452                        "Cannot start bar aggregation: no instrument found for {}",
1453                        bar_type.instrument_id(),
1454                    )
1455                })?
1456                .clone()
1457        };
1458
1459        // Use standard form of bar type as key
1460        let bar_key = bar_type.standard();
1461
1462        // Create or retrieve aggregator in Rc<RefCell>
1463        let aggregator = if let Some(rc) = self.bar_aggregators.get(&bar_key) {
1464            rc.clone()
1465        } else {
1466            let agg = self.create_bar_aggregator(&instrument, bar_type);
1467            let rc = Rc::new(RefCell::new(agg));
1468            self.bar_aggregators.insert(bar_key, rc.clone());
1469            rc
1470        };
1471
1472        // Subscribe to underlying data topics
1473        let mut handlers = Vec::new();
1474
1475        if bar_type.is_composite() {
1476            let topic = switchboard::get_bars_topic(bar_type.composite());
1477            let handler =
1478                ShareableMessageHandler(Rc::new(BarBarHandler::new(aggregator.clone(), bar_key)));
1479
1480            if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1481                msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1482            }
1483
1484            handlers.push((topic, handler));
1485        } else if bar_type.spec().price_type == PriceType::Last {
1486            let topic = switchboard::get_trades_topic(bar_type.instrument_id());
1487            let handler =
1488                ShareableMessageHandler(Rc::new(BarTradeHandler::new(aggregator.clone(), bar_key)));
1489
1490            if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1491                msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1492            }
1493
1494            handlers.push((topic, handler));
1495        } else {
1496            let topic = switchboard::get_quotes_topic(bar_type.instrument_id());
1497            let handler =
1498                ShareableMessageHandler(Rc::new(BarQuoteHandler::new(aggregator.clone(), bar_key)));
1499
1500            if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1501                msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1502            }
1503
1504            handlers.push((topic, handler));
1505        }
1506
1507        self.bar_aggregator_handlers.insert(bar_key, handlers);
1508
1509        // Setup time bar aggregator if needed (matches Cython _setup_bar_aggregator)
1510        self.setup_bar_aggregator(bar_type, false)?;
1511
1512        aggregator.borrow_mut().set_is_running(true);
1513
1514        Ok(())
1515    }
1516
1517    /// Sets up a bar aggregator, matching Cython _setup_bar_aggregator logic.
1518    ///
1519    /// This method handles historical mode, message bus subscriptions, and time bar aggregator setup.
1520    fn setup_bar_aggregator(&mut self, bar_type: BarType, historical: bool) -> anyhow::Result<()> {
1521        let bar_key = bar_type.standard();
1522        let aggregator = self.bar_aggregators.get(&bar_key).ok_or_else(|| {
1523            anyhow::anyhow!("Cannot setup bar aggregator: no aggregator found for {bar_type}")
1524        })?;
1525
1526        // Set historical mode and handler
1527        let handler: Box<dyn FnMut(Bar)> = if historical {
1528            // Historical handler - process_historical equivalent
1529            let cache = self.cache.clone();
1530            Box::new(move |bar: Bar| {
1531                if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
1532                    log_error_on_cache_insert(&e);
1533                }
1534                // In historical mode, bars are processed but not published to message bus
1535            })
1536        } else {
1537            // Regular handler - process equivalent
1538            let cache = self.cache.clone();
1539            Box::new(move |bar: Bar| {
1540                if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
1541                    log_error_on_cache_insert(&e);
1542                }
1543                let topic = switchboard::get_bars_topic(bar.bar_type);
1544                msgbus::publish(topic, &bar as &dyn Any);
1545            })
1546        };
1547
1548        aggregator
1549            .borrow_mut()
1550            .set_historical_mode(historical, handler);
1551
1552        // For TimeBarAggregator, set clock and start timer
1553        if bar_type.spec().is_time_aggregated() {
1554            use nautilus_common::clock::TestClock;
1555
1556            if historical {
1557                // Each aggregator gets its own independent clock
1558                let test_clock = Rc::new(RefCell::new(TestClock::new()));
1559                aggregator.borrow_mut().set_clock(test_clock);
1560                // Set weak reference for historical mode (start_timer called later from preprocess_historical_events)
1561                // Store weak reference so start_timer can use it when called later
1562                let aggregator_weak = Rc::downgrade(aggregator);
1563                aggregator.borrow_mut().set_aggregator_weak(aggregator_weak);
1564            } else {
1565                aggregator.borrow_mut().set_clock(self.clock.clone());
1566                aggregator
1567                    .borrow_mut()
1568                    .start_timer(Some(aggregator.clone()));
1569            }
1570        }
1571
1572        Ok(())
1573    }
1574
1575    fn stop_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
1576        let aggregator = self
1577            .bar_aggregators
1578            .remove(&bar_type.standard())
1579            .ok_or_else(|| {
1580                anyhow::anyhow!("Cannot stop bar aggregator: no aggregator to stop for {bar_type}")
1581            })?;
1582
1583        aggregator.borrow_mut().stop();
1584
1585        // Unsubscribe any registered message handlers
1586        let bar_key = bar_type.standard();
1587        if let Some(subs) = self.bar_aggregator_handlers.remove(&bar_key) {
1588            for (topic, handler) in subs {
1589                if msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1590                    msgbus::unsubscribe_topic(topic, handler);
1591                }
1592            }
1593        }
1594
1595        Ok(())
1596    }
1597}
1598
1599#[inline(always)]
1600fn log_error_on_cache_insert<T: Display>(e: &T) {
1601    log::error!("Error on cache insert: {e}");
1602}