1pub mod book;
32pub mod config;
33mod handlers;
34
35#[cfg(feature = "defi")]
36pub mod pool;
37
38use std::{
39 any::Any,
40 cell::{Ref, RefCell},
41 collections::hash_map::Entry,
42 fmt::Display,
43 num::NonZeroUsize,
44 rc::Rc,
45};
46
47use ahash::{AHashMap, AHashSet};
48use book::{BookSnapshotInfo, BookSnapshotter, BookUpdater};
49use config::DataEngineConfig;
50use futures::future::join_all;
51use handlers::{BarBarHandler, BarQuoteHandler, BarTradeHandler};
52use indexmap::IndexMap;
53use nautilus_common::{
54 cache::Cache,
55 clock::Clock,
56 logging::{RECV, RES},
57 messages::data::{
58 DataCommand, DataResponse, RequestCommand, SubscribeBars, SubscribeBookDeltas,
59 SubscribeBookDepth10, SubscribeBookSnapshots, SubscribeCommand, UnsubscribeBars,
60 UnsubscribeBookDeltas, UnsubscribeBookDepth10, UnsubscribeBookSnapshots,
61 UnsubscribeCommand,
62 },
63 msgbus::{self, MStr, Topic, handler::ShareableMessageHandler, switchboard},
64 timer::{TimeEvent, TimeEventCallback},
65};
66use nautilus_core::{
67 correctness::{
68 FAILED, check_key_in_map, check_key_not_in_map, check_predicate_false, check_predicate_true,
69 },
70 datetime::millis_to_nanos_unchecked,
71};
72#[cfg(feature = "defi")]
73use nautilus_model::defi::DefiData;
74use nautilus_model::{
75 data::{
76 Bar, BarType, Data, DataType, FundingRateUpdate, IndexPriceUpdate, InstrumentClose,
77 MarkPriceUpdate, OrderBookDelta, OrderBookDeltas, OrderBookDepth10, QuoteTick, TradeTick,
78 },
79 enums::{AggregationSource, BarAggregation, BookType, PriceType, RecordFlag},
80 identifiers::{ClientId, InstrumentId, Venue},
81 instruments::{Instrument, InstrumentAny, SyntheticInstrument},
82 orderbook::OrderBook,
83};
84#[cfg(feature = "streaming")]
85use nautilus_persistence::backend::catalog::ParquetDataCatalog;
86use ustr::Ustr;
87
88#[cfg(feature = "defi")]
89#[allow(unused_imports)] use crate::defi::engine as _;
91#[cfg(feature = "defi")]
92use crate::engine::pool::PoolUpdater;
93use crate::{
94 aggregation::{
95 BarAggregator, RenkoBarAggregator, TickBarAggregator, TickImbalanceBarAggregator,
96 TickRunsBarAggregator, TimeBarAggregator, ValueBarAggregator, ValueImbalanceBarAggregator,
97 ValueRunsBarAggregator, VolumeBarAggregator, VolumeImbalanceBarAggregator,
98 VolumeRunsBarAggregator,
99 },
100 client::DataClientAdapter,
101};
102
103#[derive(Debug)]
105pub struct DataEngine {
106 pub(crate) clock: Rc<RefCell<dyn Clock>>,
107 pub(crate) cache: Rc<RefCell<Cache>>,
108 pub(crate) external_clients: AHashSet<ClientId>,
109 clients: IndexMap<ClientId, DataClientAdapter>,
110 default_client: Option<DataClientAdapter>,
111 #[cfg(feature = "streaming")]
112 catalogs: AHashMap<Ustr, ParquetDataCatalog>,
113 routing_map: IndexMap<Venue, ClientId>,
114 book_intervals: AHashMap<NonZeroUsize, AHashSet<InstrumentId>>,
115 book_updaters: AHashMap<InstrumentId, Rc<BookUpdater>>,
116 book_snapshotters: AHashMap<InstrumentId, Rc<BookSnapshotter>>,
117 bar_aggregators: AHashMap<BarType, Rc<RefCell<Box<dyn BarAggregator>>>>,
118 bar_aggregator_handlers: AHashMap<BarType, Vec<(MStr<Topic>, ShareableMessageHandler)>>,
119 _synthetic_quote_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
120 _synthetic_trade_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
121 buffered_deltas_map: AHashMap<InstrumentId, OrderBookDeltas>,
122 pub(crate) msgbus_priority: u8,
123 pub(crate) config: DataEngineConfig,
124 #[cfg(feature = "defi")]
125 pub(crate) pool_updaters: AHashMap<InstrumentId, Rc<PoolUpdater>>,
126 #[cfg(feature = "defi")]
127 pub(crate) pool_updaters_pending: AHashSet<InstrumentId>,
128 #[cfg(feature = "defi")]
129 pub(crate) pool_snapshot_pending: AHashSet<InstrumentId>,
130 #[cfg(feature = "defi")]
131 pub(crate) pool_event_buffers: AHashMap<InstrumentId, Vec<DefiData>>,
132}
133
134impl DataEngine {
135 #[must_use]
137 pub fn new(
138 clock: Rc<RefCell<dyn Clock>>,
139 cache: Rc<RefCell<Cache>>,
140 config: Option<DataEngineConfig>,
141 ) -> Self {
142 let config = config.unwrap_or_default();
143
144 let external_clients: AHashSet<ClientId> = config
145 .external_clients
146 .clone()
147 .unwrap_or_default()
148 .into_iter()
149 .collect();
150
151 Self {
152 clock,
153 cache,
154 external_clients,
155 clients: IndexMap::new(),
156 default_client: None,
157 #[cfg(feature = "streaming")]
158 catalogs: AHashMap::new(),
159 routing_map: IndexMap::new(),
160 book_intervals: AHashMap::new(),
161 book_updaters: AHashMap::new(),
162 book_snapshotters: AHashMap::new(),
163 bar_aggregators: AHashMap::new(),
164 bar_aggregator_handlers: AHashMap::new(),
165 _synthetic_quote_feeds: AHashMap::new(),
166 _synthetic_trade_feeds: AHashMap::new(),
167 buffered_deltas_map: AHashMap::new(),
168 msgbus_priority: 10, config,
170 #[cfg(feature = "defi")]
171 pool_updaters: AHashMap::new(),
172 #[cfg(feature = "defi")]
173 pool_updaters_pending: AHashSet::new(),
174 #[cfg(feature = "defi")]
175 pool_snapshot_pending: AHashSet::new(),
176 #[cfg(feature = "defi")]
177 pool_event_buffers: AHashMap::new(),
178 }
179 }
180
181 #[must_use]
183 pub fn get_clock(&self) -> Ref<'_, dyn Clock> {
184 self.clock.borrow()
185 }
186
187 #[must_use]
189 pub fn get_cache(&self) -> Ref<'_, Cache> {
190 self.cache.borrow()
191 }
192
193 #[must_use]
195 pub fn cache_rc(&self) -> Rc<RefCell<Cache>> {
196 Rc::clone(&self.cache)
197 }
198
199 #[cfg(feature = "streaming")]
205 pub fn register_catalog(&mut self, catalog: ParquetDataCatalog, name: Option<String>) {
206 let name = Ustr::from(name.as_deref().unwrap_or("catalog_0"));
207
208 check_key_not_in_map(&name, &self.catalogs, "name", "catalogs").expect(FAILED);
209
210 self.catalogs.insert(name, catalog);
211 log::info!("Registered catalog <{name}>");
212 }
213
214 pub fn register_client(&mut self, client: DataClientAdapter, routing: Option<Venue>) {
221 let client_id = client.client_id();
222
223 if let Some(default_client) = &self.default_client {
224 check_predicate_false(
225 default_client.client_id() == client.client_id(),
226 "client_id already registered as default client",
227 )
228 .expect(FAILED);
229 }
230
231 check_key_not_in_map(&client_id, &self.clients, "client_id", "clients").expect(FAILED);
232
233 if let Some(routing) = routing {
234 self.routing_map.insert(routing, client_id);
235 log::debug!("Set client {client_id} routing for {routing}");
236 }
237
238 if client.venue.is_none() && self.default_client.is_none() {
239 self.default_client = Some(client);
240 log::debug!("Registered client {client_id} for default routing");
241 } else {
242 self.clients.insert(client_id, client);
243 log::debug!("Registered client {client_id}");
244 }
245 }
246
247 pub fn deregister_client(&mut self, client_id: &ClientId) {
253 check_key_in_map(client_id, &self.clients, "client_id", "clients").expect(FAILED);
254
255 self.clients.shift_remove(client_id);
256 log::info!("Deregistered client {client_id}");
257 }
258
259 pub fn register_default_client(&mut self, client: DataClientAdapter) {
271 check_predicate_true(
272 self.default_client.is_none(),
273 "default client already registered",
274 )
275 .expect(FAILED);
276
277 let client_id = client.client_id();
278
279 self.default_client = Some(client);
280 log::debug!("Registered default client {client_id}");
281 }
282
283 pub fn start(&mut self) {
285 for client in self.get_clients_mut() {
286 if let Err(e) = client.start() {
287 log::error!("{e}");
288 }
289 }
290 }
291
292 pub fn stop(&mut self) {
294 for client in self.get_clients_mut() {
295 if let Err(e) = client.stop() {
296 log::error!("{e}");
297 }
298 }
299 }
300
301 pub fn reset(&mut self) {
303 for client in self.get_clients_mut() {
304 if let Err(e) = client.reset() {
305 log::error!("{e}");
306 }
307 }
308 }
309
310 pub fn dispose(&mut self) {
312 for client in self.get_clients_mut() {
313 if let Err(e) = client.dispose() {
314 log::error!("{e}");
315 }
316 }
317
318 self.clock.borrow_mut().cancel_timers();
319 }
320
321 pub async fn connect(&mut self) -> anyhow::Result<()> {
327 let futures: Vec<_> = self
328 .get_clients_mut()
329 .into_iter()
330 .map(|client| client.connect())
331 .collect();
332
333 let results = join_all(futures).await;
334 let errors: Vec<_> = results.into_iter().filter_map(Result::err).collect();
335
336 if errors.is_empty() {
337 Ok(())
338 } else {
339 let error_msgs: Vec<_> = errors.iter().map(|e| e.to_string()).collect();
340 anyhow::bail!("Failed to connect data clients: {}", error_msgs.join("; "))
341 }
342 }
343
344 pub async fn disconnect(&mut self) -> anyhow::Result<()> {
350 let futures: Vec<_> = self
351 .get_clients_mut()
352 .into_iter()
353 .map(|client| client.disconnect())
354 .collect();
355
356 let results = join_all(futures).await;
357 let errors: Vec<_> = results.into_iter().filter_map(Result::err).collect();
358
359 if errors.is_empty() {
360 Ok(())
361 } else {
362 let error_msgs: Vec<_> = errors.iter().map(|e| e.to_string()).collect();
363 anyhow::bail!(
364 "Failed to disconnect data clients: {}",
365 error_msgs.join("; ")
366 )
367 }
368 }
369
370 #[must_use]
372 pub fn check_connected(&self) -> bool {
373 self.get_clients()
374 .iter()
375 .all(|client| client.is_connected())
376 }
377
378 #[must_use]
380 pub fn check_disconnected(&self) -> bool {
381 self.get_clients()
382 .iter()
383 .all(|client| !client.is_connected())
384 }
385
386 #[must_use]
388 pub fn registered_clients(&self) -> Vec<ClientId> {
389 self.get_clients()
390 .into_iter()
391 .map(|client| client.client_id())
392 .collect()
393 }
394
395 pub(crate) fn collect_subscriptions<F, T>(&self, get_subs: F) -> Vec<T>
398 where
399 F: Fn(&DataClientAdapter) -> &AHashSet<T>,
400 T: Clone,
401 {
402 self.get_clients()
403 .into_iter()
404 .flat_map(get_subs)
405 .cloned()
406 .collect()
407 }
408
409 #[must_use]
410 pub fn get_clients(&self) -> Vec<&DataClientAdapter> {
411 let (default_opt, clients_map) = (&self.default_client, &self.clients);
412 let mut clients: Vec<&DataClientAdapter> = clients_map.values().collect();
413
414 if let Some(default) = default_opt {
415 clients.push(default);
416 }
417
418 clients
419 }
420
421 #[must_use]
422 pub fn get_clients_mut(&mut self) -> Vec<&mut DataClientAdapter> {
423 let (default_opt, clients_map) = (&mut self.default_client, &mut self.clients);
424 let mut clients: Vec<&mut DataClientAdapter> = clients_map.values_mut().collect();
425
426 if let Some(default) = default_opt {
427 clients.push(default);
428 }
429
430 clients
431 }
432
433 pub fn get_client(
434 &mut self,
435 client_id: Option<&ClientId>,
436 venue: Option<&Venue>,
437 ) -> Option<&mut DataClientAdapter> {
438 if let Some(client_id) = client_id {
439 if let Some(client) = self.clients.get_mut(client_id) {
441 return Some(client);
442 }
443
444 if let Some(default) = self.default_client.as_mut()
446 && default.client_id() == *client_id
447 {
448 return Some(default);
449 }
450
451 return None;
453 }
454
455 if let Some(v) = venue {
456 if let Some(client_id) = self.routing_map.get(v) {
458 return self.clients.get_mut(client_id);
459 }
460 }
461
462 self.get_default_client()
464 }
465
466 const fn get_default_client(&mut self) -> Option<&mut DataClientAdapter> {
467 self.default_client.as_mut()
468 }
469
470 #[must_use]
472 pub fn subscribed_custom_data(&self) -> Vec<DataType> {
473 self.collect_subscriptions(|client| &client.subscriptions_custom)
474 }
475
476 #[must_use]
478 pub fn subscribed_instruments(&self) -> Vec<InstrumentId> {
479 self.collect_subscriptions(|client| &client.subscriptions_instrument)
480 }
481
482 #[must_use]
484 pub fn subscribed_book_deltas(&self) -> Vec<InstrumentId> {
485 self.collect_subscriptions(|client| &client.subscriptions_book_deltas)
486 }
487
488 #[must_use]
490 pub fn subscribed_book_snapshots(&self) -> Vec<InstrumentId> {
491 self.collect_subscriptions(|client| &client.subscriptions_book_snapshots)
492 }
493
494 #[must_use]
496 pub fn subscribed_quotes(&self) -> Vec<InstrumentId> {
497 self.collect_subscriptions(|client| &client.subscriptions_quotes)
498 }
499
500 #[must_use]
502 pub fn subscribed_trades(&self) -> Vec<InstrumentId> {
503 self.collect_subscriptions(|client| &client.subscriptions_trades)
504 }
505
506 #[must_use]
508 pub fn subscribed_bars(&self) -> Vec<BarType> {
509 self.collect_subscriptions(|client| &client.subscriptions_bars)
510 }
511
512 #[must_use]
514 pub fn subscribed_mark_prices(&self) -> Vec<InstrumentId> {
515 self.collect_subscriptions(|client| &client.subscriptions_mark_prices)
516 }
517
518 #[must_use]
520 pub fn subscribed_index_prices(&self) -> Vec<InstrumentId> {
521 self.collect_subscriptions(|client| &client.subscriptions_index_prices)
522 }
523
524 #[must_use]
526 pub fn subscribed_funding_rates(&self) -> Vec<InstrumentId> {
527 self.collect_subscriptions(|client| &client.subscriptions_funding_rates)
528 }
529
530 #[must_use]
532 pub fn subscribed_instrument_status(&self) -> Vec<InstrumentId> {
533 self.collect_subscriptions(|client| &client.subscriptions_instrument_status)
534 }
535
536 #[must_use]
538 pub fn subscribed_instrument_close(&self) -> Vec<InstrumentId> {
539 self.collect_subscriptions(|client| &client.subscriptions_instrument_close)
540 }
541
542 pub fn execute(&mut self, cmd: &DataCommand) {
548 if let Err(e) = match cmd {
549 DataCommand::Subscribe(c) => self.execute_subscribe(c),
550 DataCommand::Unsubscribe(c) => self.execute_unsubscribe(c),
551 DataCommand::Request(c) => self.execute_request(c),
552 #[cfg(feature = "defi")]
553 DataCommand::DefiRequest(c) => self.execute_defi_request(c),
554 #[cfg(feature = "defi")]
555 DataCommand::DefiSubscribe(c) => self.execute_defi_subscribe(c),
556 #[cfg(feature = "defi")]
557 DataCommand::DefiUnsubscribe(c) => self.execute_defi_unsubscribe(c),
558 _ => {
559 log::warn!("Unhandled DataCommand variant: {cmd:?}");
560 Ok(())
561 }
562 } {
563 log::error!("{e}");
564 }
565 }
566
567 pub fn execute_subscribe(&mut self, cmd: &SubscribeCommand) -> anyhow::Result<()> {
574 match &cmd {
576 SubscribeCommand::BookDeltas(cmd) => self.subscribe_book_deltas(cmd)?,
577 SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd)?,
578 SubscribeCommand::BookSnapshots(cmd) => self.subscribe_book_snapshots(cmd)?,
579 SubscribeCommand::Bars(cmd) => self.subscribe_bars(cmd)?,
580 _ => {} }
582
583 if let Some(client_id) = cmd.client_id()
584 && self.external_clients.contains(client_id)
585 {
586 if self.config.debug {
587 log::debug!("Skipping subscribe command for external client {client_id}: {cmd:?}",);
588 }
589 return Ok(());
590 }
591
592 if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
593 client.execute_subscribe(cmd);
594 } else {
595 log::error!(
596 "Cannot handle command: no client found for client_id={:?}, venue={:?}",
597 cmd.client_id(),
598 cmd.venue(),
599 );
600 }
601
602 Ok(())
603 }
604
605 pub fn execute_unsubscribe(&mut self, cmd: &UnsubscribeCommand) -> anyhow::Result<()> {
611 match &cmd {
612 UnsubscribeCommand::BookDeltas(cmd) => self.unsubscribe_book_deltas(cmd)?,
613 UnsubscribeCommand::BookDepth10(cmd) => self.unsubscribe_book_depth10(cmd)?,
614 UnsubscribeCommand::BookSnapshots(cmd) => self.unsubscribe_book_snapshots(cmd)?,
615 UnsubscribeCommand::Bars(cmd) => self.unsubscribe_bars(cmd)?,
616 _ => {} }
618
619 if let Some(client_id) = cmd.client_id()
620 && self.external_clients.contains(client_id)
621 {
622 if self.config.debug {
623 log::debug!(
624 "Skipping unsubscribe command for external client {client_id}: {cmd:?}",
625 );
626 }
627 return Ok(());
628 }
629
630 if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
631 client.execute_unsubscribe(cmd);
632 } else {
633 log::error!(
634 "Cannot handle command: no client found for client_id={:?}, venue={:?}",
635 cmd.client_id(),
636 cmd.venue(),
637 );
638 }
639
640 Ok(())
641 }
642
643 pub fn execute_request(&mut self, req: &RequestCommand) -> anyhow::Result<()> {
650 if let Some(cid) = req.client_id()
652 && self.external_clients.contains(cid)
653 {
654 if self.config.debug {
655 log::debug!("Skipping data request for external client {cid}: {req:?}");
656 }
657 return Ok(());
658 }
659 if let Some(client) = self.get_client(req.client_id(), req.venue()) {
660 match req {
661 RequestCommand::Data(req) => client.request_data(req),
662 RequestCommand::Instrument(req) => client.request_instrument(req),
663 RequestCommand::Instruments(req) => client.request_instruments(req),
664 RequestCommand::BookSnapshot(req) => client.request_book_snapshot(req),
665 RequestCommand::BookDepth(req) => client.request_book_depth(req),
666 RequestCommand::Quotes(req) => client.request_quotes(req),
667 RequestCommand::Trades(req) => client.request_trades(req),
668 RequestCommand::Bars(req) => client.request_bars(req),
669 }
670 } else {
671 anyhow::bail!(
672 "Cannot handle request: no client found for {:?} {:?}",
673 req.client_id(),
674 req.venue()
675 );
676 }
677 }
678
679 pub fn process(&mut self, data: &dyn Any) {
683 if let Some(data) = data.downcast_ref::<Data>() {
685 self.process_data(data.clone()); return;
687 }
688
689 #[cfg(feature = "defi")]
690 if let Some(data) = data.downcast_ref::<DefiData>() {
691 self.process_defi_data(data.clone()); return;
693 }
694
695 if let Some(instrument) = data.downcast_ref::<InstrumentAny>() {
696 self.handle_instrument(instrument.clone());
697 } else if let Some(funding_rate) = data.downcast_ref::<FundingRateUpdate>() {
698 self.handle_funding_rate(*funding_rate);
699 } else {
700 log::error!("Cannot process data {data:?}, type is unrecognized");
701 }
702 }
703
704 pub fn process_data(&mut self, data: Data) {
706 match data {
707 Data::Delta(delta) => self.handle_delta(delta),
708 Data::Deltas(deltas) => self.handle_deltas(deltas.into_inner()),
709 Data::Depth10(depth) => self.handle_depth10(*depth),
710 Data::Quote(quote) => self.handle_quote(quote),
711 Data::Trade(trade) => self.handle_trade(trade),
712 Data::Bar(bar) => self.handle_bar(bar),
713 Data::MarkPriceUpdate(mark_price) => self.handle_mark_price(mark_price),
714 Data::IndexPriceUpdate(index_price) => self.handle_index_price(index_price),
715 Data::InstrumentClose(close) => self.handle_instrument_close(close),
716 }
717 }
718
719 pub fn response(&self, resp: DataResponse) {
721 log::debug!("{RECV}{RES} {resp:?}");
722
723 match &resp {
724 DataResponse::Instrument(resp) => {
725 self.handle_instrument_response(resp.data.clone());
726 }
727 DataResponse::Instruments(resp) => {
728 self.handle_instruments(&resp.data);
729 }
730 DataResponse::Quotes(resp) => self.handle_quotes(&resp.data),
731 DataResponse::Trades(resp) => self.handle_trades(&resp.data),
732 DataResponse::Bars(resp) => self.handle_bars(&resp.data),
733 DataResponse::Book(resp) => self.handle_book_response(&resp.data),
734 _ => todo!("Handle other response types"),
735 }
736
737 msgbus::send_response(resp.correlation_id(), &resp);
738 }
739
740 fn handle_instrument(&mut self, instrument: InstrumentAny) {
743 if let Err(e) = self
744 .cache
745 .as_ref()
746 .borrow_mut()
747 .add_instrument(instrument.clone())
748 {
749 log_error_on_cache_insert(&e);
750 }
751
752 let topic = switchboard::get_instrument_topic(instrument.id());
753 msgbus::publish(topic, &instrument as &dyn Any);
754 }
755
756 fn handle_delta(&mut self, delta: OrderBookDelta) {
757 let deltas = if self.config.buffer_deltas {
758 if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&delta.instrument_id) {
759 buffered_deltas.deltas.push(delta);
760 buffered_deltas.flags = delta.flags;
761 buffered_deltas.sequence = delta.sequence;
762 buffered_deltas.ts_event = delta.ts_event;
763 buffered_deltas.ts_init = delta.ts_init;
764 } else {
765 let buffered_deltas = OrderBookDeltas::new(delta.instrument_id, vec![delta]);
766 self.buffered_deltas_map
767 .insert(delta.instrument_id, buffered_deltas);
768 }
769
770 if !RecordFlag::F_LAST.matches(delta.flags) {
771 return; }
773
774 self.buffered_deltas_map
776 .remove(&delta.instrument_id)
777 .unwrap()
778 } else {
779 OrderBookDeltas::new(delta.instrument_id, vec![delta])
780 };
781
782 let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
783 msgbus::publish(topic, &deltas as &dyn Any);
784 }
785
786 fn handle_deltas(&mut self, deltas: OrderBookDeltas) {
787 let deltas = if self.config.buffer_deltas {
788 let mut is_last_delta = false;
789 for delta in &deltas.deltas {
790 if RecordFlag::F_LAST.matches(delta.flags) {
791 is_last_delta = true;
792 break;
793 }
794 }
795
796 let instrument_id = deltas.instrument_id;
797
798 if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&instrument_id) {
799 buffered_deltas.deltas.extend(deltas.deltas);
800
801 if let Some(last_delta) = buffered_deltas.deltas.last() {
802 buffered_deltas.flags = last_delta.flags;
803 buffered_deltas.sequence = last_delta.sequence;
804 buffered_deltas.ts_event = last_delta.ts_event;
805 buffered_deltas.ts_init = last_delta.ts_init;
806 }
807 } else {
808 self.buffered_deltas_map.insert(instrument_id, deltas);
809 }
810
811 if !is_last_delta {
812 return;
813 }
814
815 self.buffered_deltas_map.remove(&instrument_id).unwrap()
817 } else {
818 deltas
819 };
820
821 let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
822 msgbus::publish(topic, &deltas as &dyn Any);
823 }
824
825 fn handle_depth10(&mut self, depth: OrderBookDepth10) {
826 let topic = switchboard::get_book_depth10_topic(depth.instrument_id);
827 msgbus::publish(topic, &depth as &dyn Any);
828 }
829
830 fn handle_quote(&mut self, quote: QuoteTick) {
831 if let Err(e) = self.cache.as_ref().borrow_mut().add_quote(quote) {
832 log_error_on_cache_insert(&e);
833 }
834
835 let topic = switchboard::get_quotes_topic(quote.instrument_id);
838 msgbus::publish(topic, "e as &dyn Any);
839 }
840
841 fn handle_trade(&mut self, trade: TradeTick) {
842 if let Err(e) = self.cache.as_ref().borrow_mut().add_trade(trade) {
843 log_error_on_cache_insert(&e);
844 }
845
846 let topic = switchboard::get_trades_topic(trade.instrument_id);
849 msgbus::publish(topic, &trade as &dyn Any);
850 }
851
852 fn handle_bar(&mut self, bar: Bar) {
853 if self.config.validate_data_sequence
855 && let Some(last_bar) = self.cache.as_ref().borrow().bar(&bar.bar_type)
856 {
857 if bar.ts_event < last_bar.ts_event {
858 log::warn!(
859 "Bar {bar} was prior to last bar `ts_event` {}",
860 last_bar.ts_event
861 );
862 return; }
864 if bar.ts_init < last_bar.ts_init {
865 log::warn!(
866 "Bar {bar} was prior to last bar `ts_init` {}",
867 last_bar.ts_init
868 );
869 return; }
871 }
873
874 if let Err(e) = self.cache.as_ref().borrow_mut().add_bar(bar) {
875 log_error_on_cache_insert(&e);
876 }
877
878 let topic = switchboard::get_bars_topic(bar.bar_type);
879 msgbus::publish(topic, &bar as &dyn Any);
880 }
881
882 fn handle_mark_price(&mut self, mark_price: MarkPriceUpdate) {
883 if let Err(e) = self.cache.as_ref().borrow_mut().add_mark_price(mark_price) {
884 log_error_on_cache_insert(&e);
885 }
886
887 let topic = switchboard::get_mark_price_topic(mark_price.instrument_id);
888 msgbus::publish(topic, &mark_price as &dyn Any);
889 }
890
891 fn handle_index_price(&mut self, index_price: IndexPriceUpdate) {
892 if let Err(e) = self
893 .cache
894 .as_ref()
895 .borrow_mut()
896 .add_index_price(index_price)
897 {
898 log_error_on_cache_insert(&e);
899 }
900
901 let topic = switchboard::get_index_price_topic(index_price.instrument_id);
902 msgbus::publish(topic, &index_price as &dyn Any);
903 }
904
905 pub fn handle_funding_rate(&mut self, funding_rate: FundingRateUpdate) {
907 if let Err(e) = self
908 .cache
909 .as_ref()
910 .borrow_mut()
911 .add_funding_rate(funding_rate)
912 {
913 log_error_on_cache_insert(&e);
914 }
915
916 let topic = switchboard::get_funding_rate_topic(funding_rate.instrument_id);
917 msgbus::publish(topic, &funding_rate as &dyn Any);
918 }
919
920 fn handle_instrument_close(&mut self, close: InstrumentClose) {
921 let topic = switchboard::get_instrument_close_topic(close.instrument_id);
922 msgbus::publish(topic, &close as &dyn Any);
923 }
924
925 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
928 if cmd.instrument_id.is_synthetic() {
929 anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
930 }
931
932 self.setup_book_updater(&cmd.instrument_id, cmd.book_type, true, cmd.managed)?;
933
934 Ok(())
935 }
936
937 fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
938 if cmd.instrument_id.is_synthetic() {
939 anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDepth10` data");
940 }
941
942 self.setup_book_updater(&cmd.instrument_id, cmd.book_type, false, cmd.managed)?;
943
944 Ok(())
945 }
946
947 fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
948 if self.subscribed_book_deltas().contains(&cmd.instrument_id) {
949 return Ok(());
950 }
951
952 if cmd.instrument_id.is_synthetic() {
953 anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
954 }
955
956 let first_for_interval = match self.book_intervals.entry(cmd.interval_ms) {
958 Entry::Vacant(e) => {
959 let mut set = AHashSet::new();
960 set.insert(cmd.instrument_id);
961 e.insert(set);
962 true
963 }
964 Entry::Occupied(mut e) => {
965 e.get_mut().insert(cmd.instrument_id);
966 false
967 }
968 };
969
970 if first_for_interval {
971 let interval_ns = millis_to_nanos_unchecked(cmd.interval_ms.get() as f64);
973 let topic = switchboard::get_book_snapshots_topic(cmd.instrument_id, cmd.interval_ms);
974
975 let snap_info = BookSnapshotInfo {
976 instrument_id: cmd.instrument_id,
977 venue: cmd.instrument_id.venue,
978 is_composite: cmd.instrument_id.symbol.is_composite(),
979 root: Ustr::from(cmd.instrument_id.symbol.root()),
980 topic,
981 interval_ms: cmd.interval_ms,
982 };
983
984 let now_ns = self.clock.borrow().timestamp_ns().as_u64();
986 let start_time_ns = now_ns - (now_ns % interval_ns) + interval_ns;
987
988 let snapshotter = Rc::new(BookSnapshotter::new(snap_info, self.cache.clone()));
989 self.book_snapshotters
990 .insert(cmd.instrument_id, snapshotter.clone());
991 let timer_name = snapshotter.timer_name;
992
993 let callback_fn: Rc<dyn Fn(TimeEvent)> =
994 Rc::new(move |event| snapshotter.snapshot(event));
995 let callback = TimeEventCallback::from(callback_fn);
996
997 self.clock
998 .borrow_mut()
999 .set_timer_ns(
1000 &timer_name,
1001 interval_ns,
1002 Some(start_time_ns.into()),
1003 None,
1004 Some(callback),
1005 None,
1006 None,
1007 )
1008 .expect(FAILED);
1009 }
1010
1011 self.setup_book_updater(&cmd.instrument_id, cmd.book_type, false, true)?;
1012
1013 Ok(())
1014 }
1015
1016 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
1017 match cmd.bar_type.aggregation_source() {
1018 AggregationSource::Internal => {
1019 if !self.bar_aggregators.contains_key(&cmd.bar_type.standard()) {
1020 self.start_bar_aggregator(cmd.bar_type)?;
1021 }
1022 }
1023 AggregationSource::External => {
1024 if cmd.bar_type.instrument_id().is_synthetic() {
1025 anyhow::bail!(
1026 "Cannot subscribe for externally aggregated synthetic instrument bar data"
1027 );
1028 }
1029 }
1030 }
1031
1032 Ok(())
1033 }
1034
1035 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
1036 if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
1037 log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
1038 return Ok(());
1039 }
1040
1041 let topics = vec![
1042 switchboard::get_book_deltas_topic(cmd.instrument_id),
1043 switchboard::get_book_depth10_topic(cmd.instrument_id),
1044 ];
1046
1047 self.maintain_book_updater(&cmd.instrument_id, &topics);
1048 self.maintain_book_snapshotter(&cmd.instrument_id);
1049
1050 Ok(())
1051 }
1052
1053 fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
1054 if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
1055 log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
1056 return Ok(());
1057 }
1058
1059 let topics = vec![
1060 switchboard::get_book_deltas_topic(cmd.instrument_id),
1061 switchboard::get_book_depth10_topic(cmd.instrument_id),
1062 ];
1064
1065 self.maintain_book_updater(&cmd.instrument_id, &topics);
1066 self.maintain_book_snapshotter(&cmd.instrument_id);
1067
1068 Ok(())
1069 }
1070
1071 fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
1072 if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
1073 log::warn!("Cannot unsubscribe from `OrderBook` snapshots: not subscribed");
1074 return Ok(());
1075 }
1076
1077 let mut to_remove = Vec::new();
1079 for (interval, set) in &mut self.book_intervals {
1080 if set.remove(&cmd.instrument_id) && set.is_empty() {
1081 to_remove.push(*interval);
1082 }
1083 }
1084
1085 for interval in to_remove {
1086 self.book_intervals.remove(&interval);
1087 }
1088
1089 let topics = vec![
1090 switchboard::get_book_deltas_topic(cmd.instrument_id),
1091 switchboard::get_book_depth10_topic(cmd.instrument_id),
1092 ];
1094
1095 self.maintain_book_updater(&cmd.instrument_id, &topics);
1096 self.maintain_book_snapshotter(&cmd.instrument_id);
1097
1098 Ok(())
1099 }
1100
1101 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
1103 let bar_type = cmd.bar_type;
1105 if self.bar_aggregators.contains_key(&bar_type.standard()) {
1106 if let Err(e) = self.stop_bar_aggregator(bar_type) {
1107 log::error!("Error stopping bar aggregator for {bar_type}: {e}");
1108 }
1109 self.bar_aggregators.remove(&bar_type.standard());
1110 log::debug!("Removed bar aggregator for {bar_type}");
1111 }
1112 Ok(())
1113 }
1114
1115 fn maintain_book_updater(&mut self, instrument_id: &InstrumentId, topics: &[MStr<Topic>]) {
1116 if let Some(updater) = self.book_updaters.get(instrument_id) {
1117 let handler = ShareableMessageHandler(updater.clone());
1118
1119 for topic in topics {
1121 if msgbus::subscriptions_count(topic.as_str()) == 1
1122 && msgbus::is_subscribed(topic.as_str(), handler.clone())
1123 {
1124 log::debug!("Unsubscribing BookUpdater from {topic}");
1125 msgbus::unsubscribe_topic(*topic, handler.clone());
1126 }
1127 }
1128
1129 let still_subscribed = topics
1131 .iter()
1132 .any(|topic| msgbus::is_subscribed(topic.as_str(), handler.clone()));
1133 if !still_subscribed {
1134 self.book_updaters.remove(instrument_id);
1135 log::debug!("Removed BookUpdater for instrument ID {instrument_id}");
1136 }
1137 }
1138 }
1139
1140 fn maintain_book_snapshotter(&mut self, instrument_id: &InstrumentId) {
1141 if let Some(snapshotter) = self.book_snapshotters.get(instrument_id) {
1142 let topic = switchboard::get_book_snapshots_topic(
1143 *instrument_id,
1144 snapshotter.snap_info.interval_ms,
1145 );
1146
1147 if msgbus::subscriptions_count(topic.as_str()) == 0 {
1149 let timer_name = snapshotter.timer_name;
1150 self.book_snapshotters.remove(instrument_id);
1151 let mut clock = self.clock.borrow_mut();
1152 if clock.timer_exists(&timer_name) {
1153 clock.cancel_timer(&timer_name);
1154 }
1155 log::debug!("Removed BookSnapshotter for instrument ID {instrument_id}");
1156 }
1157 }
1158 }
1159
1160 fn handle_instrument_response(&self, instrument: InstrumentAny) {
1163 let mut cache = self.cache.as_ref().borrow_mut();
1164 if let Err(e) = cache.add_instrument(instrument) {
1165 log_error_on_cache_insert(&e);
1166 }
1167 }
1168
1169 fn handle_instruments(&self, instruments: &[InstrumentAny]) {
1170 let mut cache = self.cache.as_ref().borrow_mut();
1172 for instrument in instruments {
1173 if let Err(e) = cache.add_instrument(instrument.clone()) {
1174 log_error_on_cache_insert(&e);
1175 }
1176 }
1177 }
1178
1179 fn handle_quotes(&self, quotes: &[QuoteTick]) {
1180 if let Err(e) = self.cache.as_ref().borrow_mut().add_quotes(quotes) {
1181 log_error_on_cache_insert(&e);
1182 }
1183 }
1184
1185 fn handle_trades(&self, trades: &[TradeTick]) {
1186 if let Err(e) = self.cache.as_ref().borrow_mut().add_trades(trades) {
1187 log_error_on_cache_insert(&e);
1188 }
1189 }
1190
1191 fn handle_bars(&self, bars: &[Bar]) {
1192 if let Err(e) = self.cache.as_ref().borrow_mut().add_bars(bars) {
1193 log_error_on_cache_insert(&e);
1194 }
1195 }
1196
1197 fn handle_book_response(&self, book: &OrderBook) {
1198 log::debug!("Adding order book {} to cache", book.instrument_id);
1199 if let Err(e) = self
1200 .cache
1201 .as_ref()
1202 .borrow_mut()
1203 .add_order_book(book.clone())
1204 {
1205 log_error_on_cache_insert(&e);
1206 }
1207 }
1208
1209 #[allow(clippy::too_many_arguments)]
1212 fn setup_book_updater(
1213 &mut self,
1214 instrument_id: &InstrumentId,
1215 book_type: BookType,
1216 only_deltas: bool,
1217 managed: bool,
1218 ) -> anyhow::Result<()> {
1219 let mut cache = self.cache.borrow_mut();
1220 if managed && !cache.has_order_book(instrument_id) {
1221 let book = OrderBook::new(*instrument_id, book_type);
1222 log::debug!("Created {book}");
1223 cache.add_order_book(book)?;
1224 }
1225
1226 let updater = Rc::new(BookUpdater::new(instrument_id, self.cache.clone()));
1228 self.book_updaters.insert(*instrument_id, updater.clone());
1229
1230 let handler = ShareableMessageHandler(updater);
1231
1232 let topic = switchboard::get_book_deltas_topic(*instrument_id);
1233 if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1234 msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1235 }
1236
1237 let topic = switchboard::get_book_depth10_topic(*instrument_id);
1238 if !only_deltas && !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1239 msgbus::subscribe(topic.into(), handler, Some(self.msgbus_priority));
1240 }
1241
1242 Ok(())
1243 }
1244
1245 fn create_bar_aggregator(
1246 &mut self,
1247 instrument: &InstrumentAny,
1248 bar_type: BarType,
1249 ) -> Box<dyn BarAggregator> {
1250 let cache = self.cache.clone();
1251
1252 let handler = move |bar: Bar| {
1253 if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
1254 log_error_on_cache_insert(&e);
1255 }
1256
1257 let topic = switchboard::get_bars_topic(bar.bar_type);
1258 msgbus::publish(topic, &bar as &dyn Any);
1259 };
1260
1261 let clock = self.clock.clone();
1262 let config = self.config.clone();
1263
1264 let price_precision = instrument.price_precision();
1265 let size_precision = instrument.size_precision();
1266
1267 if bar_type.spec().is_time_aggregated() {
1268 let time_bars_origin_offset = config
1270 .time_bars_origins
1271 .get(&bar_type.spec().aggregation)
1272 .map(|duration| chrono::TimeDelta::from_std(*duration).unwrap_or_default());
1273
1274 Box::new(TimeBarAggregator::new(
1275 bar_type,
1276 price_precision,
1277 size_precision,
1278 clock,
1279 handler,
1280 config.time_bars_build_with_no_updates,
1281 config.time_bars_timestamp_on_close,
1282 config.time_bars_interval_type,
1283 time_bars_origin_offset,
1284 20, false, ))
1287 } else {
1288 match bar_type.spec().aggregation {
1289 BarAggregation::Tick => Box::new(TickBarAggregator::new(
1290 bar_type,
1291 price_precision,
1292 size_precision,
1293 handler,
1294 )) as Box<dyn BarAggregator>,
1295 BarAggregation::TickImbalance => Box::new(TickImbalanceBarAggregator::new(
1296 bar_type,
1297 price_precision,
1298 size_precision,
1299 handler,
1300 )) as Box<dyn BarAggregator>,
1301 BarAggregation::TickRuns => Box::new(TickRunsBarAggregator::new(
1302 bar_type,
1303 price_precision,
1304 size_precision,
1305 handler,
1306 )) as Box<dyn BarAggregator>,
1307 BarAggregation::Volume => Box::new(VolumeBarAggregator::new(
1308 bar_type,
1309 price_precision,
1310 size_precision,
1311 handler,
1312 )) as Box<dyn BarAggregator>,
1313 BarAggregation::VolumeImbalance => Box::new(VolumeImbalanceBarAggregator::new(
1314 bar_type,
1315 price_precision,
1316 size_precision,
1317 handler,
1318 )) as Box<dyn BarAggregator>,
1319 BarAggregation::VolumeRuns => Box::new(VolumeRunsBarAggregator::new(
1320 bar_type,
1321 price_precision,
1322 size_precision,
1323 handler,
1324 )) as Box<dyn BarAggregator>,
1325 BarAggregation::Value => Box::new(ValueBarAggregator::new(
1326 bar_type,
1327 price_precision,
1328 size_precision,
1329 handler,
1330 )) as Box<dyn BarAggregator>,
1331 BarAggregation::ValueImbalance => Box::new(ValueImbalanceBarAggregator::new(
1332 bar_type,
1333 price_precision,
1334 size_precision,
1335 handler,
1336 )) as Box<dyn BarAggregator>,
1337 BarAggregation::ValueRuns => Box::new(ValueRunsBarAggregator::new(
1338 bar_type,
1339 price_precision,
1340 size_precision,
1341 handler,
1342 )) as Box<dyn BarAggregator>,
1343 BarAggregation::Renko => Box::new(RenkoBarAggregator::new(
1344 bar_type,
1345 price_precision,
1346 size_precision,
1347 instrument.price_increment(),
1348 handler,
1349 )) as Box<dyn BarAggregator>,
1350 _ => panic!(
1351 "BarAggregation {:?} is not currently implemented. Supported aggregations: MILLISECOND, SECOND, MINUTE, HOUR, DAY, WEEK, MONTH, YEAR, TICK, TICK_IMBALANCE, TICK_RUNS, VOLUME, VOLUME_IMBALANCE, VOLUME_RUNS, VALUE, VALUE_IMBALANCE, VALUE_RUNS, RENKO",
1352 bar_type.spec().aggregation
1353 ),
1354 }
1355 }
1356 }
1357
1358 fn start_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
1359 let instrument = {
1361 let cache = self.cache.borrow();
1362 cache
1363 .instrument(&bar_type.instrument_id())
1364 .ok_or_else(|| {
1365 anyhow::anyhow!(
1366 "Cannot start bar aggregation: no instrument found for {}",
1367 bar_type.instrument_id(),
1368 )
1369 })?
1370 .clone()
1371 };
1372
1373 let bar_key = bar_type.standard();
1375
1376 let aggregator = if let Some(rc) = self.bar_aggregators.get(&bar_key) {
1378 rc.clone()
1379 } else {
1380 let agg = self.create_bar_aggregator(&instrument, bar_type);
1381 let rc = Rc::new(RefCell::new(agg));
1382 self.bar_aggregators.insert(bar_key, rc.clone());
1383 rc
1384 };
1385
1386 let mut handlers = Vec::new();
1388
1389 if bar_type.is_composite() {
1390 let topic = switchboard::get_bars_topic(bar_type.composite());
1391 let handler =
1392 ShareableMessageHandler(Rc::new(BarBarHandler::new(aggregator.clone(), bar_key)));
1393
1394 if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1395 msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1396 }
1397
1398 handlers.push((topic, handler));
1399 } else if bar_type.spec().price_type == PriceType::Last {
1400 let topic = switchboard::get_trades_topic(bar_type.instrument_id());
1401 let handler =
1402 ShareableMessageHandler(Rc::new(BarTradeHandler::new(aggregator.clone(), bar_key)));
1403
1404 if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1405 msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1406 }
1407
1408 handlers.push((topic, handler));
1409 } else {
1410 let topic = switchboard::get_quotes_topic(bar_type.instrument_id());
1411 let handler =
1412 ShareableMessageHandler(Rc::new(BarQuoteHandler::new(aggregator.clone(), bar_key)));
1413
1414 if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1415 msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1416 }
1417
1418 handlers.push((topic, handler));
1419 }
1420
1421 self.bar_aggregator_handlers.insert(bar_key, handlers);
1422
1423 self.setup_bar_aggregator(bar_type, false)?;
1425
1426 aggregator.borrow_mut().set_is_running(true);
1427
1428 Ok(())
1429 }
1430
1431 fn setup_bar_aggregator(&mut self, bar_type: BarType, historical: bool) -> anyhow::Result<()> {
1435 let bar_key = bar_type.standard();
1436 let aggregator = self.bar_aggregators.get(&bar_key).ok_or_else(|| {
1437 anyhow::anyhow!("Cannot setup bar aggregator: no aggregator found for {bar_type}")
1438 })?;
1439
1440 let handler: Box<dyn FnMut(Bar)> = if historical {
1442 let cache = self.cache.clone();
1444 Box::new(move |bar: Bar| {
1445 if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
1446 log_error_on_cache_insert(&e);
1447 }
1448 })
1450 } else {
1451 let cache = self.cache.clone();
1453 Box::new(move |bar: Bar| {
1454 if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
1455 log_error_on_cache_insert(&e);
1456 }
1457 let topic = switchboard::get_bars_topic(bar.bar_type);
1458 msgbus::publish(topic, &bar as &dyn Any);
1459 })
1460 };
1461
1462 aggregator
1463 .borrow_mut()
1464 .set_historical_mode(historical, handler);
1465
1466 if bar_type.spec().is_time_aggregated() {
1468 use nautilus_common::clock::TestClock;
1469
1470 if historical {
1471 let test_clock = Rc::new(RefCell::new(TestClock::new()));
1473 aggregator.borrow_mut().set_clock(test_clock);
1474 let aggregator_weak = Rc::downgrade(aggregator);
1477 aggregator.borrow_mut().set_aggregator_weak(aggregator_weak);
1478 } else {
1479 aggregator.borrow_mut().set_clock(self.clock.clone());
1480 aggregator
1481 .borrow_mut()
1482 .start_timer(Some(aggregator.clone()));
1483 }
1484 }
1485
1486 Ok(())
1487 }
1488
1489 fn stop_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
1490 let aggregator = self
1491 .bar_aggregators
1492 .remove(&bar_type.standard())
1493 .ok_or_else(|| {
1494 anyhow::anyhow!("Cannot stop bar aggregator: no aggregator to stop for {bar_type}")
1495 })?;
1496
1497 aggregator.borrow_mut().stop();
1498
1499 let bar_key = bar_type.standard();
1501 if let Some(subs) = self.bar_aggregator_handlers.remove(&bar_key) {
1502 for (topic, handler) in subs {
1503 if msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1504 msgbus::unsubscribe_topic(topic, handler);
1505 }
1506 }
1507 }
1508
1509 Ok(())
1510 }
1511}
1512
1513#[inline(always)]
1514fn log_error_on_cache_insert<T: Display>(e: &T) {
1515 log::error!("Error on cache insert: {e}");
1516}