1#![allow(dead_code)]
33#![allow(unused_variables)]
34#![allow(unused_assignments)]
35
36pub mod book;
37pub mod config;
38pub mod runner;
39
40#[cfg(test)]
41mod tests;
42
43use std::{
44 any::Any,
45 cell::{Ref, RefCell},
46 collections::{HashMap, HashSet, VecDeque},
47 num::NonZeroU64,
48 rc::Rc,
49 sync::Arc,
50};
51
52use book::{BookSnapshotInfo, BookSnapshotter, BookUpdater};
53use config::DataEngineConfig;
54use indexmap::IndexMap;
55use nautilus_common::{
56 cache::Cache,
57 clock::Clock,
58 logging::{RECV, RES},
59 messages::data::{Action, DataRequest, DataResponse, SubscriptionCommand},
60 msgbus::{
61 handler::{MessageHandler, ShareableMessageHandler},
62 MessageBus,
63 },
64 timer::TimeEventCallback,
65};
66use nautilus_core::{
67 correctness::{check_key_in_index_map, check_key_not_in_index_map, FAILED},
68 datetime::{millis_to_nanos, NANOSECONDS_IN_MILLISECOND, NANOSECONDS_IN_SECOND},
69};
70use nautilus_model::{
71 data::{
72 Bar, BarType, Data, DataType, OrderBookDelta, OrderBookDeltas, OrderBookDepth10, QuoteTick,
73 TradeTick,
74 },
75 enums::{AggregationSource, BarAggregation, BookType, PriceType, RecordFlag},
76 identifiers::{ClientId, InstrumentId, Venue},
77 instruments::{InstrumentAny, SyntheticInstrument},
78 orderbook::OrderBook,
79};
80use ustr::Ustr;
81
82use crate::{
83 aggregation::{
84 BarAggregator, TickBarAggregator, TimeBarAggregator, ValueBarAggregator,
85 VolumeBarAggregator,
86 },
87 client::DataClientAdapter,
88};
89
90pub struct DataEngine {
92 clock: Rc<RefCell<dyn Clock>>,
93 cache: Rc<RefCell<Cache>>,
94 msgbus: Rc<RefCell<MessageBus>>,
95 clients: IndexMap<ClientId, DataClientAdapter>,
96 default_client: Option<DataClientAdapter>,
97 external_clients: HashSet<ClientId>,
98 routing_map: IndexMap<Venue, ClientId>,
99 book_intervals: HashMap<NonZeroU64, HashSet<InstrumentId>>,
100 book_updaters: HashMap<InstrumentId, Rc<BookUpdater>>,
101 book_snapshotters: HashMap<InstrumentId, Rc<BookSnapshotter>>,
102 bar_aggregators: HashMap<BarType, Box<dyn BarAggregator>>,
103 synthetic_quote_feeds: HashMap<InstrumentId, Vec<SyntheticInstrument>>,
104 synthetic_trade_feeds: HashMap<InstrumentId, Vec<SyntheticInstrument>>,
105 buffered_deltas_map: HashMap<InstrumentId, Vec<OrderBookDelta>>, msgbus_priority: u8,
107 command_queue: VecDeque<SubscriptionCommand>,
108 config: DataEngineConfig,
109}
110
111impl DataEngine {
112 #[must_use]
114 pub fn new(
115 clock: Rc<RefCell<dyn Clock>>,
116 cache: Rc<RefCell<Cache>>,
117 msgbus: Rc<RefCell<MessageBus>>,
118 config: Option<DataEngineConfig>,
119 ) -> Self {
120 Self {
121 clock,
122 cache,
123 msgbus,
124 clients: IndexMap::new(),
125 default_client: None,
126 external_clients: HashSet::new(),
127 routing_map: IndexMap::new(),
128 book_intervals: HashMap::new(),
129 book_updaters: HashMap::new(),
130 book_snapshotters: HashMap::new(),
131 bar_aggregators: HashMap::new(),
132 synthetic_quote_feeds: HashMap::new(),
133 synthetic_trade_feeds: HashMap::new(),
134 buffered_deltas_map: HashMap::new(),
135 msgbus_priority: 10, command_queue: VecDeque::new(),
137 config: config.unwrap_or_default(),
138 }
139 }
140
141 #[must_use]
143 pub fn get_cache(&self) -> Ref<'_, Cache> {
144 self.cache.borrow()
145 }
146
147 pub fn register_default_client(&mut self, client: DataClientAdapter) {
158 log::info!("Registered default client {}", client.client_id());
159 self.default_client = Some(client);
160 }
161
162 pub fn start(self) {
163 self.clients.values().for_each(|client| client.start());
164 }
165
166 pub fn stop(self) {
167 self.clients.values().for_each(|client| client.stop());
168 }
169
170 pub fn reset(self) {
171 self.clients.values().for_each(|client| client.reset());
172 }
173
174 pub fn dispose(self) {
175 self.clients.values().for_each(|client| client.dispose());
176 self.clock.borrow_mut().cancel_timers();
177 }
178
179 pub fn connect(&self) {
180 todo!() }
182
183 pub fn disconnect(&self) {
184 todo!() }
186
187 #[must_use]
188 pub fn check_connected(&self) -> bool {
189 self.clients.values().all(|client| client.is_connected())
190 }
191
192 #[must_use]
193 pub fn check_disconnected(&self) -> bool {
194 self.clients.values().all(|client| !client.is_connected())
195 }
196
197 #[must_use]
198 pub fn registed_clients(&self) -> Vec<ClientId> {
199 self.clients.keys().copied().collect()
200 }
201
202 fn collect_subscriptions<F, T>(&self, get_subs: F) -> Vec<T>
205 where
206 F: Fn(&DataClientAdapter) -> &HashSet<T>,
207 T: Clone,
208 {
209 let mut subs = Vec::new();
210 for client in self.clients.values() {
211 subs.extend(get_subs(client).iter().cloned());
212 }
213 subs
214 }
215
216 fn get_client(&self, client_id: &ClientId, venue: &Venue) -> Option<&DataClientAdapter> {
217 match self.clients.get(client_id) {
218 Some(client) => Some(client),
219 None => self
220 .routing_map
221 .get(venue)
222 .and_then(|client_id: &ClientId| self.clients.get(client_id)),
223 }
224 }
225
226 fn get_client_mut(
227 &mut self,
228 client_id: &ClientId,
229 venue: &Venue,
230 ) -> Option<&mut DataClientAdapter> {
231 if self.clients.contains_key(client_id) {
233 return self.clients.get_mut(client_id);
234 }
235
236 if let Some(mapped_client_id) = self.routing_map.get(venue) {
238 return self.clients.get_mut(mapped_client_id);
239 }
240
241 None
242 }
243
244 #[must_use]
245 pub fn subscribed_custom_data(&self) -> Vec<DataType> {
246 self.collect_subscriptions(|client| &client.subscriptions_generic)
247 }
248
249 #[must_use]
250 pub fn subscribed_instruments(&self) -> Vec<InstrumentId> {
251 self.collect_subscriptions(|client| &client.subscriptions_instrument)
252 }
253
254 #[must_use]
255 pub fn subscribed_order_book_deltas(&self) -> Vec<InstrumentId> {
256 self.collect_subscriptions(|client| &client.subscriptions_order_book_delta)
257 }
258
259 #[must_use]
260 pub fn subscribed_order_book_snapshots(&self) -> Vec<InstrumentId> {
261 self.collect_subscriptions(|client| &client.subscriptions_order_book_snapshot)
262 }
263
264 #[must_use]
265 pub fn subscribed_quote_ticks(&self) -> Vec<InstrumentId> {
266 self.collect_subscriptions(|client| &client.subscriptions_quote_tick)
267 }
268
269 #[must_use]
270 pub fn subscribed_trade_ticks(&self) -> Vec<InstrumentId> {
271 self.collect_subscriptions(|client| &client.subscriptions_trade_tick)
272 }
273
274 #[must_use]
275 pub fn subscribed_bars(&self) -> Vec<BarType> {
276 self.collect_subscriptions(|client| &client.subscriptions_bar)
277 }
278
279 #[must_use]
280 pub fn subscribed_instrument_status(&self) -> Vec<InstrumentId> {
281 self.collect_subscriptions(|client| &client.subscriptions_instrument_status)
282 }
283
284 #[must_use]
285 pub fn subscribed_instrument_close(&self) -> Vec<InstrumentId> {
286 self.collect_subscriptions(|client| &client.subscriptions_instrument_close)
287 }
288
289 pub fn on_start(self) {
290 todo!()
291 }
292
293 pub fn on_stop(self) {
294 todo!()
295 }
296
297 pub fn register_client(&mut self, client: DataClientAdapter, routing: Option<Venue>) {
304 check_key_not_in_index_map(&client.client_id, &self.clients, "client_id", "clients")
305 .expect(FAILED);
306
307 if let Some(routing) = routing {
308 self.routing_map.insert(routing, client.client_id());
309 log::info!("Set client {} routing for {routing}", client.client_id());
310 }
311
312 log::info!("Registered client {}", client.client_id());
313 self.clients.insert(client.client_id, client);
314 }
315
316 pub fn deregister_client(&mut self, client_id: &ClientId) {
323 check_key_in_index_map(client_id, &self.clients, "client_id", "clients").expect(FAILED);
324
325 self.clients.shift_remove(client_id);
326 log::info!("Deregistered client {client_id}");
327 }
328
329 pub fn run(&mut self) {
330 let commands: Vec<_> = self.command_queue.drain(..).collect();
331 for cmd in commands {
332 self.execute(cmd);
333 }
334 }
335
336 pub fn enqueue(&mut self, cmd: &dyn Any) {
337 if let Some(cmd) = cmd.downcast_ref::<SubscriptionCommand>() {
338 self.command_queue.push_back(cmd.clone());
339 } else {
340 log::error!("Invalid message type received: {cmd:?}");
341 }
342 }
343
344 pub fn execute(&mut self, cmd: SubscriptionCommand) {
345 let result = match cmd.action {
346 Action::Subscribe => match cmd.data_type.type_name() {
347 stringify!(OrderBookDelta) => self.handle_subscribe_book_deltas(&cmd),
348 stringify!(OrderBook) => self.handle_subscribe_book_snapshots(&cmd),
349 stringify!(Bar) => self.handle_subscribe_bars(&cmd),
350 _ => Ok(()), },
352 Action::Unsubscribe => match cmd.data_type.type_name() {
353 stringify!(OrderBookDelta) => self.handle_unsubscribe_book_deltas(&cmd),
354 stringify!(OrderBook) => self.handle_unsubscribe_book_snapshots(&cmd),
355 stringify!(Bar) => self.handle_unsubscribe_bars(&cmd),
356 _ => Ok(()), },
358 };
359
360 if let Err(e) = result {
361 log::error!("{e}");
362 return;
363 }
364
365 if let Some(client) = self.get_client_mut(&cmd.client_id, &cmd.venue) {
366 client.execute(cmd);
367 } else {
368 log::error!(
369 "Cannot handle command: no client found for {}",
370 cmd.client_id
371 );
372 }
373 }
374
375 pub fn request(&self, req: DataRequest) {
377 if let Some(client) = self.get_client(&req.client_id, &req.venue) {
378 client.through_request(req);
379 } else {
380 log::error!(
381 "Cannot handle request: no client found for {}",
382 req.client_id
383 );
384 }
385 }
386
387 pub fn process(&mut self, data: &dyn Any) {
388 if let Some(instrument) = data.downcast_ref::<InstrumentAny>() {
389 self.handle_instrument(instrument.clone());
390 } else {
391 log::error!("Cannot process data {data:?}, type is unrecognized");
392 }
393 }
394
395 pub fn process_data(&mut self, data: Data) {
396 match data {
397 Data::Delta(delta) => self.handle_delta(delta),
398 Data::Deltas(deltas) => self.handle_deltas(deltas.into_inner()),
399 Data::Depth10(depth) => self.handle_depth10(depth),
400 Data::Quote(quote) => self.handle_quote(quote),
401 Data::Trade(trade) => self.handle_trade(trade),
402 Data::Bar(bar) => self.handle_bar(bar),
403 }
404 }
405
406 pub fn response(&self, resp: DataResponse) {
407 log::debug!("{}", format!("{RECV}{RES} {resp:?}"));
408
409 match resp.data_type.type_name() {
410 stringify!(InstrumentAny) => {
411 let instruments = Arc::downcast::<Vec<InstrumentAny>>(resp.data.clone())
412 .expect("Invalid response data");
413 self.handle_instruments(instruments);
414 }
415 stringify!(QuoteTick) => {
416 let quotes = Arc::downcast::<Vec<QuoteTick>>(resp.data.clone())
417 .expect("Invalid response data");
418 self.handle_quotes(quotes);
419 }
420 stringify!(TradeTick) => {
421 let trades = Arc::downcast::<Vec<TradeTick>>(resp.data.clone())
422 .expect("Invalid response data");
423 self.handle_trades(trades);
424 }
425 stringify!(Bar) => {
426 let bars =
427 Arc::downcast::<Vec<Bar>>(resp.data.clone()).expect("Invalid response data");
428 self.handle_bars(bars);
429 }
430 type_name => log::error!("Cannot handle request, type {type_name} is unrecognized"),
431 }
432
433 self.msgbus.as_ref().borrow().send_response(resp);
434 }
435
436 fn handle_instrument(&mut self, instrument: InstrumentAny) {
439 if let Err(e) = self
440 .cache
441 .as_ref()
442 .borrow_mut()
443 .add_instrument(instrument.clone())
444 {
445 log::error!("Error on cache insert: {e}");
446 }
447
448 let mut msgbus = self.msgbus.borrow_mut();
449 let topic = msgbus.switchboard.get_instrument_topic(instrument.id());
450 msgbus.publish(&topic, &instrument as &dyn Any); }
452
453 fn handle_delta(&mut self, delta: OrderBookDelta) {
454 let deltas = if self.config.buffer_deltas {
455 let buffer_deltas = self
456 .buffered_deltas_map
457 .entry(delta.instrument_id)
458 .or_default();
459 buffer_deltas.push(delta);
460
461 if !RecordFlag::F_LAST.matches(delta.flags) {
462 return; }
464
465 let deltas = self
467 .buffered_deltas_map
468 .remove(&delta.instrument_id)
469 .unwrap();
470 OrderBookDeltas::new(delta.instrument_id, deltas)
471 } else {
472 OrderBookDeltas::new(delta.instrument_id, vec![delta])
473 };
474
475 let mut msgbus = self.msgbus.borrow_mut();
476 let topic = msgbus.switchboard.get_deltas_topic(deltas.instrument_id);
477 msgbus.publish(&topic, &deltas as &dyn Any);
478 }
479
480 fn handle_deltas(&mut self, deltas: OrderBookDeltas) {
481 let deltas = if self.config.buffer_deltas {
482 let buffer_deltas = self
483 .buffered_deltas_map
484 .entry(deltas.instrument_id)
485 .or_default();
486 buffer_deltas.extend(deltas.deltas);
487
488 let mut is_last_delta = false;
489 for delta in buffer_deltas.iter_mut() {
490 if RecordFlag::F_LAST.matches(delta.flags) {
491 is_last_delta = true;
492 }
493 }
494
495 if !is_last_delta {
496 return;
497 }
498
499 let buffer_deltas = self
501 .buffered_deltas_map
502 .remove(&deltas.instrument_id)
503 .unwrap();
504 OrderBookDeltas::new(deltas.instrument_id, buffer_deltas)
505 } else {
506 deltas
507 };
508
509 let mut msgbus = self.msgbus.borrow_mut();
510 let topic = msgbus.switchboard.get_deltas_topic(deltas.instrument_id);
511 msgbus.publish(&topic, &deltas as &dyn Any); }
513
514 fn handle_depth10(&mut self, depth: OrderBookDepth10) {
515 let mut msgbus = self.msgbus.borrow_mut();
516 let topic = msgbus.switchboard.get_depth_topic(depth.instrument_id);
517 msgbus.publish(&topic, &depth as &dyn Any); }
519
520 fn handle_quote(&mut self, quote: QuoteTick) {
521 if let Err(e) = self.cache.as_ref().borrow_mut().add_quote(quote) {
522 log::error!("Error on cache insert: {e}");
523 }
524
525 let mut msgbus = self.msgbus.borrow_mut();
528 let topic = msgbus.switchboard.get_quotes_topic(quote.instrument_id);
529 msgbus.publish(&topic, "e as &dyn Any); }
531
532 fn handle_trade(&mut self, trade: TradeTick) {
533 if let Err(e) = self.cache.as_ref().borrow_mut().add_trade(trade) {
534 log::error!("Error on cache insert: {e}");
535 }
536
537 let mut msgbus = self.msgbus.borrow_mut();
540 let topic = msgbus.switchboard.get_trades_topic(trade.instrument_id);
541 msgbus.publish(&topic, &trade as &dyn Any); }
543
544 fn handle_bar(&mut self, bar: Bar) {
545 if self.config.validate_data_sequence {
547 if let Some(last_bar) = self.cache.as_ref().borrow().bar(&bar.bar_type) {
548 if bar.ts_event < last_bar.ts_event {
549 log::warn!(
550 "Bar {bar} was prior to last bar `ts_event` {}",
551 last_bar.ts_event
552 );
553 return; }
555 if bar.ts_init < last_bar.ts_init {
556 log::warn!(
557 "Bar {bar} was prior to last bar `ts_init` {}",
558 last_bar.ts_init
559 );
560 return; }
562 }
564 }
565
566 if let Err(e) = self.cache.as_ref().borrow_mut().add_bar(bar) {
567 log::error!("Error on cache insert: {e}");
568 }
569
570 let mut msgbus = self.msgbus.borrow_mut();
571 let topic = msgbus.switchboard.get_bars_topic(bar.bar_type);
572 msgbus.publish(&topic, &bar as &dyn Any); }
574
575 fn handle_subscribe_book_deltas(
578 &mut self,
579 command: &SubscriptionCommand,
580 ) -> anyhow::Result<()> {
581 let instrument_id = command.data_type.instrument_id().ok_or_else(|| {
582 anyhow::anyhow!(
583 "Invalid order book deltas subscription: did not contain an 'instrument_id', {}",
584 command.data_type
585 )
586 })?;
587
588 if instrument_id.is_synthetic() {
589 anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
590 }
591
592 if !self.subscribed_order_book_deltas().contains(&instrument_id) {
593 return Ok(());
594 }
595
596 let data_type = command.data_type.clone();
597 let book_type = data_type.book_type();
598 let depth = data_type.depth();
599 let managed = data_type.managed();
600
601 self.setup_order_book(&instrument_id, book_type, depth, true, managed)?;
602
603 Ok(())
604 }
605
606 fn handle_subscribe_book_snapshots(
607 &mut self,
608 command: &SubscriptionCommand,
609 ) -> anyhow::Result<()> {
610 let instrument_id = command.data_type.instrument_id().ok_or_else(|| {
611 anyhow::anyhow!(
612 "Invalid order book snapshots subscription: did not contain an 'instrument_id', {}",
613 command.data_type
614 )
615 })?;
616
617 if self.subscribed_order_book_deltas().contains(&instrument_id) {
618 return Ok(());
619 }
620
621 if instrument_id.is_synthetic() {
622 anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
623 }
624
625 let data_type = command.data_type.clone();
626 let book_type = data_type.book_type();
627 let depth = data_type.depth();
628 let interval_ms = data_type.interval_ms();
629 let managed = data_type.managed();
630
631 {
632 if !self.book_intervals.contains_key(&interval_ms) {
633 let interval_ns = millis_to_nanos(interval_ms.get() as f64);
634 let mut msgbus = self.msgbus.borrow_mut();
635 let topic = msgbus.switchboard.get_book_snapshots_topic(instrument_id);
636
637 let snap_info = BookSnapshotInfo {
638 instrument_id,
639 venue: instrument_id.venue,
640 is_composite: instrument_id.symbol.is_composite(),
641 root: Ustr::from(instrument_id.symbol.root()),
642 topic,
643 interval_ms,
644 };
645
646 let now_ns = self.clock.borrow().timestamp_ns().as_u64();
647 let mut start_time_ns = now_ns - (now_ns % interval_ns);
648
649 if start_time_ns - NANOSECONDS_IN_MILLISECOND <= now_ns {
650 start_time_ns += NANOSECONDS_IN_SECOND; }
652
653 let snapshotter = Rc::new(BookSnapshotter::new(
654 snap_info,
655 self.cache.clone(),
656 self.msgbus.clone(),
657 ));
658 self.book_snapshotters
659 .insert(instrument_id, snapshotter.clone());
660 let timer_name = snapshotter.timer_name;
661
662 let callback =
663 TimeEventCallback::Rust(Rc::new(move |event| snapshotter.snapshot(event)));
664
665 self.clock
666 .borrow_mut()
667 .set_timer_ns(
668 &timer_name,
669 interval_ns,
670 start_time_ns.into(),
671 None,
672 Some(callback),
673 )
674 .expect(FAILED);
675 }
676 }
677
678 self.setup_order_book(&instrument_id, book_type, depth, false, managed)?;
679
680 Ok(())
681 }
682
683 fn handle_subscribe_bars(&mut self, command: &SubscriptionCommand) -> anyhow::Result<()> {
684 let bar_type = command.data_type.bar_type();
685
686 match bar_type.aggregation_source() {
687 AggregationSource::Internal => {
688 if !self.bar_aggregators.contains_key(&bar_type.standard()) {
689 self.start_bar_aggregator(bar_type)?;
690 }
691 }
692 AggregationSource::External => {
693 if bar_type.instrument_id().is_synthetic() {
694 anyhow::bail!(
695 "Cannot subscribe for externally aggregated synthetic instrument bar data"
696 );
697 }
698 }
699 }
700
701 Ok(())
702 }
703
704 fn handle_unsubscribe_book_deltas(
705 &mut self,
706 command: &SubscriptionCommand,
707 ) -> anyhow::Result<()> {
708 let instrument_id = command.data_type.instrument_id().ok_or_else(|| {
709 anyhow::anyhow!(
710 "Invalid order book snapshots subscription: did not contain an 'instrument_id', {}",
711 command.data_type
712 )
713 })?;
714
715 if !self.subscribed_order_book_deltas().contains(&instrument_id) {
716 log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
717 return Ok(());
718 }
719
720 let topics = {
721 let mut msgbus = self.msgbus.borrow_mut();
722 vec![
723 msgbus.switchboard.get_deltas_topic(instrument_id),
724 msgbus.switchboard.get_depth_topic(instrument_id),
725 msgbus.switchboard.get_book_snapshots_topic(instrument_id),
726 ]
727 };
728
729 self.maintain_book_updater(&instrument_id, &topics);
730 self.maintain_book_snapshotter(&instrument_id);
731
732 Ok(())
733 }
734
735 fn handle_unsubscribe_book_snapshots(
736 &mut self,
737 command: &SubscriptionCommand,
738 ) -> anyhow::Result<()> {
739 let instrument_id = command.data_type.instrument_id().ok_or_else(|| {
740 anyhow::anyhow!(
741 "Invalid order book snapshots subscription: did not contain an 'instrument_id', {}",
742 command.data_type
743 )
744 })?;
745
746 if !self.subscribed_order_book_deltas().contains(&instrument_id) {
747 log::warn!("Cannot unsubscribe from `OrderBook` snapshots: not subscribed");
748 return Ok(());
749 }
750
751 let topics = {
752 let mut msgbus = self.msgbus.borrow_mut();
753 vec![
754 msgbus.switchboard.get_deltas_topic(instrument_id),
755 msgbus.switchboard.get_depth_topic(instrument_id),
756 msgbus.switchboard.get_book_snapshots_topic(instrument_id),
757 ]
758 };
759
760 self.maintain_book_updater(&instrument_id, &topics);
761 self.maintain_book_snapshotter(&instrument_id);
762
763 Ok(())
764 }
765
766 const fn handle_unsubscribe_bars(
767 &mut self,
768 command: &SubscriptionCommand,
769 ) -> anyhow::Result<()> {
770 Ok(())
772 }
773
774 fn maintain_book_updater(&mut self, instrument_id: &InstrumentId, topics: &[Ustr]) {
775 if let Some(updater) = self.book_updaters.get(instrument_id) {
776 let handler = ShareableMessageHandler(updater.clone());
777 let mut msgbus = self.msgbus.borrow_mut();
778
779 for topic in topics {
781 if msgbus.subscriptions_count(*topic) == 1
782 && msgbus.is_subscribed(*topic, handler.clone())
783 {
784 log::debug!("Unsubscribing BookUpdater from {topic}");
785 msgbus.unsubscribe(*topic, handler.clone());
786 }
787 }
788
789 let still_subscribed = topics
791 .iter()
792 .any(|topic| msgbus.is_subscribed(*topic, handler.clone()));
793 if !still_subscribed {
794 self.book_updaters.remove(instrument_id);
795 log::debug!("Removed BookUpdater for instrument ID {instrument_id}");
796 }
797 }
798 }
799
800 fn maintain_book_snapshotter(&mut self, instrument_id: &InstrumentId) {
801 if let Some(snapshotter) = self.book_snapshotters.get(instrument_id) {
802 let mut msgbus = self.msgbus.borrow_mut();
803
804 let topic = msgbus.switchboard.get_book_snapshots_topic(*instrument_id);
805
806 if msgbus.subscriptions_count(topic) == 0 {
808 let timer_name = snapshotter.timer_name;
809 self.book_snapshotters.remove(instrument_id);
810 let mut clock = self.clock.borrow_mut();
811 if clock.timer_names().contains(&timer_name.as_str()) {
812 clock.cancel_timer(&timer_name);
813 }
814 log::debug!("Removed BookSnapshotter for instrument ID {instrument_id}");
815 }
816 }
817 }
818
819 fn handle_instruments(&self, instruments: Arc<Vec<InstrumentAny>>) {
822 let mut cache = self.cache.as_ref().borrow_mut();
824 for instrument in instruments.iter() {
825 if let Err(e) = cache.add_instrument(instrument.clone()) {
826 log::error!("Error on cache insert: {e}");
827 }
828 }
829 }
830
831 fn handle_quotes(&self, quotes: Arc<Vec<QuoteTick>>) {
832 if let Err(e) = self.cache.as_ref().borrow_mut().add_quotes("es) {
833 log::error!("Error on cache insert: {e}");
834 }
835 }
836
837 fn handle_trades(&self, trades: Arc<Vec<TradeTick>>) {
838 if let Err(e) = self.cache.as_ref().borrow_mut().add_trades(&trades) {
839 log::error!("Error on cache insert: {e}");
840 }
841 }
842
843 fn handle_bars(&self, bars: Arc<Vec<Bar>>) {
844 if let Err(e) = self.cache.as_ref().borrow_mut().add_bars(&bars) {
845 log::error!("Error on cache insert: {e}");
846 }
847 }
848
849 #[allow(clippy::too_many_arguments)]
852 fn setup_order_book(
853 &mut self,
854 instrument_id: &InstrumentId,
855 book_type: BookType,
856 depth: Option<usize>,
857 only_deltas: bool,
858 managed: bool,
859 ) -> anyhow::Result<()> {
860 let mut cache = self.cache.borrow_mut();
861 if managed && !cache.has_order_book(instrument_id) {
862 let book = OrderBook::new(*instrument_id, book_type);
863 log::debug!("Created {book}");
864 cache.add_order_book(book)?;
865 }
866
867 let mut msgbus = self.msgbus.borrow_mut();
868
869 let updater = Rc::new(BookUpdater::new(instrument_id, self.cache.clone()));
871 self.book_updaters.insert(*instrument_id, updater.clone());
872
873 let handler = ShareableMessageHandler(updater);
874
875 let topic = msgbus.switchboard.get_deltas_topic(*instrument_id);
876 if !msgbus.is_subscribed(topic, handler.clone()) {
877 msgbus.subscribe(topic, handler.clone(), Some(self.msgbus_priority));
878 }
879
880 let topic = msgbus.switchboard.get_depth_topic(*instrument_id);
881 if !only_deltas && !msgbus.is_subscribed(topic, handler.clone()) {
882 msgbus.subscribe(topic, handler, Some(self.msgbus_priority));
883 }
884
885 Ok(())
886 }
887
888 fn create_bar_aggregator(
889 &mut self,
890 instrument: &InstrumentAny,
891 bar_type: BarType,
892 ) -> Box<dyn BarAggregator> {
893 let cache = self.cache.clone();
894 let msgbus = self.msgbus.clone();
895
896 let handler = move |bar: Bar| {
897 if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
898 log::error!("Error on cache insert: {e}");
899 }
900
901 let mut msgbus = msgbus.borrow_mut();
902 let topic = msgbus.switchboard.get_bars_topic(bar.bar_type);
903 msgbus.publish(&topic, &bar as &dyn Any);
904 };
905
906 let clock = self.clock.clone();
907 let config = self.config.clone();
908
909 let price_precision = instrument.price_precision();
910 let size_precision = instrument.size_precision();
911
912 if bar_type.spec().is_time_aggregated() {
913 Box::new(TimeBarAggregator::new(
914 bar_type,
915 price_precision,
916 size_precision,
917 clock,
918 handler,
919 false,
920 config.time_bars_build_with_no_updates,
921 config.time_bars_timestamp_on_close,
922 config.time_bars_interval_type,
923 None, 20, ))
926 } else {
927 match bar_type.spec().aggregation {
928 BarAggregation::Tick => Box::new(TickBarAggregator::new(
929 bar_type,
930 price_precision,
931 size_precision,
932 handler,
933 false,
934 )) as Box<dyn BarAggregator>,
935 BarAggregation::Volume => Box::new(VolumeBarAggregator::new(
936 bar_type,
937 price_precision,
938 size_precision,
939 handler,
940 false,
941 )) as Box<dyn BarAggregator>,
942 BarAggregation::Value => Box::new(ValueBarAggregator::new(
943 bar_type,
944 price_precision,
945 size_precision,
946 handler,
947 false,
948 )) as Box<dyn BarAggregator>,
949 _ => panic!(
950 "Cannot create aggregator: {} aggregation not currently supported",
951 bar_type.spec().aggregation
952 ),
953 }
954 }
955 }
956
957 fn start_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
958 let instrument = {
959 let cache = self.cache.borrow();
960 cache
961 .instrument(&bar_type.instrument_id())
962 .ok_or_else(|| {
963 anyhow::anyhow!(
964 "Cannot start bar aggregation: no instrument found for {}",
965 bar_type.instrument_id(),
966 )
967 })?
968 .clone()
969 };
970
971 let aggregator = if let Some(aggregator) = self.bar_aggregators.get_mut(&bar_type) {
972 aggregator
973 } else {
974 let aggregator = self.create_bar_aggregator(&instrument, bar_type);
975 self.bar_aggregators.insert(bar_type, aggregator);
976 self.bar_aggregators.get_mut(&bar_type).unwrap()
977 };
978
979 aggregator.set_is_running(true);
982
983 Ok(())
984 }
985
986 fn stop_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
987 let aggregator = self
988 .bar_aggregators
989 .remove(&bar_type.standard())
990 .ok_or_else(|| {
991 anyhow::anyhow!("Cannot stop bar aggregator: no aggregator to stop for {bar_type}")
992 })?;
993
994 if bar_type.is_composite() {
1003 let composite_bar_type = bar_type.composite();
1004 } else if bar_type.spec().price_type == PriceType::Last {
1006 todo!()
1008 } else {
1009 todo!()
1011 }
1012
1013 Ok(())
1014 }
1015}
1016
1017pub struct SubscriptionCommandHandler {
1018 pub id: Ustr,
1019 pub engine_ref: Rc<RefCell<DataEngine>>,
1020}
1021
1022impl MessageHandler for SubscriptionCommandHandler {
1023 fn id(&self) -> Ustr {
1024 self.id
1025 }
1026
1027 fn handle(&self, msg: &dyn Any) {
1028 self.engine_ref.borrow_mut().enqueue(msg);
1029 }
1030 fn handle_response(&self, _resp: DataResponse) {}
1031 fn handle_data(&self, _data: Data) {}
1032 fn as_any(&self) -> &dyn Any {
1033 self
1034 }
1035}