nautilus_data/engine/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a high-performance `DataEngine` for all environments.
17//!
18//! The `DataEngine` is the central component of the entire data stack.
19//! The data engines primary responsibility is to orchestrate interactions between
20//! the `DataClient` instances, and the rest of the platform. This includes sending
21//! requests to, and receiving responses from, data endpoints via its registered
22//! data clients.
23//!
24//! The engine employs a simple fan-in fan-out messaging pattern to execute
25//! `DataCommand` type messages, and process `DataResponse` messages or market data
26//! objects.
27//!
28//! Alternative implementations can be written on top of the generic engine - which
29//! just need to override the `execute`, `process`, `send` and `receive` methods.
30
31// Under development
32#![allow(dead_code)]
33#![allow(unused_variables)]
34#![allow(unused_assignments)]
35
36pub mod book;
37pub mod config;
38
39#[cfg(test)]
40mod tests;
41
42use std::{
43    any::Any,
44    cell::{Ref, RefCell},
45    collections::{HashMap, HashSet, VecDeque},
46    num::NonZeroUsize,
47    rc::Rc,
48    sync::Arc,
49};
50
51use book::{BookSnapshotInfo, BookSnapshotter, BookUpdater};
52use config::DataEngineConfig;
53use indexmap::IndexMap;
54use nautilus_common::{
55    cache::Cache,
56    clock::Clock,
57    logging::{RECV, RES},
58    messages::data::{
59        DataCommand, DataResponse, RequestCommand, SubscribeBars, SubscribeBookDeltas,
60        SubscribeBookDepth10, SubscribeBookSnapshots, SubscribeCommand, UnsubscribeBars,
61        UnsubscribeBookDeltas, UnsubscribeBookDepth10, UnsubscribeBookSnapshots,
62        UnsubscribeCommand,
63    },
64    msgbus::{
65        self, get_message_bus,
66        handler::{MessageHandler, ShareableMessageHandler},
67        switchboard::{self},
68    },
69    timer::TimeEventCallback,
70};
71use nautilus_core::{
72    correctness::{FAILED, check_key_in_index_map, check_key_not_in_index_map},
73    datetime::{NANOSECONDS_IN_MILLISECOND, NANOSECONDS_IN_SECOND, millis_to_nanos},
74};
75use nautilus_model::{
76    data::{
77        Bar, BarType, Data, DataType, OrderBookDelta, OrderBookDeltas, OrderBookDepth10, QuoteTick,
78        TradeTick,
79        close::InstrumentClose,
80        prices::{IndexPriceUpdate, MarkPriceUpdate},
81    },
82    enums::{AggregationSource, BarAggregation, BookType, PriceType, RecordFlag},
83    identifiers::{ClientId, InstrumentId, Venue},
84    instruments::{Instrument, InstrumentAny, SyntheticInstrument},
85    orderbook::OrderBook,
86};
87use ustr::Ustr;
88
89use crate::{
90    aggregation::{
91        BarAggregator, TickBarAggregator, TimeBarAggregator, ValueBarAggregator,
92        VolumeBarAggregator,
93    },
94    client::DataClientAdapter,
95};
96
97/// Provides a high-performance `DataEngine` for all environments.
98pub struct DataEngine {
99    clock: Rc<RefCell<dyn Clock>>,
100    cache: Rc<RefCell<Cache>>,
101    clients: IndexMap<ClientId, DataClientAdapter>,
102    default_client: Option<DataClientAdapter>,
103    external_clients: HashSet<ClientId>,
104    routing_map: IndexMap<Venue, ClientId>,
105    book_intervals: HashMap<NonZeroUsize, HashSet<InstrumentId>>,
106    book_updaters: HashMap<InstrumentId, Rc<BookUpdater>>,
107    book_snapshotters: HashMap<InstrumentId, Rc<BookSnapshotter>>,
108    bar_aggregators: HashMap<BarType, Box<dyn BarAggregator>>,
109    synthetic_quote_feeds: HashMap<InstrumentId, Vec<SyntheticInstrument>>,
110    synthetic_trade_feeds: HashMap<InstrumentId, Vec<SyntheticInstrument>>,
111    buffered_deltas_map: HashMap<InstrumentId, Vec<OrderBookDelta>>, // TODO: Use OrderBookDeltas?
112    msgbus_priority: u8,
113    command_queue: VecDeque<DataCommand>,
114    config: DataEngineConfig,
115}
116
117impl DataEngine {
118    /// Creates a new [`DataEngine`] instance.
119    #[must_use]
120    pub fn new(
121        clock: Rc<RefCell<dyn Clock>>,
122        cache: Rc<RefCell<Cache>>,
123        config: Option<DataEngineConfig>,
124    ) -> Self {
125        Self {
126            clock,
127            cache,
128            clients: IndexMap::new(),
129            default_client: None,
130            external_clients: HashSet::new(),
131            routing_map: IndexMap::new(),
132            book_intervals: HashMap::new(),
133            book_updaters: HashMap::new(),
134            book_snapshotters: HashMap::new(),
135            bar_aggregators: HashMap::new(),
136            synthetic_quote_feeds: HashMap::new(),
137            synthetic_trade_feeds: HashMap::new(),
138            buffered_deltas_map: HashMap::new(),
139            msgbus_priority: 10, // High-priority for built-in component
140            command_queue: VecDeque::new(),
141            config: config.unwrap_or_default(),
142        }
143    }
144
145    /// Provides read-only access to the cache.
146    #[must_use]
147    pub fn get_cache(&self) -> Ref<'_, Cache> {
148        self.cache.borrow()
149    }
150
151    // pub fn register_catalog(&mut self, catalog: ParquetDataCatalog) {}  TODO: Implement catalog
152
153    /// Registers the given data `client` with the engine as the default routing client.
154    ///
155    /// When a specific venue routing cannot be found, this client will receive messages.
156    ///
157    /// # Warnings
158    ///
159    /// Any existing default routing client will be overwritten.
160    /// TODO: change this to suit message bus behaviour
161    pub fn register_default_client(&mut self, client: DataClientAdapter) {
162        log::info!("Registered default client {}", client.client_id());
163        self.default_client = Some(client);
164    }
165
166    pub fn start(self) {
167        self.clients.values().for_each(|client| client.start());
168    }
169
170    pub fn stop(self) {
171        self.clients.values().for_each(|client| client.stop());
172    }
173
174    pub fn reset(self) {
175        self.clients.values().for_each(|client| client.reset());
176    }
177
178    pub fn dispose(self) {
179        self.clients.values().for_each(|client| client.dispose());
180        self.clock.borrow_mut().cancel_timers();
181    }
182
183    pub fn connect(&self) {
184        todo!() //  Implement actual client connections for a live/sandbox context
185    }
186
187    pub fn disconnect(&self) {
188        todo!() // Implement actual client connections for a live/sandbox context
189    }
190
191    #[must_use]
192    pub fn check_connected(&self) -> bool {
193        self.clients.values().all(|client| client.is_connected())
194    }
195
196    #[must_use]
197    pub fn check_disconnected(&self) -> bool {
198        self.clients.values().all(|client| !client.is_connected())
199    }
200
201    #[must_use]
202    pub fn registered_clients(&self) -> Vec<ClientId> {
203        self.clients.keys().copied().collect()
204    }
205
206    // -- SUBSCRIPTIONS ---------------------------------------------------------------------------
207
208    fn collect_subscriptions<F, T>(&self, get_subs: F) -> Vec<T>
209    where
210        F: Fn(&DataClientAdapter) -> &HashSet<T>,
211        T: Clone,
212    {
213        let mut subs = Vec::new();
214        for client in self.clients.values() {
215            subs.extend(get_subs(client).iter().cloned());
216        }
217        subs
218    }
219
220    fn get_client(
221        &mut self,
222        client_id: Option<&ClientId>,
223        venue: Option<&Venue>,
224    ) -> Option<&mut DataClientAdapter> {
225        if let Some(client_id) = client_id {
226            // Try to get client directly from clients map
227            if self.clients.contains_key(client_id) {
228                return self.clients.get_mut(client_id);
229            }
230        }
231
232        if let Some(venue) = venue {
233            // If not found, try to get client_id from routing map
234            if let Some(mapped_client_id) = self.routing_map.get(venue) {
235                return self.clients.get_mut(mapped_client_id);
236            }
237        }
238
239        None
240    }
241
242    #[must_use]
243    pub fn subscribed_custom_data(&self) -> Vec<DataType> {
244        self.collect_subscriptions(|client| &client.subscriptions_generic)
245    }
246
247    #[must_use]
248    pub fn subscribed_instruments(&self) -> Vec<InstrumentId> {
249        self.collect_subscriptions(|client| &client.subscriptions_instrument)
250    }
251
252    #[must_use]
253    pub fn subscribed_book_deltas(&self) -> Vec<InstrumentId> {
254        self.collect_subscriptions(|client| &client.subscriptions_book_deltas)
255    }
256
257    #[must_use]
258    pub fn subscribed_book_snapshots(&self) -> Vec<InstrumentId> {
259        self.collect_subscriptions(|client| &client.subscriptions_book_snapshots)
260    }
261
262    #[must_use]
263    pub fn subscribed_quotes(&self) -> Vec<InstrumentId> {
264        self.collect_subscriptions(|client| &client.subscriptions_quotes)
265    }
266
267    #[must_use]
268    pub fn subscribed_trades(&self) -> Vec<InstrumentId> {
269        self.collect_subscriptions(|client| &client.subscriptions_trades)
270    }
271
272    #[must_use]
273    pub fn subscribed_bars(&self) -> Vec<BarType> {
274        self.collect_subscriptions(|client| &client.subscriptions_bars)
275    }
276
277    #[must_use]
278    pub fn subscribed_mark_prices(&self) -> Vec<InstrumentId> {
279        self.collect_subscriptions(|client| &client.subscriptions_mark_prices)
280    }
281
282    #[must_use]
283    pub fn subscribed_index_prices(&self) -> Vec<InstrumentId> {
284        self.collect_subscriptions(|client| &client.subscriptions_index_prices)
285    }
286
287    #[must_use]
288    pub fn subscribed_instrument_status(&self) -> Vec<InstrumentId> {
289        self.collect_subscriptions(|client| &client.subscriptions_instrument_status)
290    }
291
292    #[must_use]
293    pub fn subscribed_instrument_close(&self) -> Vec<InstrumentId> {
294        self.collect_subscriptions(|client| &client.subscriptions_instrument_close)
295    }
296
297    pub fn on_start(self) {
298        todo!()
299    }
300
301    pub fn on_stop(self) {
302        todo!()
303    }
304
305    /// Registers a new [`DataClientAdapter`]
306    ///
307    /// # Panics
308    ///
309    /// This function panics:
310    /// - If a client with the same client ID has already been registered.
311    pub fn register_client(&mut self, client: DataClientAdapter, routing: Option<Venue>) {
312        check_key_not_in_index_map(&client.client_id, &self.clients, "client_id", "clients")
313            .expect(FAILED);
314
315        if let Some(routing) = routing {
316            self.routing_map.insert(routing, client.client_id());
317            log::info!("Set client {} routing for {routing}", client.client_id());
318        }
319
320        log::info!("Registered client {}", client.client_id());
321        self.clients.insert(client.client_id, client);
322    }
323
324    /// Deregisters a [`DataClientAdapter`]
325    ///
326    /// # Panics
327    ///
328    /// This function panics:
329    /// - If a client with the same client ID has not been registered.
330    pub fn deregister_client(&mut self, client_id: &ClientId) {
331        check_key_in_index_map(client_id, &self.clients, "client_id", "clients").expect(FAILED);
332
333        self.clients.shift_remove(client_id);
334        log::info!("Deregistered client {client_id}");
335    }
336
337    pub fn run(&mut self) {
338        let commands: Vec<_> = self.command_queue.drain(..).collect();
339        for cmd in commands {
340            self.execute(cmd);
341        }
342    }
343
344    pub fn enqueue(&mut self, cmd: &dyn Any) {
345        if let Some(cmd) = cmd.downcast_ref::<DataCommand>() {
346            self.command_queue.push_back(cmd.clone());
347        } else {
348            log::error!("Invalid message type received: {cmd:?}");
349        }
350    }
351
352    pub fn execute(&mut self, cmd: DataCommand) {
353        let result = match cmd {
354            DataCommand::Subscribe(cmd) => self.execute_subscribe(cmd),
355            DataCommand::Unsubscribe(cmd) => self.execute_unsubscribe(cmd),
356            DataCommand::Request(cmd) => self.execute_request(cmd),
357        };
358
359        if let Err(e) = result {
360            log::error!("{e}");
361        }
362    }
363
364    pub fn execute_subscribe(&mut self, cmd: SubscribeCommand) -> anyhow::Result<()> {
365        match &cmd {
366            SubscribeCommand::BookDeltas(cmd) => self.subscribe_book_deltas(cmd)?,
367            SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd)?,
368            SubscribeCommand::BookSnapshots(cmd) => self.subscribe_book_snapshots(cmd)?,
369            SubscribeCommand::Bars(cmd) => self.subscribe_bars(cmd)?,
370            _ => {} // Do nothing else
371        }
372
373        if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
374            client.execute_subscribe_command(cmd.clone());
375        } else {
376            log::error!(
377                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
378                cmd.client_id(),
379                cmd.venue(),
380            );
381        }
382
383        Ok(())
384    }
385
386    pub fn execute_unsubscribe(&mut self, cmd: UnsubscribeCommand) -> anyhow::Result<()> {
387        match &cmd {
388            UnsubscribeCommand::BookDeltas(cmd) => self.unsubscribe_book_deltas(cmd)?,
389            UnsubscribeCommand::BookDepth10(cmd) => self.unsubscribe_book_depth10(cmd)?,
390            UnsubscribeCommand::BookSnapshots(cmd) => self.unsubscribe_book_snapshots(cmd)?,
391            UnsubscribeCommand::Bars(cmd) => self.unsubscribe_bars(cmd)?,
392            _ => {} // Do nothing else
393        }
394
395        if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
396            client.execute_unsubscribe_command(cmd.clone());
397        } else {
398            log::error!(
399                "Cannot handle command: no client found for client_id={:?}, venue={:?}",
400                cmd.client_id(),
401                cmd.venue(),
402            );
403        }
404
405        Ok(())
406    }
407
408    /// Sends a [`RequestCommand`] to an endpoint that must be a data client implementation.
409    pub fn execute_request(&mut self, req: RequestCommand) -> anyhow::Result<()> {
410        if let Some(client) = self.get_client(req.client_id(), req.venue()) {
411            match req {
412                RequestCommand::Data(req) => client.request_data(req),
413                RequestCommand::Instrument(req) => client.request_instrument(req),
414                RequestCommand::Instruments(req) => client.request_instruments(req),
415                RequestCommand::BookSnapshot(req) => client.request_book_snapshot(req),
416                RequestCommand::Quotes(req) => client.request_quotes(req),
417                RequestCommand::Trades(req) => client.request_trades(req),
418                RequestCommand::Bars(req) => client.request_bars(req),
419            }
420        } else {
421            anyhow::bail!(
422                "Cannot handle request: no client found for {:?} {:?}",
423                req.client_id(),
424                req.venue()
425            );
426        }
427    }
428
429    pub fn process(&mut self, data: &dyn Any) {
430        // TODO: Eventually these could be added to the `Data` enum? process here for now
431        if let Some(instrument) = data.downcast_ref::<InstrumentAny>() {
432            self.handle_instrument(instrument.clone());
433        } else {
434            log::error!("Cannot process data {data:?}, type is unrecognized");
435        }
436    }
437
438    pub fn process_data(&mut self, data: Data) {
439        match data {
440            Data::Delta(delta) => self.handle_delta(delta),
441            Data::Deltas(deltas) => self.handle_deltas(deltas.into_inner()),
442            Data::Depth10(depth) => self.handle_depth10(*depth),
443            Data::Quote(quote) => self.handle_quote(quote),
444            Data::Trade(trade) => self.handle_trade(trade),
445            Data::Bar(bar) => self.handle_bar(bar),
446            Data::MarkPriceUpdate(mark_price) => self.handle_mark_price(mark_price),
447            Data::IndexPriceUpdate(index_price) => self.handle_index_price(index_price),
448            Data::InstrumentClose(close) => self.handle_instrument_close(close),
449        }
450    }
451
452    // TODO: Upgrade to response message handling
453    pub fn response(&self, resp: DataResponse) {
454        log::debug!("{RECV}{RES} {resp:?}");
455
456        match resp.data_type.type_name() {
457            stringify!(InstrumentAny) => {
458                let instruments = Arc::downcast::<Vec<InstrumentAny>>(resp.data.clone())
459                    .expect("Invalid response data");
460                self.handle_instruments(instruments);
461            }
462            stringify!(QuoteTick) => {
463                let quotes = Arc::downcast::<Vec<QuoteTick>>(resp.data.clone())
464                    .expect("Invalid response data");
465                self.handle_quotes(quotes);
466            }
467            stringify!(TradeTick) => {
468                let trades = Arc::downcast::<Vec<TradeTick>>(resp.data.clone())
469                    .expect("Invalid response data");
470                self.handle_trades(trades);
471            }
472            stringify!(Bar) => {
473                let bars =
474                    Arc::downcast::<Vec<Bar>>(resp.data.clone()).expect("Invalid response data");
475                self.handle_bars(bars);
476            }
477            type_name => log::error!("Cannot handle request, type {type_name} is unrecognized"),
478        }
479
480        get_message_bus().borrow().send_response(resp);
481    }
482
483    // -- DATA HANDLERS ---------------------------------------------------------------------------
484
485    fn handle_instrument(&mut self, instrument: InstrumentAny) {
486        if let Err(e) = self
487            .cache
488            .as_ref()
489            .borrow_mut()
490            .add_instrument(instrument.clone())
491        {
492            log::error!("Error on cache insert: {e}");
493        }
494
495        let topic = switchboard::get_instrument_topic(instrument.id());
496        msgbus::publish(&topic, &instrument as &dyn Any); // TODO: Optimize
497    }
498
499    fn handle_delta(&mut self, delta: OrderBookDelta) {
500        let deltas = if self.config.buffer_deltas {
501            let buffer_deltas = self
502                .buffered_deltas_map
503                .entry(delta.instrument_id)
504                .or_default();
505            buffer_deltas.push(delta);
506
507            if !RecordFlag::F_LAST.matches(delta.flags) {
508                return; // Not the last delta for event
509            }
510
511            // SAFETY: We know the deltas exists already
512            let deltas = self
513                .buffered_deltas_map
514                .remove(&delta.instrument_id)
515                .unwrap();
516            OrderBookDeltas::new(delta.instrument_id, deltas)
517        } else {
518            OrderBookDeltas::new(delta.instrument_id, vec![delta])
519        };
520
521        let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
522        msgbus::publish(&topic, &deltas as &dyn Any);
523    }
524
525    fn handle_deltas(&mut self, deltas: OrderBookDeltas) {
526        let deltas = if self.config.buffer_deltas {
527            let buffer_deltas = self
528                .buffered_deltas_map
529                .entry(deltas.instrument_id)
530                .or_default();
531            buffer_deltas.extend(deltas.deltas);
532
533            let mut is_last_delta = false;
534            for delta in buffer_deltas.iter_mut() {
535                if RecordFlag::F_LAST.matches(delta.flags) {
536                    is_last_delta = true;
537                }
538            }
539
540            if !is_last_delta {
541                return;
542            }
543
544            // SAFETY: We know the deltas exists already
545            let buffer_deltas = self
546                .buffered_deltas_map
547                .remove(&deltas.instrument_id)
548                .unwrap();
549            OrderBookDeltas::new(deltas.instrument_id, buffer_deltas)
550        } else {
551            deltas
552        };
553
554        let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
555        msgbus::publish(&topic, &deltas as &dyn Any);
556    }
557
558    fn handle_depth10(&mut self, depth: OrderBookDepth10) {
559        let topic = switchboard::get_book_depth10_topic(depth.instrument_id);
560        msgbus::publish(&topic, &depth as &dyn Any);
561    }
562
563    fn handle_quote(&mut self, quote: QuoteTick) {
564        if let Err(e) = self.cache.as_ref().borrow_mut().add_quote(quote) {
565            log::error!("Error on cache insert: {e}");
566        }
567
568        // TODO: Handle synthetics
569
570        let topic = switchboard::get_quotes_topic(quote.instrument_id);
571        msgbus::publish(&topic, &quote as &dyn Any);
572    }
573
574    fn handle_trade(&mut self, trade: TradeTick) {
575        if let Err(e) = self.cache.as_ref().borrow_mut().add_trade(trade) {
576            log::error!("Error on cache insert: {e}");
577        }
578
579        // TODO: Handle synthetics
580
581        let topic = switchboard::get_trades_topic(trade.instrument_id);
582        msgbus::publish(&topic, &trade as &dyn Any);
583    }
584
585    fn handle_bar(&mut self, bar: Bar) {
586        // TODO: Handle additional bar logic
587        if self.config.validate_data_sequence {
588            if let Some(last_bar) = self.cache.as_ref().borrow().bar(&bar.bar_type) {
589                if bar.ts_event < last_bar.ts_event {
590                    log::warn!(
591                        "Bar {bar} was prior to last bar `ts_event` {}",
592                        last_bar.ts_event
593                    );
594                    return; // Bar is out of sequence
595                }
596                if bar.ts_init < last_bar.ts_init {
597                    log::warn!(
598                        "Bar {bar} was prior to last bar `ts_init` {}",
599                        last_bar.ts_init
600                    );
601                    return; // Bar is out of sequence
602                }
603                // TODO: Implement `bar.is_revision` logic
604            }
605        }
606
607        if let Err(e) = self.cache.as_ref().borrow_mut().add_bar(bar) {
608            log::error!("Error on cache insert: {e}");
609        }
610
611        let topic = switchboard::get_bars_topic(bar.bar_type);
612        msgbus::publish(&topic, &bar as &dyn Any);
613    }
614
615    fn handle_mark_price(&mut self, mark_price: MarkPriceUpdate) {
616        if let Err(e) = self.cache.as_ref().borrow_mut().add_mark_price(mark_price) {
617            log::error!("Error on cache insert: {e}");
618        }
619
620        let topic = switchboard::get_mark_price_topic(mark_price.instrument_id);
621        msgbus::publish(&topic, &mark_price as &dyn Any);
622    }
623
624    fn handle_index_price(&mut self, index_price: IndexPriceUpdate) {
625        if let Err(e) = self
626            .cache
627            .as_ref()
628            .borrow_mut()
629            .add_index_price(index_price)
630        {
631            log::error!("Error on cache insert: {e}");
632        }
633
634        let topic = switchboard::get_index_price_topic(index_price.instrument_id);
635        msgbus::publish(&topic, &index_price as &dyn Any);
636    }
637
638    fn handle_instrument_close(&mut self, close: InstrumentClose) {
639        let topic = switchboard::get_instrument_close_topic(close.instrument_id);
640        msgbus::publish(&topic, &close as &dyn Any);
641    }
642
643    // -- SUBSCRIPTION HANDLERS -------------------------------------------------------------------
644
645    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
646        if cmd.instrument_id.is_synthetic() {
647            anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
648        }
649
650        self.setup_order_book(
651            &cmd.instrument_id,
652            cmd.book_type,
653            cmd.depth,
654            true,
655            cmd.managed,
656        )?;
657
658        Ok(())
659    }
660
661    fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
662        if cmd.instrument_id.is_synthetic() {
663            anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDepth10` data");
664        }
665
666        self.setup_order_book(
667            &cmd.instrument_id,
668            cmd.book_type,
669            cmd.depth, // TODO
670            false,
671            cmd.managed,
672        )?;
673
674        Ok(())
675    }
676
677    fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
678        if self.subscribed_book_deltas().contains(&cmd.instrument_id) {
679            return Ok(());
680        }
681
682        if cmd.instrument_id.is_synthetic() {
683            anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
684        }
685
686        {
687            if !self.book_intervals.contains_key(&cmd.interval_ms) {
688                let interval_ns = millis_to_nanos(cmd.interval_ms.get() as f64);
689                let topic = switchboard::get_book_snapshots_topic(cmd.instrument_id);
690
691                let snap_info = BookSnapshotInfo {
692                    instrument_id: cmd.instrument_id,
693                    venue: cmd.instrument_id.venue,
694                    is_composite: cmd.instrument_id.symbol.is_composite(),
695                    root: Ustr::from(cmd.instrument_id.symbol.root()),
696                    topic,
697                    interval_ms: cmd.interval_ms,
698                };
699
700                let now_ns = self.clock.borrow().timestamp_ns().as_u64();
701                let mut start_time_ns = now_ns - (now_ns % interval_ns);
702
703                if start_time_ns - NANOSECONDS_IN_MILLISECOND <= now_ns {
704                    start_time_ns += NANOSECONDS_IN_SECOND; // Add one second
705                }
706
707                let snapshotter = Rc::new(BookSnapshotter::new(snap_info, self.cache.clone()));
708                self.book_snapshotters
709                    .insert(cmd.instrument_id, snapshotter.clone());
710                let timer_name = snapshotter.timer_name;
711
712                let callback =
713                    TimeEventCallback::Rust(Rc::new(move |event| snapshotter.snapshot(event)));
714
715                self.clock
716                    .borrow_mut()
717                    .set_timer_ns(
718                        &timer_name,
719                        interval_ns,
720                        start_time_ns.into(),
721                        None,
722                        Some(callback),
723                        None,
724                    )
725                    .expect(FAILED);
726            }
727        }
728
729        self.setup_order_book(&cmd.instrument_id, cmd.book_type, cmd.depth, false, true)?;
730
731        Ok(())
732    }
733
734    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
735        match cmd.bar_type.aggregation_source() {
736            AggregationSource::Internal => {
737                if !self.bar_aggregators.contains_key(&cmd.bar_type.standard()) {
738                    self.start_bar_aggregator(cmd.bar_type)?;
739                }
740            }
741            AggregationSource::External => {
742                if cmd.bar_type.instrument_id().is_synthetic() {
743                    anyhow::bail!(
744                        "Cannot subscribe for externally aggregated synthetic instrument bar data"
745                    );
746                }
747            }
748        }
749
750        Ok(())
751    }
752
753    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
754        if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
755            log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
756            return Ok(());
757        }
758
759        let topics = vec![
760            switchboard::get_book_deltas_topic(cmd.instrument_id),
761            switchboard::get_book_depth10_topic(cmd.instrument_id),
762            switchboard::get_book_snapshots_topic(cmd.instrument_id),
763        ];
764
765        self.maintain_book_updater(&cmd.instrument_id, &topics);
766        self.maintain_book_snapshotter(&cmd.instrument_id);
767
768        Ok(())
769    }
770
771    fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
772        if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
773            log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
774            return Ok(());
775        }
776
777        let topics = vec![
778            switchboard::get_book_deltas_topic(cmd.instrument_id),
779            switchboard::get_book_depth10_topic(cmd.instrument_id),
780            switchboard::get_book_snapshots_topic(cmd.instrument_id),
781        ];
782
783        self.maintain_book_updater(&cmd.instrument_id, &topics);
784        self.maintain_book_snapshotter(&cmd.instrument_id);
785
786        Ok(())
787    }
788
789    fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
790        if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
791            log::warn!("Cannot unsubscribe from `OrderBook` snapshots: not subscribed");
792            return Ok(());
793        }
794
795        let topics = vec![
796            switchboard::get_book_deltas_topic(cmd.instrument_id),
797            switchboard::get_book_depth10_topic(cmd.instrument_id),
798            switchboard::get_book_snapshots_topic(cmd.instrument_id),
799        ];
800
801        self.maintain_book_updater(&cmd.instrument_id, &topics);
802        self.maintain_book_snapshotter(&cmd.instrument_id);
803
804        Ok(())
805    }
806
807    const fn unsubscribe_bars(&mut self, command: &UnsubscribeBars) -> anyhow::Result<()> {
808        // TODO: Handle aggregators
809        Ok(())
810    }
811
812    fn maintain_book_updater(&mut self, instrument_id: &InstrumentId, topics: &[Ustr]) {
813        if let Some(updater) = self.book_updaters.get(instrument_id) {
814            let handler = ShareableMessageHandler(updater.clone());
815
816            // Unsubscribe handler if it is the last subscriber
817            for topic in topics {
818                if msgbus::subscriptions_count(*topic) == 1
819                    && msgbus::is_subscribed(*topic, handler.clone())
820                {
821                    log::debug!("Unsubscribing BookUpdater from {topic}");
822                    msgbus::unsubscribe(*topic, handler.clone());
823                }
824            }
825
826            // Check remaining subscriptions, if none then remove updater
827            let still_subscribed = topics
828                .iter()
829                .any(|topic| msgbus::is_subscribed(*topic, handler.clone()));
830            if !still_subscribed {
831                self.book_updaters.remove(instrument_id);
832                log::debug!("Removed BookUpdater for instrument ID {instrument_id}");
833            }
834        }
835    }
836
837    fn maintain_book_snapshotter(&mut self, instrument_id: &InstrumentId) {
838        if let Some(snapshotter) = self.book_snapshotters.get(instrument_id) {
839            let topic = switchboard::get_book_snapshots_topic(*instrument_id);
840
841            // Check remaining snapshot subscriptions, if none then remove snapshotter
842            if msgbus::subscriptions_count(topic) == 0 {
843                let timer_name = snapshotter.timer_name;
844                self.book_snapshotters.remove(instrument_id);
845                let mut clock = self.clock.borrow_mut();
846                if clock.timer_names().contains(&timer_name.as_str()) {
847                    clock.cancel_timer(&timer_name);
848                }
849                log::debug!("Removed BookSnapshotter for instrument ID {instrument_id}");
850            }
851        }
852    }
853
854    // -- RESPONSE HANDLERS -----------------------------------------------------------------------
855
856    fn handle_instruments(&self, instruments: Arc<Vec<InstrumentAny>>) {
857        // TODO: Improve by adding bulk update methods to cache and database
858        let mut cache = self.cache.as_ref().borrow_mut();
859        for instrument in instruments.iter() {
860            if let Err(e) = cache.add_instrument(instrument.clone()) {
861                log::error!("Error on cache insert: {e}");
862            }
863        }
864    }
865
866    fn handle_quotes(&self, quotes: Arc<Vec<QuoteTick>>) {
867        if let Err(e) = self.cache.as_ref().borrow_mut().add_quotes(&quotes) {
868            log::error!("Error on cache insert: {e}");
869        }
870    }
871
872    fn handle_trades(&self, trades: Arc<Vec<TradeTick>>) {
873        if let Err(e) = self.cache.as_ref().borrow_mut().add_trades(&trades) {
874            log::error!("Error on cache insert: {e}");
875        }
876    }
877
878    fn handle_bars(&self, bars: Arc<Vec<Bar>>) {
879        if let Err(e) = self.cache.as_ref().borrow_mut().add_bars(&bars) {
880            log::error!("Error on cache insert: {e}");
881        }
882    }
883
884    // -- INTERNAL --------------------------------------------------------------------------------
885
886    #[allow(clippy::too_many_arguments)]
887    fn setup_order_book(
888        &mut self,
889        instrument_id: &InstrumentId,
890        book_type: BookType,
891        depth: Option<NonZeroUsize>,
892        only_deltas: bool,
893        managed: bool,
894    ) -> anyhow::Result<()> {
895        let mut cache = self.cache.borrow_mut();
896        if managed && !cache.has_order_book(instrument_id) {
897            let book = OrderBook::new(*instrument_id, book_type);
898            log::debug!("Created {book}");
899            cache.add_order_book(book)?;
900        }
901
902        // Set up subscriptions
903        let updater = Rc::new(BookUpdater::new(instrument_id, self.cache.clone()));
904        self.book_updaters.insert(*instrument_id, updater.clone());
905
906        let handler = ShareableMessageHandler(updater);
907
908        let topic = switchboard::get_book_deltas_topic(*instrument_id);
909        if !msgbus::is_subscribed(topic, handler.clone()) {
910            msgbus::subscribe(topic, handler.clone(), Some(self.msgbus_priority));
911        }
912
913        let topic = switchboard::get_book_depth10_topic(*instrument_id);
914        if !only_deltas && !msgbus::is_subscribed(topic, handler.clone()) {
915            msgbus::subscribe(topic, handler, Some(self.msgbus_priority));
916        }
917
918        Ok(())
919    }
920
921    fn create_bar_aggregator(
922        &mut self,
923        instrument: &InstrumentAny,
924        bar_type: BarType,
925    ) -> Box<dyn BarAggregator> {
926        let cache = self.cache.clone();
927
928        let handler = move |bar: Bar| {
929            if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
930                log::error!("Error on cache insert: {e}");
931            }
932
933            let topic = switchboard::get_bars_topic(bar.bar_type);
934            msgbus::publish(&topic, &bar as &dyn Any);
935        };
936
937        let clock = self.clock.clone();
938        let config = self.config.clone();
939
940        let price_precision = instrument.price_precision();
941        let size_precision = instrument.size_precision();
942
943        if bar_type.spec().is_time_aggregated() {
944            Box::new(TimeBarAggregator::new(
945                bar_type,
946                price_precision,
947                size_precision,
948                clock,
949                handler,
950                false, // await_partial
951                config.time_bars_build_with_no_updates,
952                config.time_bars_timestamp_on_close,
953                config.time_bars_interval_type,
954                None,  // TODO: Implement
955                20,    // TODO: TBD, composite bar build delay
956                false, // TODO: skip_first_non_full_bar, make it config dependent
957            ))
958        } else {
959            match bar_type.spec().aggregation {
960                BarAggregation::Tick => Box::new(TickBarAggregator::new(
961                    bar_type,
962                    price_precision,
963                    size_precision,
964                    handler,
965                    false,
966                )) as Box<dyn BarAggregator>,
967                BarAggregation::Volume => Box::new(VolumeBarAggregator::new(
968                    bar_type,
969                    price_precision,
970                    size_precision,
971                    handler,
972                    false,
973                )) as Box<dyn BarAggregator>,
974                BarAggregation::Value => Box::new(ValueBarAggregator::new(
975                    bar_type,
976                    price_precision,
977                    size_precision,
978                    handler,
979                    false,
980                )) as Box<dyn BarAggregator>,
981                _ => panic!(
982                    "Cannot create aggregator: {} aggregation not currently supported",
983                    bar_type.spec().aggregation
984                ),
985            }
986        }
987    }
988
989    fn start_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
990        let instrument = {
991            let cache = self.cache.borrow();
992            cache
993                .instrument(&bar_type.instrument_id())
994                .ok_or_else(|| {
995                    anyhow::anyhow!(
996                        "Cannot start bar aggregation: no instrument found for {}",
997                        bar_type.instrument_id(),
998                    )
999                })?
1000                .clone()
1001        };
1002
1003        let aggregator = if let Some(aggregator) = self.bar_aggregators.get_mut(&bar_type) {
1004            aggregator
1005        } else {
1006            let aggregator = self.create_bar_aggregator(&instrument, bar_type);
1007            self.bar_aggregators.insert(bar_type, aggregator);
1008            self.bar_aggregators.get_mut(&bar_type).unwrap()
1009        };
1010
1011        // TODO: Subscribe to data
1012
1013        aggregator.set_is_running(true);
1014
1015        Ok(())
1016    }
1017
1018    fn stop_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
1019        let aggregator = self
1020            .bar_aggregators
1021            .remove(&bar_type.standard())
1022            .ok_or_else(|| {
1023                anyhow::anyhow!("Cannot stop bar aggregator: no aggregator to stop for {bar_type}")
1024            })?;
1025
1026        // TODO: If its a `TimeBarAggregator` then call `.stop()`
1027        // if let Some(aggregator) = (aggregator as &dyn BarAggregator)
1028        //     .as_any()
1029        //     .downcast_ref::<TimeBarAggregator<_, _>>()
1030        // {
1031        //     aggregator.stop();
1032        // };
1033
1034        if bar_type.is_composite() {
1035            let composite_bar_type = bar_type.composite();
1036            // TODO: Unsubscribe the `aggregator.handle_bar`
1037        } else if bar_type.spec().price_type == PriceType::Last {
1038            // TODO: Unsubscribe `aggregator.handle_trade_tick`
1039            todo!()
1040        } else {
1041            // TODO: Unsubscribe `aggregator.handle_quote_tick`
1042            todo!()
1043        }
1044
1045        Ok(())
1046    }
1047}
1048
1049pub struct SubscriptionCommandHandler {
1050    pub id: Ustr,
1051    pub engine_ref: Rc<RefCell<DataEngine>>,
1052}
1053
1054impl MessageHandler for SubscriptionCommandHandler {
1055    fn id(&self) -> Ustr {
1056        self.id
1057    }
1058
1059    fn handle(&self, msg: &dyn Any) {
1060        self.engine_ref.borrow_mut().enqueue(msg);
1061    }
1062
1063    fn as_any(&self) -> &dyn Any {
1064        self
1065    }
1066}