nautilus_data/engine/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a high-performance `DataEngine` for all environments.
17//!
18//! The `DataEngine` is the central component of the entire data stack.
19//! The data engines primary responsibility is to orchestrate interactions between
20//! the `DataClient` instances, and the rest of the platform. This includes sending
21//! requests to, and receiving responses from, data endpoints via its registered
22//! data clients.
23//!
24//! The engine employs a simple fan-in fan-out messaging pattern to execute
25//! `DataCommand` type messages, and process `DataResponse` messages or market data
26//! objects.
27//!
28//! Alternative implementations can be written on top of the generic engine - which
29//! just need to override the `execute`, `process`, `send` and `receive` methods.
30
31// Under development
32#![allow(dead_code)]
33#![allow(unused_variables)]
34#![allow(unused_assignments)]
35
36pub mod book;
37pub mod config;
38
39#[cfg(test)]
40mod tests;
41
42use std::{
43    any::Any,
44    cell::{Ref, RefCell},
45    collections::{HashMap, HashSet, VecDeque},
46    num::NonZeroU64,
47    rc::Rc,
48    sync::Arc,
49};
50
51use book::{BookSnapshotInfo, BookSnapshotter, BookUpdater};
52use config::DataEngineConfig;
53use indexmap::IndexMap;
54use nautilus_common::{
55    cache::Cache,
56    clock::Clock,
57    logging::{RECV, RES},
58    messages::data::{Action, DataRequest, DataResponse, SubscriptionCommand},
59    msgbus::{
60        MessageBus,
61        handler::{MessageHandler, ShareableMessageHandler},
62    },
63    timer::TimeEventCallback,
64};
65use nautilus_core::{
66    correctness::{FAILED, check_key_in_index_map, check_key_not_in_index_map},
67    datetime::{NANOSECONDS_IN_MILLISECOND, NANOSECONDS_IN_SECOND, millis_to_nanos},
68};
69use nautilus_model::{
70    data::{
71        Bar, BarType, Data, DataType, OrderBookDelta, OrderBookDeltas, OrderBookDepth10, QuoteTick,
72        TradeTick,
73    },
74    enums::{AggregationSource, BarAggregation, BookType, PriceType, RecordFlag},
75    identifiers::{ClientId, InstrumentId, Venue},
76    instruments::{InstrumentAny, SyntheticInstrument},
77    orderbook::OrderBook,
78};
79use ustr::Ustr;
80
81use crate::{
82    aggregation::{
83        BarAggregator, TickBarAggregator, TimeBarAggregator, ValueBarAggregator,
84        VolumeBarAggregator,
85    },
86    client::DataClientAdapter,
87};
88
89/// Provides a high-performance `DataEngine` for all environments.
90pub struct DataEngine {
91    clock: Rc<RefCell<dyn Clock>>,
92    cache: Rc<RefCell<Cache>>,
93    msgbus: Rc<RefCell<MessageBus>>,
94    clients: IndexMap<ClientId, DataClientAdapter>,
95    default_client: Option<DataClientAdapter>,
96    external_clients: HashSet<ClientId>,
97    routing_map: IndexMap<Venue, ClientId>,
98    book_intervals: HashMap<NonZeroU64, HashSet<InstrumentId>>,
99    book_updaters: HashMap<InstrumentId, Rc<BookUpdater>>,
100    book_snapshotters: HashMap<InstrumentId, Rc<BookSnapshotter>>,
101    bar_aggregators: HashMap<BarType, Box<dyn BarAggregator>>,
102    synthetic_quote_feeds: HashMap<InstrumentId, Vec<SyntheticInstrument>>,
103    synthetic_trade_feeds: HashMap<InstrumentId, Vec<SyntheticInstrument>>,
104    buffered_deltas_map: HashMap<InstrumentId, Vec<OrderBookDelta>>, // TODO: Use OrderBookDeltas?
105    msgbus_priority: u8,
106    command_queue: VecDeque<SubscriptionCommand>,
107    config: DataEngineConfig,
108}
109
110impl DataEngine {
111    /// Creates a new [`DataEngine`] instance.
112    #[must_use]
113    pub fn new(
114        clock: Rc<RefCell<dyn Clock>>,
115        cache: Rc<RefCell<Cache>>,
116        msgbus: Rc<RefCell<MessageBus>>,
117        config: Option<DataEngineConfig>,
118    ) -> Self {
119        Self {
120            clock,
121            cache,
122            msgbus,
123            clients: IndexMap::new(),
124            default_client: None,
125            external_clients: HashSet::new(),
126            routing_map: IndexMap::new(),
127            book_intervals: HashMap::new(),
128            book_updaters: HashMap::new(),
129            book_snapshotters: HashMap::new(),
130            bar_aggregators: HashMap::new(),
131            synthetic_quote_feeds: HashMap::new(),
132            synthetic_trade_feeds: HashMap::new(),
133            buffered_deltas_map: HashMap::new(),
134            msgbus_priority: 10, // High-priority for built-in component
135            command_queue: VecDeque::new(),
136            config: config.unwrap_or_default(),
137        }
138    }
139
140    /// Provides read-only access to the cache.
141    #[must_use]
142    pub fn get_cache(&self) -> Ref<'_, Cache> {
143        self.cache.borrow()
144    }
145
146    // pub fn register_catalog(&mut self, catalog: ParquetDataCatalog) {}  TODO: Implement catalog
147
148    /// Registers the given data `client` with the engine as the default routing client.
149    ///
150    /// When a specific venue routing cannot be found, this client will receive messages.
151    ///
152    /// # Warnings
153    ///
154    /// Any existing default routing client will be overwritten.
155    /// TODO: change this to suit message bus behaviour
156    pub fn register_default_client(&mut self, client: DataClientAdapter) {
157        log::info!("Registered default client {}", client.client_id());
158        self.default_client = Some(client);
159    }
160
161    pub fn start(self) {
162        self.clients.values().for_each(|client| client.start());
163    }
164
165    pub fn stop(self) {
166        self.clients.values().for_each(|client| client.stop());
167    }
168
169    pub fn reset(self) {
170        self.clients.values().for_each(|client| client.reset());
171    }
172
173    pub fn dispose(self) {
174        self.clients.values().for_each(|client| client.dispose());
175        self.clock.borrow_mut().cancel_timers();
176    }
177
178    pub fn connect(&self) {
179        todo!() //  Implement actual client connections for a live/sandbox context
180    }
181
182    pub fn disconnect(&self) {
183        todo!() // Implement actual client connections for a live/sandbox context
184    }
185
186    #[must_use]
187    pub fn check_connected(&self) -> bool {
188        self.clients.values().all(|client| client.is_connected())
189    }
190
191    #[must_use]
192    pub fn check_disconnected(&self) -> bool {
193        self.clients.values().all(|client| !client.is_connected())
194    }
195
196    #[must_use]
197    pub fn registed_clients(&self) -> Vec<ClientId> {
198        self.clients.keys().copied().collect()
199    }
200
201    // -- SUBSCRIPTIONS ---------------------------------------------------------------------------
202
203    fn collect_subscriptions<F, T>(&self, get_subs: F) -> Vec<T>
204    where
205        F: Fn(&DataClientAdapter) -> &HashSet<T>,
206        T: Clone,
207    {
208        let mut subs = Vec::new();
209        for client in self.clients.values() {
210            subs.extend(get_subs(client).iter().cloned());
211        }
212        subs
213    }
214
215    fn get_client(&self, client_id: &ClientId, venue: &Venue) -> Option<&DataClientAdapter> {
216        match self.clients.get(client_id) {
217            Some(client) => Some(client),
218            None => self
219                .routing_map
220                .get(venue)
221                .and_then(|client_id: &ClientId| self.clients.get(client_id)),
222        }
223    }
224
225    fn get_client_mut(
226        &mut self,
227        client_id: &ClientId,
228        venue: &Venue,
229    ) -> Option<&mut DataClientAdapter> {
230        // Try to get client directly from clients map
231        if self.clients.contains_key(client_id) {
232            return self.clients.get_mut(client_id);
233        }
234
235        // If not found, try to get client_id from routing map
236        if let Some(mapped_client_id) = self.routing_map.get(venue) {
237            return self.clients.get_mut(mapped_client_id);
238        }
239
240        None
241    }
242
243    #[must_use]
244    pub fn subscribed_custom_data(&self) -> Vec<DataType> {
245        self.collect_subscriptions(|client| &client.subscriptions_generic)
246    }
247
248    #[must_use]
249    pub fn subscribed_instruments(&self) -> Vec<InstrumentId> {
250        self.collect_subscriptions(|client| &client.subscriptions_instrument)
251    }
252
253    #[must_use]
254    pub fn subscribed_order_book_deltas(&self) -> Vec<InstrumentId> {
255        self.collect_subscriptions(|client| &client.subscriptions_order_book_delta)
256    }
257
258    #[must_use]
259    pub fn subscribed_order_book_snapshots(&self) -> Vec<InstrumentId> {
260        self.collect_subscriptions(|client| &client.subscriptions_order_book_snapshot)
261    }
262
263    #[must_use]
264    pub fn subscribed_quote_ticks(&self) -> Vec<InstrumentId> {
265        self.collect_subscriptions(|client| &client.subscriptions_quote_tick)
266    }
267
268    #[must_use]
269    pub fn subscribed_trade_ticks(&self) -> Vec<InstrumentId> {
270        self.collect_subscriptions(|client| &client.subscriptions_trade_tick)
271    }
272
273    #[must_use]
274    pub fn subscribed_bars(&self) -> Vec<BarType> {
275        self.collect_subscriptions(|client| &client.subscriptions_bar)
276    }
277
278    #[must_use]
279    pub fn subscribed_instrument_status(&self) -> Vec<InstrumentId> {
280        self.collect_subscriptions(|client| &client.subscriptions_instrument_status)
281    }
282
283    #[must_use]
284    pub fn subscribed_instrument_close(&self) -> Vec<InstrumentId> {
285        self.collect_subscriptions(|client| &client.subscriptions_instrument_close)
286    }
287
288    pub fn on_start(self) {
289        todo!()
290    }
291
292    pub fn on_stop(self) {
293        todo!()
294    }
295
296    /// Registers a new [`DataClientAdapter`]
297    ///
298    /// # Panics
299    ///
300    /// This function panics:
301    /// - If a client with the same client ID has already been registered.
302    pub fn register_client(&mut self, client: DataClientAdapter, routing: Option<Venue>) {
303        check_key_not_in_index_map(&client.client_id, &self.clients, "client_id", "clients")
304            .expect(FAILED);
305
306        if let Some(routing) = routing {
307            self.routing_map.insert(routing, client.client_id());
308            log::info!("Set client {} routing for {routing}", client.client_id());
309        }
310
311        log::info!("Registered client {}", client.client_id());
312        self.clients.insert(client.client_id, client);
313    }
314
315    /// Deregisters a [`DataClientAdapter`]
316    ///
317    /// # Panics
318    ///
319    /// This function panics:
320    /// - If a client with the same client ID has not been registered.
321    pub fn deregister_client(&mut self, client_id: &ClientId) {
322        check_key_in_index_map(client_id, &self.clients, "client_id", "clients").expect(FAILED);
323
324        self.clients.shift_remove(client_id);
325        log::info!("Deregistered client {client_id}");
326    }
327
328    pub fn run(&mut self) {
329        let commands: Vec<_> = self.command_queue.drain(..).collect();
330        for cmd in commands {
331            self.execute(cmd);
332        }
333    }
334
335    pub fn enqueue(&mut self, cmd: &dyn Any) {
336        if let Some(cmd) = cmd.downcast_ref::<SubscriptionCommand>() {
337            self.command_queue.push_back(cmd.clone());
338        } else {
339            log::error!("Invalid message type received: {cmd:?}");
340        }
341    }
342
343    pub fn execute(&mut self, cmd: SubscriptionCommand) {
344        let result = match cmd.action {
345            Action::Subscribe => match cmd.data_type.type_name() {
346                stringify!(OrderBookDelta) => self.handle_subscribe_book_deltas(&cmd),
347                stringify!(OrderBook) => self.handle_subscribe_book_snapshots(&cmd),
348                stringify!(Bar) => self.handle_subscribe_bars(&cmd),
349                _ => Ok(()), // No other actions for engine
350            },
351            Action::Unsubscribe => match cmd.data_type.type_name() {
352                stringify!(OrderBookDelta) => self.handle_unsubscribe_book_deltas(&cmd),
353                stringify!(OrderBook) => self.handle_unsubscribe_book_snapshots(&cmd),
354                stringify!(Bar) => self.handle_unsubscribe_bars(&cmd),
355                _ => Ok(()), // No other actions for engine
356            },
357        };
358
359        if let Err(e) = result {
360            log::error!("{e}");
361            return;
362        }
363
364        if let Some(client) = self.get_client_mut(&cmd.client_id, &cmd.venue) {
365            client.execute(cmd);
366        } else {
367            log::error!(
368                "Cannot handle command: no client found for {}",
369                cmd.client_id
370            );
371        }
372    }
373
374    /// Sends a [`DataRequest`] to an endpoint that must be a data client implementation.
375    pub fn request(&self, req: DataRequest) {
376        if let Some(client) = self.get_client(&req.client_id, &req.venue) {
377            client.through_request(req);
378        } else {
379            log::error!(
380                "Cannot handle request: no client found for {}",
381                req.client_id
382            );
383        }
384    }
385
386    pub fn process(&mut self, data: &dyn Any) {
387        if let Some(instrument) = data.downcast_ref::<InstrumentAny>() {
388            self.handle_instrument(instrument.clone());
389        } else {
390            log::error!("Cannot process data {data:?}, type is unrecognized");
391        }
392    }
393
394    pub fn process_data(&mut self, data: Data) {
395        match data {
396            Data::Delta(delta) => self.handle_delta(delta),
397            Data::Deltas(deltas) => self.handle_deltas(deltas.into_inner()),
398            Data::Depth10(depth) => self.handle_depth10(*depth),
399            Data::Quote(quote) => self.handle_quote(quote),
400            Data::Trade(trade) => self.handle_trade(trade),
401            Data::Bar(bar) => self.handle_bar(bar),
402        }
403    }
404
405    pub fn response(&self, resp: DataResponse) {
406        log::debug!("{}", format!("{RECV}{RES} {resp:?}"));
407
408        match resp.data_type.type_name() {
409            stringify!(InstrumentAny) => {
410                let instruments = Arc::downcast::<Vec<InstrumentAny>>(resp.data.clone())
411                    .expect("Invalid response data");
412                self.handle_instruments(instruments);
413            }
414            stringify!(QuoteTick) => {
415                let quotes = Arc::downcast::<Vec<QuoteTick>>(resp.data.clone())
416                    .expect("Invalid response data");
417                self.handle_quotes(quotes);
418            }
419            stringify!(TradeTick) => {
420                let trades = Arc::downcast::<Vec<TradeTick>>(resp.data.clone())
421                    .expect("Invalid response data");
422                self.handle_trades(trades);
423            }
424            stringify!(Bar) => {
425                let bars =
426                    Arc::downcast::<Vec<Bar>>(resp.data.clone()).expect("Invalid response data");
427                self.handle_bars(bars);
428            }
429            type_name => log::error!("Cannot handle request, type {type_name} is unrecognized"),
430        }
431
432        self.msgbus.as_ref().borrow().send_response(resp);
433    }
434
435    // -- DATA HANDLERS ---------------------------------------------------------------------------
436
437    fn handle_instrument(&mut self, instrument: InstrumentAny) {
438        if let Err(e) = self
439            .cache
440            .as_ref()
441            .borrow_mut()
442            .add_instrument(instrument.clone())
443        {
444            log::error!("Error on cache insert: {e}");
445        }
446
447        let mut msgbus = self.msgbus.borrow_mut();
448        let topic = msgbus.switchboard.get_instrument_topic(instrument.id());
449        msgbus.publish(&topic, &instrument as &dyn Any); // TODO: Optimize
450    }
451
452    fn handle_delta(&mut self, delta: OrderBookDelta) {
453        let deltas = if self.config.buffer_deltas {
454            let buffer_deltas = self
455                .buffered_deltas_map
456                .entry(delta.instrument_id)
457                .or_default();
458            buffer_deltas.push(delta);
459
460            if !RecordFlag::F_LAST.matches(delta.flags) {
461                return; // Not the last delta for event
462            }
463
464            // SAFETY: We know the deltas exists already
465            let deltas = self
466                .buffered_deltas_map
467                .remove(&delta.instrument_id)
468                .unwrap();
469            OrderBookDeltas::new(delta.instrument_id, deltas)
470        } else {
471            OrderBookDeltas::new(delta.instrument_id, vec![delta])
472        };
473
474        let mut msgbus = self.msgbus.borrow_mut();
475        let topic = msgbus.switchboard.get_deltas_topic(deltas.instrument_id);
476        msgbus.publish(&topic, &deltas as &dyn Any);
477    }
478
479    fn handle_deltas(&mut self, deltas: OrderBookDeltas) {
480        let deltas = if self.config.buffer_deltas {
481            let buffer_deltas = self
482                .buffered_deltas_map
483                .entry(deltas.instrument_id)
484                .or_default();
485            buffer_deltas.extend(deltas.deltas);
486
487            let mut is_last_delta = false;
488            for delta in buffer_deltas.iter_mut() {
489                if RecordFlag::F_LAST.matches(delta.flags) {
490                    is_last_delta = true;
491                }
492            }
493
494            if !is_last_delta {
495                return;
496            }
497
498            // SAFETY: We know the deltas exists already
499            let buffer_deltas = self
500                .buffered_deltas_map
501                .remove(&deltas.instrument_id)
502                .unwrap();
503            OrderBookDeltas::new(deltas.instrument_id, buffer_deltas)
504        } else {
505            deltas
506        };
507
508        let mut msgbus = self.msgbus.borrow_mut();
509        let topic = msgbus.switchboard.get_deltas_topic(deltas.instrument_id);
510        msgbus.publish(&topic, &deltas as &dyn Any); // TODO: Optimize
511    }
512
513    fn handle_depth10(&mut self, depth: OrderBookDepth10) {
514        let mut msgbus = self.msgbus.borrow_mut();
515        let topic = msgbus.switchboard.get_depth_topic(depth.instrument_id);
516        msgbus.publish(&topic, &depth as &dyn Any); // TODO: Optimize
517    }
518
519    fn handle_quote(&mut self, quote: QuoteTick) {
520        if let Err(e) = self.cache.as_ref().borrow_mut().add_quote(quote) {
521            log::error!("Error on cache insert: {e}");
522        }
523
524        // TODO: Handle synthetics
525
526        let mut msgbus = self.msgbus.borrow_mut();
527        let topic = msgbus.switchboard.get_quotes_topic(quote.instrument_id);
528        msgbus.publish(&topic, &quote as &dyn Any); // TODO: Optimize
529    }
530
531    fn handle_trade(&mut self, trade: TradeTick) {
532        if let Err(e) = self.cache.as_ref().borrow_mut().add_trade(trade) {
533            log::error!("Error on cache insert: {e}");
534        }
535
536        // TODO: Handle synthetics
537
538        let mut msgbus = self.msgbus.borrow_mut();
539        let topic = msgbus.switchboard.get_trades_topic(trade.instrument_id);
540        msgbus.publish(&topic, &trade as &dyn Any); // TODO: Optimize
541    }
542
543    fn handle_bar(&mut self, bar: Bar) {
544        // TODO: Handle additional bar logic
545        if self.config.validate_data_sequence {
546            if let Some(last_bar) = self.cache.as_ref().borrow().bar(&bar.bar_type) {
547                if bar.ts_event < last_bar.ts_event {
548                    log::warn!(
549                        "Bar {bar} was prior to last bar `ts_event` {}",
550                        last_bar.ts_event
551                    );
552                    return; // Bar is out of sequence
553                }
554                if bar.ts_init < last_bar.ts_init {
555                    log::warn!(
556                        "Bar {bar} was prior to last bar `ts_init` {}",
557                        last_bar.ts_init
558                    );
559                    return; // Bar is out of sequence
560                }
561                // TODO: Implement `bar.is_revision` logic
562            }
563        }
564
565        if let Err(e) = self.cache.as_ref().borrow_mut().add_bar(bar) {
566            log::error!("Error on cache insert: {e}");
567        }
568
569        let mut msgbus = self.msgbus.borrow_mut();
570        let topic = msgbus.switchboard.get_bars_topic(bar.bar_type);
571        msgbus.publish(&topic, &bar as &dyn Any); // TODO: Optimize
572    }
573
574    // -- SUBSCRIPTION HANDLERS -------------------------------------------------------------------
575
576    fn handle_subscribe_book_deltas(
577        &mut self,
578        command: &SubscriptionCommand,
579    ) -> anyhow::Result<()> {
580        let instrument_id = command.data_type.instrument_id().ok_or_else(|| {
581            anyhow::anyhow!(
582                "Invalid order book deltas subscription: did not contain an 'instrument_id', {}",
583                command.data_type
584            )
585        })?;
586
587        if instrument_id.is_synthetic() {
588            anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
589        }
590
591        if !self.subscribed_order_book_deltas().contains(&instrument_id) {
592            return Ok(());
593        }
594
595        let data_type = command.data_type.clone();
596        let book_type = data_type.book_type();
597        let depth = data_type.depth();
598        let managed = data_type.managed();
599
600        self.setup_order_book(&instrument_id, book_type, depth, true, managed)?;
601
602        Ok(())
603    }
604
605    fn handle_subscribe_book_snapshots(
606        &mut self,
607        command: &SubscriptionCommand,
608    ) -> anyhow::Result<()> {
609        let instrument_id = command.data_type.instrument_id().ok_or_else(|| {
610            anyhow::anyhow!(
611                "Invalid order book snapshots subscription: did not contain an 'instrument_id', {}",
612                command.data_type
613            )
614        })?;
615
616        if self.subscribed_order_book_deltas().contains(&instrument_id) {
617            return Ok(());
618        }
619
620        if instrument_id.is_synthetic() {
621            anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
622        }
623
624        let data_type = command.data_type.clone();
625        let book_type = data_type.book_type();
626        let depth = data_type.depth();
627        let interval_ms = data_type.interval_ms();
628        let managed = data_type.managed();
629
630        {
631            if !self.book_intervals.contains_key(&interval_ms) {
632                let interval_ns = millis_to_nanos(interval_ms.get() as f64);
633                let mut msgbus = self.msgbus.borrow_mut();
634                let topic = msgbus.switchboard.get_book_snapshots_topic(instrument_id);
635
636                let snap_info = BookSnapshotInfo {
637                    instrument_id,
638                    venue: instrument_id.venue,
639                    is_composite: instrument_id.symbol.is_composite(),
640                    root: Ustr::from(instrument_id.symbol.root()),
641                    topic,
642                    interval_ms,
643                };
644
645                let now_ns = self.clock.borrow().timestamp_ns().as_u64();
646                let mut start_time_ns = now_ns - (now_ns % interval_ns);
647
648                if start_time_ns - NANOSECONDS_IN_MILLISECOND <= now_ns {
649                    start_time_ns += NANOSECONDS_IN_SECOND; // Add one second
650                }
651
652                let snapshotter = Rc::new(BookSnapshotter::new(
653                    snap_info,
654                    self.cache.clone(),
655                    self.msgbus.clone(),
656                ));
657                self.book_snapshotters
658                    .insert(instrument_id, snapshotter.clone());
659                let timer_name = snapshotter.timer_name;
660
661                let callback =
662                    TimeEventCallback::Rust(Rc::new(move |event| snapshotter.snapshot(event)));
663
664                self.clock
665                    .borrow_mut()
666                    .set_timer_ns(
667                        &timer_name,
668                        interval_ns,
669                        start_time_ns.into(),
670                        None,
671                        Some(callback),
672                    )
673                    .expect(FAILED);
674            }
675        }
676
677        self.setup_order_book(&instrument_id, book_type, depth, false, managed)?;
678
679        Ok(())
680    }
681
682    fn handle_subscribe_bars(&mut self, command: &SubscriptionCommand) -> anyhow::Result<()> {
683        let bar_type = command.data_type.bar_type();
684
685        match bar_type.aggregation_source() {
686            AggregationSource::Internal => {
687                if !self.bar_aggregators.contains_key(&bar_type.standard()) {
688                    self.start_bar_aggregator(bar_type)?;
689                }
690            }
691            AggregationSource::External => {
692                if bar_type.instrument_id().is_synthetic() {
693                    anyhow::bail!(
694                        "Cannot subscribe for externally aggregated synthetic instrument bar data"
695                    );
696                }
697            }
698        }
699
700        Ok(())
701    }
702
703    fn handle_unsubscribe_book_deltas(
704        &mut self,
705        command: &SubscriptionCommand,
706    ) -> anyhow::Result<()> {
707        let instrument_id = command.data_type.instrument_id().ok_or_else(|| {
708            anyhow::anyhow!(
709                "Invalid order book snapshots subscription: did not contain an 'instrument_id', {}",
710                command.data_type
711            )
712        })?;
713
714        if !self.subscribed_order_book_deltas().contains(&instrument_id) {
715            log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
716            return Ok(());
717        }
718
719        let topics = {
720            let mut msgbus = self.msgbus.borrow_mut();
721            vec![
722                msgbus.switchboard.get_deltas_topic(instrument_id),
723                msgbus.switchboard.get_depth_topic(instrument_id),
724                msgbus.switchboard.get_book_snapshots_topic(instrument_id),
725            ]
726        };
727
728        self.maintain_book_updater(&instrument_id, &topics);
729        self.maintain_book_snapshotter(&instrument_id);
730
731        Ok(())
732    }
733
734    fn handle_unsubscribe_book_snapshots(
735        &mut self,
736        command: &SubscriptionCommand,
737    ) -> anyhow::Result<()> {
738        let instrument_id = command.data_type.instrument_id().ok_or_else(|| {
739            anyhow::anyhow!(
740                "Invalid order book snapshots subscription: did not contain an 'instrument_id', {}",
741                command.data_type
742            )
743        })?;
744
745        if !self.subscribed_order_book_deltas().contains(&instrument_id) {
746            log::warn!("Cannot unsubscribe from `OrderBook` snapshots: not subscribed");
747            return Ok(());
748        }
749
750        let topics = {
751            let mut msgbus = self.msgbus.borrow_mut();
752            vec![
753                msgbus.switchboard.get_deltas_topic(instrument_id),
754                msgbus.switchboard.get_depth_topic(instrument_id),
755                msgbus.switchboard.get_book_snapshots_topic(instrument_id),
756            ]
757        };
758
759        self.maintain_book_updater(&instrument_id, &topics);
760        self.maintain_book_snapshotter(&instrument_id);
761
762        Ok(())
763    }
764
765    const fn handle_unsubscribe_bars(
766        &mut self,
767        command: &SubscriptionCommand,
768    ) -> anyhow::Result<()> {
769        // TODO: Handle aggregators
770        Ok(())
771    }
772
773    fn maintain_book_updater(&mut self, instrument_id: &InstrumentId, topics: &[Ustr]) {
774        if let Some(updater) = self.book_updaters.get(instrument_id) {
775            let handler = ShareableMessageHandler(updater.clone());
776            let mut msgbus = self.msgbus.borrow_mut();
777
778            // Unsubscribe handler if it is the last subscriber
779            for topic in topics {
780                if msgbus.subscriptions_count(*topic) == 1
781                    && msgbus.is_subscribed(*topic, handler.clone())
782                {
783                    log::debug!("Unsubscribing BookUpdater from {topic}");
784                    msgbus.unsubscribe(*topic, handler.clone());
785                }
786            }
787
788            // Check remaining subscriptions, if none then remove updater
789            let still_subscribed = topics
790                .iter()
791                .any(|topic| msgbus.is_subscribed(*topic, handler.clone()));
792            if !still_subscribed {
793                self.book_updaters.remove(instrument_id);
794                log::debug!("Removed BookUpdater for instrument ID {instrument_id}");
795            }
796        }
797    }
798
799    fn maintain_book_snapshotter(&mut self, instrument_id: &InstrumentId) {
800        if let Some(snapshotter) = self.book_snapshotters.get(instrument_id) {
801            let mut msgbus = self.msgbus.borrow_mut();
802
803            let topic = msgbus.switchboard.get_book_snapshots_topic(*instrument_id);
804
805            // Check remaining snapshot subscriptions, if none then remove snapshotter
806            if msgbus.subscriptions_count(topic) == 0 {
807                let timer_name = snapshotter.timer_name;
808                self.book_snapshotters.remove(instrument_id);
809                let mut clock = self.clock.borrow_mut();
810                if clock.timer_names().contains(&timer_name.as_str()) {
811                    clock.cancel_timer(&timer_name);
812                }
813                log::debug!("Removed BookSnapshotter for instrument ID {instrument_id}");
814            }
815        }
816    }
817
818    // -- RESPONSE HANDLERS -----------------------------------------------------------------------
819
820    fn handle_instruments(&self, instruments: Arc<Vec<InstrumentAny>>) {
821        // TODO: Improve by adding bulk update methods to cache and database
822        let mut cache = self.cache.as_ref().borrow_mut();
823        for instrument in instruments.iter() {
824            if let Err(e) = cache.add_instrument(instrument.clone()) {
825                log::error!("Error on cache insert: {e}");
826            }
827        }
828    }
829
830    fn handle_quotes(&self, quotes: Arc<Vec<QuoteTick>>) {
831        if let Err(e) = self.cache.as_ref().borrow_mut().add_quotes(&quotes) {
832            log::error!("Error on cache insert: {e}");
833        }
834    }
835
836    fn handle_trades(&self, trades: Arc<Vec<TradeTick>>) {
837        if let Err(e) = self.cache.as_ref().borrow_mut().add_trades(&trades) {
838            log::error!("Error on cache insert: {e}");
839        }
840    }
841
842    fn handle_bars(&self, bars: Arc<Vec<Bar>>) {
843        if let Err(e) = self.cache.as_ref().borrow_mut().add_bars(&bars) {
844            log::error!("Error on cache insert: {e}");
845        }
846    }
847
848    // -- INTERNAL --------------------------------------------------------------------------------
849
850    #[allow(clippy::too_many_arguments)]
851    fn setup_order_book(
852        &mut self,
853        instrument_id: &InstrumentId,
854        book_type: BookType,
855        depth: Option<usize>,
856        only_deltas: bool,
857        managed: bool,
858    ) -> anyhow::Result<()> {
859        let mut cache = self.cache.borrow_mut();
860        if managed && !cache.has_order_book(instrument_id) {
861            let book = OrderBook::new(*instrument_id, book_type);
862            log::debug!("Created {book}");
863            cache.add_order_book(book)?;
864        }
865
866        let mut msgbus = self.msgbus.borrow_mut();
867
868        // Set up subscriptions
869        let updater = Rc::new(BookUpdater::new(instrument_id, self.cache.clone()));
870        self.book_updaters.insert(*instrument_id, updater.clone());
871
872        let handler = ShareableMessageHandler(updater);
873
874        let topic = msgbus.switchboard.get_deltas_topic(*instrument_id);
875        if !msgbus.is_subscribed(topic, handler.clone()) {
876            msgbus.subscribe(topic, handler.clone(), Some(self.msgbus_priority));
877        }
878
879        let topic = msgbus.switchboard.get_depth_topic(*instrument_id);
880        if !only_deltas && !msgbus.is_subscribed(topic, handler.clone()) {
881            msgbus.subscribe(topic, handler, Some(self.msgbus_priority));
882        }
883
884        Ok(())
885    }
886
887    fn create_bar_aggregator(
888        &mut self,
889        instrument: &InstrumentAny,
890        bar_type: BarType,
891    ) -> Box<dyn BarAggregator> {
892        let cache = self.cache.clone();
893        let msgbus = self.msgbus.clone();
894
895        let handler = move |bar: Bar| {
896            if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
897                log::error!("Error on cache insert: {e}");
898            }
899
900            let mut msgbus = msgbus.borrow_mut();
901            let topic = msgbus.switchboard.get_bars_topic(bar.bar_type);
902            msgbus.publish(&topic, &bar as &dyn Any);
903        };
904
905        let clock = self.clock.clone();
906        let config = self.config.clone();
907
908        let price_precision = instrument.price_precision();
909        let size_precision = instrument.size_precision();
910
911        if bar_type.spec().is_time_aggregated() {
912            Box::new(TimeBarAggregator::new(
913                bar_type,
914                price_precision,
915                size_precision,
916                clock,
917                handler,
918                false, // await_partial
919                config.time_bars_build_with_no_updates,
920                config.time_bars_timestamp_on_close,
921                config.time_bars_interval_type,
922                None,  // TODO: Implement
923                20,    // TODO: TBD, composite bar build delay
924                false, // TODO: skip_first_non_full_bar, make it config dependent
925            ))
926        } else {
927            match bar_type.spec().aggregation {
928                BarAggregation::Tick => Box::new(TickBarAggregator::new(
929                    bar_type,
930                    price_precision,
931                    size_precision,
932                    handler,
933                    false,
934                )) as Box<dyn BarAggregator>,
935                BarAggregation::Volume => Box::new(VolumeBarAggregator::new(
936                    bar_type,
937                    price_precision,
938                    size_precision,
939                    handler,
940                    false,
941                )) as Box<dyn BarAggregator>,
942                BarAggregation::Value => Box::new(ValueBarAggregator::new(
943                    bar_type,
944                    price_precision,
945                    size_precision,
946                    handler,
947                    false,
948                )) as Box<dyn BarAggregator>,
949                _ => panic!(
950                    "Cannot create aggregator: {} aggregation not currently supported",
951                    bar_type.spec().aggregation
952                ),
953            }
954        }
955    }
956
957    fn start_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
958        let instrument = {
959            let cache = self.cache.borrow();
960            cache
961                .instrument(&bar_type.instrument_id())
962                .ok_or_else(|| {
963                    anyhow::anyhow!(
964                        "Cannot start bar aggregation: no instrument found for {}",
965                        bar_type.instrument_id(),
966                    )
967                })?
968                .clone()
969        };
970
971        let aggregator = if let Some(aggregator) = self.bar_aggregators.get_mut(&bar_type) {
972            aggregator
973        } else {
974            let aggregator = self.create_bar_aggregator(&instrument, bar_type);
975            self.bar_aggregators.insert(bar_type, aggregator);
976            self.bar_aggregators.get_mut(&bar_type).unwrap()
977        };
978
979        // TODO: Subscribe to data
980
981        aggregator.set_is_running(true);
982
983        Ok(())
984    }
985
986    fn stop_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
987        let aggregator = self
988            .bar_aggregators
989            .remove(&bar_type.standard())
990            .ok_or_else(|| {
991                anyhow::anyhow!("Cannot stop bar aggregator: no aggregator to stop for {bar_type}")
992            })?;
993
994        // TODO: If its a `TimeBarAggregator` then call `.stop()`
995        // if let Some(aggregator) = (aggregator as &dyn BarAggregator)
996        //     .as_any()
997        //     .downcast_ref::<TimeBarAggregator<_, _>>()
998        // {
999        //     aggregator.stop();
1000        // };
1001
1002        if bar_type.is_composite() {
1003            let composite_bar_type = bar_type.composite();
1004            // TODO: Unsubscribe the `aggregator.handle_bar`
1005        } else if bar_type.spec().price_type == PriceType::Last {
1006            // TODO: Unsubscribe `aggregator.handle_trade_tick`
1007            todo!()
1008        } else {
1009            // TODO: Unsubscribe `aggregator.handle_quote_tick`
1010            todo!()
1011        }
1012
1013        Ok(())
1014    }
1015}
1016
1017pub struct SubscriptionCommandHandler {
1018    pub id: Ustr,
1019    pub engine_ref: Rc<RefCell<DataEngine>>,
1020}
1021
1022impl MessageHandler for SubscriptionCommandHandler {
1023    fn id(&self) -> Ustr {
1024        self.id
1025    }
1026
1027    fn handle(&self, msg: &dyn Any) {
1028        self.engine_ref.borrow_mut().enqueue(msg);
1029    }
1030    fn handle_response(&self, _resp: DataResponse) {}
1031    fn handle_data(&self, _data: Data) {}
1032    fn as_any(&self) -> &dyn Any {
1033        self
1034    }
1035}