nautilus_data/engine/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a high-performance `DataEngine` for all environments.
17//!
18//! The `DataEngine` is the central component of the entire data stack.
19//! The data engines primary responsibility is to orchestrate interactions between
20//! the `DataClient` instances, and the rest of the platform. This includes sending
21//! requests to, and receiving responses from, data endpoints via its registered
22//! data clients.
23//!
24//! The engine employs a simple fan-in fan-out messaging pattern to execute
25//! `DataCommand` type messages, and process `DataResponse` messages or market data
26//! objects.
27//!
28//! Alternative implementations can be written on top of the generic engine - which
29//! just need to override the `execute`, `process`, `send` and `receive` methods.
30
31// Under development
32#![allow(dead_code)]
33#![allow(unused_variables)]
34#![allow(unused_assignments)]
35
36pub mod book;
37pub mod config;
38pub mod runner;
39
40#[cfg(test)]
41mod tests;
42
43use std::{
44    any::Any,
45    cell::{Ref, RefCell},
46    collections::{HashMap, HashSet, VecDeque},
47    num::NonZeroU64,
48    rc::Rc,
49    sync::Arc,
50};
51
52use book::{BookSnapshotInfo, BookSnapshotter, BookUpdater};
53use config::DataEngineConfig;
54use indexmap::IndexMap;
55use nautilus_common::{
56    cache::Cache,
57    clock::Clock,
58    logging::{RECV, RES},
59    messages::data::{Action, DataRequest, DataResponse, SubscriptionCommand},
60    msgbus::{
61        handler::{MessageHandler, ShareableMessageHandler},
62        MessageBus,
63    },
64    timer::TimeEventCallback,
65};
66use nautilus_core::{
67    correctness::{check_key_in_index_map, check_key_not_in_index_map, FAILED},
68    datetime::{millis_to_nanos, NANOSECONDS_IN_MILLISECOND, NANOSECONDS_IN_SECOND},
69};
70use nautilus_model::{
71    data::{
72        Bar, BarType, Data, DataType, OrderBookDelta, OrderBookDeltas, OrderBookDepth10, QuoteTick,
73        TradeTick,
74    },
75    enums::{AggregationSource, BarAggregation, BookType, PriceType, RecordFlag},
76    identifiers::{ClientId, InstrumentId, Venue},
77    instruments::{InstrumentAny, SyntheticInstrument},
78    orderbook::OrderBook,
79};
80use ustr::Ustr;
81
82use crate::{
83    aggregation::{
84        BarAggregator, TickBarAggregator, TimeBarAggregator, ValueBarAggregator,
85        VolumeBarAggregator,
86    },
87    client::DataClientAdapter,
88};
89
90/// Provides a high-performance `DataEngine` for all environments.
91pub struct DataEngine {
92    clock: Rc<RefCell<dyn Clock>>,
93    cache: Rc<RefCell<Cache>>,
94    msgbus: Rc<RefCell<MessageBus>>,
95    clients: IndexMap<ClientId, DataClientAdapter>,
96    default_client: Option<DataClientAdapter>,
97    external_clients: HashSet<ClientId>,
98    routing_map: IndexMap<Venue, ClientId>,
99    book_intervals: HashMap<NonZeroU64, HashSet<InstrumentId>>,
100    book_updaters: HashMap<InstrumentId, Rc<BookUpdater>>,
101    book_snapshotters: HashMap<InstrumentId, Rc<BookSnapshotter>>,
102    bar_aggregators: HashMap<BarType, Box<dyn BarAggregator>>,
103    synthetic_quote_feeds: HashMap<InstrumentId, Vec<SyntheticInstrument>>,
104    synthetic_trade_feeds: HashMap<InstrumentId, Vec<SyntheticInstrument>>,
105    buffered_deltas_map: HashMap<InstrumentId, Vec<OrderBookDelta>>, // TODO: Use OrderBookDeltas?
106    msgbus_priority: u8,
107    command_queue: VecDeque<SubscriptionCommand>,
108    config: DataEngineConfig,
109}
110
111impl DataEngine {
112    /// Creates a new [`DataEngine`] instance.
113    #[must_use]
114    pub fn new(
115        clock: Rc<RefCell<dyn Clock>>,
116        cache: Rc<RefCell<Cache>>,
117        msgbus: Rc<RefCell<MessageBus>>,
118        config: Option<DataEngineConfig>,
119    ) -> Self {
120        Self {
121            clock,
122            cache,
123            msgbus,
124            clients: IndexMap::new(),
125            default_client: None,
126            external_clients: HashSet::new(),
127            routing_map: IndexMap::new(),
128            book_intervals: HashMap::new(),
129            book_updaters: HashMap::new(),
130            book_snapshotters: HashMap::new(),
131            bar_aggregators: HashMap::new(),
132            synthetic_quote_feeds: HashMap::new(),
133            synthetic_trade_feeds: HashMap::new(),
134            buffered_deltas_map: HashMap::new(),
135            msgbus_priority: 10, // High-priority for built-in component
136            command_queue: VecDeque::new(),
137            config: config.unwrap_or_default(),
138        }
139    }
140
141    /// Provides read-only access to the cache.
142    #[must_use]
143    pub fn get_cache(&self) -> Ref<'_, Cache> {
144        self.cache.borrow()
145    }
146
147    // pub fn register_catalog(&mut self, catalog: ParquetDataCatalog) {}  TODO: Implement catalog
148
149    /// Registers the given data `client` with the engine as the default routing client.
150    ///
151    /// When a specific venue routing cannot be found, this client will receive messages.
152    ///
153    /// # Warnings
154    ///
155    /// Any existing default routing client will be overwritten.
156    /// TODO: change this to suit message bus behaviour
157    pub fn register_default_client(&mut self, client: DataClientAdapter) {
158        log::info!("Registered default client {}", client.client_id());
159        self.default_client = Some(client);
160    }
161
162    pub fn start(self) {
163        self.clients.values().for_each(|client| client.start());
164    }
165
166    pub fn stop(self) {
167        self.clients.values().for_each(|client| client.stop());
168    }
169
170    pub fn reset(self) {
171        self.clients.values().for_each(|client| client.reset());
172    }
173
174    pub fn dispose(self) {
175        self.clients.values().for_each(|client| client.dispose());
176        self.clock.borrow_mut().cancel_timers();
177    }
178
179    pub fn connect(&self) {
180        todo!() //  Implement actual client connections for a live/sandbox context
181    }
182
183    pub fn disconnect(&self) {
184        todo!() // Implement actual client connections for a live/sandbox context
185    }
186
187    #[must_use]
188    pub fn check_connected(&self) -> bool {
189        self.clients.values().all(|client| client.is_connected())
190    }
191
192    #[must_use]
193    pub fn check_disconnected(&self) -> bool {
194        self.clients.values().all(|client| !client.is_connected())
195    }
196
197    #[must_use]
198    pub fn registed_clients(&self) -> Vec<ClientId> {
199        self.clients.keys().copied().collect()
200    }
201
202    // -- SUBSCRIPTIONS ---------------------------------------------------------------------------
203
204    fn collect_subscriptions<F, T>(&self, get_subs: F) -> Vec<T>
205    where
206        F: Fn(&DataClientAdapter) -> &HashSet<T>,
207        T: Clone,
208    {
209        let mut subs = Vec::new();
210        for client in self.clients.values() {
211            subs.extend(get_subs(client).iter().cloned());
212        }
213        subs
214    }
215
216    fn get_client(&self, client_id: &ClientId, venue: &Venue) -> Option<&DataClientAdapter> {
217        match self.clients.get(client_id) {
218            Some(client) => Some(client),
219            None => self
220                .routing_map
221                .get(venue)
222                .and_then(|client_id: &ClientId| self.clients.get(client_id)),
223        }
224    }
225
226    fn get_client_mut(
227        &mut self,
228        client_id: &ClientId,
229        venue: &Venue,
230    ) -> Option<&mut DataClientAdapter> {
231        // Try to get client directly from clients map
232        if self.clients.contains_key(client_id) {
233            return self.clients.get_mut(client_id);
234        }
235
236        // If not found, try to get client_id from routing map
237        if let Some(mapped_client_id) = self.routing_map.get(venue) {
238            return self.clients.get_mut(mapped_client_id);
239        }
240
241        None
242    }
243
244    #[must_use]
245    pub fn subscribed_custom_data(&self) -> Vec<DataType> {
246        self.collect_subscriptions(|client| &client.subscriptions_generic)
247    }
248
249    #[must_use]
250    pub fn subscribed_instruments(&self) -> Vec<InstrumentId> {
251        self.collect_subscriptions(|client| &client.subscriptions_instrument)
252    }
253
254    #[must_use]
255    pub fn subscribed_order_book_deltas(&self) -> Vec<InstrumentId> {
256        self.collect_subscriptions(|client| &client.subscriptions_order_book_delta)
257    }
258
259    #[must_use]
260    pub fn subscribed_order_book_snapshots(&self) -> Vec<InstrumentId> {
261        self.collect_subscriptions(|client| &client.subscriptions_order_book_snapshot)
262    }
263
264    #[must_use]
265    pub fn subscribed_quote_ticks(&self) -> Vec<InstrumentId> {
266        self.collect_subscriptions(|client| &client.subscriptions_quote_tick)
267    }
268
269    #[must_use]
270    pub fn subscribed_trade_ticks(&self) -> Vec<InstrumentId> {
271        self.collect_subscriptions(|client| &client.subscriptions_trade_tick)
272    }
273
274    #[must_use]
275    pub fn subscribed_bars(&self) -> Vec<BarType> {
276        self.collect_subscriptions(|client| &client.subscriptions_bar)
277    }
278
279    #[must_use]
280    pub fn subscribed_instrument_status(&self) -> Vec<InstrumentId> {
281        self.collect_subscriptions(|client| &client.subscriptions_instrument_status)
282    }
283
284    #[must_use]
285    pub fn subscribed_instrument_close(&self) -> Vec<InstrumentId> {
286        self.collect_subscriptions(|client| &client.subscriptions_instrument_close)
287    }
288
289    pub fn on_start(self) {
290        todo!()
291    }
292
293    pub fn on_stop(self) {
294        todo!()
295    }
296
297    /// Registers a new [`DataClientAdapter`]
298    ///
299    /// # Panics
300    ///
301    /// This function panics:
302    /// - If a client with the same client ID has already been registered.
303    pub fn register_client(&mut self, client: DataClientAdapter, routing: Option<Venue>) {
304        check_key_not_in_index_map(&client.client_id, &self.clients, "client_id", "clients")
305            .expect(FAILED);
306
307        if let Some(routing) = routing {
308            self.routing_map.insert(routing, client.client_id());
309            log::info!("Set client {} routing for {routing}", client.client_id());
310        }
311
312        log::info!("Registered client {}", client.client_id());
313        self.clients.insert(client.client_id, client);
314    }
315
316    /// Deregisters a [`DataClientAdapter`]
317    ///
318    /// # Panics
319    ///
320    /// This function panics:
321    /// - If a client with the same client ID has not been registered.
322    pub fn deregister_client(&mut self, client_id: &ClientId) {
323        check_key_in_index_map(client_id, &self.clients, "client_id", "clients").expect(FAILED);
324
325        self.clients.shift_remove(client_id);
326        log::info!("Deregistered client {client_id}");
327    }
328
329    pub fn run(&mut self) {
330        let commands: Vec<_> = self.command_queue.drain(..).collect();
331        for cmd in commands {
332            self.execute(cmd);
333        }
334    }
335
336    pub fn enqueue(&mut self, cmd: &dyn Any) {
337        if let Some(cmd) = cmd.downcast_ref::<SubscriptionCommand>() {
338            self.command_queue.push_back(cmd.clone());
339        } else {
340            log::error!("Invalid message type received: {cmd:?}");
341        }
342    }
343
344    pub fn execute(&mut self, cmd: SubscriptionCommand) {
345        let result = match cmd.action {
346            Action::Subscribe => match cmd.data_type.type_name() {
347                stringify!(OrderBookDelta) => self.handle_subscribe_book_deltas(&cmd),
348                stringify!(OrderBook) => self.handle_subscribe_book_snapshots(&cmd),
349                stringify!(Bar) => self.handle_subscribe_bars(&cmd),
350                _ => Ok(()), // No other actions for engine
351            },
352            Action::Unsubscribe => match cmd.data_type.type_name() {
353                stringify!(OrderBookDelta) => self.handle_unsubscribe_book_deltas(&cmd),
354                stringify!(OrderBook) => self.handle_unsubscribe_book_snapshots(&cmd),
355                stringify!(Bar) => self.handle_unsubscribe_bars(&cmd),
356                _ => Ok(()), // No other actions for engine
357            },
358        };
359
360        if let Err(e) = result {
361            log::error!("{e}");
362            return;
363        }
364
365        if let Some(client) = self.get_client_mut(&cmd.client_id, &cmd.venue) {
366            client.execute(cmd);
367        } else {
368            log::error!(
369                "Cannot handle command: no client found for {}",
370                cmd.client_id
371            );
372        }
373    }
374
375    /// Sends a [`DataRequest`] to an endpoint that must be a data client implementation.
376    pub fn request(&self, req: DataRequest) {
377        if let Some(client) = self.get_client(&req.client_id, &req.venue) {
378            client.through_request(req);
379        } else {
380            log::error!(
381                "Cannot handle request: no client found for {}",
382                req.client_id
383            );
384        }
385    }
386
387    pub fn process(&mut self, data: &dyn Any) {
388        if let Some(instrument) = data.downcast_ref::<InstrumentAny>() {
389            self.handle_instrument(instrument.clone());
390        } else {
391            log::error!("Cannot process data {data:?}, type is unrecognized");
392        }
393    }
394
395    pub fn process_data(&mut self, data: Data) {
396        match data {
397            Data::Delta(delta) => self.handle_delta(delta),
398            Data::Deltas(deltas) => self.handle_deltas(deltas.into_inner()),
399            Data::Depth10(depth) => self.handle_depth10(depth),
400            Data::Quote(quote) => self.handle_quote(quote),
401            Data::Trade(trade) => self.handle_trade(trade),
402            Data::Bar(bar) => self.handle_bar(bar),
403        }
404    }
405
406    pub fn response(&self, resp: DataResponse) {
407        log::debug!("{}", format!("{RECV}{RES} {resp:?}"));
408
409        match resp.data_type.type_name() {
410            stringify!(InstrumentAny) => {
411                let instruments = Arc::downcast::<Vec<InstrumentAny>>(resp.data.clone())
412                    .expect("Invalid response data");
413                self.handle_instruments(instruments);
414            }
415            stringify!(QuoteTick) => {
416                let quotes = Arc::downcast::<Vec<QuoteTick>>(resp.data.clone())
417                    .expect("Invalid response data");
418                self.handle_quotes(quotes);
419            }
420            stringify!(TradeTick) => {
421                let trades = Arc::downcast::<Vec<TradeTick>>(resp.data.clone())
422                    .expect("Invalid response data");
423                self.handle_trades(trades);
424            }
425            stringify!(Bar) => {
426                let bars =
427                    Arc::downcast::<Vec<Bar>>(resp.data.clone()).expect("Invalid response data");
428                self.handle_bars(bars);
429            }
430            type_name => log::error!("Cannot handle request, type {type_name} is unrecognized"),
431        }
432
433        self.msgbus.as_ref().borrow().send_response(resp);
434    }
435
436    // -- DATA HANDLERS ---------------------------------------------------------------------------
437
438    fn handle_instrument(&mut self, instrument: InstrumentAny) {
439        if let Err(e) = self
440            .cache
441            .as_ref()
442            .borrow_mut()
443            .add_instrument(instrument.clone())
444        {
445            log::error!("Error on cache insert: {e}");
446        }
447
448        let mut msgbus = self.msgbus.borrow_mut();
449        let topic = msgbus.switchboard.get_instrument_topic(instrument.id());
450        msgbus.publish(&topic, &instrument as &dyn Any); // TODO: Optimize
451    }
452
453    fn handle_delta(&mut self, delta: OrderBookDelta) {
454        let deltas = if self.config.buffer_deltas {
455            let buffer_deltas = self
456                .buffered_deltas_map
457                .entry(delta.instrument_id)
458                .or_default();
459            buffer_deltas.push(delta);
460
461            if !RecordFlag::F_LAST.matches(delta.flags) {
462                return; // Not the last delta for event
463            }
464
465            // SAFETY: We know the deltas exists already
466            let deltas = self
467                .buffered_deltas_map
468                .remove(&delta.instrument_id)
469                .unwrap();
470            OrderBookDeltas::new(delta.instrument_id, deltas)
471        } else {
472            OrderBookDeltas::new(delta.instrument_id, vec![delta])
473        };
474
475        let mut msgbus = self.msgbus.borrow_mut();
476        let topic = msgbus.switchboard.get_deltas_topic(deltas.instrument_id);
477        msgbus.publish(&topic, &deltas as &dyn Any);
478    }
479
480    fn handle_deltas(&mut self, deltas: OrderBookDeltas) {
481        let deltas = if self.config.buffer_deltas {
482            let buffer_deltas = self
483                .buffered_deltas_map
484                .entry(deltas.instrument_id)
485                .or_default();
486            buffer_deltas.extend(deltas.deltas);
487
488            let mut is_last_delta = false;
489            for delta in buffer_deltas.iter_mut() {
490                if RecordFlag::F_LAST.matches(delta.flags) {
491                    is_last_delta = true;
492                }
493            }
494
495            if !is_last_delta {
496                return;
497            }
498
499            // SAFETY: We know the deltas exists already
500            let buffer_deltas = self
501                .buffered_deltas_map
502                .remove(&deltas.instrument_id)
503                .unwrap();
504            OrderBookDeltas::new(deltas.instrument_id, buffer_deltas)
505        } else {
506            deltas
507        };
508
509        let mut msgbus = self.msgbus.borrow_mut();
510        let topic = msgbus.switchboard.get_deltas_topic(deltas.instrument_id);
511        msgbus.publish(&topic, &deltas as &dyn Any); // TODO: Optimize
512    }
513
514    fn handle_depth10(&mut self, depth: OrderBookDepth10) {
515        let mut msgbus = self.msgbus.borrow_mut();
516        let topic = msgbus.switchboard.get_depth_topic(depth.instrument_id);
517        msgbus.publish(&topic, &depth as &dyn Any); // TODO: Optimize
518    }
519
520    fn handle_quote(&mut self, quote: QuoteTick) {
521        if let Err(e) = self.cache.as_ref().borrow_mut().add_quote(quote) {
522            log::error!("Error on cache insert: {e}");
523        }
524
525        // TODO: Handle synthetics
526
527        let mut msgbus = self.msgbus.borrow_mut();
528        let topic = msgbus.switchboard.get_quotes_topic(quote.instrument_id);
529        msgbus.publish(&topic, &quote as &dyn Any); // TODO: Optimize
530    }
531
532    fn handle_trade(&mut self, trade: TradeTick) {
533        if let Err(e) = self.cache.as_ref().borrow_mut().add_trade(trade) {
534            log::error!("Error on cache insert: {e}");
535        }
536
537        // TODO: Handle synthetics
538
539        let mut msgbus = self.msgbus.borrow_mut();
540        let topic = msgbus.switchboard.get_trades_topic(trade.instrument_id);
541        msgbus.publish(&topic, &trade as &dyn Any); // TODO: Optimize
542    }
543
544    fn handle_bar(&mut self, bar: Bar) {
545        // TODO: Handle additional bar logic
546        if self.config.validate_data_sequence {
547            if let Some(last_bar) = self.cache.as_ref().borrow().bar(&bar.bar_type) {
548                if bar.ts_event < last_bar.ts_event {
549                    log::warn!(
550                        "Bar {bar} was prior to last bar `ts_event` {}",
551                        last_bar.ts_event
552                    );
553                    return; // Bar is out of sequence
554                }
555                if bar.ts_init < last_bar.ts_init {
556                    log::warn!(
557                        "Bar {bar} was prior to last bar `ts_init` {}",
558                        last_bar.ts_init
559                    );
560                    return; // Bar is out of sequence
561                }
562                // TODO: Implement `bar.is_revision` logic
563            }
564        }
565
566        if let Err(e) = self.cache.as_ref().borrow_mut().add_bar(bar) {
567            log::error!("Error on cache insert: {e}");
568        }
569
570        let mut msgbus = self.msgbus.borrow_mut();
571        let topic = msgbus.switchboard.get_bars_topic(bar.bar_type);
572        msgbus.publish(&topic, &bar as &dyn Any); // TODO: Optimize
573    }
574
575    // -- SUBSCRIPTION HANDLERS -------------------------------------------------------------------
576
577    fn handle_subscribe_book_deltas(
578        &mut self,
579        command: &SubscriptionCommand,
580    ) -> anyhow::Result<()> {
581        let instrument_id = command.data_type.instrument_id().ok_or_else(|| {
582            anyhow::anyhow!(
583                "Invalid order book deltas subscription: did not contain an 'instrument_id', {}",
584                command.data_type
585            )
586        })?;
587
588        if instrument_id.is_synthetic() {
589            anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
590        }
591
592        if !self.subscribed_order_book_deltas().contains(&instrument_id) {
593            return Ok(());
594        }
595
596        let data_type = command.data_type.clone();
597        let book_type = data_type.book_type();
598        let depth = data_type.depth();
599        let managed = data_type.managed();
600
601        self.setup_order_book(&instrument_id, book_type, depth, true, managed)?;
602
603        Ok(())
604    }
605
606    fn handle_subscribe_book_snapshots(
607        &mut self,
608        command: &SubscriptionCommand,
609    ) -> anyhow::Result<()> {
610        let instrument_id = command.data_type.instrument_id().ok_or_else(|| {
611            anyhow::anyhow!(
612                "Invalid order book snapshots subscription: did not contain an 'instrument_id', {}",
613                command.data_type
614            )
615        })?;
616
617        if self.subscribed_order_book_deltas().contains(&instrument_id) {
618            return Ok(());
619        }
620
621        if instrument_id.is_synthetic() {
622            anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
623        }
624
625        let data_type = command.data_type.clone();
626        let book_type = data_type.book_type();
627        let depth = data_type.depth();
628        let interval_ms = data_type.interval_ms();
629        let managed = data_type.managed();
630
631        {
632            if !self.book_intervals.contains_key(&interval_ms) {
633                let interval_ns = millis_to_nanos(interval_ms.get() as f64);
634                let mut msgbus = self.msgbus.borrow_mut();
635                let topic = msgbus.switchboard.get_book_snapshots_topic(instrument_id);
636
637                let snap_info = BookSnapshotInfo {
638                    instrument_id,
639                    venue: instrument_id.venue,
640                    is_composite: instrument_id.symbol.is_composite(),
641                    root: Ustr::from(instrument_id.symbol.root()),
642                    topic,
643                    interval_ms,
644                };
645
646                let now_ns = self.clock.borrow().timestamp_ns().as_u64();
647                let mut start_time_ns = now_ns - (now_ns % interval_ns);
648
649                if start_time_ns - NANOSECONDS_IN_MILLISECOND <= now_ns {
650                    start_time_ns += NANOSECONDS_IN_SECOND; // Add one second
651                }
652
653                let snapshotter = Rc::new(BookSnapshotter::new(
654                    snap_info,
655                    self.cache.clone(),
656                    self.msgbus.clone(),
657                ));
658                self.book_snapshotters
659                    .insert(instrument_id, snapshotter.clone());
660                let timer_name = snapshotter.timer_name;
661
662                let callback =
663                    TimeEventCallback::Rust(Rc::new(move |event| snapshotter.snapshot(event)));
664
665                self.clock
666                    .borrow_mut()
667                    .set_timer_ns(
668                        &timer_name,
669                        interval_ns,
670                        start_time_ns.into(),
671                        None,
672                        Some(callback),
673                    )
674                    .expect(FAILED);
675            }
676        }
677
678        self.setup_order_book(&instrument_id, book_type, depth, false, managed)?;
679
680        Ok(())
681    }
682
683    fn handle_subscribe_bars(&mut self, command: &SubscriptionCommand) -> anyhow::Result<()> {
684        let bar_type = command.data_type.bar_type();
685
686        match bar_type.aggregation_source() {
687            AggregationSource::Internal => {
688                if !self.bar_aggregators.contains_key(&bar_type.standard()) {
689                    self.start_bar_aggregator(bar_type)?;
690                }
691            }
692            AggregationSource::External => {
693                if bar_type.instrument_id().is_synthetic() {
694                    anyhow::bail!(
695                        "Cannot subscribe for externally aggregated synthetic instrument bar data"
696                    );
697                }
698            }
699        }
700
701        Ok(())
702    }
703
704    fn handle_unsubscribe_book_deltas(
705        &mut self,
706        command: &SubscriptionCommand,
707    ) -> anyhow::Result<()> {
708        let instrument_id = command.data_type.instrument_id().ok_or_else(|| {
709            anyhow::anyhow!(
710                "Invalid order book snapshots subscription: did not contain an 'instrument_id', {}",
711                command.data_type
712            )
713        })?;
714
715        if !self.subscribed_order_book_deltas().contains(&instrument_id) {
716            log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
717            return Ok(());
718        }
719
720        let topics = {
721            let mut msgbus = self.msgbus.borrow_mut();
722            vec![
723                msgbus.switchboard.get_deltas_topic(instrument_id),
724                msgbus.switchboard.get_depth_topic(instrument_id),
725                msgbus.switchboard.get_book_snapshots_topic(instrument_id),
726            ]
727        };
728
729        self.maintain_book_updater(&instrument_id, &topics);
730        self.maintain_book_snapshotter(&instrument_id);
731
732        Ok(())
733    }
734
735    fn handle_unsubscribe_book_snapshots(
736        &mut self,
737        command: &SubscriptionCommand,
738    ) -> anyhow::Result<()> {
739        let instrument_id = command.data_type.instrument_id().ok_or_else(|| {
740            anyhow::anyhow!(
741                "Invalid order book snapshots subscription: did not contain an 'instrument_id', {}",
742                command.data_type
743            )
744        })?;
745
746        if !self.subscribed_order_book_deltas().contains(&instrument_id) {
747            log::warn!("Cannot unsubscribe from `OrderBook` snapshots: not subscribed");
748            return Ok(());
749        }
750
751        let topics = {
752            let mut msgbus = self.msgbus.borrow_mut();
753            vec![
754                msgbus.switchboard.get_deltas_topic(instrument_id),
755                msgbus.switchboard.get_depth_topic(instrument_id),
756                msgbus.switchboard.get_book_snapshots_topic(instrument_id),
757            ]
758        };
759
760        self.maintain_book_updater(&instrument_id, &topics);
761        self.maintain_book_snapshotter(&instrument_id);
762
763        Ok(())
764    }
765
766    const fn handle_unsubscribe_bars(
767        &mut self,
768        command: &SubscriptionCommand,
769    ) -> anyhow::Result<()> {
770        // TODO: Handle aggregators
771        Ok(())
772    }
773
774    fn maintain_book_updater(&mut self, instrument_id: &InstrumentId, topics: &[Ustr]) {
775        if let Some(updater) = self.book_updaters.get(instrument_id) {
776            let handler = ShareableMessageHandler(updater.clone());
777            let mut msgbus = self.msgbus.borrow_mut();
778
779            // Unsubscribe handler if it is the last subscriber
780            for topic in topics {
781                if msgbus.subscriptions_count(*topic) == 1
782                    && msgbus.is_subscribed(*topic, handler.clone())
783                {
784                    log::debug!("Unsubscribing BookUpdater from {topic}");
785                    msgbus.unsubscribe(*topic, handler.clone());
786                }
787            }
788
789            // Check remaining subscriptions, if none then remove updater
790            let still_subscribed = topics
791                .iter()
792                .any(|topic| msgbus.is_subscribed(*topic, handler.clone()));
793            if !still_subscribed {
794                self.book_updaters.remove(instrument_id);
795                log::debug!("Removed BookUpdater for instrument ID {instrument_id}");
796            }
797        }
798    }
799
800    fn maintain_book_snapshotter(&mut self, instrument_id: &InstrumentId) {
801        if let Some(snapshotter) = self.book_snapshotters.get(instrument_id) {
802            let mut msgbus = self.msgbus.borrow_mut();
803
804            let topic = msgbus.switchboard.get_book_snapshots_topic(*instrument_id);
805
806            // Check remaining snapshot subscriptions, if none then remove snapshotter
807            if msgbus.subscriptions_count(topic) == 0 {
808                let timer_name = snapshotter.timer_name;
809                self.book_snapshotters.remove(instrument_id);
810                let mut clock = self.clock.borrow_mut();
811                if clock.timer_names().contains(&timer_name.as_str()) {
812                    clock.cancel_timer(&timer_name);
813                }
814                log::debug!("Removed BookSnapshotter for instrument ID {instrument_id}");
815            }
816        }
817    }
818
819    // -- RESPONSE HANDLERS -----------------------------------------------------------------------
820
821    fn handle_instruments(&self, instruments: Arc<Vec<InstrumentAny>>) {
822        // TODO: Improve by adding bulk update methods to cache and database
823        let mut cache = self.cache.as_ref().borrow_mut();
824        for instrument in instruments.iter() {
825            if let Err(e) = cache.add_instrument(instrument.clone()) {
826                log::error!("Error on cache insert: {e}");
827            }
828        }
829    }
830
831    fn handle_quotes(&self, quotes: Arc<Vec<QuoteTick>>) {
832        if let Err(e) = self.cache.as_ref().borrow_mut().add_quotes(&quotes) {
833            log::error!("Error on cache insert: {e}");
834        }
835    }
836
837    fn handle_trades(&self, trades: Arc<Vec<TradeTick>>) {
838        if let Err(e) = self.cache.as_ref().borrow_mut().add_trades(&trades) {
839            log::error!("Error on cache insert: {e}");
840        }
841    }
842
843    fn handle_bars(&self, bars: Arc<Vec<Bar>>) {
844        if let Err(e) = self.cache.as_ref().borrow_mut().add_bars(&bars) {
845            log::error!("Error on cache insert: {e}");
846        }
847    }
848
849    // -- INTERNAL --------------------------------------------------------------------------------
850
851    #[allow(clippy::too_many_arguments)]
852    fn setup_order_book(
853        &mut self,
854        instrument_id: &InstrumentId,
855        book_type: BookType,
856        depth: Option<usize>,
857        only_deltas: bool,
858        managed: bool,
859    ) -> anyhow::Result<()> {
860        let mut cache = self.cache.borrow_mut();
861        if managed && !cache.has_order_book(instrument_id) {
862            let book = OrderBook::new(*instrument_id, book_type);
863            log::debug!("Created {book}");
864            cache.add_order_book(book)?;
865        }
866
867        let mut msgbus = self.msgbus.borrow_mut();
868
869        // Set up subscriptions
870        let updater = Rc::new(BookUpdater::new(instrument_id, self.cache.clone()));
871        self.book_updaters.insert(*instrument_id, updater.clone());
872
873        let handler = ShareableMessageHandler(updater);
874
875        let topic = msgbus.switchboard.get_deltas_topic(*instrument_id);
876        if !msgbus.is_subscribed(topic, handler.clone()) {
877            msgbus.subscribe(topic, handler.clone(), Some(self.msgbus_priority));
878        }
879
880        let topic = msgbus.switchboard.get_depth_topic(*instrument_id);
881        if !only_deltas && !msgbus.is_subscribed(topic, handler.clone()) {
882            msgbus.subscribe(topic, handler, Some(self.msgbus_priority));
883        }
884
885        Ok(())
886    }
887
888    fn create_bar_aggregator(
889        &mut self,
890        instrument: &InstrumentAny,
891        bar_type: BarType,
892    ) -> Box<dyn BarAggregator> {
893        let cache = self.cache.clone();
894        let msgbus = self.msgbus.clone();
895
896        let handler = move |bar: Bar| {
897            if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
898                log::error!("Error on cache insert: {e}");
899            }
900
901            let mut msgbus = msgbus.borrow_mut();
902            let topic = msgbus.switchboard.get_bars_topic(bar.bar_type);
903            msgbus.publish(&topic, &bar as &dyn Any);
904        };
905
906        let clock = self.clock.clone();
907        let config = self.config.clone();
908
909        let price_precision = instrument.price_precision();
910        let size_precision = instrument.size_precision();
911
912        if bar_type.spec().is_time_aggregated() {
913            Box::new(TimeBarAggregator::new(
914                bar_type,
915                price_precision,
916                size_precision,
917                clock,
918                handler,
919                false,
920                config.time_bars_build_with_no_updates,
921                config.time_bars_timestamp_on_close,
922                config.time_bars_interval_type,
923                None, // TODO: Implement
924                20,   // TODO: TBD, composite bar build delay
925            ))
926        } else {
927            match bar_type.spec().aggregation {
928                BarAggregation::Tick => Box::new(TickBarAggregator::new(
929                    bar_type,
930                    price_precision,
931                    size_precision,
932                    handler,
933                    false,
934                )) as Box<dyn BarAggregator>,
935                BarAggregation::Volume => Box::new(VolumeBarAggregator::new(
936                    bar_type,
937                    price_precision,
938                    size_precision,
939                    handler,
940                    false,
941                )) as Box<dyn BarAggregator>,
942                BarAggregation::Value => Box::new(ValueBarAggregator::new(
943                    bar_type,
944                    price_precision,
945                    size_precision,
946                    handler,
947                    false,
948                )) as Box<dyn BarAggregator>,
949                _ => panic!(
950                    "Cannot create aggregator: {} aggregation not currently supported",
951                    bar_type.spec().aggregation
952                ),
953            }
954        }
955    }
956
957    fn start_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
958        let instrument = {
959            let cache = self.cache.borrow();
960            cache
961                .instrument(&bar_type.instrument_id())
962                .ok_or_else(|| {
963                    anyhow::anyhow!(
964                        "Cannot start bar aggregation: no instrument found for {}",
965                        bar_type.instrument_id(),
966                    )
967                })?
968                .clone()
969        };
970
971        let aggregator = if let Some(aggregator) = self.bar_aggregators.get_mut(&bar_type) {
972            aggregator
973        } else {
974            let aggregator = self.create_bar_aggregator(&instrument, bar_type);
975            self.bar_aggregators.insert(bar_type, aggregator);
976            self.bar_aggregators.get_mut(&bar_type).unwrap()
977        };
978
979        // TODO: Subscribe to data
980
981        aggregator.set_is_running(true);
982
983        Ok(())
984    }
985
986    fn stop_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
987        let aggregator = self
988            .bar_aggregators
989            .remove(&bar_type.standard())
990            .ok_or_else(|| {
991                anyhow::anyhow!("Cannot stop bar aggregator: no aggregator to stop for {bar_type}")
992            })?;
993
994        // TODO: If its a `TimeBarAggregator` then call `.stop()`
995        // if let Some(aggregator) = (aggregator as &dyn BarAggregator)
996        //     .as_any()
997        //     .downcast_ref::<TimeBarAggregator<_, _>>()
998        // {
999        //     aggregator.stop();
1000        // };
1001
1002        if bar_type.is_composite() {
1003            let composite_bar_type = bar_type.composite();
1004            // TODO: Unsubscribe the `aggregator.handle_bar`
1005        } else if bar_type.spec().price_type == PriceType::Last {
1006            // TODO: Unsubscribe `aggregator.handle_trade_tick`
1007            todo!()
1008        } else {
1009            // TODO: Unsubscribe `aggregator.handle_quote_tick`
1010            todo!()
1011        }
1012
1013        Ok(())
1014    }
1015}
1016
1017pub struct SubscriptionCommandHandler {
1018    pub id: Ustr,
1019    pub engine_ref: Rc<RefCell<DataEngine>>,
1020}
1021
1022impl MessageHandler for SubscriptionCommandHandler {
1023    fn id(&self) -> Ustr {
1024        self.id
1025    }
1026
1027    fn handle(&self, msg: &dyn Any) {
1028        self.engine_ref.borrow_mut().enqueue(msg);
1029    }
1030    fn handle_response(&self, _resp: DataResponse) {}
1031    fn handle_data(&self, _data: Data) {}
1032    fn as_any(&self) -> &dyn Any {
1033        self
1034    }
1035}