1pub mod book;
32pub mod config;
33mod handlers;
34
35#[cfg(feature = "defi")]
36pub mod pool;
37
38use std::{
39 any::{Any, type_name},
40 cell::{Ref, RefCell},
41 collections::hash_map::Entry,
42 fmt::{Debug, Display},
43 num::NonZeroUsize,
44 rc::Rc,
45};
46
47use ahash::{AHashMap, AHashSet};
48use book::{BookSnapshotInfo, BookSnapshotter, BookUpdater};
49use config::DataEngineConfig;
50use futures::future::join_all;
51use handlers::{BarBarHandler, BarQuoteHandler, BarTradeHandler};
52use indexmap::IndexMap;
53use nautilus_common::{
54 cache::Cache,
55 clock::Clock,
56 logging::{RECV, RES},
57 messages::data::{
58 DataCommand, DataResponse, RequestCommand, SubscribeBars, SubscribeBookDeltas,
59 SubscribeBookDepth10, SubscribeBookSnapshots, SubscribeCommand, UnsubscribeBars,
60 UnsubscribeBookDeltas, UnsubscribeBookDepth10, UnsubscribeBookSnapshots,
61 UnsubscribeCommand,
62 },
63 msgbus::{
64 self, MStr, ShareableMessageHandler, Topic, TypedHandler, TypedIntoHandler,
65 switchboard::{self, MessagingSwitchboard},
66 },
67 runner::get_data_cmd_sender,
68 timer::{TimeEvent, TimeEventCallback},
69};
70use nautilus_core::{
71 UUID4, WeakCell,
72 correctness::{
73 FAILED, check_key_in_map, check_key_not_in_map, check_predicate_false, check_predicate_true,
74 },
75 datetime::millis_to_nanos_unchecked,
76};
77#[cfg(feature = "defi")]
78use nautilus_model::defi::DefiData;
79use nautilus_model::{
80 data::{
81 Bar, BarType, Data, DataType, FundingRateUpdate, IndexPriceUpdate, InstrumentClose,
82 MarkPriceUpdate, OrderBookDelta, OrderBookDeltas, OrderBookDepth10, QuoteTick, TradeTick,
83 },
84 enums::{AggregationSource, BarAggregation, BookType, PriceType, RecordFlag},
85 identifiers::{ClientId, InstrumentId, Venue},
86 instruments::{Instrument, InstrumentAny, SyntheticInstrument},
87 orderbook::OrderBook,
88};
89#[cfg(feature = "streaming")]
90use nautilus_persistence::backend::catalog::ParquetDataCatalog;
91use ustr::Ustr;
92
93#[cfg(feature = "defi")]
94#[allow(unused_imports)] use crate::defi::engine as _;
96#[cfg(feature = "defi")]
97use crate::engine::pool::PoolUpdater;
98use crate::{
99 aggregation::{
100 BarAggregator, RenkoBarAggregator, TickBarAggregator, TickImbalanceBarAggregator,
101 TickRunsBarAggregator, TimeBarAggregator, ValueBarAggregator, ValueImbalanceBarAggregator,
102 ValueRunsBarAggregator, VolumeBarAggregator, VolumeImbalanceBarAggregator,
103 VolumeRunsBarAggregator,
104 },
105 client::DataClientAdapter,
106};
107
108#[derive(Clone)]
113pub enum BarAggregatorSubscription {
114 Bar {
115 topic: MStr<Topic>,
116 handler: TypedHandler<Bar>,
117 },
118 Trade {
119 topic: MStr<Topic>,
120 handler: TypedHandler<TradeTick>,
121 },
122 Quote {
123 topic: MStr<Topic>,
124 handler: TypedHandler<QuoteTick>,
125 },
126}
127
128impl Debug for BarAggregatorSubscription {
129 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
130 match self {
131 Self::Bar { topic, handler } => f
132 .debug_struct(stringify!(Bar))
133 .field("topic", topic)
134 .field("handler_id", &handler.id())
135 .finish(),
136 Self::Trade { topic, handler } => f
137 .debug_struct(stringify!(Trade))
138 .field("topic", topic)
139 .field("handler_id", &handler.id())
140 .finish(),
141 Self::Quote { topic, handler } => f
142 .debug_struct(stringify!(Quote))
143 .field("topic", topic)
144 .field("handler_id", &handler.id())
145 .finish(),
146 }
147 }
148}
149
150#[derive(Debug)]
152pub struct DataEngine {
153 pub(crate) clock: Rc<RefCell<dyn Clock>>,
154 pub(crate) cache: Rc<RefCell<Cache>>,
155 pub(crate) external_clients: AHashSet<ClientId>,
156 clients: IndexMap<ClientId, DataClientAdapter>,
157 default_client: Option<DataClientAdapter>,
158 #[cfg(feature = "streaming")]
159 catalogs: AHashMap<Ustr, ParquetDataCatalog>,
160 routing_map: IndexMap<Venue, ClientId>,
161 book_intervals: AHashMap<NonZeroUsize, AHashSet<InstrumentId>>,
162 book_deltas_subs: AHashSet<InstrumentId>,
163 book_depth10_subs: AHashSet<InstrumentId>,
164 book_updaters: AHashMap<InstrumentId, Rc<BookUpdater>>,
165 book_snapshotters: AHashMap<InstrumentId, Rc<BookSnapshotter>>,
166 bar_aggregators: AHashMap<BarType, Rc<RefCell<Box<dyn BarAggregator>>>>,
167 bar_aggregator_handlers: AHashMap<BarType, Vec<BarAggregatorSubscription>>,
168 _synthetic_quote_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
169 _synthetic_trade_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
170 buffered_deltas_map: AHashMap<InstrumentId, OrderBookDeltas>,
171 pub(crate) msgbus_priority: u8,
172 pub(crate) config: DataEngineConfig,
173 #[cfg(feature = "defi")]
174 pub(crate) pool_updaters: AHashMap<InstrumentId, Rc<PoolUpdater>>,
175 #[cfg(feature = "defi")]
176 pub(crate) pool_updaters_pending: AHashSet<InstrumentId>,
177 #[cfg(feature = "defi")]
178 pub(crate) pool_snapshot_pending: AHashSet<InstrumentId>,
179 #[cfg(feature = "defi")]
180 pub(crate) pool_event_buffers: AHashMap<InstrumentId, Vec<DefiData>>,
181}
182
183impl DataEngine {
184 #[must_use]
186 pub fn new(
187 clock: Rc<RefCell<dyn Clock>>,
188 cache: Rc<RefCell<Cache>>,
189 config: Option<DataEngineConfig>,
190 ) -> Self {
191 let config = config.unwrap_or_default();
192
193 let external_clients: AHashSet<ClientId> = config
194 .external_clients
195 .clone()
196 .unwrap_or_default()
197 .into_iter()
198 .collect();
199
200 Self {
201 clock,
202 cache,
203 external_clients,
204 clients: IndexMap::new(),
205 default_client: None,
206 #[cfg(feature = "streaming")]
207 catalogs: AHashMap::new(),
208 routing_map: IndexMap::new(),
209 book_intervals: AHashMap::new(),
210 book_deltas_subs: AHashSet::new(),
211 book_depth10_subs: AHashSet::new(),
212 book_updaters: AHashMap::new(),
213 book_snapshotters: AHashMap::new(),
214 bar_aggregators: AHashMap::new(),
215 bar_aggregator_handlers: AHashMap::new(),
216 _synthetic_quote_feeds: AHashMap::new(),
217 _synthetic_trade_feeds: AHashMap::new(),
218 buffered_deltas_map: AHashMap::new(),
219 msgbus_priority: 10, config,
221 #[cfg(feature = "defi")]
222 pool_updaters: AHashMap::new(),
223 #[cfg(feature = "defi")]
224 pool_updaters_pending: AHashSet::new(),
225 #[cfg(feature = "defi")]
226 pool_snapshot_pending: AHashSet::new(),
227 #[cfg(feature = "defi")]
228 pool_event_buffers: AHashMap::new(),
229 }
230 }
231
232 pub fn register_msgbus_handlers(engine: Rc<RefCell<Self>>) {
234 let weak = WeakCell::from(Rc::downgrade(&engine));
235
236 let weak1 = weak.clone();
237 msgbus::register_data_command_endpoint(
238 MessagingSwitchboard::data_engine_execute(),
239 TypedIntoHandler::from(move |cmd: DataCommand| {
240 if let Some(rc) = weak1.upgrade() {
241 rc.borrow_mut().execute(cmd);
242 }
243 }),
244 );
245
246 msgbus::register_data_command_endpoint(
247 MessagingSwitchboard::data_engine_queue_execute(),
248 TypedIntoHandler::from(move |cmd: DataCommand| {
249 get_data_cmd_sender().clone().execute(cmd);
250 }),
251 );
252
253 let weak2 = weak.clone();
255 msgbus::register_any(
256 MessagingSwitchboard::data_engine_process(),
257 ShareableMessageHandler::from_any(move |data: &dyn Any| {
258 if let Some(rc) = weak2.upgrade() {
259 rc.borrow_mut().process(data);
260 }
261 }),
262 );
263
264 let weak3 = weak.clone();
266 msgbus::register_data_endpoint(
267 MessagingSwitchboard::data_engine_process_data(),
268 TypedIntoHandler::from(move |data: Data| {
269 if let Some(rc) = weak3.upgrade() {
270 rc.borrow_mut().process_data(data);
271 }
272 }),
273 );
274
275 #[cfg(feature = "defi")]
277 {
278 let weak4 = weak.clone();
279 msgbus::register_defi_data_endpoint(
280 MessagingSwitchboard::data_engine_process_defi_data(),
281 TypedIntoHandler::from(move |data: DefiData| {
282 if let Some(rc) = weak4.upgrade() {
283 rc.borrow_mut().process_defi_data(data);
284 }
285 }),
286 );
287 }
288
289 let weak5 = weak;
290 msgbus::register_data_response_endpoint(
291 MessagingSwitchboard::data_engine_response(),
292 TypedIntoHandler::from(move |resp: DataResponse| {
293 if let Some(rc) = weak5.upgrade() {
294 rc.borrow_mut().response(resp);
295 }
296 }),
297 );
298 }
299
300 #[must_use]
302 pub fn get_clock(&self) -> Ref<'_, dyn Clock> {
303 self.clock.borrow()
304 }
305
306 #[must_use]
308 pub fn get_cache(&self) -> Ref<'_, Cache> {
309 self.cache.borrow()
310 }
311
312 #[must_use]
314 pub fn cache_rc(&self) -> Rc<RefCell<Cache>> {
315 Rc::clone(&self.cache)
316 }
317
318 #[cfg(feature = "streaming")]
324 pub fn register_catalog(&mut self, catalog: ParquetDataCatalog, name: Option<String>) {
325 let name = Ustr::from(name.as_deref().unwrap_or("catalog_0"));
326
327 check_key_not_in_map(&name, &self.catalogs, "name", "catalogs").expect(FAILED);
328
329 self.catalogs.insert(name, catalog);
330 log::info!("Registered catalog <{name}>");
331 }
332
333 pub fn register_client(&mut self, client: DataClientAdapter, routing: Option<Venue>) {
340 let client_id = client.client_id();
341
342 if let Some(default_client) = &self.default_client {
343 check_predicate_false(
344 default_client.client_id() == client.client_id(),
345 "client_id already registered as default client",
346 )
347 .expect(FAILED);
348 }
349
350 check_key_not_in_map(&client_id, &self.clients, "client_id", "clients").expect(FAILED);
351
352 if let Some(routing) = routing {
353 self.routing_map.insert(routing, client_id);
354 log::debug!("Set client {client_id} routing for {routing}");
355 }
356
357 if client.venue.is_none() && self.default_client.is_none() {
358 self.default_client = Some(client);
359 log::debug!("Registered client {client_id} for default routing");
360 } else {
361 self.clients.insert(client_id, client);
362 log::debug!("Registered client {client_id}");
363 }
364 }
365
366 pub fn deregister_client(&mut self, client_id: &ClientId) {
372 check_key_in_map(client_id, &self.clients, "client_id", "clients").expect(FAILED);
373
374 self.clients.shift_remove(client_id);
375 log::info!("Deregistered client {client_id}");
376 }
377
378 pub fn register_default_client(&mut self, client: DataClientAdapter) {
390 check_predicate_true(
391 self.default_client.is_none(),
392 "default client already registered",
393 )
394 .expect(FAILED);
395
396 let client_id = client.client_id();
397
398 self.default_client = Some(client);
399 log::debug!("Registered default client {client_id}");
400 }
401
402 pub fn start(&mut self) {
404 for client in self.get_clients_mut() {
405 if let Err(e) = client.start() {
406 log::error!("{e}");
407 }
408 }
409
410 for aggregator in self.bar_aggregators.values() {
411 if aggregator.borrow().bar_type().spec().is_time_aggregated() {
412 aggregator
413 .borrow_mut()
414 .start_timer(Some(aggregator.clone()));
415 }
416 }
417 }
418
419 pub fn stop(&mut self) {
421 for client in self.get_clients_mut() {
422 if let Err(e) = client.stop() {
423 log::error!("{e}");
424 }
425 }
426
427 for aggregator in self.bar_aggregators.values() {
428 aggregator.borrow_mut().stop();
429 }
430 }
431
432 pub fn reset(&mut self) {
434 for client in self.get_clients_mut() {
435 if let Err(e) = client.reset() {
436 log::error!("{e}");
437 }
438 }
439
440 let bar_types: Vec<BarType> = self.bar_aggregators.keys().copied().collect();
441 for bar_type in bar_types {
442 if let Err(e) = self.stop_bar_aggregator(bar_type) {
443 log::error!("Error stopping bar aggregator during reset for {bar_type}: {e}");
444 }
445 }
446 }
447
448 pub fn dispose(&mut self) {
450 for client in self.get_clients_mut() {
451 if let Err(e) = client.dispose() {
452 log::error!("{e}");
453 }
454 }
455
456 self.clock.borrow_mut().cancel_timers();
457 }
458
459 pub async fn connect(&mut self) {
463 let futures: Vec<_> = self
464 .get_clients_mut()
465 .into_iter()
466 .map(|client| client.connect())
467 .collect();
468
469 let results = join_all(futures).await;
470
471 for error in results.into_iter().filter_map(Result::err) {
472 log::error!("Failed to connect data client: {error}");
473 }
474 }
475
476 pub async fn disconnect(&mut self) -> anyhow::Result<()> {
482 let futures: Vec<_> = self
483 .get_clients_mut()
484 .into_iter()
485 .map(|client| client.disconnect())
486 .collect();
487
488 let results = join_all(futures).await;
489 let errors: Vec<_> = results.into_iter().filter_map(Result::err).collect();
490
491 if errors.is_empty() {
492 Ok(())
493 } else {
494 let error_msgs: Vec<_> = errors.iter().map(|e| e.to_string()).collect();
495 anyhow::bail!(
496 "Failed to disconnect data clients: {}",
497 error_msgs.join("; ")
498 )
499 }
500 }
501
502 #[must_use]
504 pub fn check_connected(&self) -> bool {
505 self.get_clients()
506 .iter()
507 .all(|client| client.is_connected())
508 }
509
510 #[must_use]
512 pub fn check_disconnected(&self) -> bool {
513 self.get_clients()
514 .iter()
515 .all(|client| !client.is_connected())
516 }
517
518 #[must_use]
520 pub fn client_connection_status(&self) -> Vec<(ClientId, bool)> {
521 self.get_clients()
522 .into_iter()
523 .map(|client| (client.client_id(), client.is_connected()))
524 .collect()
525 }
526
527 #[must_use]
529 pub fn registered_clients(&self) -> Vec<ClientId> {
530 self.get_clients()
531 .into_iter()
532 .map(|client| client.client_id())
533 .collect()
534 }
535
536 pub(crate) fn collect_subscriptions<F, T>(&self, get_subs: F) -> Vec<T>
539 where
540 F: Fn(&DataClientAdapter) -> &AHashSet<T>,
541 T: Clone,
542 {
543 self.get_clients()
544 .into_iter()
545 .flat_map(get_subs)
546 .cloned()
547 .collect()
548 }
549
550 #[must_use]
551 pub fn get_clients(&self) -> Vec<&DataClientAdapter> {
552 let (default_opt, clients_map) = (&self.default_client, &self.clients);
553 let mut clients: Vec<&DataClientAdapter> = clients_map.values().collect();
554
555 if let Some(default) = default_opt {
556 clients.push(default);
557 }
558
559 clients
560 }
561
562 #[must_use]
563 pub fn get_clients_mut(&mut self) -> Vec<&mut DataClientAdapter> {
564 let (default_opt, clients_map) = (&mut self.default_client, &mut self.clients);
565 let mut clients: Vec<&mut DataClientAdapter> = clients_map.values_mut().collect();
566
567 if let Some(default) = default_opt {
568 clients.push(default);
569 }
570
571 clients
572 }
573
574 pub fn get_client(
575 &mut self,
576 client_id: Option<&ClientId>,
577 venue: Option<&Venue>,
578 ) -> Option<&mut DataClientAdapter> {
579 if let Some(client_id) = client_id {
580 if let Some(client) = self.clients.get_mut(client_id) {
582 return Some(client);
583 }
584
585 if let Some(default) = self.default_client.as_mut()
587 && default.client_id() == *client_id
588 {
589 return Some(default);
590 }
591
592 return None;
594 }
595
596 if let Some(v) = venue {
597 if let Some(client_id) = self.routing_map.get(v) {
599 return self.clients.get_mut(client_id);
600 }
601 }
602
603 self.get_default_client()
605 }
606
607 const fn get_default_client(&mut self) -> Option<&mut DataClientAdapter> {
608 self.default_client.as_mut()
609 }
610
611 #[must_use]
613 pub fn subscribed_custom_data(&self) -> Vec<DataType> {
614 self.collect_subscriptions(|client| &client.subscriptions_custom)
615 }
616
617 #[must_use]
619 pub fn subscribed_instruments(&self) -> Vec<InstrumentId> {
620 self.collect_subscriptions(|client| &client.subscriptions_instrument)
621 }
622
623 #[must_use]
625 pub fn subscribed_book_deltas(&self) -> Vec<InstrumentId> {
626 self.collect_subscriptions(|client| &client.subscriptions_book_deltas)
627 }
628
629 #[must_use]
631 pub fn subscribed_book_depth10(&self) -> Vec<InstrumentId> {
632 self.collect_subscriptions(|client| &client.subscriptions_book_depth10)
633 }
634
635 #[must_use]
637 pub fn subscribed_book_snapshots(&self) -> Vec<InstrumentId> {
638 self.book_intervals
639 .values()
640 .flat_map(|set| set.iter().copied())
641 .collect()
642 }
643
644 #[must_use]
646 pub fn subscribed_quotes(&self) -> Vec<InstrumentId> {
647 self.collect_subscriptions(|client| &client.subscriptions_quotes)
648 }
649
650 #[must_use]
652 pub fn subscribed_trades(&self) -> Vec<InstrumentId> {
653 self.collect_subscriptions(|client| &client.subscriptions_trades)
654 }
655
656 #[must_use]
658 pub fn subscribed_bars(&self) -> Vec<BarType> {
659 self.collect_subscriptions(|client| &client.subscriptions_bars)
660 }
661
662 #[must_use]
664 pub fn subscribed_mark_prices(&self) -> Vec<InstrumentId> {
665 self.collect_subscriptions(|client| &client.subscriptions_mark_prices)
666 }
667
668 #[must_use]
670 pub fn subscribed_index_prices(&self) -> Vec<InstrumentId> {
671 self.collect_subscriptions(|client| &client.subscriptions_index_prices)
672 }
673
674 #[must_use]
676 pub fn subscribed_funding_rates(&self) -> Vec<InstrumentId> {
677 self.collect_subscriptions(|client| &client.subscriptions_funding_rates)
678 }
679
680 #[must_use]
682 pub fn subscribed_instrument_status(&self) -> Vec<InstrumentId> {
683 self.collect_subscriptions(|client| &client.subscriptions_instrument_status)
684 }
685
686 #[must_use]
688 pub fn subscribed_instrument_close(&self) -> Vec<InstrumentId> {
689 self.collect_subscriptions(|client| &client.subscriptions_instrument_close)
690 }
691
692 pub fn execute(&mut self, cmd: DataCommand) {
698 if let Err(e) = match cmd {
699 DataCommand::Subscribe(c) => self.execute_subscribe(&c),
700 DataCommand::Unsubscribe(c) => self.execute_unsubscribe(&c),
701 DataCommand::Request(c) => self.execute_request(c),
702 #[cfg(feature = "defi")]
703 DataCommand::DefiRequest(c) => self.execute_defi_request(c),
704 #[cfg(feature = "defi")]
705 DataCommand::DefiSubscribe(c) => self.execute_defi_subscribe(&c),
706 #[cfg(feature = "defi")]
707 DataCommand::DefiUnsubscribe(c) => self.execute_defi_unsubscribe(&c),
708 _ => {
709 log::warn!("Unhandled DataCommand variant");
710 Ok(())
711 }
712 } {
713 log::error!("{e}");
714 }
715 }
716
717 pub fn execute_subscribe(&mut self, cmd: &SubscribeCommand) -> anyhow::Result<()> {
724 match &cmd {
726 SubscribeCommand::BookDeltas(cmd) => self.subscribe_book_deltas(cmd)?,
727 SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd)?,
728 SubscribeCommand::BookSnapshots(cmd) => {
729 return self.subscribe_book_snapshots(cmd);
731 }
732 SubscribeCommand::Bars(cmd) => self.subscribe_bars(cmd)?,
733 _ => {} }
735
736 if let Some(client_id) = cmd.client_id()
737 && self.external_clients.contains(client_id)
738 {
739 if self.config.debug {
740 log::debug!("Skipping subscribe command for external client {client_id}: {cmd:?}",);
741 }
742 return Ok(());
743 }
744
745 if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
746 client.execute_subscribe(cmd);
747 } else {
748 log::error!(
749 "Cannot handle command: no client found for client_id={:?}, venue={:?}",
750 cmd.client_id(),
751 cmd.venue(),
752 );
753 }
754
755 Ok(())
756 }
757
758 pub fn execute_unsubscribe(&mut self, cmd: &UnsubscribeCommand) -> anyhow::Result<()> {
764 match &cmd {
765 UnsubscribeCommand::BookDeltas(cmd) => self.unsubscribe_book_deltas(cmd)?,
766 UnsubscribeCommand::BookDepth10(cmd) => self.unsubscribe_book_depth10(cmd)?,
767 UnsubscribeCommand::BookSnapshots(cmd) => {
768 return self.unsubscribe_book_snapshots(cmd);
770 }
771 UnsubscribeCommand::Bars(cmd) => self.unsubscribe_bars(cmd)?,
772 _ => {} }
774
775 if let Some(client_id) = cmd.client_id()
776 && self.external_clients.contains(client_id)
777 {
778 if self.config.debug {
779 log::debug!(
780 "Skipping unsubscribe command for external client {client_id}: {cmd:?}",
781 );
782 }
783 return Ok(());
784 }
785
786 if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
787 client.execute_unsubscribe(cmd);
788 } else {
789 log::error!(
790 "Cannot handle command: no client found for client_id={:?}, venue={:?}",
791 cmd.client_id(),
792 cmd.venue(),
793 );
794 }
795
796 Ok(())
797 }
798
799 pub fn execute_request(&mut self, req: RequestCommand) -> anyhow::Result<()> {
806 if let Some(cid) = req.client_id()
808 && self.external_clients.contains(cid)
809 {
810 if self.config.debug {
811 log::debug!("Skipping data request for external client {cid}: {req:?}");
812 }
813 return Ok(());
814 }
815 if let Some(client) = self.get_client(req.client_id(), req.venue()) {
816 match req {
817 RequestCommand::Data(req) => client.request_data(req),
818 RequestCommand::Instrument(req) => client.request_instrument(req),
819 RequestCommand::Instruments(req) => client.request_instruments(req),
820 RequestCommand::BookSnapshot(req) => client.request_book_snapshot(req),
821 RequestCommand::BookDepth(req) => client.request_book_depth(req),
822 RequestCommand::Quotes(req) => client.request_quotes(req),
823 RequestCommand::Trades(req) => client.request_trades(req),
824 RequestCommand::FundingRates(req) => client.request_funding_rates(req),
825 RequestCommand::Bars(req) => client.request_bars(req),
826 }
827 } else {
828 anyhow::bail!(
829 "Cannot handle request: no client found for {:?} {:?}",
830 req.client_id(),
831 req.venue()
832 );
833 }
834 }
835
836 pub fn process(&mut self, data: &dyn Any) {
840 if let Some(instrument) = data.downcast_ref::<InstrumentAny>() {
842 self.handle_instrument(instrument.clone());
843 } else if let Some(funding_rate) = data.downcast_ref::<FundingRateUpdate>() {
844 self.handle_funding_rate(*funding_rate);
845 } else {
846 log::error!("Cannot process data {data:?}, type is unrecognized");
847 }
848
849 }
851
852 pub fn process_data(&mut self, data: Data) {
854 match data {
855 Data::Delta(delta) => self.handle_delta(delta),
856 Data::Deltas(deltas) => self.handle_deltas(deltas.into_inner()),
857 Data::Depth10(depth) => self.handle_depth10(*depth),
858 Data::Quote(quote) => self.handle_quote(quote),
859 Data::Trade(trade) => self.handle_trade(trade),
860 Data::Bar(bar) => self.handle_bar(bar),
861 Data::MarkPriceUpdate(mark_price) => self.handle_mark_price(mark_price),
862 Data::IndexPriceUpdate(index_price) => self.handle_index_price(index_price),
863 Data::InstrumentClose(close) => self.handle_instrument_close(close),
864 }
865 }
866
867 pub fn response(&self, resp: DataResponse) {
869 log::debug!("{RECV}{RES} {resp:?}");
870
871 let correlation_id = *resp.correlation_id();
872
873 match &resp {
874 DataResponse::Instrument(r) => {
875 self.handle_instrument_response(r.data.clone());
876 }
877 DataResponse::Instruments(r) => {
878 self.handle_instruments(&r.data);
879 }
880 DataResponse::Quotes(r) => {
881 if !log_if_empty_response(&r.data, &r.instrument_id, &correlation_id) {
882 self.handle_quotes(&r.data);
883 }
884 }
885 DataResponse::Trades(r) => {
886 if !log_if_empty_response(&r.data, &r.instrument_id, &correlation_id) {
887 self.handle_trades(&r.data);
888 }
889 }
890 DataResponse::FundingRates(r) => {
891 if !log_if_empty_response(&r.data, &r.instrument_id, &correlation_id) {
892 self.handle_funding_rates(&r.data);
893 }
894 }
895 DataResponse::Bars(r) => {
896 if !log_if_empty_response(&r.data, &r.bar_type, &correlation_id) {
897 self.handle_bars(&r.data);
898 }
899 }
900 DataResponse::Book(r) => self.handle_book_response(&r.data),
901 _ => todo!("Handle other response types"),
902 }
903
904 msgbus::send_response(&correlation_id, resp);
905 }
906
907 fn handle_instrument(&mut self, instrument: InstrumentAny) {
910 log::debug!("Handling instrument: {}", instrument.id());
911
912 if let Err(e) = self
913 .cache
914 .as_ref()
915 .borrow_mut()
916 .add_instrument(instrument.clone())
917 {
918 log_error_on_cache_insert(&e);
919 }
920
921 let topic = switchboard::get_instrument_topic(instrument.id());
922 log::debug!("Publishing instrument to topic: {topic}");
923 msgbus::publish_any(topic, &instrument);
924 }
925
926 fn handle_delta(&mut self, delta: OrderBookDelta) {
927 let deltas = if self.config.buffer_deltas {
928 if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&delta.instrument_id) {
929 buffered_deltas.deltas.push(delta);
930 buffered_deltas.flags = delta.flags;
931 buffered_deltas.sequence = delta.sequence;
932 buffered_deltas.ts_event = delta.ts_event;
933 buffered_deltas.ts_init = delta.ts_init;
934 } else {
935 let buffered_deltas = OrderBookDeltas::new(delta.instrument_id, vec![delta]);
936 self.buffered_deltas_map
937 .insert(delta.instrument_id, buffered_deltas);
938 }
939
940 if !RecordFlag::F_LAST.matches(delta.flags) {
941 return; }
943
944 self.buffered_deltas_map
946 .remove(&delta.instrument_id)
947 .unwrap()
948 } else {
949 OrderBookDeltas::new(delta.instrument_id, vec![delta])
950 };
951
952 let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
953 msgbus::publish_deltas(topic, &deltas);
954 }
955
956 fn handle_deltas(&mut self, deltas: OrderBookDeltas) {
957 if self.config.buffer_deltas {
958 let instrument_id = deltas.instrument_id;
959
960 for delta in deltas.deltas {
961 if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&instrument_id) {
962 buffered_deltas.deltas.push(delta);
963 buffered_deltas.flags = delta.flags;
964 buffered_deltas.sequence = delta.sequence;
965 buffered_deltas.ts_event = delta.ts_event;
966 buffered_deltas.ts_init = delta.ts_init;
967 } else {
968 let buffered_deltas = OrderBookDeltas::new(instrument_id, vec![delta]);
969 self.buffered_deltas_map
970 .insert(instrument_id, buffered_deltas);
971 }
972
973 if RecordFlag::F_LAST.matches(delta.flags) {
974 let deltas_to_publish =
976 self.buffered_deltas_map.remove(&instrument_id).unwrap();
977 let topic = switchboard::get_book_deltas_topic(instrument_id);
978 msgbus::publish_deltas(topic, &deltas_to_publish);
979 }
980 }
981 } else {
982 let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
983 msgbus::publish_deltas(topic, &deltas);
984 };
985 }
986
987 fn handle_depth10(&mut self, depth: OrderBookDepth10) {
988 let topic = switchboard::get_book_depth10_topic(depth.instrument_id);
989 msgbus::publish_depth10(topic, &depth);
990 }
991
992 fn handle_quote(&mut self, quote: QuoteTick) {
993 if let Err(e) = self.cache.as_ref().borrow_mut().add_quote(quote) {
994 log_error_on_cache_insert(&e);
995 }
996
997 let topic = switchboard::get_quotes_topic(quote.instrument_id);
1000 msgbus::publish_quote(topic, "e);
1001 }
1002
1003 fn handle_trade(&mut self, trade: TradeTick) {
1004 if let Err(e) = self.cache.as_ref().borrow_mut().add_trade(trade) {
1005 log_error_on_cache_insert(&e);
1006 }
1007
1008 let topic = switchboard::get_trades_topic(trade.instrument_id);
1011 msgbus::publish_trade(topic, &trade);
1012 }
1013
1014 fn handle_bar(&mut self, bar: Bar) {
1015 if self.config.validate_data_sequence
1017 && let Some(last_bar) = self.cache.as_ref().borrow().bar(&bar.bar_type)
1018 {
1019 if bar.ts_event < last_bar.ts_event {
1020 log::warn!(
1021 "Bar {bar} was prior to last bar `ts_event` {}",
1022 last_bar.ts_event
1023 );
1024 return; }
1026 if bar.ts_init < last_bar.ts_init {
1027 log::warn!(
1028 "Bar {bar} was prior to last bar `ts_init` {}",
1029 last_bar.ts_init
1030 );
1031 return; }
1033 }
1035
1036 if let Err(e) = self.cache.as_ref().borrow_mut().add_bar(bar) {
1037 log_error_on_cache_insert(&e);
1038 }
1039
1040 let topic = switchboard::get_bars_topic(bar.bar_type);
1041 msgbus::publish_bar(topic, &bar);
1042 }
1043
1044 fn handle_mark_price(&mut self, mark_price: MarkPriceUpdate) {
1045 if let Err(e) = self.cache.as_ref().borrow_mut().add_mark_price(mark_price) {
1046 log_error_on_cache_insert(&e);
1047 }
1048
1049 let topic = switchboard::get_mark_price_topic(mark_price.instrument_id);
1050 msgbus::publish_mark_price(topic, &mark_price);
1051 }
1052
1053 fn handle_index_price(&mut self, index_price: IndexPriceUpdate) {
1054 if let Err(e) = self
1055 .cache
1056 .as_ref()
1057 .borrow_mut()
1058 .add_index_price(index_price)
1059 {
1060 log_error_on_cache_insert(&e);
1061 }
1062
1063 let topic = switchboard::get_index_price_topic(index_price.instrument_id);
1064 msgbus::publish_index_price(topic, &index_price);
1065 }
1066
1067 pub fn handle_funding_rate(&mut self, funding_rate: FundingRateUpdate) {
1069 if let Err(e) = self
1070 .cache
1071 .as_ref()
1072 .borrow_mut()
1073 .add_funding_rate(funding_rate)
1074 {
1075 log_error_on_cache_insert(&e);
1076 }
1077
1078 let topic = switchboard::get_funding_rate_topic(funding_rate.instrument_id);
1079 msgbus::publish_funding_rate(topic, &funding_rate);
1080 }
1081
1082 fn handle_instrument_close(&mut self, close: InstrumentClose) {
1083 let topic = switchboard::get_instrument_close_topic(close.instrument_id);
1084 msgbus::publish_any(topic, &close);
1085 }
1086
1087 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
1090 if cmd.instrument_id.is_synthetic() {
1091 anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
1092 }
1093
1094 self.book_deltas_subs.insert(cmd.instrument_id);
1095 self.setup_book_updater(&cmd.instrument_id, cmd.book_type, true, cmd.managed)?;
1096
1097 Ok(())
1098 }
1099
1100 fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
1101 if cmd.instrument_id.is_synthetic() {
1102 anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDepth10` data");
1103 }
1104
1105 self.book_depth10_subs.insert(cmd.instrument_id);
1106 self.setup_book_updater(&cmd.instrument_id, cmd.book_type, false, cmd.managed)?;
1107
1108 Ok(())
1109 }
1110
1111 fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
1112 if cmd.instrument_id.is_synthetic() {
1113 anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
1114 }
1115
1116 let first_for_interval = match self.book_intervals.entry(cmd.interval_ms) {
1118 Entry::Vacant(e) => {
1119 let mut set = AHashSet::new();
1120 set.insert(cmd.instrument_id);
1121 e.insert(set);
1122 true
1123 }
1124 Entry::Occupied(mut e) => {
1125 e.get_mut().insert(cmd.instrument_id);
1126 false
1127 }
1128 };
1129
1130 if first_for_interval {
1131 let interval_ns = millis_to_nanos_unchecked(cmd.interval_ms.get() as f64);
1133 let topic = switchboard::get_book_snapshots_topic(cmd.instrument_id, cmd.interval_ms);
1134
1135 let snap_info = BookSnapshotInfo {
1136 instrument_id: cmd.instrument_id,
1137 venue: cmd.instrument_id.venue,
1138 is_composite: cmd.instrument_id.symbol.is_composite(),
1139 root: Ustr::from(cmd.instrument_id.symbol.root()),
1140 topic,
1141 interval_ms: cmd.interval_ms,
1142 };
1143
1144 let now_ns = self.clock.borrow().timestamp_ns().as_u64();
1146 let start_time_ns = now_ns - (now_ns % interval_ns) + interval_ns;
1147
1148 let snapshotter = Rc::new(BookSnapshotter::new(snap_info, self.cache.clone()));
1149 self.book_snapshotters
1150 .insert(cmd.instrument_id, snapshotter.clone());
1151 let timer_name = snapshotter.timer_name;
1152
1153 let callback_fn: Rc<dyn Fn(TimeEvent)> =
1154 Rc::new(move |event| snapshotter.snapshot(event));
1155 let callback = TimeEventCallback::from(callback_fn);
1156
1157 self.clock
1158 .borrow_mut()
1159 .set_timer_ns(
1160 &timer_name,
1161 interval_ns,
1162 Some(start_time_ns.into()),
1163 None,
1164 Some(callback),
1165 None,
1166 None,
1167 )
1168 .expect(FAILED);
1169 }
1170
1171 if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
1173 self.setup_book_updater(&cmd.instrument_id, cmd.book_type, false, true)?;
1174 }
1175
1176 if let Some(client_id) = cmd.client_id.as_ref()
1177 && self.external_clients.contains(client_id)
1178 {
1179 if self.config.debug {
1180 log::debug!("Skipping subscribe command for external client {client_id}: {cmd:?}",);
1181 }
1182 return Ok(());
1183 }
1184
1185 log::debug!(
1186 "Forwarding BookSnapshots as BookDeltas for {}, client_id={:?}, venue={:?}",
1187 cmd.instrument_id,
1188 cmd.client_id,
1189 cmd.venue,
1190 );
1191
1192 if let Some(client) = self.get_client(cmd.client_id.as_ref(), cmd.venue.as_ref()) {
1193 let deltas_cmd = SubscribeBookDeltas::new(
1194 cmd.instrument_id,
1195 cmd.book_type,
1196 cmd.client_id,
1197 cmd.venue,
1198 UUID4::new(),
1199 cmd.ts_init,
1200 cmd.depth,
1201 true, Some(cmd.command_id),
1203 cmd.params.clone(),
1204 );
1205 log::debug!(
1206 "Calling client.execute_subscribe for BookDeltas: {}",
1207 cmd.instrument_id
1208 );
1209 client.execute_subscribe(&SubscribeCommand::BookDeltas(deltas_cmd));
1210 } else {
1211 log::error!(
1212 "Cannot handle command: no client found for client_id={:?}, venue={:?}",
1213 cmd.client_id,
1214 cmd.venue,
1215 );
1216 }
1217
1218 Ok(())
1219 }
1220
1221 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
1222 match cmd.bar_type.aggregation_source() {
1223 AggregationSource::Internal => {
1224 if !self.bar_aggregators.contains_key(&cmd.bar_type.standard()) {
1225 self.start_bar_aggregator(cmd.bar_type)?;
1226 }
1227 }
1228 AggregationSource::External => {
1229 if cmd.bar_type.instrument_id().is_synthetic() {
1230 anyhow::bail!(
1231 "Cannot subscribe for externally aggregated synthetic instrument bar data"
1232 );
1233 }
1234 }
1235 }
1236
1237 Ok(())
1238 }
1239
1240 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
1241 if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
1242 log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
1243 return Ok(());
1244 }
1245
1246 self.book_deltas_subs.remove(&cmd.instrument_id);
1247
1248 let topics = vec![
1249 switchboard::get_book_deltas_topic(cmd.instrument_id),
1250 switchboard::get_book_depth10_topic(cmd.instrument_id),
1251 ];
1253
1254 self.maintain_book_updater(&cmd.instrument_id, &topics);
1255 self.maintain_book_snapshotter(&cmd.instrument_id);
1256
1257 Ok(())
1258 }
1259
1260 fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
1261 if !self.book_depth10_subs.contains(&cmd.instrument_id) {
1262 log::warn!("Cannot unsubscribe from `OrderBookDepth10` data: not subscribed");
1263 return Ok(());
1264 }
1265
1266 self.book_depth10_subs.remove(&cmd.instrument_id);
1267
1268 let topics = vec![
1269 switchboard::get_book_deltas_topic(cmd.instrument_id),
1270 switchboard::get_book_depth10_topic(cmd.instrument_id),
1271 ];
1272
1273 self.maintain_book_updater(&cmd.instrument_id, &topics);
1274 self.maintain_book_snapshotter(&cmd.instrument_id);
1275
1276 Ok(())
1277 }
1278
1279 fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
1280 let is_subscribed = self
1281 .book_intervals
1282 .values()
1283 .any(|set| set.contains(&cmd.instrument_id));
1284
1285 if !is_subscribed {
1286 log::warn!("Cannot unsubscribe from `OrderBook` snapshots: not subscribed");
1287 return Ok(());
1288 }
1289
1290 let mut to_remove = Vec::new();
1292 for (interval, set) in &mut self.book_intervals {
1293 if set.remove(&cmd.instrument_id) && set.is_empty() {
1294 to_remove.push(*interval);
1295 }
1296 }
1297
1298 for interval in to_remove {
1299 self.book_intervals.remove(&interval);
1300 }
1301
1302 let topics = vec![
1303 switchboard::get_book_deltas_topic(cmd.instrument_id),
1304 switchboard::get_book_depth10_topic(cmd.instrument_id),
1305 ];
1306
1307 self.maintain_book_updater(&cmd.instrument_id, &topics);
1308 self.maintain_book_snapshotter(&cmd.instrument_id);
1309
1310 let still_in_intervals = self
1311 .book_intervals
1312 .values()
1313 .any(|set| set.contains(&cmd.instrument_id));
1314
1315 if !still_in_intervals && !self.book_deltas_subs.contains(&cmd.instrument_id) {
1316 if let Some(client_id) = cmd.client_id.as_ref()
1317 && self.external_clients.contains(client_id)
1318 {
1319 return Ok(());
1320 }
1321
1322 if let Some(client) = self.get_client(cmd.client_id.as_ref(), cmd.venue.as_ref()) {
1323 let deltas_cmd = UnsubscribeBookDeltas::new(
1324 cmd.instrument_id,
1325 cmd.client_id,
1326 cmd.venue,
1327 UUID4::new(),
1328 cmd.ts_init,
1329 Some(cmd.command_id),
1330 cmd.params.clone(),
1331 );
1332 client.execute_unsubscribe(&UnsubscribeCommand::BookDeltas(deltas_cmd));
1333 }
1334 }
1335
1336 Ok(())
1337 }
1338
1339 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
1340 let bar_type = cmd.bar_type;
1341
1342 let topic = switchboard::get_bars_topic(bar_type.standard());
1344 if msgbus::exact_subscriber_count_bars(topic) > 0 {
1345 return Ok(());
1346 }
1347
1348 if self.bar_aggregators.contains_key(&bar_type.standard())
1349 && let Err(e) = self.stop_bar_aggregator(bar_type)
1350 {
1351 log::error!("Error stopping bar aggregator for {bar_type}: {e}");
1352 }
1353
1354 if bar_type.is_composite() {
1356 let source_type = bar_type.composite();
1357 let source_topic = switchboard::get_bars_topic(source_type);
1358 if msgbus::exact_subscriber_count_bars(source_topic) == 0
1359 && self.bar_aggregators.contains_key(&source_type)
1360 && let Err(e) = self.stop_bar_aggregator(source_type)
1361 {
1362 log::error!("Error stopping source bar aggregator for {source_type}: {e}");
1363 }
1364 }
1365
1366 Ok(())
1367 }
1368
1369 fn maintain_book_updater(&mut self, instrument_id: &InstrumentId, _topics: &[MStr<Topic>]) {
1370 let Some(updater) = self.book_updaters.get(instrument_id) else {
1371 return;
1372 };
1373
1374 let has_deltas = self.book_deltas_subs.contains(instrument_id);
1376 let has_depth10 = self.book_depth10_subs.contains(instrument_id);
1377
1378 let deltas_topic = switchboard::get_book_deltas_topic(*instrument_id);
1379 let depth_topic = switchboard::get_book_depth10_topic(*instrument_id);
1380 let deltas_handler: TypedHandler<OrderBookDeltas> = TypedHandler::new(updater.clone());
1381 let depth_handler: TypedHandler<OrderBookDepth10> = TypedHandler::new(updater.clone());
1382
1383 if !has_deltas {
1385 msgbus::unsubscribe_book_deltas(deltas_topic.into(), &deltas_handler);
1386 }
1387 if !has_depth10 {
1388 msgbus::unsubscribe_book_depth10(depth_topic.into(), &depth_handler);
1389 }
1390
1391 if !has_deltas && !has_depth10 {
1393 self.book_updaters.remove(instrument_id);
1394 log::debug!("Removed BookUpdater for instrument ID {instrument_id}");
1395 }
1396 }
1397
1398 fn maintain_book_snapshotter(&mut self, instrument_id: &InstrumentId) {
1399 if let Some(snapshotter) = self.book_snapshotters.get(instrument_id) {
1400 let topic = switchboard::get_book_snapshots_topic(
1401 *instrument_id,
1402 snapshotter.snap_info.interval_ms,
1403 );
1404
1405 if msgbus::subscriber_count_book_snapshots(topic) == 0 {
1407 let timer_name = snapshotter.timer_name;
1408 self.book_snapshotters.remove(instrument_id);
1409 let mut clock = self.clock.borrow_mut();
1410 if clock.timer_exists(&timer_name) {
1411 clock.cancel_timer(&timer_name);
1412 }
1413 log::debug!("Removed BookSnapshotter for instrument ID {instrument_id}");
1414 }
1415 }
1416 }
1417
1418 fn handle_instrument_response(&self, instrument: InstrumentAny) {
1421 let mut cache = self.cache.as_ref().borrow_mut();
1422 if let Err(e) = cache.add_instrument(instrument) {
1423 log_error_on_cache_insert(&e);
1424 }
1425 }
1426
1427 fn handle_instruments(&self, instruments: &[InstrumentAny]) {
1428 let mut cache = self.cache.as_ref().borrow_mut();
1430 for instrument in instruments {
1431 if let Err(e) = cache.add_instrument(instrument.clone()) {
1432 log_error_on_cache_insert(&e);
1433 }
1434 }
1435 }
1436
1437 fn handle_quotes(&self, quotes: &[QuoteTick]) {
1438 if let Err(e) = self.cache.as_ref().borrow_mut().add_quotes(quotes) {
1439 log_error_on_cache_insert(&e);
1440 }
1441 }
1442
1443 fn handle_trades(&self, trades: &[TradeTick]) {
1444 if let Err(e) = self.cache.as_ref().borrow_mut().add_trades(trades) {
1445 log_error_on_cache_insert(&e);
1446 }
1447 }
1448
1449 fn handle_funding_rates(&self, funding_rates: &[FundingRateUpdate]) {
1450 if let Err(e) = self
1451 .cache
1452 .as_ref()
1453 .borrow_mut()
1454 .add_funding_rates(funding_rates)
1455 {
1456 log_error_on_cache_insert(&e);
1457 }
1458 }
1459
1460 fn handle_bars(&self, bars: &[Bar]) {
1461 if let Err(e) = self.cache.as_ref().borrow_mut().add_bars(bars) {
1462 log_error_on_cache_insert(&e);
1463 }
1464 }
1465
1466 fn handle_book_response(&self, book: &OrderBook) {
1467 log::debug!("Adding order book {} to cache", book.instrument_id);
1468 if let Err(e) = self
1469 .cache
1470 .as_ref()
1471 .borrow_mut()
1472 .add_order_book(book.clone())
1473 {
1474 log_error_on_cache_insert(&e);
1475 }
1476 }
1477
1478 #[allow(clippy::too_many_arguments)]
1481 fn setup_book_updater(
1482 &mut self,
1483 instrument_id: &InstrumentId,
1484 book_type: BookType,
1485 only_deltas: bool,
1486 managed: bool,
1487 ) -> anyhow::Result<()> {
1488 let mut cache = self.cache.borrow_mut();
1489 if managed && !cache.has_order_book(instrument_id) {
1490 let book = OrderBook::new(*instrument_id, book_type);
1491 log::debug!("Created {book}");
1492 cache.add_order_book(book)?;
1493 }
1494
1495 let updater = self
1497 .book_updaters
1498 .entry(*instrument_id)
1499 .or_insert_with(|| Rc::new(BookUpdater::new(instrument_id, self.cache.clone())))
1500 .clone();
1501
1502 let topic = switchboard::get_book_deltas_topic(*instrument_id);
1504 let deltas_handler = TypedHandler::new(updater.clone());
1505 msgbus::subscribe_book_deltas(topic.into(), deltas_handler, Some(self.msgbus_priority));
1506
1507 if !only_deltas {
1509 let topic = switchboard::get_book_depth10_topic(*instrument_id);
1510 let depth_handler = TypedHandler::new(updater);
1511 msgbus::subscribe_book_depth10(topic.into(), depth_handler, Some(self.msgbus_priority));
1512 }
1513
1514 Ok(())
1515 }
1516
1517 fn create_bar_aggregator(
1518 &mut self,
1519 instrument: &InstrumentAny,
1520 bar_type: BarType,
1521 ) -> Box<dyn BarAggregator> {
1522 let cache = self.cache.clone();
1523
1524 let handler = move |bar: Bar| {
1525 if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
1526 log_error_on_cache_insert(&e);
1527 }
1528
1529 let topic = switchboard::get_bars_topic(bar.bar_type);
1530 msgbus::publish_bar(topic, &bar);
1531 };
1532
1533 let clock = self.clock.clone();
1534 let config = self.config.clone();
1535
1536 let price_precision = instrument.price_precision();
1537 let size_precision = instrument.size_precision();
1538
1539 if bar_type.spec().is_time_aggregated() {
1540 let time_bars_origin_offset = config
1542 .time_bars_origins
1543 .get(&bar_type.spec().aggregation)
1544 .map(|duration| chrono::TimeDelta::from_std(*duration).unwrap_or_default());
1545
1546 Box::new(TimeBarAggregator::new(
1547 bar_type,
1548 price_precision,
1549 size_precision,
1550 clock,
1551 handler,
1552 config.time_bars_build_with_no_updates,
1553 config.time_bars_timestamp_on_close,
1554 config.time_bars_interval_type,
1555 time_bars_origin_offset,
1556 config.time_bars_build_delay,
1557 config.time_bars_skip_first_non_full_bar,
1558 ))
1559 } else {
1560 match bar_type.spec().aggregation {
1561 BarAggregation::Tick => Box::new(TickBarAggregator::new(
1562 bar_type,
1563 price_precision,
1564 size_precision,
1565 handler,
1566 )) as Box<dyn BarAggregator>,
1567 BarAggregation::TickImbalance => Box::new(TickImbalanceBarAggregator::new(
1568 bar_type,
1569 price_precision,
1570 size_precision,
1571 handler,
1572 )) as Box<dyn BarAggregator>,
1573 BarAggregation::TickRuns => Box::new(TickRunsBarAggregator::new(
1574 bar_type,
1575 price_precision,
1576 size_precision,
1577 handler,
1578 )) as Box<dyn BarAggregator>,
1579 BarAggregation::Volume => Box::new(VolumeBarAggregator::new(
1580 bar_type,
1581 price_precision,
1582 size_precision,
1583 handler,
1584 )) as Box<dyn BarAggregator>,
1585 BarAggregation::VolumeImbalance => Box::new(VolumeImbalanceBarAggregator::new(
1586 bar_type,
1587 price_precision,
1588 size_precision,
1589 handler,
1590 )) as Box<dyn BarAggregator>,
1591 BarAggregation::VolumeRuns => Box::new(VolumeRunsBarAggregator::new(
1592 bar_type,
1593 price_precision,
1594 size_precision,
1595 handler,
1596 )) as Box<dyn BarAggregator>,
1597 BarAggregation::Value => Box::new(ValueBarAggregator::new(
1598 bar_type,
1599 price_precision,
1600 size_precision,
1601 handler,
1602 )) as Box<dyn BarAggregator>,
1603 BarAggregation::ValueImbalance => Box::new(ValueImbalanceBarAggregator::new(
1604 bar_type,
1605 price_precision,
1606 size_precision,
1607 handler,
1608 )) as Box<dyn BarAggregator>,
1609 BarAggregation::ValueRuns => Box::new(ValueRunsBarAggregator::new(
1610 bar_type,
1611 price_precision,
1612 size_precision,
1613 handler,
1614 )) as Box<dyn BarAggregator>,
1615 BarAggregation::Renko => Box::new(RenkoBarAggregator::new(
1616 bar_type,
1617 price_precision,
1618 size_precision,
1619 instrument.price_increment(),
1620 handler,
1621 )) as Box<dyn BarAggregator>,
1622 _ => panic!(
1623 "BarAggregation {:?} is not currently implemented. Supported aggregations: MILLISECOND, SECOND, MINUTE, HOUR, DAY, WEEK, MONTH, YEAR, TICK, TICK_IMBALANCE, TICK_RUNS, VOLUME, VOLUME_IMBALANCE, VOLUME_RUNS, VALUE, VALUE_IMBALANCE, VALUE_RUNS, RENKO",
1624 bar_type.spec().aggregation
1625 ),
1626 }
1627 }
1628 }
1629
1630 fn start_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
1631 let instrument = {
1633 let cache = self.cache.borrow();
1634 cache
1635 .instrument(&bar_type.instrument_id())
1636 .ok_or_else(|| {
1637 anyhow::anyhow!(
1638 "Cannot start bar aggregation: no instrument found for {}",
1639 bar_type.instrument_id(),
1640 )
1641 })?
1642 .clone()
1643 };
1644
1645 let bar_key = bar_type.standard();
1647
1648 let aggregator = if let Some(rc) = self.bar_aggregators.get(&bar_key) {
1650 rc.clone()
1651 } else {
1652 let agg = self.create_bar_aggregator(&instrument, bar_type);
1653 let rc = Rc::new(RefCell::new(agg));
1654 self.bar_aggregators.insert(bar_key, rc.clone());
1655 rc
1656 };
1657
1658 let mut subscriptions = Vec::new();
1660
1661 if bar_type.is_composite() {
1662 let topic = switchboard::get_bars_topic(bar_type.composite());
1663 let handler = TypedHandler::new(BarBarHandler::new(aggregator.clone(), bar_key));
1664 msgbus::subscribe_bars(topic.into(), handler.clone(), Some(self.msgbus_priority));
1665 subscriptions.push(BarAggregatorSubscription::Bar { topic, handler });
1666 } else if bar_type.spec().price_type == PriceType::Last {
1667 let topic = switchboard::get_trades_topic(bar_type.instrument_id());
1668 let handler = TypedHandler::new(BarTradeHandler::new(aggregator.clone(), bar_key));
1669 msgbus::subscribe_trades(topic.into(), handler.clone(), Some(self.msgbus_priority));
1670 subscriptions.push(BarAggregatorSubscription::Trade { topic, handler });
1671 } else {
1672 if matches!(
1674 bar_type.spec().aggregation,
1675 BarAggregation::TickImbalance
1676 | BarAggregation::VolumeImbalance
1677 | BarAggregation::ValueImbalance
1678 | BarAggregation::TickRuns
1679 | BarAggregation::VolumeRuns
1680 | BarAggregation::ValueRuns
1681 ) {
1682 log::warn!(
1683 "Bar type {bar_type} uses imbalance/runs aggregation which requires trade \
1684 data with `aggressor_side`, but `price_type` is not LAST so it will receive \
1685 quote data: bars will not emit correctly",
1686 );
1687 }
1688
1689 let topic = switchboard::get_quotes_topic(bar_type.instrument_id());
1690 let handler = TypedHandler::new(BarQuoteHandler::new(aggregator.clone(), bar_key));
1691 msgbus::subscribe_quotes(topic.into(), handler.clone(), Some(self.msgbus_priority));
1692 subscriptions.push(BarAggregatorSubscription::Quote { topic, handler });
1693 }
1694
1695 self.bar_aggregator_handlers.insert(bar_key, subscriptions);
1696
1697 self.setup_bar_aggregator(bar_type, false)?;
1699
1700 aggregator.borrow_mut().set_is_running(true);
1701
1702 Ok(())
1703 }
1704
1705 fn setup_bar_aggregator(&mut self, bar_type: BarType, historical: bool) -> anyhow::Result<()> {
1709 let bar_key = bar_type.standard();
1710 let aggregator = self.bar_aggregators.get(&bar_key).ok_or_else(|| {
1711 anyhow::anyhow!("Cannot setup bar aggregator: no aggregator found for {bar_type}")
1712 })?;
1713
1714 let handler: Box<dyn FnMut(Bar)> = if historical {
1716 let cache = self.cache.clone();
1718 Box::new(move |bar: Bar| {
1719 if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
1720 log_error_on_cache_insert(&e);
1721 }
1722 })
1724 } else {
1725 let cache = self.cache.clone();
1727 Box::new(move |bar: Bar| {
1728 if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
1729 log_error_on_cache_insert(&e);
1730 }
1731 let topic = switchboard::get_bars_topic(bar.bar_type);
1732 msgbus::publish_bar(topic, &bar);
1733 })
1734 };
1735
1736 aggregator
1737 .borrow_mut()
1738 .set_historical_mode(historical, handler);
1739
1740 if bar_type.spec().is_time_aggregated() {
1742 use nautilus_common::clock::TestClock;
1743
1744 if historical {
1745 let test_clock = Rc::new(RefCell::new(TestClock::new()));
1747 aggregator.borrow_mut().set_clock(test_clock);
1748 let aggregator_weak = Rc::downgrade(aggregator);
1751 aggregator.borrow_mut().set_aggregator_weak(aggregator_weak);
1752 } else {
1753 aggregator.borrow_mut().set_clock(self.clock.clone());
1754 aggregator
1755 .borrow_mut()
1756 .start_timer(Some(aggregator.clone()));
1757 }
1758 }
1759
1760 Ok(())
1761 }
1762
1763 fn stop_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
1764 let aggregator = self
1765 .bar_aggregators
1766 .remove(&bar_type.standard())
1767 .ok_or_else(|| {
1768 anyhow::anyhow!("Cannot stop bar aggregator: no aggregator to stop for {bar_type}")
1769 })?;
1770
1771 aggregator.borrow_mut().stop();
1772
1773 let bar_key = bar_type.standard();
1775 if let Some(subs) = self.bar_aggregator_handlers.remove(&bar_key) {
1776 for sub in subs {
1777 match sub {
1778 BarAggregatorSubscription::Bar { topic, handler } => {
1779 msgbus::unsubscribe_bars(topic.into(), &handler);
1780 }
1781 BarAggregatorSubscription::Trade { topic, handler } => {
1782 msgbus::unsubscribe_trades(topic.into(), &handler);
1783 }
1784 BarAggregatorSubscription::Quote { topic, handler } => {
1785 msgbus::unsubscribe_quotes(topic.into(), &handler);
1786 }
1787 }
1788 }
1789 }
1790
1791 Ok(())
1792 }
1793}
1794
1795#[inline(always)]
1796fn log_error_on_cache_insert<T: Display>(e: &T) {
1797 log::error!("Error on cache insert: {e}");
1798}
1799
1800#[inline(always)]
1801fn log_if_empty_response<T, I: Display>(data: &[T], id: &I, correlation_id: &UUID4) -> bool {
1802 if data.is_empty() {
1803 let name = type_name::<T>();
1804 let short_name = name.rsplit("::").next().unwrap_or(name);
1805 log::warn!("Received empty {short_name} response for {id} {correlation_id}");
1806 return true;
1807 }
1808 false
1809}