1pub mod book;
32pub mod config;
33mod handlers;
34
35#[cfg(feature = "defi")]
36pub mod pool;
37
38use std::{
39 any::Any,
40 cell::{Ref, RefCell},
41 collections::hash_map::Entry,
42 fmt::Display,
43 num::NonZeroUsize,
44 rc::Rc,
45};
46
47use ahash::{AHashMap, AHashSet};
48use book::{BookSnapshotInfo, BookSnapshotter, BookUpdater};
49use config::DataEngineConfig;
50use futures::future::join_all;
51use handlers::{BarBarHandler, BarQuoteHandler, BarTradeHandler};
52use indexmap::IndexMap;
53use nautilus_common::{
54 cache::Cache,
55 clock::Clock,
56 logging::{RECV, RES},
57 messages::data::{
58 DataCommand, DataResponse, RequestCommand, SubscribeBars, SubscribeBookDeltas,
59 SubscribeBookDepth10, SubscribeBookSnapshots, SubscribeCommand, UnsubscribeBars,
60 UnsubscribeBookDeltas, UnsubscribeBookDepth10, UnsubscribeBookSnapshots,
61 UnsubscribeCommand,
62 },
63 msgbus::{self, MStr, Topic, handler::ShareableMessageHandler, switchboard},
64 timer::{TimeEvent, TimeEventCallback},
65};
66use nautilus_core::{
67 UUID4,
68 correctness::{
69 FAILED, check_key_in_map, check_key_not_in_map, check_predicate_false, check_predicate_true,
70 },
71 datetime::millis_to_nanos_unchecked,
72};
73#[cfg(feature = "defi")]
74use nautilus_model::defi::DefiData;
75use nautilus_model::{
76 data::{
77 Bar, BarType, Data, DataType, FundingRateUpdate, IndexPriceUpdate, InstrumentClose,
78 MarkPriceUpdate, OrderBookDelta, OrderBookDeltas, OrderBookDepth10, QuoteTick, TradeTick,
79 },
80 enums::{AggregationSource, BarAggregation, BookType, PriceType, RecordFlag},
81 identifiers::{ClientId, InstrumentId, Venue},
82 instruments::{Instrument, InstrumentAny, SyntheticInstrument},
83 orderbook::OrderBook,
84};
85#[cfg(feature = "streaming")]
86use nautilus_persistence::backend::catalog::ParquetDataCatalog;
87use ustr::Ustr;
88
89#[cfg(feature = "defi")]
90#[allow(unused_imports)] use crate::defi::engine as _;
92#[cfg(feature = "defi")]
93use crate::engine::pool::PoolUpdater;
94use crate::{
95 aggregation::{
96 BarAggregator, RenkoBarAggregator, TickBarAggregator, TickImbalanceBarAggregator,
97 TickRunsBarAggregator, TimeBarAggregator, ValueBarAggregator, ValueImbalanceBarAggregator,
98 ValueRunsBarAggregator, VolumeBarAggregator, VolumeImbalanceBarAggregator,
99 VolumeRunsBarAggregator,
100 },
101 client::DataClientAdapter,
102};
103
104#[derive(Debug)]
106pub struct DataEngine {
107 pub(crate) clock: Rc<RefCell<dyn Clock>>,
108 pub(crate) cache: Rc<RefCell<Cache>>,
109 pub(crate) external_clients: AHashSet<ClientId>,
110 clients: IndexMap<ClientId, DataClientAdapter>,
111 default_client: Option<DataClientAdapter>,
112 #[cfg(feature = "streaming")]
113 catalogs: AHashMap<Ustr, ParquetDataCatalog>,
114 routing_map: IndexMap<Venue, ClientId>,
115 book_intervals: AHashMap<NonZeroUsize, AHashSet<InstrumentId>>,
116 book_deltas_subs: AHashSet<InstrumentId>,
117 book_updaters: AHashMap<InstrumentId, Rc<BookUpdater>>,
118 book_snapshotters: AHashMap<InstrumentId, Rc<BookSnapshotter>>,
119 bar_aggregators: AHashMap<BarType, Rc<RefCell<Box<dyn BarAggregator>>>>,
120 bar_aggregator_handlers: AHashMap<BarType, Vec<(MStr<Topic>, ShareableMessageHandler)>>,
121 _synthetic_quote_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
122 _synthetic_trade_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
123 buffered_deltas_map: AHashMap<InstrumentId, OrderBookDeltas>,
124 pub(crate) msgbus_priority: u8,
125 pub(crate) config: DataEngineConfig,
126 #[cfg(feature = "defi")]
127 pub(crate) pool_updaters: AHashMap<InstrumentId, Rc<PoolUpdater>>,
128 #[cfg(feature = "defi")]
129 pub(crate) pool_updaters_pending: AHashSet<InstrumentId>,
130 #[cfg(feature = "defi")]
131 pub(crate) pool_snapshot_pending: AHashSet<InstrumentId>,
132 #[cfg(feature = "defi")]
133 pub(crate) pool_event_buffers: AHashMap<InstrumentId, Vec<DefiData>>,
134}
135
136impl DataEngine {
137 #[must_use]
139 pub fn new(
140 clock: Rc<RefCell<dyn Clock>>,
141 cache: Rc<RefCell<Cache>>,
142 config: Option<DataEngineConfig>,
143 ) -> Self {
144 let config = config.unwrap_or_default();
145
146 let external_clients: AHashSet<ClientId> = config
147 .external_clients
148 .clone()
149 .unwrap_or_default()
150 .into_iter()
151 .collect();
152
153 Self {
154 clock,
155 cache,
156 external_clients,
157 clients: IndexMap::new(),
158 default_client: None,
159 #[cfg(feature = "streaming")]
160 catalogs: AHashMap::new(),
161 routing_map: IndexMap::new(),
162 book_intervals: AHashMap::new(),
163 book_deltas_subs: AHashSet::new(),
164 book_updaters: AHashMap::new(),
165 book_snapshotters: AHashMap::new(),
166 bar_aggregators: AHashMap::new(),
167 bar_aggregator_handlers: AHashMap::new(),
168 _synthetic_quote_feeds: AHashMap::new(),
169 _synthetic_trade_feeds: AHashMap::new(),
170 buffered_deltas_map: AHashMap::new(),
171 msgbus_priority: 10, config,
173 #[cfg(feature = "defi")]
174 pool_updaters: AHashMap::new(),
175 #[cfg(feature = "defi")]
176 pool_updaters_pending: AHashSet::new(),
177 #[cfg(feature = "defi")]
178 pool_snapshot_pending: AHashSet::new(),
179 #[cfg(feature = "defi")]
180 pool_event_buffers: AHashMap::new(),
181 }
182 }
183
184 #[must_use]
186 pub fn get_clock(&self) -> Ref<'_, dyn Clock> {
187 self.clock.borrow()
188 }
189
190 #[must_use]
192 pub fn get_cache(&self) -> Ref<'_, Cache> {
193 self.cache.borrow()
194 }
195
196 #[must_use]
198 pub fn cache_rc(&self) -> Rc<RefCell<Cache>> {
199 Rc::clone(&self.cache)
200 }
201
202 #[cfg(feature = "streaming")]
208 pub fn register_catalog(&mut self, catalog: ParquetDataCatalog, name: Option<String>) {
209 let name = Ustr::from(name.as_deref().unwrap_or("catalog_0"));
210
211 check_key_not_in_map(&name, &self.catalogs, "name", "catalogs").expect(FAILED);
212
213 self.catalogs.insert(name, catalog);
214 log::info!("Registered catalog <{name}>");
215 }
216
217 pub fn register_client(&mut self, client: DataClientAdapter, routing: Option<Venue>) {
224 let client_id = client.client_id();
225
226 if let Some(default_client) = &self.default_client {
227 check_predicate_false(
228 default_client.client_id() == client.client_id(),
229 "client_id already registered as default client",
230 )
231 .expect(FAILED);
232 }
233
234 check_key_not_in_map(&client_id, &self.clients, "client_id", "clients").expect(FAILED);
235
236 if let Some(routing) = routing {
237 self.routing_map.insert(routing, client_id);
238 log::debug!("Set client {client_id} routing for {routing}");
239 }
240
241 if client.venue.is_none() && self.default_client.is_none() {
242 self.default_client = Some(client);
243 log::debug!("Registered client {client_id} for default routing");
244 } else {
245 self.clients.insert(client_id, client);
246 log::debug!("Registered client {client_id}");
247 }
248 }
249
250 pub fn deregister_client(&mut self, client_id: &ClientId) {
256 check_key_in_map(client_id, &self.clients, "client_id", "clients").expect(FAILED);
257
258 self.clients.shift_remove(client_id);
259 log::info!("Deregistered client {client_id}");
260 }
261
262 pub fn register_default_client(&mut self, client: DataClientAdapter) {
274 check_predicate_true(
275 self.default_client.is_none(),
276 "default client already registered",
277 )
278 .expect(FAILED);
279
280 let client_id = client.client_id();
281
282 self.default_client = Some(client);
283 log::debug!("Registered default client {client_id}");
284 }
285
286 pub fn start(&mut self) {
288 for client in self.get_clients_mut() {
289 if let Err(e) = client.start() {
290 log::error!("{e}");
291 }
292 }
293 }
294
295 pub fn stop(&mut self) {
297 for client in self.get_clients_mut() {
298 if let Err(e) = client.stop() {
299 log::error!("{e}");
300 }
301 }
302 }
303
304 pub fn reset(&mut self) {
306 for client in self.get_clients_mut() {
307 if let Err(e) = client.reset() {
308 log::error!("{e}");
309 }
310 }
311 }
312
313 pub fn dispose(&mut self) {
315 for client in self.get_clients_mut() {
316 if let Err(e) = client.dispose() {
317 log::error!("{e}");
318 }
319 }
320
321 self.clock.borrow_mut().cancel_timers();
322 }
323
324 pub async fn connect(&mut self) -> anyhow::Result<()> {
330 let futures: Vec<_> = self
331 .get_clients_mut()
332 .into_iter()
333 .map(|client| client.connect())
334 .collect();
335
336 let results = join_all(futures).await;
337 let errors: Vec<_> = results.into_iter().filter_map(Result::err).collect();
338
339 if errors.is_empty() {
340 Ok(())
341 } else {
342 let error_msgs: Vec<_> = errors.iter().map(|e| e.to_string()).collect();
343 anyhow::bail!("Failed to connect data clients: {}", error_msgs.join("; "))
344 }
345 }
346
347 pub async fn disconnect(&mut self) -> anyhow::Result<()> {
353 let futures: Vec<_> = self
354 .get_clients_mut()
355 .into_iter()
356 .map(|client| client.disconnect())
357 .collect();
358
359 let results = join_all(futures).await;
360 let errors: Vec<_> = results.into_iter().filter_map(Result::err).collect();
361
362 if errors.is_empty() {
363 Ok(())
364 } else {
365 let error_msgs: Vec<_> = errors.iter().map(|e| e.to_string()).collect();
366 anyhow::bail!(
367 "Failed to disconnect data clients: {}",
368 error_msgs.join("; ")
369 )
370 }
371 }
372
373 #[must_use]
375 pub fn check_connected(&self) -> bool {
376 self.get_clients()
377 .iter()
378 .all(|client| client.is_connected())
379 }
380
381 #[must_use]
383 pub fn check_disconnected(&self) -> bool {
384 self.get_clients()
385 .iter()
386 .all(|client| !client.is_connected())
387 }
388
389 #[must_use]
391 pub fn registered_clients(&self) -> Vec<ClientId> {
392 self.get_clients()
393 .into_iter()
394 .map(|client| client.client_id())
395 .collect()
396 }
397
398 pub(crate) fn collect_subscriptions<F, T>(&self, get_subs: F) -> Vec<T>
401 where
402 F: Fn(&DataClientAdapter) -> &AHashSet<T>,
403 T: Clone,
404 {
405 self.get_clients()
406 .into_iter()
407 .flat_map(get_subs)
408 .cloned()
409 .collect()
410 }
411
412 #[must_use]
413 pub fn get_clients(&self) -> Vec<&DataClientAdapter> {
414 let (default_opt, clients_map) = (&self.default_client, &self.clients);
415 let mut clients: Vec<&DataClientAdapter> = clients_map.values().collect();
416
417 if let Some(default) = default_opt {
418 clients.push(default);
419 }
420
421 clients
422 }
423
424 #[must_use]
425 pub fn get_clients_mut(&mut self) -> Vec<&mut DataClientAdapter> {
426 let (default_opt, clients_map) = (&mut self.default_client, &mut self.clients);
427 let mut clients: Vec<&mut DataClientAdapter> = clients_map.values_mut().collect();
428
429 if let Some(default) = default_opt {
430 clients.push(default);
431 }
432
433 clients
434 }
435
436 pub fn get_client(
437 &mut self,
438 client_id: Option<&ClientId>,
439 venue: Option<&Venue>,
440 ) -> Option<&mut DataClientAdapter> {
441 if let Some(client_id) = client_id {
442 if let Some(client) = self.clients.get_mut(client_id) {
444 return Some(client);
445 }
446
447 if let Some(default) = self.default_client.as_mut()
449 && default.client_id() == *client_id
450 {
451 return Some(default);
452 }
453
454 return None;
456 }
457
458 if let Some(v) = venue {
459 if let Some(client_id) = self.routing_map.get(v) {
461 return self.clients.get_mut(client_id);
462 }
463 }
464
465 self.get_default_client()
467 }
468
469 const fn get_default_client(&mut self) -> Option<&mut DataClientAdapter> {
470 self.default_client.as_mut()
471 }
472
473 #[must_use]
475 pub fn subscribed_custom_data(&self) -> Vec<DataType> {
476 self.collect_subscriptions(|client| &client.subscriptions_custom)
477 }
478
479 #[must_use]
481 pub fn subscribed_instruments(&self) -> Vec<InstrumentId> {
482 self.collect_subscriptions(|client| &client.subscriptions_instrument)
483 }
484
485 #[must_use]
487 pub fn subscribed_book_deltas(&self) -> Vec<InstrumentId> {
488 self.collect_subscriptions(|client| &client.subscriptions_book_deltas)
489 }
490
491 #[must_use]
493 pub fn subscribed_book_snapshots(&self) -> Vec<InstrumentId> {
494 self.book_intervals
495 .values()
496 .flat_map(|set| set.iter().copied())
497 .collect()
498 }
499
500 #[must_use]
502 pub fn subscribed_quotes(&self) -> Vec<InstrumentId> {
503 self.collect_subscriptions(|client| &client.subscriptions_quotes)
504 }
505
506 #[must_use]
508 pub fn subscribed_trades(&self) -> Vec<InstrumentId> {
509 self.collect_subscriptions(|client| &client.subscriptions_trades)
510 }
511
512 #[must_use]
514 pub fn subscribed_bars(&self) -> Vec<BarType> {
515 self.collect_subscriptions(|client| &client.subscriptions_bars)
516 }
517
518 #[must_use]
520 pub fn subscribed_mark_prices(&self) -> Vec<InstrumentId> {
521 self.collect_subscriptions(|client| &client.subscriptions_mark_prices)
522 }
523
524 #[must_use]
526 pub fn subscribed_index_prices(&self) -> Vec<InstrumentId> {
527 self.collect_subscriptions(|client| &client.subscriptions_index_prices)
528 }
529
530 #[must_use]
532 pub fn subscribed_funding_rates(&self) -> Vec<InstrumentId> {
533 self.collect_subscriptions(|client| &client.subscriptions_funding_rates)
534 }
535
536 #[must_use]
538 pub fn subscribed_instrument_status(&self) -> Vec<InstrumentId> {
539 self.collect_subscriptions(|client| &client.subscriptions_instrument_status)
540 }
541
542 #[must_use]
544 pub fn subscribed_instrument_close(&self) -> Vec<InstrumentId> {
545 self.collect_subscriptions(|client| &client.subscriptions_instrument_close)
546 }
547
548 pub fn execute(&mut self, cmd: &DataCommand) {
554 if let Err(e) = match cmd {
555 DataCommand::Subscribe(c) => self.execute_subscribe(c),
556 DataCommand::Unsubscribe(c) => self.execute_unsubscribe(c),
557 DataCommand::Request(c) => self.execute_request(c),
558 #[cfg(feature = "defi")]
559 DataCommand::DefiRequest(c) => self.execute_defi_request(c),
560 #[cfg(feature = "defi")]
561 DataCommand::DefiSubscribe(c) => self.execute_defi_subscribe(c),
562 #[cfg(feature = "defi")]
563 DataCommand::DefiUnsubscribe(c) => self.execute_defi_unsubscribe(c),
564 _ => {
565 log::warn!("Unhandled DataCommand variant: {cmd:?}");
566 Ok(())
567 }
568 } {
569 log::error!("{e}");
570 }
571 }
572
573 pub fn execute_subscribe(&mut self, cmd: &SubscribeCommand) -> anyhow::Result<()> {
580 match &cmd {
582 SubscribeCommand::BookDeltas(cmd) => self.subscribe_book_deltas(cmd)?,
583 SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd)?,
584 SubscribeCommand::BookSnapshots(cmd) => {
585 return self.subscribe_book_snapshots(cmd);
587 }
588 SubscribeCommand::Bars(cmd) => self.subscribe_bars(cmd)?,
589 _ => {} }
591
592 if let Some(client_id) = cmd.client_id()
593 && self.external_clients.contains(client_id)
594 {
595 if self.config.debug {
596 log::debug!("Skipping subscribe command for external client {client_id}: {cmd:?}",);
597 }
598 return Ok(());
599 }
600
601 if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
602 client.execute_subscribe(cmd);
603 } else {
604 log::error!(
605 "Cannot handle command: no client found for client_id={:?}, venue={:?}",
606 cmd.client_id(),
607 cmd.venue(),
608 );
609 }
610
611 Ok(())
612 }
613
614 pub fn execute_unsubscribe(&mut self, cmd: &UnsubscribeCommand) -> anyhow::Result<()> {
620 match &cmd {
621 UnsubscribeCommand::BookDeltas(cmd) => self.unsubscribe_book_deltas(cmd)?,
622 UnsubscribeCommand::BookDepth10(cmd) => self.unsubscribe_book_depth10(cmd)?,
623 UnsubscribeCommand::BookSnapshots(cmd) => {
624 return self.unsubscribe_book_snapshots(cmd);
626 }
627 UnsubscribeCommand::Bars(cmd) => self.unsubscribe_bars(cmd)?,
628 _ => {} }
630
631 if let Some(client_id) = cmd.client_id()
632 && self.external_clients.contains(client_id)
633 {
634 if self.config.debug {
635 log::debug!(
636 "Skipping unsubscribe command for external client {client_id}: {cmd:?}",
637 );
638 }
639 return Ok(());
640 }
641
642 if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
643 client.execute_unsubscribe(cmd);
644 } else {
645 log::error!(
646 "Cannot handle command: no client found for client_id={:?}, venue={:?}",
647 cmd.client_id(),
648 cmd.venue(),
649 );
650 }
651
652 Ok(())
653 }
654
655 pub fn execute_request(&mut self, req: &RequestCommand) -> anyhow::Result<()> {
662 if let Some(cid) = req.client_id()
664 && self.external_clients.contains(cid)
665 {
666 if self.config.debug {
667 log::debug!("Skipping data request for external client {cid}: {req:?}");
668 }
669 return Ok(());
670 }
671 if let Some(client) = self.get_client(req.client_id(), req.venue()) {
672 match req {
673 RequestCommand::Data(req) => client.request_data(req),
674 RequestCommand::Instrument(req) => client.request_instrument(req),
675 RequestCommand::Instruments(req) => client.request_instruments(req),
676 RequestCommand::BookSnapshot(req) => client.request_book_snapshot(req),
677 RequestCommand::BookDepth(req) => client.request_book_depth(req),
678 RequestCommand::Quotes(req) => client.request_quotes(req),
679 RequestCommand::Trades(req) => client.request_trades(req),
680 RequestCommand::Bars(req) => client.request_bars(req),
681 }
682 } else {
683 anyhow::bail!(
684 "Cannot handle request: no client found for {:?} {:?}",
685 req.client_id(),
686 req.venue()
687 );
688 }
689 }
690
691 pub fn process(&mut self, data: &dyn Any) {
695 if let Some(data) = data.downcast_ref::<Data>() {
697 self.process_data(data.clone()); return;
699 }
700
701 #[cfg(feature = "defi")]
702 if let Some(data) = data.downcast_ref::<DefiData>() {
703 self.process_defi_data(data.clone()); return;
705 }
706
707 if let Some(instrument) = data.downcast_ref::<InstrumentAny>() {
708 self.handle_instrument(instrument.clone());
709 } else if let Some(funding_rate) = data.downcast_ref::<FundingRateUpdate>() {
710 self.handle_funding_rate(*funding_rate);
711 } else {
712 log::error!("Cannot process data {data:?}, type is unrecognized");
713 }
714 }
715
716 pub fn process_data(&mut self, data: Data) {
718 match data {
719 Data::Delta(delta) => self.handle_delta(delta),
720 Data::Deltas(deltas) => self.handle_deltas(deltas.into_inner()),
721 Data::Depth10(depth) => self.handle_depth10(*depth),
722 Data::Quote(quote) => self.handle_quote(quote),
723 Data::Trade(trade) => self.handle_trade(trade),
724 Data::Bar(bar) => self.handle_bar(bar),
725 Data::MarkPriceUpdate(mark_price) => self.handle_mark_price(mark_price),
726 Data::IndexPriceUpdate(index_price) => self.handle_index_price(index_price),
727 Data::InstrumentClose(close) => self.handle_instrument_close(close),
728 }
729 }
730
731 pub fn response(&self, resp: DataResponse) {
733 log::debug!("{RECV}{RES} {resp:?}");
734
735 match &resp {
736 DataResponse::Instrument(resp) => {
737 self.handle_instrument_response(resp.data.clone());
738 }
739 DataResponse::Instruments(resp) => {
740 self.handle_instruments(&resp.data);
741 }
742 DataResponse::Quotes(resp) => self.handle_quotes(&resp.data),
743 DataResponse::Trades(resp) => self.handle_trades(&resp.data),
744 DataResponse::Bars(resp) => self.handle_bars(&resp.data),
745 DataResponse::Book(resp) => self.handle_book_response(&resp.data),
746 _ => todo!("Handle other response types"),
747 }
748
749 msgbus::send_response(resp.correlation_id(), &resp);
750 }
751
752 fn handle_instrument(&mut self, instrument: InstrumentAny) {
755 if let Err(e) = self
756 .cache
757 .as_ref()
758 .borrow_mut()
759 .add_instrument(instrument.clone())
760 {
761 log_error_on_cache_insert(&e);
762 }
763
764 let topic = switchboard::get_instrument_topic(instrument.id());
765 msgbus::publish(topic, &instrument as &dyn Any);
766 }
767
768 fn handle_delta(&mut self, delta: OrderBookDelta) {
769 let deltas = if self.config.buffer_deltas {
770 if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&delta.instrument_id) {
771 buffered_deltas.deltas.push(delta);
772 buffered_deltas.flags = delta.flags;
773 buffered_deltas.sequence = delta.sequence;
774 buffered_deltas.ts_event = delta.ts_event;
775 buffered_deltas.ts_init = delta.ts_init;
776 } else {
777 let buffered_deltas = OrderBookDeltas::new(delta.instrument_id, vec![delta]);
778 self.buffered_deltas_map
779 .insert(delta.instrument_id, buffered_deltas);
780 }
781
782 if !RecordFlag::F_LAST.matches(delta.flags) {
783 return; }
785
786 self.buffered_deltas_map
788 .remove(&delta.instrument_id)
789 .unwrap()
790 } else {
791 OrderBookDeltas::new(delta.instrument_id, vec![delta])
792 };
793
794 let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
795 msgbus::publish(topic, &deltas as &dyn Any);
796 }
797
798 fn handle_deltas(&mut self, deltas: OrderBookDeltas) {
799 let deltas = if self.config.buffer_deltas {
800 let mut is_last_delta = false;
801 for delta in &deltas.deltas {
802 if RecordFlag::F_LAST.matches(delta.flags) {
803 is_last_delta = true;
804 break;
805 }
806 }
807
808 let instrument_id = deltas.instrument_id;
809
810 if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&instrument_id) {
811 buffered_deltas.deltas.extend(deltas.deltas);
812
813 if let Some(last_delta) = buffered_deltas.deltas.last() {
814 buffered_deltas.flags = last_delta.flags;
815 buffered_deltas.sequence = last_delta.sequence;
816 buffered_deltas.ts_event = last_delta.ts_event;
817 buffered_deltas.ts_init = last_delta.ts_init;
818 }
819 } else {
820 self.buffered_deltas_map.insert(instrument_id, deltas);
821 }
822
823 if !is_last_delta {
824 return;
825 }
826
827 self.buffered_deltas_map.remove(&instrument_id).unwrap()
829 } else {
830 deltas
831 };
832
833 let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
834 msgbus::publish(topic, &deltas as &dyn Any);
835 }
836
837 fn handle_depth10(&mut self, depth: OrderBookDepth10) {
838 let topic = switchboard::get_book_depth10_topic(depth.instrument_id);
839 msgbus::publish(topic, &depth as &dyn Any);
840 }
841
842 fn handle_quote(&mut self, quote: QuoteTick) {
843 if let Err(e) = self.cache.as_ref().borrow_mut().add_quote(quote) {
844 log_error_on_cache_insert(&e);
845 }
846
847 let topic = switchboard::get_quotes_topic(quote.instrument_id);
850 msgbus::publish(topic, "e as &dyn Any);
851 }
852
853 fn handle_trade(&mut self, trade: TradeTick) {
854 if let Err(e) = self.cache.as_ref().borrow_mut().add_trade(trade) {
855 log_error_on_cache_insert(&e);
856 }
857
858 let topic = switchboard::get_trades_topic(trade.instrument_id);
861 msgbus::publish(topic, &trade as &dyn Any);
862 }
863
864 fn handle_bar(&mut self, bar: Bar) {
865 if self.config.validate_data_sequence
867 && let Some(last_bar) = self.cache.as_ref().borrow().bar(&bar.bar_type)
868 {
869 if bar.ts_event < last_bar.ts_event {
870 log::warn!(
871 "Bar {bar} was prior to last bar `ts_event` {}",
872 last_bar.ts_event
873 );
874 return; }
876 if bar.ts_init < last_bar.ts_init {
877 log::warn!(
878 "Bar {bar} was prior to last bar `ts_init` {}",
879 last_bar.ts_init
880 );
881 return; }
883 }
885
886 if let Err(e) = self.cache.as_ref().borrow_mut().add_bar(bar) {
887 log_error_on_cache_insert(&e);
888 }
889
890 let topic = switchboard::get_bars_topic(bar.bar_type);
891 msgbus::publish(topic, &bar as &dyn Any);
892 }
893
894 fn handle_mark_price(&mut self, mark_price: MarkPriceUpdate) {
895 if let Err(e) = self.cache.as_ref().borrow_mut().add_mark_price(mark_price) {
896 log_error_on_cache_insert(&e);
897 }
898
899 let topic = switchboard::get_mark_price_topic(mark_price.instrument_id);
900 msgbus::publish(topic, &mark_price as &dyn Any);
901 }
902
903 fn handle_index_price(&mut self, index_price: IndexPriceUpdate) {
904 if let Err(e) = self
905 .cache
906 .as_ref()
907 .borrow_mut()
908 .add_index_price(index_price)
909 {
910 log_error_on_cache_insert(&e);
911 }
912
913 let topic = switchboard::get_index_price_topic(index_price.instrument_id);
914 msgbus::publish(topic, &index_price as &dyn Any);
915 }
916
917 pub fn handle_funding_rate(&mut self, funding_rate: FundingRateUpdate) {
919 if let Err(e) = self
920 .cache
921 .as_ref()
922 .borrow_mut()
923 .add_funding_rate(funding_rate)
924 {
925 log_error_on_cache_insert(&e);
926 }
927
928 let topic = switchboard::get_funding_rate_topic(funding_rate.instrument_id);
929 msgbus::publish(topic, &funding_rate as &dyn Any);
930 }
931
932 fn handle_instrument_close(&mut self, close: InstrumentClose) {
933 let topic = switchboard::get_instrument_close_topic(close.instrument_id);
934 msgbus::publish(topic, &close as &dyn Any);
935 }
936
937 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
940 if cmd.instrument_id.is_synthetic() {
941 anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
942 }
943
944 self.book_deltas_subs.insert(cmd.instrument_id);
945 self.setup_book_updater(&cmd.instrument_id, cmd.book_type, true, cmd.managed)?;
946
947 Ok(())
948 }
949
950 fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
951 if cmd.instrument_id.is_synthetic() {
952 anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDepth10` data");
953 }
954
955 self.setup_book_updater(&cmd.instrument_id, cmd.book_type, false, cmd.managed)?;
956
957 Ok(())
958 }
959
960 fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
961 if cmd.instrument_id.is_synthetic() {
962 anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
963 }
964
965 let first_for_interval = match self.book_intervals.entry(cmd.interval_ms) {
967 Entry::Vacant(e) => {
968 let mut set = AHashSet::new();
969 set.insert(cmd.instrument_id);
970 e.insert(set);
971 true
972 }
973 Entry::Occupied(mut e) => {
974 e.get_mut().insert(cmd.instrument_id);
975 false
976 }
977 };
978
979 if first_for_interval {
980 let interval_ns = millis_to_nanos_unchecked(cmd.interval_ms.get() as f64);
982 let topic = switchboard::get_book_snapshots_topic(cmd.instrument_id, cmd.interval_ms);
983
984 let snap_info = BookSnapshotInfo {
985 instrument_id: cmd.instrument_id,
986 venue: cmd.instrument_id.venue,
987 is_composite: cmd.instrument_id.symbol.is_composite(),
988 root: Ustr::from(cmd.instrument_id.symbol.root()),
989 topic,
990 interval_ms: cmd.interval_ms,
991 };
992
993 let now_ns = self.clock.borrow().timestamp_ns().as_u64();
995 let start_time_ns = now_ns - (now_ns % interval_ns) + interval_ns;
996
997 let snapshotter = Rc::new(BookSnapshotter::new(snap_info, self.cache.clone()));
998 self.book_snapshotters
999 .insert(cmd.instrument_id, snapshotter.clone());
1000 let timer_name = snapshotter.timer_name;
1001
1002 let callback_fn: Rc<dyn Fn(TimeEvent)> =
1003 Rc::new(move |event| snapshotter.snapshot(event));
1004 let callback = TimeEventCallback::from(callback_fn);
1005
1006 self.clock
1007 .borrow_mut()
1008 .set_timer_ns(
1009 &timer_name,
1010 interval_ns,
1011 Some(start_time_ns.into()),
1012 None,
1013 Some(callback),
1014 None,
1015 None,
1016 )
1017 .expect(FAILED);
1018 }
1019
1020 if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
1022 self.setup_book_updater(&cmd.instrument_id, cmd.book_type, false, true)?;
1023 }
1024
1025 if let Some(client_id) = cmd.client_id.as_ref()
1026 && self.external_clients.contains(client_id)
1027 {
1028 if self.config.debug {
1029 log::debug!("Skipping subscribe command for external client {client_id}: {cmd:?}",);
1030 }
1031 return Ok(());
1032 }
1033
1034 log::debug!(
1035 "Forwarding BookSnapshots as BookDeltas for {}, client_id={:?}, venue={:?}",
1036 cmd.instrument_id,
1037 cmd.client_id,
1038 cmd.venue,
1039 );
1040
1041 if let Some(client) = self.get_client(cmd.client_id.as_ref(), cmd.venue.as_ref()) {
1042 let deltas_cmd = SubscribeBookDeltas::new(
1043 cmd.instrument_id,
1044 cmd.book_type,
1045 cmd.client_id,
1046 cmd.venue,
1047 UUID4::new(),
1048 cmd.ts_init,
1049 cmd.depth,
1050 true, Some(cmd.command_id),
1052 cmd.params.clone(),
1053 );
1054 log::debug!(
1055 "Calling client.execute_subscribe for BookDeltas: {}",
1056 cmd.instrument_id
1057 );
1058 client.execute_subscribe(&SubscribeCommand::BookDeltas(deltas_cmd));
1059 } else {
1060 log::error!(
1061 "Cannot handle command: no client found for client_id={:?}, venue={:?}",
1062 cmd.client_id,
1063 cmd.venue,
1064 );
1065 }
1066
1067 Ok(())
1068 }
1069
1070 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
1071 match cmd.bar_type.aggregation_source() {
1072 AggregationSource::Internal => {
1073 if !self.bar_aggregators.contains_key(&cmd.bar_type.standard()) {
1074 self.start_bar_aggregator(cmd.bar_type)?;
1075 }
1076 }
1077 AggregationSource::External => {
1078 if cmd.bar_type.instrument_id().is_synthetic() {
1079 anyhow::bail!(
1080 "Cannot subscribe for externally aggregated synthetic instrument bar data"
1081 );
1082 }
1083 }
1084 }
1085
1086 Ok(())
1087 }
1088
1089 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
1090 if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
1091 log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
1092 return Ok(());
1093 }
1094
1095 self.book_deltas_subs.remove(&cmd.instrument_id);
1096
1097 let topics = vec![
1098 switchboard::get_book_deltas_topic(cmd.instrument_id),
1099 switchboard::get_book_depth10_topic(cmd.instrument_id),
1100 ];
1102
1103 self.maintain_book_updater(&cmd.instrument_id, &topics);
1104 self.maintain_book_snapshotter(&cmd.instrument_id);
1105
1106 Ok(())
1107 }
1108
1109 fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
1110 if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
1111 log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
1112 return Ok(());
1113 }
1114
1115 let topics = vec![
1116 switchboard::get_book_deltas_topic(cmd.instrument_id),
1117 switchboard::get_book_depth10_topic(cmd.instrument_id),
1118 ];
1120
1121 self.maintain_book_updater(&cmd.instrument_id, &topics);
1122 self.maintain_book_snapshotter(&cmd.instrument_id);
1123
1124 Ok(())
1125 }
1126
1127 fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
1128 let is_subscribed = self
1129 .book_intervals
1130 .values()
1131 .any(|set| set.contains(&cmd.instrument_id));
1132
1133 if !is_subscribed {
1134 log::warn!("Cannot unsubscribe from `OrderBook` snapshots: not subscribed");
1135 return Ok(());
1136 }
1137
1138 let mut to_remove = Vec::new();
1140 for (interval, set) in &mut self.book_intervals {
1141 if set.remove(&cmd.instrument_id) && set.is_empty() {
1142 to_remove.push(*interval);
1143 }
1144 }
1145
1146 for interval in to_remove {
1147 self.book_intervals.remove(&interval);
1148 }
1149
1150 let topics = vec![
1151 switchboard::get_book_deltas_topic(cmd.instrument_id),
1152 switchboard::get_book_depth10_topic(cmd.instrument_id),
1153 ];
1154
1155 self.maintain_book_updater(&cmd.instrument_id, &topics);
1156 self.maintain_book_snapshotter(&cmd.instrument_id);
1157
1158 let still_in_intervals = self
1159 .book_intervals
1160 .values()
1161 .any(|set| set.contains(&cmd.instrument_id));
1162
1163 if !still_in_intervals && !self.book_deltas_subs.contains(&cmd.instrument_id) {
1164 if let Some(client_id) = cmd.client_id.as_ref()
1165 && self.external_clients.contains(client_id)
1166 {
1167 return Ok(());
1168 }
1169
1170 if let Some(client) = self.get_client(cmd.client_id.as_ref(), cmd.venue.as_ref()) {
1171 let deltas_cmd = UnsubscribeBookDeltas::new(
1172 cmd.instrument_id,
1173 cmd.client_id,
1174 cmd.venue,
1175 UUID4::new(),
1176 cmd.ts_init,
1177 Some(cmd.command_id),
1178 cmd.params.clone(),
1179 );
1180 client.execute_unsubscribe(&UnsubscribeCommand::BookDeltas(deltas_cmd));
1181 }
1182 }
1183
1184 Ok(())
1185 }
1186
1187 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
1189 let bar_type = cmd.bar_type;
1191 if self.bar_aggregators.contains_key(&bar_type.standard()) {
1192 if let Err(e) = self.stop_bar_aggregator(bar_type) {
1193 log::error!("Error stopping bar aggregator for {bar_type}: {e}");
1194 }
1195 self.bar_aggregators.remove(&bar_type.standard());
1196 log::debug!("Removed bar aggregator for {bar_type}");
1197 }
1198 Ok(())
1199 }
1200
1201 fn maintain_book_updater(&mut self, instrument_id: &InstrumentId, topics: &[MStr<Topic>]) {
1202 if let Some(updater) = self.book_updaters.get(instrument_id) {
1203 let handler = ShareableMessageHandler(updater.clone());
1204
1205 for topic in topics {
1207 if msgbus::subscriptions_count(topic.as_str()) == 1
1208 && msgbus::is_subscribed(topic.as_str(), handler.clone())
1209 {
1210 log::debug!("Unsubscribing BookUpdater from {topic}");
1211 msgbus::unsubscribe_topic(*topic, handler.clone());
1212 }
1213 }
1214
1215 let still_subscribed = topics
1217 .iter()
1218 .any(|topic| msgbus::is_subscribed(topic.as_str(), handler.clone()));
1219 if !still_subscribed {
1220 self.book_updaters.remove(instrument_id);
1221 log::debug!("Removed BookUpdater for instrument ID {instrument_id}");
1222 }
1223 }
1224 }
1225
1226 fn maintain_book_snapshotter(&mut self, instrument_id: &InstrumentId) {
1227 if let Some(snapshotter) = self.book_snapshotters.get(instrument_id) {
1228 let topic = switchboard::get_book_snapshots_topic(
1229 *instrument_id,
1230 snapshotter.snap_info.interval_ms,
1231 );
1232
1233 if msgbus::subscriptions_count(topic.as_str()) == 0 {
1235 let timer_name = snapshotter.timer_name;
1236 self.book_snapshotters.remove(instrument_id);
1237 let mut clock = self.clock.borrow_mut();
1238 if clock.timer_exists(&timer_name) {
1239 clock.cancel_timer(&timer_name);
1240 }
1241 log::debug!("Removed BookSnapshotter for instrument ID {instrument_id}");
1242 }
1243 }
1244 }
1245
1246 fn handle_instrument_response(&self, instrument: InstrumentAny) {
1249 let mut cache = self.cache.as_ref().borrow_mut();
1250 if let Err(e) = cache.add_instrument(instrument) {
1251 log_error_on_cache_insert(&e);
1252 }
1253 }
1254
1255 fn handle_instruments(&self, instruments: &[InstrumentAny]) {
1256 let mut cache = self.cache.as_ref().borrow_mut();
1258 for instrument in instruments {
1259 if let Err(e) = cache.add_instrument(instrument.clone()) {
1260 log_error_on_cache_insert(&e);
1261 }
1262 }
1263 }
1264
1265 fn handle_quotes(&self, quotes: &[QuoteTick]) {
1266 if let Err(e) = self.cache.as_ref().borrow_mut().add_quotes(quotes) {
1267 log_error_on_cache_insert(&e);
1268 }
1269 }
1270
1271 fn handle_trades(&self, trades: &[TradeTick]) {
1272 if let Err(e) = self.cache.as_ref().borrow_mut().add_trades(trades) {
1273 log_error_on_cache_insert(&e);
1274 }
1275 }
1276
1277 fn handle_bars(&self, bars: &[Bar]) {
1278 if let Err(e) = self.cache.as_ref().borrow_mut().add_bars(bars) {
1279 log_error_on_cache_insert(&e);
1280 }
1281 }
1282
1283 fn handle_book_response(&self, book: &OrderBook) {
1284 log::debug!("Adding order book {} to cache", book.instrument_id);
1285 if let Err(e) = self
1286 .cache
1287 .as_ref()
1288 .borrow_mut()
1289 .add_order_book(book.clone())
1290 {
1291 log_error_on_cache_insert(&e);
1292 }
1293 }
1294
1295 #[allow(clippy::too_many_arguments)]
1298 fn setup_book_updater(
1299 &mut self,
1300 instrument_id: &InstrumentId,
1301 book_type: BookType,
1302 only_deltas: bool,
1303 managed: bool,
1304 ) -> anyhow::Result<()> {
1305 let mut cache = self.cache.borrow_mut();
1306 if managed && !cache.has_order_book(instrument_id) {
1307 let book = OrderBook::new(*instrument_id, book_type);
1308 log::debug!("Created {book}");
1309 cache.add_order_book(book)?;
1310 }
1311
1312 let updater = Rc::new(BookUpdater::new(instrument_id, self.cache.clone()));
1314 self.book_updaters.insert(*instrument_id, updater.clone());
1315
1316 let handler = ShareableMessageHandler(updater);
1317
1318 let topic = switchboard::get_book_deltas_topic(*instrument_id);
1319 if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1320 msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1321 }
1322
1323 let topic = switchboard::get_book_depth10_topic(*instrument_id);
1324 if !only_deltas && !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1325 msgbus::subscribe(topic.into(), handler, Some(self.msgbus_priority));
1326 }
1327
1328 Ok(())
1329 }
1330
1331 fn create_bar_aggregator(
1332 &mut self,
1333 instrument: &InstrumentAny,
1334 bar_type: BarType,
1335 ) -> Box<dyn BarAggregator> {
1336 let cache = self.cache.clone();
1337
1338 let handler = move |bar: Bar| {
1339 if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
1340 log_error_on_cache_insert(&e);
1341 }
1342
1343 let topic = switchboard::get_bars_topic(bar.bar_type);
1344 msgbus::publish(topic, &bar as &dyn Any);
1345 };
1346
1347 let clock = self.clock.clone();
1348 let config = self.config.clone();
1349
1350 let price_precision = instrument.price_precision();
1351 let size_precision = instrument.size_precision();
1352
1353 if bar_type.spec().is_time_aggregated() {
1354 let time_bars_origin_offset = config
1356 .time_bars_origins
1357 .get(&bar_type.spec().aggregation)
1358 .map(|duration| chrono::TimeDelta::from_std(*duration).unwrap_or_default());
1359
1360 Box::new(TimeBarAggregator::new(
1361 bar_type,
1362 price_precision,
1363 size_precision,
1364 clock,
1365 handler,
1366 config.time_bars_build_with_no_updates,
1367 config.time_bars_timestamp_on_close,
1368 config.time_bars_interval_type,
1369 time_bars_origin_offset,
1370 20, false, ))
1373 } else {
1374 match bar_type.spec().aggregation {
1375 BarAggregation::Tick => Box::new(TickBarAggregator::new(
1376 bar_type,
1377 price_precision,
1378 size_precision,
1379 handler,
1380 )) as Box<dyn BarAggregator>,
1381 BarAggregation::TickImbalance => Box::new(TickImbalanceBarAggregator::new(
1382 bar_type,
1383 price_precision,
1384 size_precision,
1385 handler,
1386 )) as Box<dyn BarAggregator>,
1387 BarAggregation::TickRuns => Box::new(TickRunsBarAggregator::new(
1388 bar_type,
1389 price_precision,
1390 size_precision,
1391 handler,
1392 )) as Box<dyn BarAggregator>,
1393 BarAggregation::Volume => Box::new(VolumeBarAggregator::new(
1394 bar_type,
1395 price_precision,
1396 size_precision,
1397 handler,
1398 )) as Box<dyn BarAggregator>,
1399 BarAggregation::VolumeImbalance => Box::new(VolumeImbalanceBarAggregator::new(
1400 bar_type,
1401 price_precision,
1402 size_precision,
1403 handler,
1404 )) as Box<dyn BarAggregator>,
1405 BarAggregation::VolumeRuns => Box::new(VolumeRunsBarAggregator::new(
1406 bar_type,
1407 price_precision,
1408 size_precision,
1409 handler,
1410 )) as Box<dyn BarAggregator>,
1411 BarAggregation::Value => Box::new(ValueBarAggregator::new(
1412 bar_type,
1413 price_precision,
1414 size_precision,
1415 handler,
1416 )) as Box<dyn BarAggregator>,
1417 BarAggregation::ValueImbalance => Box::new(ValueImbalanceBarAggregator::new(
1418 bar_type,
1419 price_precision,
1420 size_precision,
1421 handler,
1422 )) as Box<dyn BarAggregator>,
1423 BarAggregation::ValueRuns => Box::new(ValueRunsBarAggregator::new(
1424 bar_type,
1425 price_precision,
1426 size_precision,
1427 handler,
1428 )) as Box<dyn BarAggregator>,
1429 BarAggregation::Renko => Box::new(RenkoBarAggregator::new(
1430 bar_type,
1431 price_precision,
1432 size_precision,
1433 instrument.price_increment(),
1434 handler,
1435 )) as Box<dyn BarAggregator>,
1436 _ => panic!(
1437 "BarAggregation {:?} is not currently implemented. Supported aggregations: MILLISECOND, SECOND, MINUTE, HOUR, DAY, WEEK, MONTH, YEAR, TICK, TICK_IMBALANCE, TICK_RUNS, VOLUME, VOLUME_IMBALANCE, VOLUME_RUNS, VALUE, VALUE_IMBALANCE, VALUE_RUNS, RENKO",
1438 bar_type.spec().aggregation
1439 ),
1440 }
1441 }
1442 }
1443
1444 fn start_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
1445 let instrument = {
1447 let cache = self.cache.borrow();
1448 cache
1449 .instrument(&bar_type.instrument_id())
1450 .ok_or_else(|| {
1451 anyhow::anyhow!(
1452 "Cannot start bar aggregation: no instrument found for {}",
1453 bar_type.instrument_id(),
1454 )
1455 })?
1456 .clone()
1457 };
1458
1459 let bar_key = bar_type.standard();
1461
1462 let aggregator = if let Some(rc) = self.bar_aggregators.get(&bar_key) {
1464 rc.clone()
1465 } else {
1466 let agg = self.create_bar_aggregator(&instrument, bar_type);
1467 let rc = Rc::new(RefCell::new(agg));
1468 self.bar_aggregators.insert(bar_key, rc.clone());
1469 rc
1470 };
1471
1472 let mut handlers = Vec::new();
1474
1475 if bar_type.is_composite() {
1476 let topic = switchboard::get_bars_topic(bar_type.composite());
1477 let handler =
1478 ShareableMessageHandler(Rc::new(BarBarHandler::new(aggregator.clone(), bar_key)));
1479
1480 if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1481 msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1482 }
1483
1484 handlers.push((topic, handler));
1485 } else if bar_type.spec().price_type == PriceType::Last {
1486 let topic = switchboard::get_trades_topic(bar_type.instrument_id());
1487 let handler =
1488 ShareableMessageHandler(Rc::new(BarTradeHandler::new(aggregator.clone(), bar_key)));
1489
1490 if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1491 msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1492 }
1493
1494 handlers.push((topic, handler));
1495 } else {
1496 let topic = switchboard::get_quotes_topic(bar_type.instrument_id());
1497 let handler =
1498 ShareableMessageHandler(Rc::new(BarQuoteHandler::new(aggregator.clone(), bar_key)));
1499
1500 if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1501 msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1502 }
1503
1504 handlers.push((topic, handler));
1505 }
1506
1507 self.bar_aggregator_handlers.insert(bar_key, handlers);
1508
1509 self.setup_bar_aggregator(bar_type, false)?;
1511
1512 aggregator.borrow_mut().set_is_running(true);
1513
1514 Ok(())
1515 }
1516
1517 fn setup_bar_aggregator(&mut self, bar_type: BarType, historical: bool) -> anyhow::Result<()> {
1521 let bar_key = bar_type.standard();
1522 let aggregator = self.bar_aggregators.get(&bar_key).ok_or_else(|| {
1523 anyhow::anyhow!("Cannot setup bar aggregator: no aggregator found for {bar_type}")
1524 })?;
1525
1526 let handler: Box<dyn FnMut(Bar)> = if historical {
1528 let cache = self.cache.clone();
1530 Box::new(move |bar: Bar| {
1531 if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
1532 log_error_on_cache_insert(&e);
1533 }
1534 })
1536 } else {
1537 let cache = self.cache.clone();
1539 Box::new(move |bar: Bar| {
1540 if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
1541 log_error_on_cache_insert(&e);
1542 }
1543 let topic = switchboard::get_bars_topic(bar.bar_type);
1544 msgbus::publish(topic, &bar as &dyn Any);
1545 })
1546 };
1547
1548 aggregator
1549 .borrow_mut()
1550 .set_historical_mode(historical, handler);
1551
1552 if bar_type.spec().is_time_aggregated() {
1554 use nautilus_common::clock::TestClock;
1555
1556 if historical {
1557 let test_clock = Rc::new(RefCell::new(TestClock::new()));
1559 aggregator.borrow_mut().set_clock(test_clock);
1560 let aggregator_weak = Rc::downgrade(aggregator);
1563 aggregator.borrow_mut().set_aggregator_weak(aggregator_weak);
1564 } else {
1565 aggregator.borrow_mut().set_clock(self.clock.clone());
1566 aggregator
1567 .borrow_mut()
1568 .start_timer(Some(aggregator.clone()));
1569 }
1570 }
1571
1572 Ok(())
1573 }
1574
1575 fn stop_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
1576 let aggregator = self
1577 .bar_aggregators
1578 .remove(&bar_type.standard())
1579 .ok_or_else(|| {
1580 anyhow::anyhow!("Cannot stop bar aggregator: no aggregator to stop for {bar_type}")
1581 })?;
1582
1583 aggregator.borrow_mut().stop();
1584
1585 let bar_key = bar_type.standard();
1587 if let Some(subs) = self.bar_aggregator_handlers.remove(&bar_key) {
1588 for (topic, handler) in subs {
1589 if msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1590 msgbus::unsubscribe_topic(topic, handler);
1591 }
1592 }
1593 }
1594
1595 Ok(())
1596 }
1597}
1598
1599#[inline(always)]
1600fn log_error_on_cache_insert<T: Display>(e: &T) {
1601 log::error!("Error on cache insert: {e}");
1602}