1#![allow(dead_code)]
33#![allow(unused_variables)]
34#![allow(unused_assignments)]
35
36pub mod book;
37pub mod config;
38
39#[cfg(test)]
40mod tests;
41
42use std::{
43 any::Any,
44 cell::{Ref, RefCell},
45 collections::{HashMap, HashSet, VecDeque},
46 num::NonZeroU64,
47 rc::Rc,
48 sync::Arc,
49};
50
51use book::{BookSnapshotInfo, BookSnapshotter, BookUpdater};
52use config::DataEngineConfig;
53use indexmap::IndexMap;
54use nautilus_common::{
55 cache::Cache,
56 clock::Clock,
57 logging::{RECV, RES},
58 messages::data::{Action, DataRequest, DataResponse, SubscriptionCommand},
59 msgbus::{
60 MessageBus,
61 handler::{MessageHandler, ShareableMessageHandler},
62 },
63 timer::TimeEventCallback,
64};
65use nautilus_core::{
66 correctness::{FAILED, check_key_in_index_map, check_key_not_in_index_map},
67 datetime::{NANOSECONDS_IN_MILLISECOND, NANOSECONDS_IN_SECOND, millis_to_nanos},
68};
69use nautilus_model::{
70 data::{
71 Bar, BarType, Data, DataType, OrderBookDelta, OrderBookDeltas, OrderBookDepth10, QuoteTick,
72 TradeTick,
73 },
74 enums::{AggregationSource, BarAggregation, BookType, PriceType, RecordFlag},
75 identifiers::{ClientId, InstrumentId, Venue},
76 instruments::{InstrumentAny, SyntheticInstrument},
77 orderbook::OrderBook,
78};
79use ustr::Ustr;
80
81use crate::{
82 aggregation::{
83 BarAggregator, TickBarAggregator, TimeBarAggregator, ValueBarAggregator,
84 VolumeBarAggregator,
85 },
86 client::DataClientAdapter,
87};
88
89pub struct DataEngine {
91 clock: Rc<RefCell<dyn Clock>>,
92 cache: Rc<RefCell<Cache>>,
93 msgbus: Rc<RefCell<MessageBus>>,
94 clients: IndexMap<ClientId, DataClientAdapter>,
95 default_client: Option<DataClientAdapter>,
96 external_clients: HashSet<ClientId>,
97 routing_map: IndexMap<Venue, ClientId>,
98 book_intervals: HashMap<NonZeroU64, HashSet<InstrumentId>>,
99 book_updaters: HashMap<InstrumentId, Rc<BookUpdater>>,
100 book_snapshotters: HashMap<InstrumentId, Rc<BookSnapshotter>>,
101 bar_aggregators: HashMap<BarType, Box<dyn BarAggregator>>,
102 synthetic_quote_feeds: HashMap<InstrumentId, Vec<SyntheticInstrument>>,
103 synthetic_trade_feeds: HashMap<InstrumentId, Vec<SyntheticInstrument>>,
104 buffered_deltas_map: HashMap<InstrumentId, Vec<OrderBookDelta>>, msgbus_priority: u8,
106 command_queue: VecDeque<SubscriptionCommand>,
107 config: DataEngineConfig,
108}
109
110impl DataEngine {
111 #[must_use]
113 pub fn new(
114 clock: Rc<RefCell<dyn Clock>>,
115 cache: Rc<RefCell<Cache>>,
116 msgbus: Rc<RefCell<MessageBus>>,
117 config: Option<DataEngineConfig>,
118 ) -> Self {
119 Self {
120 clock,
121 cache,
122 msgbus,
123 clients: IndexMap::new(),
124 default_client: None,
125 external_clients: HashSet::new(),
126 routing_map: IndexMap::new(),
127 book_intervals: HashMap::new(),
128 book_updaters: HashMap::new(),
129 book_snapshotters: HashMap::new(),
130 bar_aggregators: HashMap::new(),
131 synthetic_quote_feeds: HashMap::new(),
132 synthetic_trade_feeds: HashMap::new(),
133 buffered_deltas_map: HashMap::new(),
134 msgbus_priority: 10, command_queue: VecDeque::new(),
136 config: config.unwrap_or_default(),
137 }
138 }
139
140 #[must_use]
142 pub fn get_cache(&self) -> Ref<'_, Cache> {
143 self.cache.borrow()
144 }
145
146 pub fn register_default_client(&mut self, client: DataClientAdapter) {
157 log::info!("Registered default client {}", client.client_id());
158 self.default_client = Some(client);
159 }
160
161 pub fn start(self) {
162 self.clients.values().for_each(|client| client.start());
163 }
164
165 pub fn stop(self) {
166 self.clients.values().for_each(|client| client.stop());
167 }
168
169 pub fn reset(self) {
170 self.clients.values().for_each(|client| client.reset());
171 }
172
173 pub fn dispose(self) {
174 self.clients.values().for_each(|client| client.dispose());
175 self.clock.borrow_mut().cancel_timers();
176 }
177
178 pub fn connect(&self) {
179 todo!() }
181
182 pub fn disconnect(&self) {
183 todo!() }
185
186 #[must_use]
187 pub fn check_connected(&self) -> bool {
188 self.clients.values().all(|client| client.is_connected())
189 }
190
191 #[must_use]
192 pub fn check_disconnected(&self) -> bool {
193 self.clients.values().all(|client| !client.is_connected())
194 }
195
196 #[must_use]
197 pub fn registed_clients(&self) -> Vec<ClientId> {
198 self.clients.keys().copied().collect()
199 }
200
201 fn collect_subscriptions<F, T>(&self, get_subs: F) -> Vec<T>
204 where
205 F: Fn(&DataClientAdapter) -> &HashSet<T>,
206 T: Clone,
207 {
208 let mut subs = Vec::new();
209 for client in self.clients.values() {
210 subs.extend(get_subs(client).iter().cloned());
211 }
212 subs
213 }
214
215 fn get_client(&self, client_id: &ClientId, venue: &Venue) -> Option<&DataClientAdapter> {
216 match self.clients.get(client_id) {
217 Some(client) => Some(client),
218 None => self
219 .routing_map
220 .get(venue)
221 .and_then(|client_id: &ClientId| self.clients.get(client_id)),
222 }
223 }
224
225 fn get_client_mut(
226 &mut self,
227 client_id: &ClientId,
228 venue: &Venue,
229 ) -> Option<&mut DataClientAdapter> {
230 if self.clients.contains_key(client_id) {
232 return self.clients.get_mut(client_id);
233 }
234
235 if let Some(mapped_client_id) = self.routing_map.get(venue) {
237 return self.clients.get_mut(mapped_client_id);
238 }
239
240 None
241 }
242
243 #[must_use]
244 pub fn subscribed_custom_data(&self) -> Vec<DataType> {
245 self.collect_subscriptions(|client| &client.subscriptions_generic)
246 }
247
248 #[must_use]
249 pub fn subscribed_instruments(&self) -> Vec<InstrumentId> {
250 self.collect_subscriptions(|client| &client.subscriptions_instrument)
251 }
252
253 #[must_use]
254 pub fn subscribed_order_book_deltas(&self) -> Vec<InstrumentId> {
255 self.collect_subscriptions(|client| &client.subscriptions_order_book_delta)
256 }
257
258 #[must_use]
259 pub fn subscribed_order_book_snapshots(&self) -> Vec<InstrumentId> {
260 self.collect_subscriptions(|client| &client.subscriptions_order_book_snapshot)
261 }
262
263 #[must_use]
264 pub fn subscribed_quote_ticks(&self) -> Vec<InstrumentId> {
265 self.collect_subscriptions(|client| &client.subscriptions_quote_tick)
266 }
267
268 #[must_use]
269 pub fn subscribed_trade_ticks(&self) -> Vec<InstrumentId> {
270 self.collect_subscriptions(|client| &client.subscriptions_trade_tick)
271 }
272
273 #[must_use]
274 pub fn subscribed_bars(&self) -> Vec<BarType> {
275 self.collect_subscriptions(|client| &client.subscriptions_bar)
276 }
277
278 #[must_use]
279 pub fn subscribed_instrument_status(&self) -> Vec<InstrumentId> {
280 self.collect_subscriptions(|client| &client.subscriptions_instrument_status)
281 }
282
283 #[must_use]
284 pub fn subscribed_instrument_close(&self) -> Vec<InstrumentId> {
285 self.collect_subscriptions(|client| &client.subscriptions_instrument_close)
286 }
287
288 pub fn on_start(self) {
289 todo!()
290 }
291
292 pub fn on_stop(self) {
293 todo!()
294 }
295
296 pub fn register_client(&mut self, client: DataClientAdapter, routing: Option<Venue>) {
303 check_key_not_in_index_map(&client.client_id, &self.clients, "client_id", "clients")
304 .expect(FAILED);
305
306 if let Some(routing) = routing {
307 self.routing_map.insert(routing, client.client_id());
308 log::info!("Set client {} routing for {routing}", client.client_id());
309 }
310
311 log::info!("Registered client {}", client.client_id());
312 self.clients.insert(client.client_id, client);
313 }
314
315 pub fn deregister_client(&mut self, client_id: &ClientId) {
322 check_key_in_index_map(client_id, &self.clients, "client_id", "clients").expect(FAILED);
323
324 self.clients.shift_remove(client_id);
325 log::info!("Deregistered client {client_id}");
326 }
327
328 pub fn run(&mut self) {
329 let commands: Vec<_> = self.command_queue.drain(..).collect();
330 for cmd in commands {
331 self.execute(cmd);
332 }
333 }
334
335 pub fn enqueue(&mut self, cmd: &dyn Any) {
336 if let Some(cmd) = cmd.downcast_ref::<SubscriptionCommand>() {
337 self.command_queue.push_back(cmd.clone());
338 } else {
339 log::error!("Invalid message type received: {cmd:?}");
340 }
341 }
342
343 pub fn execute(&mut self, cmd: SubscriptionCommand) {
344 let result = match cmd.action {
345 Action::Subscribe => match cmd.data_type.type_name() {
346 stringify!(OrderBookDelta) => self.handle_subscribe_book_deltas(&cmd),
347 stringify!(OrderBook) => self.handle_subscribe_book_snapshots(&cmd),
348 stringify!(Bar) => self.handle_subscribe_bars(&cmd),
349 _ => Ok(()), },
351 Action::Unsubscribe => match cmd.data_type.type_name() {
352 stringify!(OrderBookDelta) => self.handle_unsubscribe_book_deltas(&cmd),
353 stringify!(OrderBook) => self.handle_unsubscribe_book_snapshots(&cmd),
354 stringify!(Bar) => self.handle_unsubscribe_bars(&cmd),
355 _ => Ok(()), },
357 };
358
359 if let Err(e) = result {
360 log::error!("{e}");
361 return;
362 }
363
364 if let Some(client) = self.get_client_mut(&cmd.client_id, &cmd.venue) {
365 client.execute(cmd);
366 } else {
367 log::error!(
368 "Cannot handle command: no client found for {}",
369 cmd.client_id
370 );
371 }
372 }
373
374 pub fn request(&self, req: DataRequest) {
376 if let Some(client) = self.get_client(&req.client_id, &req.venue) {
377 client.through_request(req);
378 } else {
379 log::error!(
380 "Cannot handle request: no client found for {}",
381 req.client_id
382 );
383 }
384 }
385
386 pub fn process(&mut self, data: &dyn Any) {
387 if let Some(instrument) = data.downcast_ref::<InstrumentAny>() {
388 self.handle_instrument(instrument.clone());
389 } else {
390 log::error!("Cannot process data {data:?}, type is unrecognized");
391 }
392 }
393
394 pub fn process_data(&mut self, data: Data) {
395 match data {
396 Data::Delta(delta) => self.handle_delta(delta),
397 Data::Deltas(deltas) => self.handle_deltas(deltas.into_inner()),
398 Data::Depth10(depth) => self.handle_depth10(*depth),
399 Data::Quote(quote) => self.handle_quote(quote),
400 Data::Trade(trade) => self.handle_trade(trade),
401 Data::Bar(bar) => self.handle_bar(bar),
402 }
403 }
404
405 pub fn response(&self, resp: DataResponse) {
406 log::debug!("{}", format!("{RECV}{RES} {resp:?}"));
407
408 match resp.data_type.type_name() {
409 stringify!(InstrumentAny) => {
410 let instruments = Arc::downcast::<Vec<InstrumentAny>>(resp.data.clone())
411 .expect("Invalid response data");
412 self.handle_instruments(instruments);
413 }
414 stringify!(QuoteTick) => {
415 let quotes = Arc::downcast::<Vec<QuoteTick>>(resp.data.clone())
416 .expect("Invalid response data");
417 self.handle_quotes(quotes);
418 }
419 stringify!(TradeTick) => {
420 let trades = Arc::downcast::<Vec<TradeTick>>(resp.data.clone())
421 .expect("Invalid response data");
422 self.handle_trades(trades);
423 }
424 stringify!(Bar) => {
425 let bars =
426 Arc::downcast::<Vec<Bar>>(resp.data.clone()).expect("Invalid response data");
427 self.handle_bars(bars);
428 }
429 type_name => log::error!("Cannot handle request, type {type_name} is unrecognized"),
430 }
431
432 self.msgbus.as_ref().borrow().send_response(resp);
433 }
434
435 fn handle_instrument(&mut self, instrument: InstrumentAny) {
438 if let Err(e) = self
439 .cache
440 .as_ref()
441 .borrow_mut()
442 .add_instrument(instrument.clone())
443 {
444 log::error!("Error on cache insert: {e}");
445 }
446
447 let mut msgbus = self.msgbus.borrow_mut();
448 let topic = msgbus.switchboard.get_instrument_topic(instrument.id());
449 msgbus.publish(&topic, &instrument as &dyn Any); }
451
452 fn handle_delta(&mut self, delta: OrderBookDelta) {
453 let deltas = if self.config.buffer_deltas {
454 let buffer_deltas = self
455 .buffered_deltas_map
456 .entry(delta.instrument_id)
457 .or_default();
458 buffer_deltas.push(delta);
459
460 if !RecordFlag::F_LAST.matches(delta.flags) {
461 return; }
463
464 let deltas = self
466 .buffered_deltas_map
467 .remove(&delta.instrument_id)
468 .unwrap();
469 OrderBookDeltas::new(delta.instrument_id, deltas)
470 } else {
471 OrderBookDeltas::new(delta.instrument_id, vec![delta])
472 };
473
474 let mut msgbus = self.msgbus.borrow_mut();
475 let topic = msgbus.switchboard.get_deltas_topic(deltas.instrument_id);
476 msgbus.publish(&topic, &deltas as &dyn Any);
477 }
478
479 fn handle_deltas(&mut self, deltas: OrderBookDeltas) {
480 let deltas = if self.config.buffer_deltas {
481 let buffer_deltas = self
482 .buffered_deltas_map
483 .entry(deltas.instrument_id)
484 .or_default();
485 buffer_deltas.extend(deltas.deltas);
486
487 let mut is_last_delta = false;
488 for delta in buffer_deltas.iter_mut() {
489 if RecordFlag::F_LAST.matches(delta.flags) {
490 is_last_delta = true;
491 }
492 }
493
494 if !is_last_delta {
495 return;
496 }
497
498 let buffer_deltas = self
500 .buffered_deltas_map
501 .remove(&deltas.instrument_id)
502 .unwrap();
503 OrderBookDeltas::new(deltas.instrument_id, buffer_deltas)
504 } else {
505 deltas
506 };
507
508 let mut msgbus = self.msgbus.borrow_mut();
509 let topic = msgbus.switchboard.get_deltas_topic(deltas.instrument_id);
510 msgbus.publish(&topic, &deltas as &dyn Any); }
512
513 fn handle_depth10(&mut self, depth: OrderBookDepth10) {
514 let mut msgbus = self.msgbus.borrow_mut();
515 let topic = msgbus.switchboard.get_depth_topic(depth.instrument_id);
516 msgbus.publish(&topic, &depth as &dyn Any); }
518
519 fn handle_quote(&mut self, quote: QuoteTick) {
520 if let Err(e) = self.cache.as_ref().borrow_mut().add_quote(quote) {
521 log::error!("Error on cache insert: {e}");
522 }
523
524 let mut msgbus = self.msgbus.borrow_mut();
527 let topic = msgbus.switchboard.get_quotes_topic(quote.instrument_id);
528 msgbus.publish(&topic, "e as &dyn Any); }
530
531 fn handle_trade(&mut self, trade: TradeTick) {
532 if let Err(e) = self.cache.as_ref().borrow_mut().add_trade(trade) {
533 log::error!("Error on cache insert: {e}");
534 }
535
536 let mut msgbus = self.msgbus.borrow_mut();
539 let topic = msgbus.switchboard.get_trades_topic(trade.instrument_id);
540 msgbus.publish(&topic, &trade as &dyn Any); }
542
543 fn handle_bar(&mut self, bar: Bar) {
544 if self.config.validate_data_sequence {
546 if let Some(last_bar) = self.cache.as_ref().borrow().bar(&bar.bar_type) {
547 if bar.ts_event < last_bar.ts_event {
548 log::warn!(
549 "Bar {bar} was prior to last bar `ts_event` {}",
550 last_bar.ts_event
551 );
552 return; }
554 if bar.ts_init < last_bar.ts_init {
555 log::warn!(
556 "Bar {bar} was prior to last bar `ts_init` {}",
557 last_bar.ts_init
558 );
559 return; }
561 }
563 }
564
565 if let Err(e) = self.cache.as_ref().borrow_mut().add_bar(bar) {
566 log::error!("Error on cache insert: {e}");
567 }
568
569 let mut msgbus = self.msgbus.borrow_mut();
570 let topic = msgbus.switchboard.get_bars_topic(bar.bar_type);
571 msgbus.publish(&topic, &bar as &dyn Any); }
573
574 fn handle_subscribe_book_deltas(
577 &mut self,
578 command: &SubscriptionCommand,
579 ) -> anyhow::Result<()> {
580 let instrument_id = command.data_type.instrument_id().ok_or_else(|| {
581 anyhow::anyhow!(
582 "Invalid order book deltas subscription: did not contain an 'instrument_id', {}",
583 command.data_type
584 )
585 })?;
586
587 if instrument_id.is_synthetic() {
588 anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
589 }
590
591 if !self.subscribed_order_book_deltas().contains(&instrument_id) {
592 return Ok(());
593 }
594
595 let data_type = command.data_type.clone();
596 let book_type = data_type.book_type();
597 let depth = data_type.depth();
598 let managed = data_type.managed();
599
600 self.setup_order_book(&instrument_id, book_type, depth, true, managed)?;
601
602 Ok(())
603 }
604
605 fn handle_subscribe_book_snapshots(
606 &mut self,
607 command: &SubscriptionCommand,
608 ) -> anyhow::Result<()> {
609 let instrument_id = command.data_type.instrument_id().ok_or_else(|| {
610 anyhow::anyhow!(
611 "Invalid order book snapshots subscription: did not contain an 'instrument_id', {}",
612 command.data_type
613 )
614 })?;
615
616 if self.subscribed_order_book_deltas().contains(&instrument_id) {
617 return Ok(());
618 }
619
620 if instrument_id.is_synthetic() {
621 anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
622 }
623
624 let data_type = command.data_type.clone();
625 let book_type = data_type.book_type();
626 let depth = data_type.depth();
627 let interval_ms = data_type.interval_ms();
628 let managed = data_type.managed();
629
630 {
631 if !self.book_intervals.contains_key(&interval_ms) {
632 let interval_ns = millis_to_nanos(interval_ms.get() as f64);
633 let mut msgbus = self.msgbus.borrow_mut();
634 let topic = msgbus.switchboard.get_book_snapshots_topic(instrument_id);
635
636 let snap_info = BookSnapshotInfo {
637 instrument_id,
638 venue: instrument_id.venue,
639 is_composite: instrument_id.symbol.is_composite(),
640 root: Ustr::from(instrument_id.symbol.root()),
641 topic,
642 interval_ms,
643 };
644
645 let now_ns = self.clock.borrow().timestamp_ns().as_u64();
646 let mut start_time_ns = now_ns - (now_ns % interval_ns);
647
648 if start_time_ns - NANOSECONDS_IN_MILLISECOND <= now_ns {
649 start_time_ns += NANOSECONDS_IN_SECOND; }
651
652 let snapshotter = Rc::new(BookSnapshotter::new(
653 snap_info,
654 self.cache.clone(),
655 self.msgbus.clone(),
656 ));
657 self.book_snapshotters
658 .insert(instrument_id, snapshotter.clone());
659 let timer_name = snapshotter.timer_name;
660
661 let callback =
662 TimeEventCallback::Rust(Rc::new(move |event| snapshotter.snapshot(event)));
663
664 self.clock
665 .borrow_mut()
666 .set_timer_ns(
667 &timer_name,
668 interval_ns,
669 start_time_ns.into(),
670 None,
671 Some(callback),
672 )
673 .expect(FAILED);
674 }
675 }
676
677 self.setup_order_book(&instrument_id, book_type, depth, false, managed)?;
678
679 Ok(())
680 }
681
682 fn handle_subscribe_bars(&mut self, command: &SubscriptionCommand) -> anyhow::Result<()> {
683 let bar_type = command.data_type.bar_type();
684
685 match bar_type.aggregation_source() {
686 AggregationSource::Internal => {
687 if !self.bar_aggregators.contains_key(&bar_type.standard()) {
688 self.start_bar_aggregator(bar_type)?;
689 }
690 }
691 AggregationSource::External => {
692 if bar_type.instrument_id().is_synthetic() {
693 anyhow::bail!(
694 "Cannot subscribe for externally aggregated synthetic instrument bar data"
695 );
696 }
697 }
698 }
699
700 Ok(())
701 }
702
703 fn handle_unsubscribe_book_deltas(
704 &mut self,
705 command: &SubscriptionCommand,
706 ) -> anyhow::Result<()> {
707 let instrument_id = command.data_type.instrument_id().ok_or_else(|| {
708 anyhow::anyhow!(
709 "Invalid order book snapshots subscription: did not contain an 'instrument_id', {}",
710 command.data_type
711 )
712 })?;
713
714 if !self.subscribed_order_book_deltas().contains(&instrument_id) {
715 log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
716 return Ok(());
717 }
718
719 let topics = {
720 let mut msgbus = self.msgbus.borrow_mut();
721 vec![
722 msgbus.switchboard.get_deltas_topic(instrument_id),
723 msgbus.switchboard.get_depth_topic(instrument_id),
724 msgbus.switchboard.get_book_snapshots_topic(instrument_id),
725 ]
726 };
727
728 self.maintain_book_updater(&instrument_id, &topics);
729 self.maintain_book_snapshotter(&instrument_id);
730
731 Ok(())
732 }
733
734 fn handle_unsubscribe_book_snapshots(
735 &mut self,
736 command: &SubscriptionCommand,
737 ) -> anyhow::Result<()> {
738 let instrument_id = command.data_type.instrument_id().ok_or_else(|| {
739 anyhow::anyhow!(
740 "Invalid order book snapshots subscription: did not contain an 'instrument_id', {}",
741 command.data_type
742 )
743 })?;
744
745 if !self.subscribed_order_book_deltas().contains(&instrument_id) {
746 log::warn!("Cannot unsubscribe from `OrderBook` snapshots: not subscribed");
747 return Ok(());
748 }
749
750 let topics = {
751 let mut msgbus = self.msgbus.borrow_mut();
752 vec![
753 msgbus.switchboard.get_deltas_topic(instrument_id),
754 msgbus.switchboard.get_depth_topic(instrument_id),
755 msgbus.switchboard.get_book_snapshots_topic(instrument_id),
756 ]
757 };
758
759 self.maintain_book_updater(&instrument_id, &topics);
760 self.maintain_book_snapshotter(&instrument_id);
761
762 Ok(())
763 }
764
765 const fn handle_unsubscribe_bars(
766 &mut self,
767 command: &SubscriptionCommand,
768 ) -> anyhow::Result<()> {
769 Ok(())
771 }
772
773 fn maintain_book_updater(&mut self, instrument_id: &InstrumentId, topics: &[Ustr]) {
774 if let Some(updater) = self.book_updaters.get(instrument_id) {
775 let handler = ShareableMessageHandler(updater.clone());
776 let mut msgbus = self.msgbus.borrow_mut();
777
778 for topic in topics {
780 if msgbus.subscriptions_count(*topic) == 1
781 && msgbus.is_subscribed(*topic, handler.clone())
782 {
783 log::debug!("Unsubscribing BookUpdater from {topic}");
784 msgbus.unsubscribe(*topic, handler.clone());
785 }
786 }
787
788 let still_subscribed = topics
790 .iter()
791 .any(|topic| msgbus.is_subscribed(*topic, handler.clone()));
792 if !still_subscribed {
793 self.book_updaters.remove(instrument_id);
794 log::debug!("Removed BookUpdater for instrument ID {instrument_id}");
795 }
796 }
797 }
798
799 fn maintain_book_snapshotter(&mut self, instrument_id: &InstrumentId) {
800 if let Some(snapshotter) = self.book_snapshotters.get(instrument_id) {
801 let mut msgbus = self.msgbus.borrow_mut();
802
803 let topic = msgbus.switchboard.get_book_snapshots_topic(*instrument_id);
804
805 if msgbus.subscriptions_count(topic) == 0 {
807 let timer_name = snapshotter.timer_name;
808 self.book_snapshotters.remove(instrument_id);
809 let mut clock = self.clock.borrow_mut();
810 if clock.timer_names().contains(&timer_name.as_str()) {
811 clock.cancel_timer(&timer_name);
812 }
813 log::debug!("Removed BookSnapshotter for instrument ID {instrument_id}");
814 }
815 }
816 }
817
818 fn handle_instruments(&self, instruments: Arc<Vec<InstrumentAny>>) {
821 let mut cache = self.cache.as_ref().borrow_mut();
823 for instrument in instruments.iter() {
824 if let Err(e) = cache.add_instrument(instrument.clone()) {
825 log::error!("Error on cache insert: {e}");
826 }
827 }
828 }
829
830 fn handle_quotes(&self, quotes: Arc<Vec<QuoteTick>>) {
831 if let Err(e) = self.cache.as_ref().borrow_mut().add_quotes("es) {
832 log::error!("Error on cache insert: {e}");
833 }
834 }
835
836 fn handle_trades(&self, trades: Arc<Vec<TradeTick>>) {
837 if let Err(e) = self.cache.as_ref().borrow_mut().add_trades(&trades) {
838 log::error!("Error on cache insert: {e}");
839 }
840 }
841
842 fn handle_bars(&self, bars: Arc<Vec<Bar>>) {
843 if let Err(e) = self.cache.as_ref().borrow_mut().add_bars(&bars) {
844 log::error!("Error on cache insert: {e}");
845 }
846 }
847
848 #[allow(clippy::too_many_arguments)]
851 fn setup_order_book(
852 &mut self,
853 instrument_id: &InstrumentId,
854 book_type: BookType,
855 depth: Option<usize>,
856 only_deltas: bool,
857 managed: bool,
858 ) -> anyhow::Result<()> {
859 let mut cache = self.cache.borrow_mut();
860 if managed && !cache.has_order_book(instrument_id) {
861 let book = OrderBook::new(*instrument_id, book_type);
862 log::debug!("Created {book}");
863 cache.add_order_book(book)?;
864 }
865
866 let mut msgbus = self.msgbus.borrow_mut();
867
868 let updater = Rc::new(BookUpdater::new(instrument_id, self.cache.clone()));
870 self.book_updaters.insert(*instrument_id, updater.clone());
871
872 let handler = ShareableMessageHandler(updater);
873
874 let topic = msgbus.switchboard.get_deltas_topic(*instrument_id);
875 if !msgbus.is_subscribed(topic, handler.clone()) {
876 msgbus.subscribe(topic, handler.clone(), Some(self.msgbus_priority));
877 }
878
879 let topic = msgbus.switchboard.get_depth_topic(*instrument_id);
880 if !only_deltas && !msgbus.is_subscribed(topic, handler.clone()) {
881 msgbus.subscribe(topic, handler, Some(self.msgbus_priority));
882 }
883
884 Ok(())
885 }
886
887 fn create_bar_aggregator(
888 &mut self,
889 instrument: &InstrumentAny,
890 bar_type: BarType,
891 ) -> Box<dyn BarAggregator> {
892 let cache = self.cache.clone();
893 let msgbus = self.msgbus.clone();
894
895 let handler = move |bar: Bar| {
896 if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
897 log::error!("Error on cache insert: {e}");
898 }
899
900 let mut msgbus = msgbus.borrow_mut();
901 let topic = msgbus.switchboard.get_bars_topic(bar.bar_type);
902 msgbus.publish(&topic, &bar as &dyn Any);
903 };
904
905 let clock = self.clock.clone();
906 let config = self.config.clone();
907
908 let price_precision = instrument.price_precision();
909 let size_precision = instrument.size_precision();
910
911 if bar_type.spec().is_time_aggregated() {
912 Box::new(TimeBarAggregator::new(
913 bar_type,
914 price_precision,
915 size_precision,
916 clock,
917 handler,
918 false, config.time_bars_build_with_no_updates,
920 config.time_bars_timestamp_on_close,
921 config.time_bars_interval_type,
922 None, 20, false, ))
926 } else {
927 match bar_type.spec().aggregation {
928 BarAggregation::Tick => Box::new(TickBarAggregator::new(
929 bar_type,
930 price_precision,
931 size_precision,
932 handler,
933 false,
934 )) as Box<dyn BarAggregator>,
935 BarAggregation::Volume => Box::new(VolumeBarAggregator::new(
936 bar_type,
937 price_precision,
938 size_precision,
939 handler,
940 false,
941 )) as Box<dyn BarAggregator>,
942 BarAggregation::Value => Box::new(ValueBarAggregator::new(
943 bar_type,
944 price_precision,
945 size_precision,
946 handler,
947 false,
948 )) as Box<dyn BarAggregator>,
949 _ => panic!(
950 "Cannot create aggregator: {} aggregation not currently supported",
951 bar_type.spec().aggregation
952 ),
953 }
954 }
955 }
956
957 fn start_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
958 let instrument = {
959 let cache = self.cache.borrow();
960 cache
961 .instrument(&bar_type.instrument_id())
962 .ok_or_else(|| {
963 anyhow::anyhow!(
964 "Cannot start bar aggregation: no instrument found for {}",
965 bar_type.instrument_id(),
966 )
967 })?
968 .clone()
969 };
970
971 let aggregator = if let Some(aggregator) = self.bar_aggregators.get_mut(&bar_type) {
972 aggregator
973 } else {
974 let aggregator = self.create_bar_aggregator(&instrument, bar_type);
975 self.bar_aggregators.insert(bar_type, aggregator);
976 self.bar_aggregators.get_mut(&bar_type).unwrap()
977 };
978
979 aggregator.set_is_running(true);
982
983 Ok(())
984 }
985
986 fn stop_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
987 let aggregator = self
988 .bar_aggregators
989 .remove(&bar_type.standard())
990 .ok_or_else(|| {
991 anyhow::anyhow!("Cannot stop bar aggregator: no aggregator to stop for {bar_type}")
992 })?;
993
994 if bar_type.is_composite() {
1003 let composite_bar_type = bar_type.composite();
1004 } else if bar_type.spec().price_type == PriceType::Last {
1006 todo!()
1008 } else {
1009 todo!()
1011 }
1012
1013 Ok(())
1014 }
1015}
1016
1017pub struct SubscriptionCommandHandler {
1018 pub id: Ustr,
1019 pub engine_ref: Rc<RefCell<DataEngine>>,
1020}
1021
1022impl MessageHandler for SubscriptionCommandHandler {
1023 fn id(&self) -> Ustr {
1024 self.id
1025 }
1026
1027 fn handle(&self, msg: &dyn Any) {
1028 self.engine_ref.borrow_mut().enqueue(msg);
1029 }
1030 fn handle_response(&self, _resp: DataResponse) {}
1031 fn handle_data(&self, _data: Data) {}
1032 fn as_any(&self) -> &dyn Any {
1033 self
1034 }
1035}