Skip to main content

nautilus_data/engine/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a high-performance `DataEngine` for all environments.
17//!
18//! The `DataEngine` is the central component of the entire data stack.
19//! The data engines primary responsibility is to orchestrate interactions between
20//! the `DataClient` instances, and the rest of the platform. This includes sending
21//! requests to, and receiving responses from, data endpoints via its registered
22//! data clients.
23//!
24//! The engine employs a simple fan-in fan-out messaging pattern to execute
25//! `DataCommand` type messages, and process `DataResponse` messages or market data
26//! objects.
27//!
28//! Alternative implementations can be written on top of the generic engine - which
29//! just need to override the `execute`, `process`, `send` and `receive` methods.
30
31pub mod book;
32pub mod config;
33mod handlers;
34
35#[cfg(feature = "defi")]
36pub mod pool;
37
38use std::{
39    any::{Any, type_name},
40    cell::{Ref, RefCell},
41    collections::hash_map::Entry,
42    fmt::{Debug, Display},
43    num::NonZeroUsize,
44    rc::Rc,
45};
46
47use ahash::{AHashMap, AHashSet};
48use book::{BookSnapshotInfo, BookSnapshotter, BookUpdater};
49use config::DataEngineConfig;
50use futures::future::join_all;
51use handlers::{BarBarHandler, BarQuoteHandler, BarTradeHandler};
52use indexmap::IndexMap;
53use nautilus_common::{
54    cache::Cache,
55    clock::Clock,
56    logging::{RECV, RES},
57    messages::data::{
58        DataCommand, DataResponse, RequestCommand, SubscribeBars, SubscribeBookDeltas,
59        SubscribeBookDepth10, SubscribeBookSnapshots, SubscribeCommand, UnsubscribeBars,
60        UnsubscribeBookDeltas, UnsubscribeBookDepth10, UnsubscribeBookSnapshots,
61        UnsubscribeCommand,
62    },
63    msgbus::{
64        self, MStr, ShareableMessageHandler, Topic, TypedHandler, TypedIntoHandler,
65        switchboard::{self, MessagingSwitchboard},
66    },
67    runner::get_data_cmd_sender,
68    timer::{TimeEvent, TimeEventCallback},
69};
70use nautilus_core::{
71    UUID4, WeakCell,
72    correctness::{
73        FAILED, check_key_in_map, check_key_not_in_map, check_predicate_false, check_predicate_true,
74    },
75    datetime::millis_to_nanos_unchecked,
76};
77#[cfg(feature = "defi")]
78use nautilus_model::defi::DefiData;
79use nautilus_model::{
80    data::{
81        Bar, BarType, Data, DataType, FundingRateUpdate, IndexPriceUpdate, InstrumentClose,
82        MarkPriceUpdate, OrderBookDelta, OrderBookDeltas, OrderBookDepth10, QuoteTick, TradeTick,
83    },
84    enums::{AggregationSource, BarAggregation, BookType, PriceType, RecordFlag},
85    identifiers::{ClientId, InstrumentId, Venue},
86    instruments::{Instrument, InstrumentAny, SyntheticInstrument},
87    orderbook::OrderBook,
88};
89#[cfg(feature = "streaming")]
90use nautilus_persistence::backend::catalog::ParquetDataCatalog;
91use ustr::Ustr;
92
93#[cfg(feature = "defi")]
94#[allow(unused_imports)] // Brings DeFi impl blocks into scope
95use crate::defi::engine as _;
96#[cfg(feature = "defi")]
97use crate::engine::pool::PoolUpdater;
98use crate::{
99    aggregation::{
100        BarAggregator, RenkoBarAggregator, TickBarAggregator, TickImbalanceBarAggregator,
101        TickRunsBarAggregator, TimeBarAggregator, ValueBarAggregator, ValueImbalanceBarAggregator,
102        ValueRunsBarAggregator, VolumeBarAggregator, VolumeImbalanceBarAggregator,
103        VolumeRunsBarAggregator,
104    },
105    client::DataClientAdapter,
106};
107
108/// Typed subscription for bar aggregator handlers.
109///
110/// Stores the topic and handler for each data type so we can properly
111/// unsubscribe from the typed routers.
112#[derive(Clone)]
113pub enum BarAggregatorSubscription {
114    Bar {
115        topic: MStr<Topic>,
116        handler: TypedHandler<Bar>,
117    },
118    Trade {
119        topic: MStr<Topic>,
120        handler: TypedHandler<TradeTick>,
121    },
122    Quote {
123        topic: MStr<Topic>,
124        handler: TypedHandler<QuoteTick>,
125    },
126}
127
128impl Debug for BarAggregatorSubscription {
129    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
130        match self {
131            Self::Bar { topic, handler } => f
132                .debug_struct(stringify!(Bar))
133                .field("topic", topic)
134                .field("handler_id", &handler.id())
135                .finish(),
136            Self::Trade { topic, handler } => f
137                .debug_struct(stringify!(Trade))
138                .field("topic", topic)
139                .field("handler_id", &handler.id())
140                .finish(),
141            Self::Quote { topic, handler } => f
142                .debug_struct(stringify!(Quote))
143                .field("topic", topic)
144                .field("handler_id", &handler.id())
145                .finish(),
146        }
147    }
148}
149
150/// Provides a high-performance `DataEngine` for all environments.
151#[derive(Debug)]
152pub struct DataEngine {
153    pub(crate) clock: Rc<RefCell<dyn Clock>>,
154    pub(crate) cache: Rc<RefCell<Cache>>,
155    pub(crate) external_clients: AHashSet<ClientId>,
156    clients: IndexMap<ClientId, DataClientAdapter>,
157    default_client: Option<DataClientAdapter>,
158    #[cfg(feature = "streaming")]
159    catalogs: AHashMap<Ustr, ParquetDataCatalog>,
160    routing_map: IndexMap<Venue, ClientId>,
161    book_intervals: AHashMap<NonZeroUsize, AHashSet<InstrumentId>>,
162    book_deltas_subs: AHashSet<InstrumentId>,
163    book_depth10_subs: AHashSet<InstrumentId>,
164    book_updaters: AHashMap<InstrumentId, Rc<BookUpdater>>,
165    book_snapshotters: AHashMap<InstrumentId, Rc<BookSnapshotter>>,
166    bar_aggregators: AHashMap<BarType, Rc<RefCell<Box<dyn BarAggregator>>>>,
167    bar_aggregator_handlers: AHashMap<BarType, Vec<BarAggregatorSubscription>>,
168    _synthetic_quote_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
169    _synthetic_trade_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
170    buffered_deltas_map: AHashMap<InstrumentId, OrderBookDeltas>,
171    pub(crate) msgbus_priority: u8,
172    pub(crate) config: DataEngineConfig,
173    #[cfg(feature = "defi")]
174    pub(crate) pool_updaters: AHashMap<InstrumentId, Rc<PoolUpdater>>,
175    #[cfg(feature = "defi")]
176    pub(crate) pool_updaters_pending: AHashSet<InstrumentId>,
177    #[cfg(feature = "defi")]
178    pub(crate) pool_snapshot_pending: AHashSet<InstrumentId>,
179    #[cfg(feature = "defi")]
180    pub(crate) pool_event_buffers: AHashMap<InstrumentId, Vec<DefiData>>,
181}
182
183impl DataEngine {
184    /// Creates a new [`DataEngine`] instance.
185    #[must_use]
186    pub fn new(
187        clock: Rc<RefCell<dyn Clock>>,
188        cache: Rc<RefCell<Cache>>,
189        config: Option<DataEngineConfig>,
190    ) -> Self {
191        let config = config.unwrap_or_default();
192
193        let external_clients: AHashSet<ClientId> = config
194            .external_clients
195            .clone()
196            .unwrap_or_default()
197            .into_iter()
198            .collect();
199
200        Self {
201            clock,
202            cache,
203            external_clients,
204            clients: IndexMap::new(),
205            default_client: None,
206            #[cfg(feature = "streaming")]
207            catalogs: AHashMap::new(),
208            routing_map: IndexMap::new(),
209            book_intervals: AHashMap::new(),
210            book_deltas_subs: AHashSet::new(),
211            book_depth10_subs: AHashSet::new(),
212            book_updaters: AHashMap::new(),
213            book_snapshotters: AHashMap::new(),
214            bar_aggregators: AHashMap::new(),
215            bar_aggregator_handlers: AHashMap::new(),
216            _synthetic_quote_feeds: AHashMap::new(),
217            _synthetic_trade_feeds: AHashMap::new(),
218            buffered_deltas_map: AHashMap::new(),
219            msgbus_priority: 10, // High-priority for built-in component
220            config,
221            #[cfg(feature = "defi")]
222            pool_updaters: AHashMap::new(),
223            #[cfg(feature = "defi")]
224            pool_updaters_pending: AHashSet::new(),
225            #[cfg(feature = "defi")]
226            pool_snapshot_pending: AHashSet::new(),
227            #[cfg(feature = "defi")]
228            pool_event_buffers: AHashMap::new(),
229        }
230    }
231
232    /// Registers all message bus handlers for the data engine.
233    pub fn register_msgbus_handlers(engine: Rc<RefCell<Self>>) {
234        let weak = WeakCell::from(Rc::downgrade(&engine));
235
236        let weak1 = weak.clone();
237        msgbus::register_data_command_endpoint(
238            MessagingSwitchboard::data_engine_execute(),
239            TypedIntoHandler::from(move |cmd: DataCommand| {
240                if let Some(rc) = weak1.upgrade() {
241                    rc.borrow_mut().execute(cmd);
242                }
243            }),
244        );
245
246        msgbus::register_data_command_endpoint(
247            MessagingSwitchboard::data_engine_queue_execute(),
248            TypedIntoHandler::from(move |cmd: DataCommand| {
249                get_data_cmd_sender().clone().execute(cmd);
250            }),
251        );
252
253        // Register process handler (polymorphic - uses Any)
254        let weak2 = weak.clone();
255        msgbus::register_any(
256            MessagingSwitchboard::data_engine_process(),
257            ShareableMessageHandler::from_any(move |data: &dyn Any| {
258                if let Some(rc) = weak2.upgrade() {
259                    rc.borrow_mut().process(data);
260                }
261            }),
262        );
263
264        // Register process_data handler (typed - takes ownership)
265        let weak3 = weak.clone();
266        msgbus::register_data_endpoint(
267            MessagingSwitchboard::data_engine_process_data(),
268            TypedIntoHandler::from(move |data: Data| {
269                if let Some(rc) = weak3.upgrade() {
270                    rc.borrow_mut().process_data(data);
271                }
272            }),
273        );
274
275        // Register process_defi_data handler (typed - takes ownership)
276        #[cfg(feature = "defi")]
277        {
278            let weak4 = weak.clone();
279            msgbus::register_defi_data_endpoint(
280                MessagingSwitchboard::data_engine_process_defi_data(),
281                TypedIntoHandler::from(move |data: DefiData| {
282                    if let Some(rc) = weak4.upgrade() {
283                        rc.borrow_mut().process_defi_data(data);
284                    }
285                }),
286            );
287        }
288
289        let weak5 = weak;
290        msgbus::register_data_response_endpoint(
291            MessagingSwitchboard::data_engine_response(),
292            TypedIntoHandler::from(move |resp: DataResponse| {
293                if let Some(rc) = weak5.upgrade() {
294                    rc.borrow_mut().response(resp);
295                }
296            }),
297        );
298    }
299
300    /// Returns a read-only reference to the engines clock.
301    #[must_use]
302    pub fn get_clock(&self) -> Ref<'_, dyn Clock> {
303        self.clock.borrow()
304    }
305
306    /// Returns a read-only reference to the engines cache.
307    #[must_use]
308    pub fn get_cache(&self) -> Ref<'_, Cache> {
309        self.cache.borrow()
310    }
311
312    /// Returns the `Rc<RefCell<Cache>>` used by this engine.
313    #[must_use]
314    pub fn cache_rc(&self) -> Rc<RefCell<Cache>> {
315        Rc::clone(&self.cache)
316    }
317
318    /// Registers the `catalog` with the engine with an optional specific `name`.
319    ///
320    /// # Panics
321    ///
322    /// Panics if a catalog with the same `name` has already been registered.
323    #[cfg(feature = "streaming")]
324    pub fn register_catalog(&mut self, catalog: ParquetDataCatalog, name: Option<String>) {
325        let name = Ustr::from(name.as_deref().unwrap_or("catalog_0"));
326
327        check_key_not_in_map(&name, &self.catalogs, "name", "catalogs").expect(FAILED);
328
329        self.catalogs.insert(name, catalog);
330        log::info!("Registered catalog <{name}>");
331    }
332
333    /// Registers the `client` with the engine with an optional venue `routing`.
334    ///
335    ///
336    /// # Panics
337    ///
338    /// Panics if a client with the same client ID has already been registered.
339    pub fn register_client(&mut self, client: DataClientAdapter, routing: Option<Venue>) {
340        let client_id = client.client_id();
341
342        if let Some(default_client) = &self.default_client {
343            check_predicate_false(
344                default_client.client_id() == client.client_id(),
345                "client_id already registered as default client",
346            )
347            .expect(FAILED);
348        }
349
350        check_key_not_in_map(&client_id, &self.clients, "client_id", "clients").expect(FAILED);
351
352        if let Some(routing) = routing {
353            self.routing_map.insert(routing, client_id);
354            log::debug!("Set client {client_id} routing for {routing}");
355        }
356
357        if client.venue.is_none() && self.default_client.is_none() {
358            self.default_client = Some(client);
359            log::debug!("Registered client {client_id} for default routing");
360        } else {
361            self.clients.insert(client_id, client);
362            log::debug!("Registered client {client_id}");
363        }
364    }
365
366    /// Deregisters the client for the `client_id`.
367    ///
368    /// # Panics
369    ///
370    /// Panics if the client ID has not been registered.
371    pub fn deregister_client(&mut self, client_id: &ClientId) {
372        check_key_in_map(client_id, &self.clients, "client_id", "clients").expect(FAILED);
373
374        self.clients.shift_remove(client_id);
375        log::info!("Deregistered client {client_id}");
376    }
377
378    /// Registers the data `client` with the engine as the default routing client.
379    ///
380    /// When a specific venue routing cannot be found, this client will receive messages.
381    ///
382    /// # Warnings
383    ///
384    /// Any existing default routing client will be overwritten.
385    ///
386    /// # Panics
387    ///
388    /// Panics if a default client has already been registered.
389    pub fn register_default_client(&mut self, client: DataClientAdapter) {
390        check_predicate_true(
391            self.default_client.is_none(),
392            "default client already registered",
393        )
394        .expect(FAILED);
395
396        let client_id = client.client_id();
397
398        self.default_client = Some(client);
399        log::debug!("Registered default client {client_id}");
400    }
401
402    /// Starts all registered data clients and re-arms bar aggregator timers.
403    pub fn start(&mut self) {
404        for client in self.get_clients_mut() {
405            if let Err(e) = client.start() {
406                log::error!("{e}");
407            }
408        }
409
410        for aggregator in self.bar_aggregators.values() {
411            if aggregator.borrow().bar_type().spec().is_time_aggregated() {
412                aggregator
413                    .borrow_mut()
414                    .start_timer(Some(aggregator.clone()));
415            }
416        }
417    }
418
419    /// Stops all registered data clients and bar aggregator timers.
420    pub fn stop(&mut self) {
421        for client in self.get_clients_mut() {
422            if let Err(e) = client.stop() {
423                log::error!("{e}");
424            }
425        }
426
427        for aggregator in self.bar_aggregators.values() {
428            aggregator.borrow_mut().stop();
429        }
430    }
431
432    /// Resets all registered data clients and clears bar aggregator state.
433    pub fn reset(&mut self) {
434        for client in self.get_clients_mut() {
435            if let Err(e) = client.reset() {
436                log::error!("{e}");
437            }
438        }
439
440        let bar_types: Vec<BarType> = self.bar_aggregators.keys().copied().collect();
441        for bar_type in bar_types {
442            if let Err(e) = self.stop_bar_aggregator(bar_type) {
443                log::error!("Error stopping bar aggregator during reset for {bar_type}: {e}");
444            }
445        }
446    }
447
448    /// Disposes the engine, stopping all clients and canceling any timers.
449    pub fn dispose(&mut self) {
450        for client in self.get_clients_mut() {
451            if let Err(e) = client.dispose() {
452                log::error!("{e}");
453            }
454        }
455
456        self.clock.borrow_mut().cancel_timers();
457    }
458
459    /// Connects all registered data clients concurrently.
460    ///
461    /// Connection failures are logged but do not prevent the node from running.
462    pub async fn connect(&mut self) {
463        let futures: Vec<_> = self
464            .get_clients_mut()
465            .into_iter()
466            .map(|client| client.connect())
467            .collect();
468
469        let results = join_all(futures).await;
470
471        for error in results.into_iter().filter_map(Result::err) {
472            log::error!("Failed to connect data client: {error}");
473        }
474    }
475
476    /// Disconnects all registered data clients concurrently.
477    ///
478    /// # Errors
479    ///
480    /// Returns an error if any client fails to disconnect.
481    pub async fn disconnect(&mut self) -> anyhow::Result<()> {
482        let futures: Vec<_> = self
483            .get_clients_mut()
484            .into_iter()
485            .map(|client| client.disconnect())
486            .collect();
487
488        let results = join_all(futures).await;
489        let errors: Vec<_> = results.into_iter().filter_map(Result::err).collect();
490
491        if errors.is_empty() {
492            Ok(())
493        } else {
494            let error_msgs: Vec<_> = errors.iter().map(|e| e.to_string()).collect();
495            anyhow::bail!(
496                "Failed to disconnect data clients: {}",
497                error_msgs.join("; ")
498            )
499        }
500    }
501
502    /// Returns `true` if all registered data clients are currently connected.
503    #[must_use]
504    pub fn check_connected(&self) -> bool {
505        self.get_clients()
506            .iter()
507            .all(|client| client.is_connected())
508    }
509
510    /// Returns `true` if all registered data clients are currently disconnected.
511    #[must_use]
512    pub fn check_disconnected(&self) -> bool {
513        self.get_clients()
514            .iter()
515            .all(|client| !client.is_connected())
516    }
517
518    /// Returns connection status for each registered client.
519    #[must_use]
520    pub fn client_connection_status(&self) -> Vec<(ClientId, bool)> {
521        self.get_clients()
522            .into_iter()
523            .map(|client| (client.client_id(), client.is_connected()))
524            .collect()
525    }
526
527    /// Returns a list of all registered client IDs, including the default client if set.
528    #[must_use]
529    pub fn registered_clients(&self) -> Vec<ClientId> {
530        self.get_clients()
531            .into_iter()
532            .map(|client| client.client_id())
533            .collect()
534    }
535
536    // -- SUBSCRIPTIONS ---------------------------------------------------------------------------
537
538    pub(crate) fn collect_subscriptions<F, T>(&self, get_subs: F) -> Vec<T>
539    where
540        F: Fn(&DataClientAdapter) -> &AHashSet<T>,
541        T: Clone,
542    {
543        self.get_clients()
544            .into_iter()
545            .flat_map(get_subs)
546            .cloned()
547            .collect()
548    }
549
550    #[must_use]
551    pub fn get_clients(&self) -> Vec<&DataClientAdapter> {
552        let (default_opt, clients_map) = (&self.default_client, &self.clients);
553        let mut clients: Vec<&DataClientAdapter> = clients_map.values().collect();
554
555        if let Some(default) = default_opt {
556            clients.push(default);
557        }
558
559        clients
560    }
561
562    #[must_use]
563    pub fn get_clients_mut(&mut self) -> Vec<&mut DataClientAdapter> {
564        let (default_opt, clients_map) = (&mut self.default_client, &mut self.clients);
565        let mut clients: Vec<&mut DataClientAdapter> = clients_map.values_mut().collect();
566
567        if let Some(default) = default_opt {
568            clients.push(default);
569        }
570
571        clients
572    }
573
574    pub fn get_client(
575        &mut self,
576        client_id: Option<&ClientId>,
577        venue: Option<&Venue>,
578    ) -> Option<&mut DataClientAdapter> {
579        if let Some(client_id) = client_id {
580            // Explicit ID: first look in registered clients
581            if let Some(client) = self.clients.get_mut(client_id) {
582                return Some(client);
583            }
584
585            // Then check if it matches the default client
586            if let Some(default) = self.default_client.as_mut()
587                && default.client_id() == *client_id
588            {
589                return Some(default);
590            }
591
592            // Unknown explicit client
593            return None;
594        }
595
596        if let Some(v) = venue {
597            // Route by venue if mapped client still registered
598            if let Some(client_id) = self.routing_map.get(v) {
599                return self.clients.get_mut(client_id);
600            }
601        }
602
603        // Fallback to default client
604        self.get_default_client()
605    }
606
607    const fn get_default_client(&mut self) -> Option<&mut DataClientAdapter> {
608        self.default_client.as_mut()
609    }
610
611    /// Returns all custom data types currently subscribed across all clients.
612    #[must_use]
613    pub fn subscribed_custom_data(&self) -> Vec<DataType> {
614        self.collect_subscriptions(|client| &client.subscriptions_custom)
615    }
616
617    /// Returns all instrument IDs currently subscribed across all clients.
618    #[must_use]
619    pub fn subscribed_instruments(&self) -> Vec<InstrumentId> {
620        self.collect_subscriptions(|client| &client.subscriptions_instrument)
621    }
622
623    /// Returns all instrument IDs for which book delta subscriptions exist.
624    #[must_use]
625    pub fn subscribed_book_deltas(&self) -> Vec<InstrumentId> {
626        self.collect_subscriptions(|client| &client.subscriptions_book_deltas)
627    }
628
629    /// Returns all instrument IDs for which book depth10 subscriptions exist.
630    #[must_use]
631    pub fn subscribed_book_depth10(&self) -> Vec<InstrumentId> {
632        self.collect_subscriptions(|client| &client.subscriptions_book_depth10)
633    }
634
635    /// Returns all instrument IDs for which book snapshot subscriptions exist.
636    #[must_use]
637    pub fn subscribed_book_snapshots(&self) -> Vec<InstrumentId> {
638        self.book_intervals
639            .values()
640            .flat_map(|set| set.iter().copied())
641            .collect()
642    }
643
644    /// Returns all instrument IDs for which quote subscriptions exist.
645    #[must_use]
646    pub fn subscribed_quotes(&self) -> Vec<InstrumentId> {
647        self.collect_subscriptions(|client| &client.subscriptions_quotes)
648    }
649
650    /// Returns all instrument IDs for which trade subscriptions exist.
651    #[must_use]
652    pub fn subscribed_trades(&self) -> Vec<InstrumentId> {
653        self.collect_subscriptions(|client| &client.subscriptions_trades)
654    }
655
656    /// Returns all bar types currently subscribed across all clients.
657    #[must_use]
658    pub fn subscribed_bars(&self) -> Vec<BarType> {
659        self.collect_subscriptions(|client| &client.subscriptions_bars)
660    }
661
662    /// Returns all instrument IDs for which mark price subscriptions exist.
663    #[must_use]
664    pub fn subscribed_mark_prices(&self) -> Vec<InstrumentId> {
665        self.collect_subscriptions(|client| &client.subscriptions_mark_prices)
666    }
667
668    /// Returns all instrument IDs for which index price subscriptions exist.
669    #[must_use]
670    pub fn subscribed_index_prices(&self) -> Vec<InstrumentId> {
671        self.collect_subscriptions(|client| &client.subscriptions_index_prices)
672    }
673
674    /// Returns all instrument IDs for which funding rate subscriptions exist.
675    #[must_use]
676    pub fn subscribed_funding_rates(&self) -> Vec<InstrumentId> {
677        self.collect_subscriptions(|client| &client.subscriptions_funding_rates)
678    }
679
680    /// Returns all instrument IDs for which status subscriptions exist.
681    #[must_use]
682    pub fn subscribed_instrument_status(&self) -> Vec<InstrumentId> {
683        self.collect_subscriptions(|client| &client.subscriptions_instrument_status)
684    }
685
686    /// Returns all instrument IDs for which instrument close subscriptions exist.
687    #[must_use]
688    pub fn subscribed_instrument_close(&self) -> Vec<InstrumentId> {
689        self.collect_subscriptions(|client| &client.subscriptions_instrument_close)
690    }
691
692    // -- COMMANDS --------------------------------------------------------------------------------
693
694    /// Executes a `DataCommand` by delegating to subscribe, unsubscribe, or request handlers.
695    ///
696    /// Errors during execution are logged.
697    pub fn execute(&mut self, cmd: DataCommand) {
698        if let Err(e) = match cmd {
699            DataCommand::Subscribe(c) => self.execute_subscribe(&c),
700            DataCommand::Unsubscribe(c) => self.execute_unsubscribe(&c),
701            DataCommand::Request(c) => self.execute_request(c),
702            #[cfg(feature = "defi")]
703            DataCommand::DefiRequest(c) => self.execute_defi_request(c),
704            #[cfg(feature = "defi")]
705            DataCommand::DefiSubscribe(c) => self.execute_defi_subscribe(&c),
706            #[cfg(feature = "defi")]
707            DataCommand::DefiUnsubscribe(c) => self.execute_defi_unsubscribe(&c),
708            _ => {
709                log::warn!("Unhandled DataCommand variant");
710                Ok(())
711            }
712        } {
713            log::error!("{e}");
714        }
715    }
716
717    /// Handles a subscribe command, updating internal state and forwarding to the client.
718    ///
719    /// # Errors
720    ///
721    /// Returns an error if the subscription is invalid (e.g., synthetic instrument for book data),
722    /// or if the underlying client operation fails.
723    pub fn execute_subscribe(&mut self, cmd: &SubscribeCommand) -> anyhow::Result<()> {
724        // Update internal engine state
725        match &cmd {
726            SubscribeCommand::BookDeltas(cmd) => self.subscribe_book_deltas(cmd)?,
727            SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd)?,
728            SubscribeCommand::BookSnapshots(cmd) => {
729                // Handles client forwarding internally (forwards as BookDeltas)
730                return self.subscribe_book_snapshots(cmd);
731            }
732            SubscribeCommand::Bars(cmd) => self.subscribe_bars(cmd)?,
733            _ => {} // Do nothing else
734        }
735
736        if let Some(client_id) = cmd.client_id()
737            && self.external_clients.contains(client_id)
738        {
739            if self.config.debug {
740                log::debug!("Skipping subscribe command for external client {client_id}: {cmd:?}",);
741            }
742            return Ok(());
743        }
744
745        if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
746            client.execute_subscribe(cmd);
747        } else {
748            log::error!(
749                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
750                cmd.client_id(),
751                cmd.venue(),
752            );
753        }
754
755        Ok(())
756    }
757
758    /// Handles an unsubscribe command, updating internal state and forwarding to the client.
759    ///
760    /// # Errors
761    ///
762    /// Returns an error if the underlying client operation fails.
763    pub fn execute_unsubscribe(&mut self, cmd: &UnsubscribeCommand) -> anyhow::Result<()> {
764        match &cmd {
765            UnsubscribeCommand::BookDeltas(cmd) => self.unsubscribe_book_deltas(cmd)?,
766            UnsubscribeCommand::BookDepth10(cmd) => self.unsubscribe_book_depth10(cmd)?,
767            UnsubscribeCommand::BookSnapshots(cmd) => {
768                // Handles client forwarding internally (forwards as BookDeltas)
769                return self.unsubscribe_book_snapshots(cmd);
770            }
771            UnsubscribeCommand::Bars(cmd) => self.unsubscribe_bars(cmd)?,
772            _ => {} // Do nothing else
773        }
774
775        if let Some(client_id) = cmd.client_id()
776            && self.external_clients.contains(client_id)
777        {
778            if self.config.debug {
779                log::debug!(
780                    "Skipping unsubscribe command for external client {client_id}: {cmd:?}",
781                );
782            }
783            return Ok(());
784        }
785
786        if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
787            client.execute_unsubscribe(cmd);
788        } else {
789            log::error!(
790                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
791                cmd.client_id(),
792                cmd.venue(),
793            );
794        }
795
796        Ok(())
797    }
798
799    /// Sends a [`RequestCommand`] to a suitable data client implementation.
800    ///
801    /// # Errors
802    ///
803    /// Returns an error if no client is found for the given client ID or venue,
804    /// or if the client fails to process the request.
805    pub fn execute_request(&mut self, req: RequestCommand) -> anyhow::Result<()> {
806        // Skip requests for external clients
807        if let Some(cid) = req.client_id()
808            && self.external_clients.contains(cid)
809        {
810            if self.config.debug {
811                log::debug!("Skipping data request for external client {cid}: {req:?}");
812            }
813            return Ok(());
814        }
815        if let Some(client) = self.get_client(req.client_id(), req.venue()) {
816            match req {
817                RequestCommand::Data(req) => client.request_data(req),
818                RequestCommand::Instrument(req) => client.request_instrument(req),
819                RequestCommand::Instruments(req) => client.request_instruments(req),
820                RequestCommand::BookSnapshot(req) => client.request_book_snapshot(req),
821                RequestCommand::BookDepth(req) => client.request_book_depth(req),
822                RequestCommand::Quotes(req) => client.request_quotes(req),
823                RequestCommand::Trades(req) => client.request_trades(req),
824                RequestCommand::FundingRates(req) => client.request_funding_rates(req),
825                RequestCommand::Bars(req) => client.request_bars(req),
826            }
827        } else {
828            anyhow::bail!(
829                "Cannot handle request: no client found for {:?} {:?}",
830                req.client_id(),
831                req.venue()
832            );
833        }
834    }
835
836    /// Processes a dynamically-typed data message.
837    ///
838    /// Currently supports `InstrumentAny` and `FundingRateUpdate`; unrecognized types are logged as errors.
839    pub fn process(&mut self, data: &dyn Any) {
840        // TODO: Eventually these can be added to the `Data` enum (C/Cython blocking), process here for now
841        if let Some(instrument) = data.downcast_ref::<InstrumentAny>() {
842            self.handle_instrument(instrument.clone());
843        } else if let Some(funding_rate) = data.downcast_ref::<FundingRateUpdate>() {
844            self.handle_funding_rate(*funding_rate);
845        } else {
846            log::error!("Cannot process data {data:?}, type is unrecognized");
847        }
848
849        // TODO: Add custom data handling here
850    }
851
852    /// Processes a `Data` enum instance, dispatching to appropriate handlers.
853    pub fn process_data(&mut self, data: Data) {
854        match data {
855            Data::Delta(delta) => self.handle_delta(delta),
856            Data::Deltas(deltas) => self.handle_deltas(deltas.into_inner()),
857            Data::Depth10(depth) => self.handle_depth10(*depth),
858            Data::Quote(quote) => self.handle_quote(quote),
859            Data::Trade(trade) => self.handle_trade(trade),
860            Data::Bar(bar) => self.handle_bar(bar),
861            Data::MarkPriceUpdate(mark_price) => self.handle_mark_price(mark_price),
862            Data::IndexPriceUpdate(index_price) => self.handle_index_price(index_price),
863            Data::InstrumentClose(close) => self.handle_instrument_close(close),
864        }
865    }
866
867    /// Processes a `DataResponse`, handling and publishing the response message.
868    pub fn response(&self, resp: DataResponse) {
869        log::debug!("{RECV}{RES} {resp:?}");
870
871        let correlation_id = *resp.correlation_id();
872
873        match &resp {
874            DataResponse::Instrument(r) => {
875                self.handle_instrument_response(r.data.clone());
876            }
877            DataResponse::Instruments(r) => {
878                self.handle_instruments(&r.data);
879            }
880            DataResponse::Quotes(r) => {
881                if !log_if_empty_response(&r.data, &r.instrument_id, &correlation_id) {
882                    self.handle_quotes(&r.data);
883                }
884            }
885            DataResponse::Trades(r) => {
886                if !log_if_empty_response(&r.data, &r.instrument_id, &correlation_id) {
887                    self.handle_trades(&r.data);
888                }
889            }
890            DataResponse::FundingRates(r) => {
891                if !log_if_empty_response(&r.data, &r.instrument_id, &correlation_id) {
892                    self.handle_funding_rates(&r.data);
893                }
894            }
895            DataResponse::Bars(r) => {
896                if !log_if_empty_response(&r.data, &r.bar_type, &correlation_id) {
897                    self.handle_bars(&r.data);
898                }
899            }
900            DataResponse::Book(r) => self.handle_book_response(&r.data),
901            _ => todo!("Handle other response types"),
902        }
903
904        msgbus::send_response(&correlation_id, resp);
905    }
906
907    // -- DATA HANDLERS ---------------------------------------------------------------------------
908
909    fn handle_instrument(&mut self, instrument: InstrumentAny) {
910        log::debug!("Handling instrument: {}", instrument.id());
911
912        if let Err(e) = self
913            .cache
914            .as_ref()
915            .borrow_mut()
916            .add_instrument(instrument.clone())
917        {
918            log_error_on_cache_insert(&e);
919        }
920
921        let topic = switchboard::get_instrument_topic(instrument.id());
922        log::debug!("Publishing instrument to topic: {topic}");
923        msgbus::publish_any(topic, &instrument);
924    }
925
926    fn handle_delta(&mut self, delta: OrderBookDelta) {
927        let deltas = if self.config.buffer_deltas {
928            if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&delta.instrument_id) {
929                buffered_deltas.deltas.push(delta);
930                buffered_deltas.flags = delta.flags;
931                buffered_deltas.sequence = delta.sequence;
932                buffered_deltas.ts_event = delta.ts_event;
933                buffered_deltas.ts_init = delta.ts_init;
934            } else {
935                let buffered_deltas = OrderBookDeltas::new(delta.instrument_id, vec![delta]);
936                self.buffered_deltas_map
937                    .insert(delta.instrument_id, buffered_deltas);
938            }
939
940            if !RecordFlag::F_LAST.matches(delta.flags) {
941                return; // Not the last delta for event
942            }
943
944            // SAFETY: We know the deltas exists already
945            self.buffered_deltas_map
946                .remove(&delta.instrument_id)
947                .unwrap()
948        } else {
949            OrderBookDeltas::new(delta.instrument_id, vec![delta])
950        };
951
952        let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
953        msgbus::publish_deltas(topic, &deltas);
954    }
955
956    fn handle_deltas(&mut self, deltas: OrderBookDeltas) {
957        if self.config.buffer_deltas {
958            let instrument_id = deltas.instrument_id;
959
960            for delta in deltas.deltas {
961                if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&instrument_id) {
962                    buffered_deltas.deltas.push(delta);
963                    buffered_deltas.flags = delta.flags;
964                    buffered_deltas.sequence = delta.sequence;
965                    buffered_deltas.ts_event = delta.ts_event;
966                    buffered_deltas.ts_init = delta.ts_init;
967                } else {
968                    let buffered_deltas = OrderBookDeltas::new(instrument_id, vec![delta]);
969                    self.buffered_deltas_map
970                        .insert(instrument_id, buffered_deltas);
971                }
972
973                if RecordFlag::F_LAST.matches(delta.flags) {
974                    // SAFETY: We know the deltas exist already
975                    let deltas_to_publish =
976                        self.buffered_deltas_map.remove(&instrument_id).unwrap();
977                    let topic = switchboard::get_book_deltas_topic(instrument_id);
978                    msgbus::publish_deltas(topic, &deltas_to_publish);
979                }
980            }
981        } else {
982            let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
983            msgbus::publish_deltas(topic, &deltas);
984        };
985    }
986
987    fn handle_depth10(&mut self, depth: OrderBookDepth10) {
988        let topic = switchboard::get_book_depth10_topic(depth.instrument_id);
989        msgbus::publish_depth10(topic, &depth);
990    }
991
992    fn handle_quote(&mut self, quote: QuoteTick) {
993        if let Err(e) = self.cache.as_ref().borrow_mut().add_quote(quote) {
994            log_error_on_cache_insert(&e);
995        }
996
997        // TODO: Handle synthetics
998
999        let topic = switchboard::get_quotes_topic(quote.instrument_id);
1000        msgbus::publish_quote(topic, &quote);
1001    }
1002
1003    fn handle_trade(&mut self, trade: TradeTick) {
1004        if let Err(e) = self.cache.as_ref().borrow_mut().add_trade(trade) {
1005            log_error_on_cache_insert(&e);
1006        }
1007
1008        // TODO: Handle synthetics
1009
1010        let topic = switchboard::get_trades_topic(trade.instrument_id);
1011        msgbus::publish_trade(topic, &trade);
1012    }
1013
1014    fn handle_bar(&mut self, bar: Bar) {
1015        // TODO: Handle additional bar logic
1016        if self.config.validate_data_sequence
1017            && let Some(last_bar) = self.cache.as_ref().borrow().bar(&bar.bar_type)
1018        {
1019            if bar.ts_event < last_bar.ts_event {
1020                log::warn!(
1021                    "Bar {bar} was prior to last bar `ts_event` {}",
1022                    last_bar.ts_event
1023                );
1024                return; // Bar is out of sequence
1025            }
1026            if bar.ts_init < last_bar.ts_init {
1027                log::warn!(
1028                    "Bar {bar} was prior to last bar `ts_init` {}",
1029                    last_bar.ts_init
1030                );
1031                return; // Bar is out of sequence
1032            }
1033            // TODO: Implement `bar.is_revision` logic
1034        }
1035
1036        if let Err(e) = self.cache.as_ref().borrow_mut().add_bar(bar) {
1037            log_error_on_cache_insert(&e);
1038        }
1039
1040        let topic = switchboard::get_bars_topic(bar.bar_type);
1041        msgbus::publish_bar(topic, &bar);
1042    }
1043
1044    fn handle_mark_price(&mut self, mark_price: MarkPriceUpdate) {
1045        if let Err(e) = self.cache.as_ref().borrow_mut().add_mark_price(mark_price) {
1046            log_error_on_cache_insert(&e);
1047        }
1048
1049        let topic = switchboard::get_mark_price_topic(mark_price.instrument_id);
1050        msgbus::publish_mark_price(topic, &mark_price);
1051    }
1052
1053    fn handle_index_price(&mut self, index_price: IndexPriceUpdate) {
1054        if let Err(e) = self
1055            .cache
1056            .as_ref()
1057            .borrow_mut()
1058            .add_index_price(index_price)
1059        {
1060            log_error_on_cache_insert(&e);
1061        }
1062
1063        let topic = switchboard::get_index_price_topic(index_price.instrument_id);
1064        msgbus::publish_index_price(topic, &index_price);
1065    }
1066
1067    /// Handles a funding rate update by adding it to the cache and publishing to the message bus.
1068    pub fn handle_funding_rate(&mut self, funding_rate: FundingRateUpdate) {
1069        if let Err(e) = self
1070            .cache
1071            .as_ref()
1072            .borrow_mut()
1073            .add_funding_rate(funding_rate)
1074        {
1075            log_error_on_cache_insert(&e);
1076        }
1077
1078        let topic = switchboard::get_funding_rate_topic(funding_rate.instrument_id);
1079        msgbus::publish_funding_rate(topic, &funding_rate);
1080    }
1081
1082    fn handle_instrument_close(&mut self, close: InstrumentClose) {
1083        let topic = switchboard::get_instrument_close_topic(close.instrument_id);
1084        msgbus::publish_any(topic, &close);
1085    }
1086
1087    // -- SUBSCRIPTION HANDLERS -------------------------------------------------------------------
1088
1089    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
1090        if cmd.instrument_id.is_synthetic() {
1091            anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
1092        }
1093
1094        self.book_deltas_subs.insert(cmd.instrument_id);
1095        self.setup_book_updater(&cmd.instrument_id, cmd.book_type, true, cmd.managed)?;
1096
1097        Ok(())
1098    }
1099
1100    fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
1101        if cmd.instrument_id.is_synthetic() {
1102            anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDepth10` data");
1103        }
1104
1105        self.book_depth10_subs.insert(cmd.instrument_id);
1106        self.setup_book_updater(&cmd.instrument_id, cmd.book_type, false, cmd.managed)?;
1107
1108        Ok(())
1109    }
1110
1111    fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
1112        if cmd.instrument_id.is_synthetic() {
1113            anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
1114        }
1115
1116        // Track snapshot intervals per instrument, and set up timer on first subscription
1117        let first_for_interval = match self.book_intervals.entry(cmd.interval_ms) {
1118            Entry::Vacant(e) => {
1119                let mut set = AHashSet::new();
1120                set.insert(cmd.instrument_id);
1121                e.insert(set);
1122                true
1123            }
1124            Entry::Occupied(mut e) => {
1125                e.get_mut().insert(cmd.instrument_id);
1126                false
1127            }
1128        };
1129
1130        if first_for_interval {
1131            // Initialize snapshotter and schedule its timer
1132            let interval_ns = millis_to_nanos_unchecked(cmd.interval_ms.get() as f64);
1133            let topic = switchboard::get_book_snapshots_topic(cmd.instrument_id, cmd.interval_ms);
1134
1135            let snap_info = BookSnapshotInfo {
1136                instrument_id: cmd.instrument_id,
1137                venue: cmd.instrument_id.venue,
1138                is_composite: cmd.instrument_id.symbol.is_composite(),
1139                root: Ustr::from(cmd.instrument_id.symbol.root()),
1140                topic,
1141                interval_ms: cmd.interval_ms,
1142            };
1143
1144            // Schedule the first snapshot at the next interval boundary
1145            let now_ns = self.clock.borrow().timestamp_ns().as_u64();
1146            let start_time_ns = now_ns - (now_ns % interval_ns) + interval_ns;
1147
1148            let snapshotter = Rc::new(BookSnapshotter::new(snap_info, self.cache.clone()));
1149            self.book_snapshotters
1150                .insert(cmd.instrument_id, snapshotter.clone());
1151            let timer_name = snapshotter.timer_name;
1152
1153            let callback_fn: Rc<dyn Fn(TimeEvent)> =
1154                Rc::new(move |event| snapshotter.snapshot(event));
1155            let callback = TimeEventCallback::from(callback_fn);
1156
1157            self.clock
1158                .borrow_mut()
1159                .set_timer_ns(
1160                    &timer_name,
1161                    interval_ns,
1162                    Some(start_time_ns.into()),
1163                    None,
1164                    Some(callback),
1165                    None,
1166                    None,
1167                )
1168                .expect(FAILED);
1169        }
1170
1171        // Only set up book updater if not already subscribed to deltas
1172        if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
1173            self.setup_book_updater(&cmd.instrument_id, cmd.book_type, false, true)?;
1174        }
1175
1176        if let Some(client_id) = cmd.client_id.as_ref()
1177            && self.external_clients.contains(client_id)
1178        {
1179            if self.config.debug {
1180                log::debug!("Skipping subscribe command for external client {client_id}: {cmd:?}",);
1181            }
1182            return Ok(());
1183        }
1184
1185        log::debug!(
1186            "Forwarding BookSnapshots as BookDeltas for {}, client_id={:?}, venue={:?}",
1187            cmd.instrument_id,
1188            cmd.client_id,
1189            cmd.venue,
1190        );
1191
1192        if let Some(client) = self.get_client(cmd.client_id.as_ref(), cmd.venue.as_ref()) {
1193            let deltas_cmd = SubscribeBookDeltas::new(
1194                cmd.instrument_id,
1195                cmd.book_type,
1196                cmd.client_id,
1197                cmd.venue,
1198                UUID4::new(),
1199                cmd.ts_init,
1200                cmd.depth,
1201                true, // managed
1202                Some(cmd.command_id),
1203                cmd.params.clone(),
1204            );
1205            log::debug!(
1206                "Calling client.execute_subscribe for BookDeltas: {}",
1207                cmd.instrument_id
1208            );
1209            client.execute_subscribe(&SubscribeCommand::BookDeltas(deltas_cmd));
1210        } else {
1211            log::error!(
1212                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
1213                cmd.client_id,
1214                cmd.venue,
1215            );
1216        }
1217
1218        Ok(())
1219    }
1220
1221    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
1222        match cmd.bar_type.aggregation_source() {
1223            AggregationSource::Internal => {
1224                if !self.bar_aggregators.contains_key(&cmd.bar_type.standard()) {
1225                    self.start_bar_aggregator(cmd.bar_type)?;
1226                }
1227            }
1228            AggregationSource::External => {
1229                if cmd.bar_type.instrument_id().is_synthetic() {
1230                    anyhow::bail!(
1231                        "Cannot subscribe for externally aggregated synthetic instrument bar data"
1232                    );
1233                }
1234            }
1235        }
1236
1237        Ok(())
1238    }
1239
1240    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
1241        if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
1242            log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
1243            return Ok(());
1244        }
1245
1246        self.book_deltas_subs.remove(&cmd.instrument_id);
1247
1248        let topics = vec![
1249            switchboard::get_book_deltas_topic(cmd.instrument_id),
1250            switchboard::get_book_depth10_topic(cmd.instrument_id),
1251            // TODO: Unsubscribe from snapshots?
1252        ];
1253
1254        self.maintain_book_updater(&cmd.instrument_id, &topics);
1255        self.maintain_book_snapshotter(&cmd.instrument_id);
1256
1257        Ok(())
1258    }
1259
1260    fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
1261        if !self.book_depth10_subs.contains(&cmd.instrument_id) {
1262            log::warn!("Cannot unsubscribe from `OrderBookDepth10` data: not subscribed");
1263            return Ok(());
1264        }
1265
1266        self.book_depth10_subs.remove(&cmd.instrument_id);
1267
1268        let topics = vec![
1269            switchboard::get_book_deltas_topic(cmd.instrument_id),
1270            switchboard::get_book_depth10_topic(cmd.instrument_id),
1271        ];
1272
1273        self.maintain_book_updater(&cmd.instrument_id, &topics);
1274        self.maintain_book_snapshotter(&cmd.instrument_id);
1275
1276        Ok(())
1277    }
1278
1279    fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
1280        let is_subscribed = self
1281            .book_intervals
1282            .values()
1283            .any(|set| set.contains(&cmd.instrument_id));
1284
1285        if !is_subscribed {
1286            log::warn!("Cannot unsubscribe from `OrderBook` snapshots: not subscribed");
1287            return Ok(());
1288        }
1289
1290        // Remove instrument from interval tracking, and drop empty intervals
1291        let mut to_remove = Vec::new();
1292        for (interval, set) in &mut self.book_intervals {
1293            if set.remove(&cmd.instrument_id) && set.is_empty() {
1294                to_remove.push(*interval);
1295            }
1296        }
1297
1298        for interval in to_remove {
1299            self.book_intervals.remove(&interval);
1300        }
1301
1302        let topics = vec![
1303            switchboard::get_book_deltas_topic(cmd.instrument_id),
1304            switchboard::get_book_depth10_topic(cmd.instrument_id),
1305        ];
1306
1307        self.maintain_book_updater(&cmd.instrument_id, &topics);
1308        self.maintain_book_snapshotter(&cmd.instrument_id);
1309
1310        let still_in_intervals = self
1311            .book_intervals
1312            .values()
1313            .any(|set| set.contains(&cmd.instrument_id));
1314
1315        if !still_in_intervals && !self.book_deltas_subs.contains(&cmd.instrument_id) {
1316            if let Some(client_id) = cmd.client_id.as_ref()
1317                && self.external_clients.contains(client_id)
1318            {
1319                return Ok(());
1320            }
1321
1322            if let Some(client) = self.get_client(cmd.client_id.as_ref(), cmd.venue.as_ref()) {
1323                let deltas_cmd = UnsubscribeBookDeltas::new(
1324                    cmd.instrument_id,
1325                    cmd.client_id,
1326                    cmd.venue,
1327                    UUID4::new(),
1328                    cmd.ts_init,
1329                    Some(cmd.command_id),
1330                    cmd.params.clone(),
1331                );
1332                client.execute_unsubscribe(&UnsubscribeCommand::BookDeltas(deltas_cmd));
1333            }
1334        }
1335
1336        Ok(())
1337    }
1338
1339    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
1340        let bar_type = cmd.bar_type;
1341
1342        // Don't remove aggregator if other exact-topic subscribers still exist
1343        let topic = switchboard::get_bars_topic(bar_type.standard());
1344        if msgbus::exact_subscriber_count_bars(topic) > 0 {
1345            return Ok(());
1346        }
1347
1348        if self.bar_aggregators.contains_key(&bar_type.standard())
1349            && let Err(e) = self.stop_bar_aggregator(bar_type)
1350        {
1351            log::error!("Error stopping bar aggregator for {bar_type}: {e}");
1352        }
1353
1354        // After stopping a composite, check if the source aggregator is now orphaned
1355        if bar_type.is_composite() {
1356            let source_type = bar_type.composite();
1357            let source_topic = switchboard::get_bars_topic(source_type);
1358            if msgbus::exact_subscriber_count_bars(source_topic) == 0
1359                && self.bar_aggregators.contains_key(&source_type)
1360                && let Err(e) = self.stop_bar_aggregator(source_type)
1361            {
1362                log::error!("Error stopping source bar aggregator for {source_type}: {e}");
1363            }
1364        }
1365
1366        Ok(())
1367    }
1368
1369    fn maintain_book_updater(&mut self, instrument_id: &InstrumentId, _topics: &[MStr<Topic>]) {
1370        let Some(updater) = self.book_updaters.get(instrument_id) else {
1371            return;
1372        };
1373
1374        // Check which internal subscriptions still exist
1375        let has_deltas = self.book_deltas_subs.contains(instrument_id);
1376        let has_depth10 = self.book_depth10_subs.contains(instrument_id);
1377
1378        let deltas_topic = switchboard::get_book_deltas_topic(*instrument_id);
1379        let depth_topic = switchboard::get_book_depth10_topic(*instrument_id);
1380        let deltas_handler: TypedHandler<OrderBookDeltas> = TypedHandler::new(updater.clone());
1381        let depth_handler: TypedHandler<OrderBookDepth10> = TypedHandler::new(updater.clone());
1382
1383        // Unsubscribe from topics that no longer have subscriptions
1384        if !has_deltas {
1385            msgbus::unsubscribe_book_deltas(deltas_topic.into(), &deltas_handler);
1386        }
1387        if !has_depth10 {
1388            msgbus::unsubscribe_book_depth10(depth_topic.into(), &depth_handler);
1389        }
1390
1391        // Remove BookUpdater only when no subscriptions remain
1392        if !has_deltas && !has_depth10 {
1393            self.book_updaters.remove(instrument_id);
1394            log::debug!("Removed BookUpdater for instrument ID {instrument_id}");
1395        }
1396    }
1397
1398    fn maintain_book_snapshotter(&mut self, instrument_id: &InstrumentId) {
1399        if let Some(snapshotter) = self.book_snapshotters.get(instrument_id) {
1400            let topic = switchboard::get_book_snapshots_topic(
1401                *instrument_id,
1402                snapshotter.snap_info.interval_ms,
1403            );
1404
1405            // Check remaining snapshot subscriptions, if none then remove snapshotter
1406            if msgbus::subscriber_count_book_snapshots(topic) == 0 {
1407                let timer_name = snapshotter.timer_name;
1408                self.book_snapshotters.remove(instrument_id);
1409                let mut clock = self.clock.borrow_mut();
1410                if clock.timer_exists(&timer_name) {
1411                    clock.cancel_timer(&timer_name);
1412                }
1413                log::debug!("Removed BookSnapshotter for instrument ID {instrument_id}");
1414            }
1415        }
1416    }
1417
1418    // -- RESPONSE HANDLERS -----------------------------------------------------------------------
1419
1420    fn handle_instrument_response(&self, instrument: InstrumentAny) {
1421        let mut cache = self.cache.as_ref().borrow_mut();
1422        if let Err(e) = cache.add_instrument(instrument) {
1423            log_error_on_cache_insert(&e);
1424        }
1425    }
1426
1427    fn handle_instruments(&self, instruments: &[InstrumentAny]) {
1428        // TODO: Improve by adding bulk update methods to cache and database
1429        let mut cache = self.cache.as_ref().borrow_mut();
1430        for instrument in instruments {
1431            if let Err(e) = cache.add_instrument(instrument.clone()) {
1432                log_error_on_cache_insert(&e);
1433            }
1434        }
1435    }
1436
1437    fn handle_quotes(&self, quotes: &[QuoteTick]) {
1438        if let Err(e) = self.cache.as_ref().borrow_mut().add_quotes(quotes) {
1439            log_error_on_cache_insert(&e);
1440        }
1441    }
1442
1443    fn handle_trades(&self, trades: &[TradeTick]) {
1444        if let Err(e) = self.cache.as_ref().borrow_mut().add_trades(trades) {
1445            log_error_on_cache_insert(&e);
1446        }
1447    }
1448
1449    fn handle_funding_rates(&self, funding_rates: &[FundingRateUpdate]) {
1450        if let Err(e) = self
1451            .cache
1452            .as_ref()
1453            .borrow_mut()
1454            .add_funding_rates(funding_rates)
1455        {
1456            log_error_on_cache_insert(&e);
1457        }
1458    }
1459
1460    fn handle_bars(&self, bars: &[Bar]) {
1461        if let Err(e) = self.cache.as_ref().borrow_mut().add_bars(bars) {
1462            log_error_on_cache_insert(&e);
1463        }
1464    }
1465
1466    fn handle_book_response(&self, book: &OrderBook) {
1467        log::debug!("Adding order book {} to cache", book.instrument_id);
1468        if let Err(e) = self
1469            .cache
1470            .as_ref()
1471            .borrow_mut()
1472            .add_order_book(book.clone())
1473        {
1474            log_error_on_cache_insert(&e);
1475        }
1476    }
1477
1478    // -- INTERNAL --------------------------------------------------------------------------------
1479
1480    #[allow(clippy::too_many_arguments)]
1481    fn setup_book_updater(
1482        &mut self,
1483        instrument_id: &InstrumentId,
1484        book_type: BookType,
1485        only_deltas: bool,
1486        managed: bool,
1487    ) -> anyhow::Result<()> {
1488        let mut cache = self.cache.borrow_mut();
1489        if managed && !cache.has_order_book(instrument_id) {
1490            let book = OrderBook::new(*instrument_id, book_type);
1491            log::debug!("Created {book}");
1492            cache.add_order_book(book)?;
1493        }
1494
1495        // Reuse existing BookUpdater or create a new one
1496        let updater = self
1497            .book_updaters
1498            .entry(*instrument_id)
1499            .or_insert_with(|| Rc::new(BookUpdater::new(instrument_id, self.cache.clone())))
1500            .clone();
1501
1502        // Subscribe to deltas (typed router handles duplicates)
1503        let topic = switchboard::get_book_deltas_topic(*instrument_id);
1504        let deltas_handler = TypedHandler::new(updater.clone());
1505        msgbus::subscribe_book_deltas(topic.into(), deltas_handler, Some(self.msgbus_priority));
1506
1507        // Subscribe to depth10 if not only_deltas
1508        if !only_deltas {
1509            let topic = switchboard::get_book_depth10_topic(*instrument_id);
1510            let depth_handler = TypedHandler::new(updater);
1511            msgbus::subscribe_book_depth10(topic.into(), depth_handler, Some(self.msgbus_priority));
1512        }
1513
1514        Ok(())
1515    }
1516
1517    fn create_bar_aggregator(
1518        &mut self,
1519        instrument: &InstrumentAny,
1520        bar_type: BarType,
1521    ) -> Box<dyn BarAggregator> {
1522        let cache = self.cache.clone();
1523
1524        let handler = move |bar: Bar| {
1525            if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
1526                log_error_on_cache_insert(&e);
1527            }
1528
1529            let topic = switchboard::get_bars_topic(bar.bar_type);
1530            msgbus::publish_bar(topic, &bar);
1531        };
1532
1533        let clock = self.clock.clone();
1534        let config = self.config.clone();
1535
1536        let price_precision = instrument.price_precision();
1537        let size_precision = instrument.size_precision();
1538
1539        if bar_type.spec().is_time_aggregated() {
1540            // Get time_bars_origin_offset from config
1541            let time_bars_origin_offset = config
1542                .time_bars_origins
1543                .get(&bar_type.spec().aggregation)
1544                .map(|duration| chrono::TimeDelta::from_std(*duration).unwrap_or_default());
1545
1546            Box::new(TimeBarAggregator::new(
1547                bar_type,
1548                price_precision,
1549                size_precision,
1550                clock,
1551                handler,
1552                config.time_bars_build_with_no_updates,
1553                config.time_bars_timestamp_on_close,
1554                config.time_bars_interval_type,
1555                time_bars_origin_offset,
1556                config.time_bars_build_delay,
1557                config.time_bars_skip_first_non_full_bar,
1558            ))
1559        } else {
1560            match bar_type.spec().aggregation {
1561                BarAggregation::Tick => Box::new(TickBarAggregator::new(
1562                    bar_type,
1563                    price_precision,
1564                    size_precision,
1565                    handler,
1566                )) as Box<dyn BarAggregator>,
1567                BarAggregation::TickImbalance => Box::new(TickImbalanceBarAggregator::new(
1568                    bar_type,
1569                    price_precision,
1570                    size_precision,
1571                    handler,
1572                )) as Box<dyn BarAggregator>,
1573                BarAggregation::TickRuns => Box::new(TickRunsBarAggregator::new(
1574                    bar_type,
1575                    price_precision,
1576                    size_precision,
1577                    handler,
1578                )) as Box<dyn BarAggregator>,
1579                BarAggregation::Volume => Box::new(VolumeBarAggregator::new(
1580                    bar_type,
1581                    price_precision,
1582                    size_precision,
1583                    handler,
1584                )) as Box<dyn BarAggregator>,
1585                BarAggregation::VolumeImbalance => Box::new(VolumeImbalanceBarAggregator::new(
1586                    bar_type,
1587                    price_precision,
1588                    size_precision,
1589                    handler,
1590                )) as Box<dyn BarAggregator>,
1591                BarAggregation::VolumeRuns => Box::new(VolumeRunsBarAggregator::new(
1592                    bar_type,
1593                    price_precision,
1594                    size_precision,
1595                    handler,
1596                )) as Box<dyn BarAggregator>,
1597                BarAggregation::Value => Box::new(ValueBarAggregator::new(
1598                    bar_type,
1599                    price_precision,
1600                    size_precision,
1601                    handler,
1602                )) as Box<dyn BarAggregator>,
1603                BarAggregation::ValueImbalance => Box::new(ValueImbalanceBarAggregator::new(
1604                    bar_type,
1605                    price_precision,
1606                    size_precision,
1607                    handler,
1608                )) as Box<dyn BarAggregator>,
1609                BarAggregation::ValueRuns => Box::new(ValueRunsBarAggregator::new(
1610                    bar_type,
1611                    price_precision,
1612                    size_precision,
1613                    handler,
1614                )) as Box<dyn BarAggregator>,
1615                BarAggregation::Renko => Box::new(RenkoBarAggregator::new(
1616                    bar_type,
1617                    price_precision,
1618                    size_precision,
1619                    instrument.price_increment(),
1620                    handler,
1621                )) as Box<dyn BarAggregator>,
1622                _ => panic!(
1623                    "BarAggregation {:?} is not currently implemented. Supported aggregations: MILLISECOND, SECOND, MINUTE, HOUR, DAY, WEEK, MONTH, YEAR, TICK, TICK_IMBALANCE, TICK_RUNS, VOLUME, VOLUME_IMBALANCE, VOLUME_RUNS, VALUE, VALUE_IMBALANCE, VALUE_RUNS, RENKO",
1624                    bar_type.spec().aggregation
1625                ),
1626            }
1627        }
1628    }
1629
1630    fn start_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
1631        // Get the instrument for this bar type
1632        let instrument = {
1633            let cache = self.cache.borrow();
1634            cache
1635                .instrument(&bar_type.instrument_id())
1636                .ok_or_else(|| {
1637                    anyhow::anyhow!(
1638                        "Cannot start bar aggregation: no instrument found for {}",
1639                        bar_type.instrument_id(),
1640                    )
1641                })?
1642                .clone()
1643        };
1644
1645        // Use standard form of bar type as key
1646        let bar_key = bar_type.standard();
1647
1648        // Create or retrieve aggregator in Rc<RefCell>
1649        let aggregator = if let Some(rc) = self.bar_aggregators.get(&bar_key) {
1650            rc.clone()
1651        } else {
1652            let agg = self.create_bar_aggregator(&instrument, bar_type);
1653            let rc = Rc::new(RefCell::new(agg));
1654            self.bar_aggregators.insert(bar_key, rc.clone());
1655            rc
1656        };
1657
1658        // Subscribe to underlying data topics
1659        let mut subscriptions = Vec::new();
1660
1661        if bar_type.is_composite() {
1662            let topic = switchboard::get_bars_topic(bar_type.composite());
1663            let handler = TypedHandler::new(BarBarHandler::new(aggregator.clone(), bar_key));
1664            msgbus::subscribe_bars(topic.into(), handler.clone(), Some(self.msgbus_priority));
1665            subscriptions.push(BarAggregatorSubscription::Bar { topic, handler });
1666        } else if bar_type.spec().price_type == PriceType::Last {
1667            let topic = switchboard::get_trades_topic(bar_type.instrument_id());
1668            let handler = TypedHandler::new(BarTradeHandler::new(aggregator.clone(), bar_key));
1669            msgbus::subscribe_trades(topic.into(), handler.clone(), Some(self.msgbus_priority));
1670            subscriptions.push(BarAggregatorSubscription::Trade { topic, handler });
1671        } else {
1672            // Warn if imbalance/runs aggregation is wired to quotes (needs aggressor_side from trades)
1673            if matches!(
1674                bar_type.spec().aggregation,
1675                BarAggregation::TickImbalance
1676                    | BarAggregation::VolumeImbalance
1677                    | BarAggregation::ValueImbalance
1678                    | BarAggregation::TickRuns
1679                    | BarAggregation::VolumeRuns
1680                    | BarAggregation::ValueRuns
1681            ) {
1682                log::warn!(
1683                    "Bar type {bar_type} uses imbalance/runs aggregation which requires trade \
1684                     data with `aggressor_side`, but `price_type` is not LAST so it will receive \
1685                     quote data: bars will not emit correctly",
1686                );
1687            }
1688
1689            let topic = switchboard::get_quotes_topic(bar_type.instrument_id());
1690            let handler = TypedHandler::new(BarQuoteHandler::new(aggregator.clone(), bar_key));
1691            msgbus::subscribe_quotes(topic.into(), handler.clone(), Some(self.msgbus_priority));
1692            subscriptions.push(BarAggregatorSubscription::Quote { topic, handler });
1693        }
1694
1695        self.bar_aggregator_handlers.insert(bar_key, subscriptions);
1696
1697        // Setup time bar aggregator if needed (matches Cython _setup_bar_aggregator)
1698        self.setup_bar_aggregator(bar_type, false)?;
1699
1700        aggregator.borrow_mut().set_is_running(true);
1701
1702        Ok(())
1703    }
1704
1705    /// Sets up a bar aggregator, matching Cython _setup_bar_aggregator logic.
1706    ///
1707    /// This method handles historical mode, message bus subscriptions, and time bar aggregator setup.
1708    fn setup_bar_aggregator(&mut self, bar_type: BarType, historical: bool) -> anyhow::Result<()> {
1709        let bar_key = bar_type.standard();
1710        let aggregator = self.bar_aggregators.get(&bar_key).ok_or_else(|| {
1711            anyhow::anyhow!("Cannot setup bar aggregator: no aggregator found for {bar_type}")
1712        })?;
1713
1714        // Set historical mode and handler
1715        let handler: Box<dyn FnMut(Bar)> = if historical {
1716            // Historical handler - process_historical equivalent
1717            let cache = self.cache.clone();
1718            Box::new(move |bar: Bar| {
1719                if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
1720                    log_error_on_cache_insert(&e);
1721                }
1722                // In historical mode, bars are processed but not published to message bus
1723            })
1724        } else {
1725            // Regular handler - process equivalent
1726            let cache = self.cache.clone();
1727            Box::new(move |bar: Bar| {
1728                if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
1729                    log_error_on_cache_insert(&e);
1730                }
1731                let topic = switchboard::get_bars_topic(bar.bar_type);
1732                msgbus::publish_bar(topic, &bar);
1733            })
1734        };
1735
1736        aggregator
1737            .borrow_mut()
1738            .set_historical_mode(historical, handler);
1739
1740        // For TimeBarAggregator, set clock and start timer
1741        if bar_type.spec().is_time_aggregated() {
1742            use nautilus_common::clock::TestClock;
1743
1744            if historical {
1745                // Each aggregator gets its own independent clock
1746                let test_clock = Rc::new(RefCell::new(TestClock::new()));
1747                aggregator.borrow_mut().set_clock(test_clock);
1748                // Set weak reference for historical mode (start_timer called later from preprocess_historical_events)
1749                // Store weak reference so start_timer can use it when called later
1750                let aggregator_weak = Rc::downgrade(aggregator);
1751                aggregator.borrow_mut().set_aggregator_weak(aggregator_weak);
1752            } else {
1753                aggregator.borrow_mut().set_clock(self.clock.clone());
1754                aggregator
1755                    .borrow_mut()
1756                    .start_timer(Some(aggregator.clone()));
1757            }
1758        }
1759
1760        Ok(())
1761    }
1762
1763    fn stop_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
1764        let aggregator = self
1765            .bar_aggregators
1766            .remove(&bar_type.standard())
1767            .ok_or_else(|| {
1768                anyhow::anyhow!("Cannot stop bar aggregator: no aggregator to stop for {bar_type}")
1769            })?;
1770
1771        aggregator.borrow_mut().stop();
1772
1773        // Unsubscribe any registered message handlers
1774        let bar_key = bar_type.standard();
1775        if let Some(subs) = self.bar_aggregator_handlers.remove(&bar_key) {
1776            for sub in subs {
1777                match sub {
1778                    BarAggregatorSubscription::Bar { topic, handler } => {
1779                        msgbus::unsubscribe_bars(topic.into(), &handler);
1780                    }
1781                    BarAggregatorSubscription::Trade { topic, handler } => {
1782                        msgbus::unsubscribe_trades(topic.into(), &handler);
1783                    }
1784                    BarAggregatorSubscription::Quote { topic, handler } => {
1785                        msgbus::unsubscribe_quotes(topic.into(), &handler);
1786                    }
1787                }
1788            }
1789        }
1790
1791        Ok(())
1792    }
1793}
1794
1795#[inline(always)]
1796fn log_error_on_cache_insert<T: Display>(e: &T) {
1797    log::error!("Error on cache insert: {e}");
1798}
1799
1800#[inline(always)]
1801fn log_if_empty_response<T, I: Display>(data: &[T], id: &I, correlation_id: &UUID4) -> bool {
1802    if data.is_empty() {
1803        let name = type_name::<T>();
1804        let short_name = name.rsplit("::").next().unwrap_or(name);
1805        log::warn!("Received empty {short_name} response for {id} {correlation_id}");
1806        return true;
1807    }
1808    false
1809}