1pub mod book;
32pub mod config;
33mod handlers;
34#[cfg(feature = "defi")]
35pub mod pool;
36
37use std::{
38 any::Any,
39 cell::{Ref, RefCell},
40 collections::hash_map::Entry,
41 fmt::Display,
42 num::NonZeroUsize,
43 rc::Rc,
44};
45
46use ahash::{AHashMap, AHashSet};
47use book::{BookSnapshotInfo, BookSnapshotter, BookUpdater};
48use config::DataEngineConfig;
49use handlers::{BarBarHandler, BarQuoteHandler, BarTradeHandler};
50use indexmap::IndexMap;
51#[cfg(feature = "defi")]
52use nautilus_common::messages::defi::{DefiSubscribeCommand, DefiUnsubscribeCommand};
53use nautilus_common::{
54 cache::Cache,
55 clock::Clock,
56 logging::{RECV, RES},
57 messages::data::{
58 DataCommand, DataResponse, RequestCommand, SubscribeBars, SubscribeBookDeltas,
59 SubscribeBookDepth10, SubscribeBookSnapshots, SubscribeCommand, UnsubscribeBars,
60 UnsubscribeBookDeltas, UnsubscribeBookDepth10, UnsubscribeBookSnapshots,
61 UnsubscribeCommand,
62 },
63 msgbus::{self, MStr, Topic, handler::ShareableMessageHandler, switchboard},
64 timer::TimeEventCallback,
65};
66use nautilus_core::{
67 correctness::{
68 FAILED, check_key_in_map, check_key_not_in_map, check_predicate_false, check_predicate_true,
69 },
70 datetime::millis_to_nanos,
71};
72#[cfg(feature = "defi")]
73use nautilus_model::defi::Blockchain;
74#[cfg(feature = "defi")]
75use nautilus_model::defi::DefiData;
76use nautilus_model::{
77 data::{
78 Bar, BarType, Data, DataType, FundingRateUpdate, IndexPriceUpdate, InstrumentClose,
79 MarkPriceUpdate, OrderBookDelta, OrderBookDeltas, OrderBookDepth10, QuoteTick, TradeTick,
80 },
81 enums::{AggregationSource, BarAggregation, BookType, PriceType, RecordFlag},
82 identifiers::{ClientId, InstrumentId, Venue},
83 instruments::{Instrument, InstrumentAny, SyntheticInstrument},
84 orderbook::OrderBook,
85};
86use nautilus_persistence::backend::catalog::ParquetDataCatalog;
87use ustr::Ustr;
88
89#[cfg(feature = "defi")]
90use crate::engine::pool::PoolUpdater;
91use crate::{
92 aggregation::{
93 BarAggregator, RenkoBarAggregator, TickBarAggregator, TimeBarAggregator,
94 ValueBarAggregator, VolumeBarAggregator,
95 },
96 client::DataClientAdapter,
97};
98
99#[derive(Debug)]
101pub struct DataEngine {
102 clock: Rc<RefCell<dyn Clock>>,
103 cache: Rc<RefCell<Cache>>,
104 clients: IndexMap<ClientId, DataClientAdapter>,
105 default_client: Option<DataClientAdapter>,
106 external_clients: AHashSet<ClientId>,
107 catalogs: AHashMap<Ustr, ParquetDataCatalog>,
108 routing_map: IndexMap<Venue, ClientId>,
109 book_intervals: AHashMap<NonZeroUsize, AHashSet<InstrumentId>>,
110 book_updaters: AHashMap<InstrumentId, Rc<BookUpdater>>,
111 book_snapshotters: AHashMap<InstrumentId, Rc<BookSnapshotter>>,
112 bar_aggregators: AHashMap<BarType, Rc<RefCell<Box<dyn BarAggregator>>>>,
113 bar_aggregator_handlers: AHashMap<BarType, Vec<(MStr<Topic>, ShareableMessageHandler)>>,
114 _synthetic_quote_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
115 _synthetic_trade_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
116 buffered_deltas_map: AHashMap<InstrumentId, OrderBookDeltas>,
117 msgbus_priority: u8,
118 config: DataEngineConfig,
119 #[cfg(feature = "defi")]
120 pool_updaters: AHashMap<InstrumentId, Rc<crate::engine::pool::PoolUpdater>>,
121}
122
123impl DataEngine {
124 #[must_use]
126 pub fn new(
127 clock: Rc<RefCell<dyn Clock>>,
128 cache: Rc<RefCell<Cache>>,
129 config: Option<DataEngineConfig>,
130 ) -> Self {
131 let config = config.unwrap_or_default();
132
133 let external_clients: AHashSet<ClientId> = config
134 .external_clients
135 .clone()
136 .unwrap_or_default()
137 .into_iter()
138 .collect();
139
140 Self {
141 clock,
142 cache,
143 clients: IndexMap::new(),
144 default_client: None,
145 external_clients,
146 catalogs: AHashMap::new(),
147 routing_map: IndexMap::new(),
148 book_intervals: AHashMap::new(),
149 book_updaters: AHashMap::new(),
150 book_snapshotters: AHashMap::new(),
151 bar_aggregators: AHashMap::new(),
152 bar_aggregator_handlers: AHashMap::new(),
153 _synthetic_quote_feeds: AHashMap::new(),
154 _synthetic_trade_feeds: AHashMap::new(),
155 buffered_deltas_map: AHashMap::new(),
156 msgbus_priority: 10, config,
158 #[cfg(feature = "defi")]
159 pool_updaters: AHashMap::new(),
160 }
161 }
162
163 #[must_use]
165 pub fn get_clock(&self) -> Ref<'_, dyn Clock> {
166 self.clock.borrow()
167 }
168
169 #[must_use]
171 pub fn get_cache(&self) -> Ref<'_, Cache> {
172 self.cache.borrow()
173 }
174
175 pub fn register_catalog(&mut self, catalog: ParquetDataCatalog, name: Option<String>) {
181 let name = Ustr::from(&name.unwrap_or("catalog_0".to_string()));
182
183 check_key_not_in_map(&name, &self.catalogs, "name", "catalogs").expect(FAILED);
184
185 self.catalogs.insert(name, catalog);
186 log::info!("Registered catalog <{name}>");
187 }
188
189 pub fn register_client(&mut self, client: DataClientAdapter, routing: Option<Venue>) {
196 let client_id = client.client_id();
197
198 if let Some(default_client) = &self.default_client {
199 check_predicate_false(
200 default_client.client_id() == client.client_id(),
201 "client_id already registered as default client",
202 )
203 .expect(FAILED);
204 }
205
206 check_key_not_in_map(&client_id, &self.clients, "client_id", "clients").expect(FAILED);
207
208 if let Some(routing) = routing {
209 self.routing_map.insert(routing, client_id);
210 log::info!("Set client {client_id} routing for {routing}");
211 }
212
213 if client.venue.is_none() && self.default_client.is_none() {
214 self.default_client = Some(client);
215 log::info!("Registered client {client_id} for default routing");
216 } else {
217 self.clients.insert(client_id, client);
218 log::info!("Registered client {client_id}");
219 }
220 }
221
222 pub fn deregister_client(&mut self, client_id: &ClientId) {
228 check_key_in_map(client_id, &self.clients, "client_id", "clients").expect(FAILED);
229
230 self.clients.shift_remove(client_id);
231 log::info!("Deregistered client {client_id}");
232 }
233
234 pub fn register_default_client(&mut self, client: DataClientAdapter) {
246 check_predicate_true(
247 self.default_client.is_none(),
248 "default client already registered",
249 )
250 .expect(FAILED);
251
252 let client_id = client.client_id();
253
254 self.default_client = Some(client);
255 log::info!("Registered default client {client_id}");
256 }
257
258 pub fn start(&mut self) {
260 for client in self.get_clients_mut() {
261 if let Err(e) = client.start() {
262 log::error!("{e}");
263 }
264 }
265 }
266
267 pub fn stop(&mut self) {
269 for client in self.get_clients_mut() {
270 if let Err(e) = client.stop() {
271 log::error!("{e}");
272 }
273 }
274 }
275
276 pub fn reset(&mut self) {
278 for client in self.get_clients_mut() {
279 if let Err(e) = client.reset() {
280 log::error!("{e}");
281 }
282 }
283 }
284
285 pub fn dispose(&mut self) {
287 for client in self.get_clients_mut() {
288 if let Err(e) = client.dispose() {
289 log::error!("{e}");
290 }
291 }
292
293 self.clock.borrow_mut().cancel_timers();
294 }
295
296 #[must_use]
298 pub fn check_connected(&self) -> bool {
299 self.get_clients()
300 .iter()
301 .all(|client| client.is_connected())
302 }
303
304 #[must_use]
306 pub fn check_disconnected(&self) -> bool {
307 self.get_clients()
308 .iter()
309 .all(|client| !client.is_connected())
310 }
311
312 #[must_use]
314 pub fn registered_clients(&self) -> Vec<ClientId> {
315 self.get_clients()
316 .into_iter()
317 .map(|client| client.client_id())
318 .collect()
319 }
320
321 fn collect_subscriptions<F, T>(&self, get_subs: F) -> Vec<T>
324 where
325 F: Fn(&DataClientAdapter) -> &AHashSet<T>,
326 T: Clone,
327 {
328 self.get_clients()
329 .into_iter()
330 .flat_map(get_subs)
331 .cloned()
332 .collect()
333 }
334
335 #[must_use]
336 pub fn get_clients(&self) -> Vec<&DataClientAdapter> {
337 let (default_opt, clients_map) = (&self.default_client, &self.clients);
338 let mut clients: Vec<&DataClientAdapter> = clients_map.values().collect();
339
340 if let Some(default) = default_opt {
341 clients.push(default);
342 }
343
344 clients
345 }
346
347 #[must_use]
348 pub fn get_clients_mut(&mut self) -> Vec<&mut DataClientAdapter> {
349 let (default_opt, clients_map) = (&mut self.default_client, &mut self.clients);
350 let mut clients: Vec<&mut DataClientAdapter> = clients_map.values_mut().collect();
351
352 if let Some(default) = default_opt {
353 clients.push(default);
354 }
355
356 clients
357 }
358
359 pub fn get_client(
360 &mut self,
361 client_id: Option<&ClientId>,
362 venue: Option<&Venue>,
363 ) -> Option<&mut DataClientAdapter> {
364 if let Some(client_id) = client_id {
365 if let Some(client) = self.clients.get_mut(client_id) {
367 return Some(client);
368 }
369
370 if let Some(default) = self.default_client.as_mut()
372 && default.client_id() == *client_id
373 {
374 return Some(default);
375 }
376
377 return None;
379 }
380
381 if let Some(v) = venue {
382 if let Some(client_id) = self.routing_map.get(v) {
384 return self.clients.get_mut(client_id);
385 }
386 }
387
388 self.get_default_client()
390 }
391
392 const fn get_default_client(&mut self) -> Option<&mut DataClientAdapter> {
393 self.default_client.as_mut()
394 }
395
396 #[must_use]
398 pub fn subscribed_custom_data(&self) -> Vec<DataType> {
399 self.collect_subscriptions(|client| &client.subscriptions_custom)
400 }
401
402 #[must_use]
404 pub fn subscribed_instruments(&self) -> Vec<InstrumentId> {
405 self.collect_subscriptions(|client| &client.subscriptions_instrument)
406 }
407
408 #[must_use]
410 pub fn subscribed_book_deltas(&self) -> Vec<InstrumentId> {
411 self.collect_subscriptions(|client| &client.subscriptions_book_deltas)
412 }
413
414 #[must_use]
416 pub fn subscribed_book_snapshots(&self) -> Vec<InstrumentId> {
417 self.collect_subscriptions(|client| &client.subscriptions_book_snapshots)
418 }
419
420 #[must_use]
422 pub fn subscribed_quotes(&self) -> Vec<InstrumentId> {
423 self.collect_subscriptions(|client| &client.subscriptions_quotes)
424 }
425
426 #[must_use]
428 pub fn subscribed_trades(&self) -> Vec<InstrumentId> {
429 self.collect_subscriptions(|client| &client.subscriptions_trades)
430 }
431
432 #[must_use]
434 pub fn subscribed_bars(&self) -> Vec<BarType> {
435 self.collect_subscriptions(|client| &client.subscriptions_bars)
436 }
437
438 #[must_use]
440 pub fn subscribed_mark_prices(&self) -> Vec<InstrumentId> {
441 self.collect_subscriptions(|client| &client.subscriptions_mark_prices)
442 }
443
444 #[must_use]
446 pub fn subscribed_index_prices(&self) -> Vec<InstrumentId> {
447 self.collect_subscriptions(|client| &client.subscriptions_index_prices)
448 }
449
450 #[must_use]
452 pub fn subscribed_funding_rates(&self) -> Vec<InstrumentId> {
453 self.collect_subscriptions(|client| &client.subscriptions_funding_rates)
454 }
455
456 #[must_use]
458 pub fn subscribed_instrument_status(&self) -> Vec<InstrumentId> {
459 self.collect_subscriptions(|client| &client.subscriptions_instrument_status)
460 }
461
462 #[must_use]
464 pub fn subscribed_instrument_close(&self) -> Vec<InstrumentId> {
465 self.collect_subscriptions(|client| &client.subscriptions_instrument_close)
466 }
467
468 #[cfg(feature = "defi")]
469 #[must_use]
471 pub fn subscribed_blocks(&self) -> Vec<Blockchain> {
472 self.collect_subscriptions(|client| &client.subscriptions_blocks)
473 }
474
475 #[cfg(feature = "defi")]
476 #[must_use]
478 pub fn subscribed_pools(&self) -> Vec<InstrumentId> {
479 self.collect_subscriptions(|client| &client.subscriptions_pools)
480 }
481
482 #[cfg(feature = "defi")]
483 #[must_use]
485 pub fn subscribed_pool_swaps(&self) -> Vec<InstrumentId> {
486 self.collect_subscriptions(|client| &client.subscriptions_pool_swaps)
487 }
488
489 #[cfg(feature = "defi")]
490 #[must_use]
492 pub fn subscribed_pool_liquidity_updates(&self) -> Vec<InstrumentId> {
493 self.collect_subscriptions(|client| &client.subscriptions_pool_liquidity_updates)
494 }
495
496 pub fn execute(&mut self, cmd: &DataCommand) {
502 if let Err(e) = match cmd {
503 DataCommand::Subscribe(c) => self.execute_subscribe(c),
504 DataCommand::Unsubscribe(c) => self.execute_unsubscribe(c),
505 DataCommand::Request(c) => self.execute_request(c),
506 #[cfg(feature = "defi")]
507 DataCommand::DefiSubscribe(c) => self.execute_defi_subscribe(c),
508 #[cfg(feature = "defi")]
509 DataCommand::DefiUnsubscribe(c) => self.execute_defi_unsubscribe(c),
510 _ => {
511 log::warn!("Unhandled DataCommand variant: {cmd:?}");
512 Ok(())
513 }
514 } {
515 log::error!("{e}");
516 }
517 }
518
519 pub fn execute_subscribe(&mut self, cmd: &SubscribeCommand) -> anyhow::Result<()> {
526 match &cmd {
528 SubscribeCommand::BookDeltas(cmd) => self.subscribe_book_deltas(cmd)?,
529 SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd)?,
530 SubscribeCommand::BookSnapshots(cmd) => self.subscribe_book_snapshots(cmd)?,
531 SubscribeCommand::Bars(cmd) => self.subscribe_bars(cmd)?,
532 _ => {} }
534
535 if let Some(client_id) = cmd.client_id()
536 && self.external_clients.contains(client_id)
537 {
538 if self.config.debug {
539 log::debug!("Skipping subscribe command for external client {client_id}: {cmd:?}",);
540 }
541 return Ok(());
542 }
543
544 if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
545 client.execute_subscribe(cmd);
546 } else {
547 log::error!(
548 "Cannot handle command: no client found for client_id={:?}, venue={:?}",
549 cmd.client_id(),
550 cmd.venue(),
551 );
552 }
553
554 Ok(())
555 }
556
557 #[cfg(feature = "defi")]
558 pub fn execute_defi_subscribe(&mut self, cmd: &DefiSubscribeCommand) -> anyhow::Result<()> {
565 if let Some(client_id) = cmd.client_id()
566 && self.external_clients.contains(client_id)
567 {
568 if self.config.debug {
569 log::debug!("Skipping defi subscribe for external client {client_id}: {cmd:?}",);
570 }
571 return Ok(());
572 }
573
574 match cmd {
575 DefiSubscribeCommand::Pool(cmd) => self.setup_pool_updater(&cmd.instrument_id),
576 DefiSubscribeCommand::PoolSwaps(cmd) => self.setup_pool_updater(&cmd.instrument_id),
577 DefiSubscribeCommand::PoolLiquidityUpdates(cmd) => {
578 self.setup_pool_updater(&cmd.instrument_id);
579 }
580 _ => {}
581 }
582
583 if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
585 client.execute_defi_subscribe(cmd);
586 } else {
587 log::error!(
588 "Cannot handle command: no client found for client_id={:?}, venue={:?}",
589 cmd.client_id(),
590 cmd.venue(),
591 );
592 }
593
594 Ok(())
595 }
596
597 pub fn execute_unsubscribe(&mut self, cmd: &UnsubscribeCommand) -> anyhow::Result<()> {
603 match &cmd {
604 UnsubscribeCommand::BookDeltas(cmd) => self.unsubscribe_book_deltas(cmd)?,
605 UnsubscribeCommand::BookDepth10(cmd) => self.unsubscribe_book_depth10(cmd)?,
606 UnsubscribeCommand::BookSnapshots(cmd) => self.unsubscribe_book_snapshots(cmd)?,
607 UnsubscribeCommand::Bars(cmd) => self.unsubscribe_bars(cmd)?,
608 _ => {} }
610
611 if let Some(client_id) = cmd.client_id()
612 && self.external_clients.contains(client_id)
613 {
614 if self.config.debug {
615 log::debug!(
616 "Skipping unsubscribe command for external client {client_id}: {cmd:?}",
617 );
618 }
619 return Ok(());
620 }
621
622 if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
623 client.execute_unsubscribe(cmd);
624 } else {
625 log::error!(
626 "Cannot handle command: no client found for client_id={:?}, venue={:?}",
627 cmd.client_id(),
628 cmd.venue(),
629 );
630 }
631
632 Ok(())
633 }
634
635 #[cfg(feature = "defi")]
636 pub fn execute_defi_unsubscribe(&mut self, cmd: &DefiUnsubscribeCommand) -> anyhow::Result<()> {
642 if let Some(client_id) = cmd.client_id()
643 && self.external_clients.contains(client_id)
644 {
645 if self.config.debug {
646 log::debug!("Skipping defi unsubscribe for external client {client_id}: {cmd:?}",);
647 }
648 return Ok(());
649 }
650
651 if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
652 client.execute_defi_unsubscribe(cmd);
653 } else {
654 log::error!(
655 "Cannot handle command: no client found for client_id={:?}, venue={:?}",
656 cmd.client_id(),
657 cmd.venue(),
658 );
659 }
660
661 Ok(())
662 }
663
664 pub fn execute_request(&mut self, req: &RequestCommand) -> anyhow::Result<()> {
671 if let Some(cid) = req.client_id()
673 && self.external_clients.contains(cid)
674 {
675 if self.config.debug {
676 log::debug!("Skipping data request for external client {cid}: {req:?}");
677 }
678 return Ok(());
679 }
680 if let Some(client) = self.get_client(req.client_id(), req.venue()) {
681 match req {
682 RequestCommand::Data(req) => client.request_data(req),
683 RequestCommand::Instrument(req) => client.request_instrument(req),
684 RequestCommand::Instruments(req) => client.request_instruments(req),
685 RequestCommand::BookSnapshot(req) => client.request_book_snapshot(req),
686 RequestCommand::BookDepth(req) => client.request_book_depth(req),
687 RequestCommand::Quotes(req) => client.request_quotes(req),
688 RequestCommand::Trades(req) => client.request_trades(req),
689 RequestCommand::Bars(req) => client.request_bars(req),
690 }
691 } else {
692 anyhow::bail!(
693 "Cannot handle request: no client found for {:?} {:?}",
694 req.client_id(),
695 req.venue()
696 );
697 }
698 }
699
700 pub fn process(&mut self, data: &dyn Any) {
704 if let Some(data) = data.downcast_ref::<Data>() {
706 self.process_data(data.clone()); return;
708 }
709
710 #[cfg(feature = "defi")]
711 if let Some(data) = data.downcast_ref::<DefiData>() {
712 self.process_defi_data(data.clone()); return;
714 }
715
716 if let Some(instrument) = data.downcast_ref::<InstrumentAny>() {
717 self.handle_instrument(instrument.clone());
718 } else if let Some(funding_rate) = data.downcast_ref::<FundingRateUpdate>() {
719 self.handle_funding_rate(*funding_rate);
720 } else {
721 log::error!("Cannot process data {data:?}, type is unrecognized");
722 }
723 }
724
725 pub fn process_data(&mut self, data: Data) {
727 match data {
728 Data::Delta(delta) => self.handle_delta(delta),
729 Data::Deltas(deltas) => self.handle_deltas(deltas.into_inner()),
730 Data::Depth10(depth) => self.handle_depth10(*depth),
731 Data::Quote(quote) => self.handle_quote(quote),
732 Data::Trade(trade) => self.handle_trade(trade),
733 Data::Bar(bar) => self.handle_bar(bar),
734 Data::MarkPriceUpdate(mark_price) => self.handle_mark_price(mark_price),
735 Data::IndexPriceUpdate(index_price) => self.handle_index_price(index_price),
736 Data::InstrumentClose(close) => self.handle_instrument_close(close),
737 }
738 }
739
740 #[cfg(feature = "defi")]
742 pub fn process_defi_data(&mut self, data: DefiData) {
743 match data {
744 DefiData::Block(block) => {
745 let topic = switchboard::get_defi_blocks_topic(block.chain());
746 msgbus::publish(topic, &block as &dyn Any);
747 }
748 DefiData::Pool(pool) => {
749 if let Err(err) = self.cache.borrow_mut().add_pool(pool.clone()) {
750 log::error!("Failed to add Pool to cache: {err}");
751 }
752
753 let topic = switchboard::get_defi_pool_topic(pool.instrument_id);
754 msgbus::publish(topic, &pool as &dyn Any);
755 }
756 DefiData::PoolSwap(swap) => {
757 let topic = switchboard::get_defi_pool_swaps_topic(swap.instrument_id);
758 msgbus::publish(topic, &swap as &dyn Any);
759 }
760 DefiData::PoolLiquidityUpdate(update) => {
761 let topic = switchboard::get_defi_liquidity_topic(update.instrument_id);
762 msgbus::publish(topic, &update as &dyn Any);
763 }
764 DefiData::PoolFeeCollect(collect) => {
765 let topic = switchboard::get_defi_collect_topic(collect.instrument_id);
766 msgbus::publish(topic, &collect as &dyn Any);
767 }
768 }
769 }
770
771 pub fn response(&self, resp: DataResponse) {
773 log::debug!("{RECV}{RES} {resp:?}");
774
775 match &resp {
776 DataResponse::Instrument(resp) => {
777 self.handle_instrument_response(resp.data.clone());
778 }
779 DataResponse::Instruments(resp) => {
780 self.handle_instruments(&resp.data);
781 }
782 DataResponse::Quotes(resp) => self.handle_quotes(&resp.data),
783 DataResponse::Trades(resp) => self.handle_trades(&resp.data),
784 DataResponse::Bars(resp) => self.handle_bars(&resp.data),
785 _ => todo!(),
786 }
787
788 msgbus::send_response(resp.correlation_id(), &resp);
789 }
790
791 fn handle_instrument(&mut self, instrument: InstrumentAny) {
794 if let Err(e) = self
795 .cache
796 .as_ref()
797 .borrow_mut()
798 .add_instrument(instrument.clone())
799 {
800 log_error_on_cache_insert(&e);
801 }
802
803 let topic = switchboard::get_instrument_topic(instrument.id());
804 msgbus::publish(topic, &instrument as &dyn Any);
805 }
806
807 fn handle_delta(&mut self, delta: OrderBookDelta) {
808 let deltas = if self.config.buffer_deltas {
809 if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&delta.instrument_id) {
810 buffered_deltas.deltas.push(delta);
811 } else {
812 let buffered_deltas = OrderBookDeltas::new(delta.instrument_id, vec![delta]);
813 self.buffered_deltas_map
814 .insert(delta.instrument_id, buffered_deltas);
815 }
816
817 if !RecordFlag::F_LAST.matches(delta.flags) {
818 return; }
820
821 self.buffered_deltas_map
823 .remove(&delta.instrument_id)
824 .unwrap()
825 } else {
826 OrderBookDeltas::new(delta.instrument_id, vec![delta])
827 };
828
829 let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
830 msgbus::publish(topic, &deltas as &dyn Any);
831 }
832
833 fn handle_deltas(&mut self, deltas: OrderBookDeltas) {
834 let deltas = if self.config.buffer_deltas {
835 let mut is_last_delta = false;
836 for delta in &deltas.deltas {
837 if RecordFlag::F_LAST.matches(delta.flags) {
838 is_last_delta = true;
839 break;
840 }
841 }
842
843 let instrument_id = deltas.instrument_id;
844
845 if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&instrument_id) {
846 buffered_deltas.deltas.extend(deltas.deltas);
847 } else {
848 self.buffered_deltas_map.insert(instrument_id, deltas);
849 }
850
851 if !is_last_delta {
852 return;
853 }
854
855 self.buffered_deltas_map.remove(&instrument_id).unwrap()
857 } else {
858 deltas
859 };
860
861 let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
862 msgbus::publish(topic, &deltas as &dyn Any);
863 }
864
865 fn handle_depth10(&mut self, depth: OrderBookDepth10) {
866 let topic = switchboard::get_book_depth10_topic(depth.instrument_id);
867 msgbus::publish(topic, &depth as &dyn Any);
868 }
869
870 fn handle_quote(&mut self, quote: QuoteTick) {
871 if let Err(e) = self.cache.as_ref().borrow_mut().add_quote(quote) {
872 log_error_on_cache_insert(&e);
873 }
874
875 let topic = switchboard::get_quotes_topic(quote.instrument_id);
878 msgbus::publish(topic, "e as &dyn Any);
879 }
880
881 fn handle_trade(&mut self, trade: TradeTick) {
882 if let Err(e) = self.cache.as_ref().borrow_mut().add_trade(trade) {
883 log_error_on_cache_insert(&e);
884 }
885
886 let topic = switchboard::get_trades_topic(trade.instrument_id);
889 msgbus::publish(topic, &trade as &dyn Any);
890 }
891
892 fn handle_bar(&mut self, bar: Bar) {
893 if self.config.validate_data_sequence
895 && let Some(last_bar) = self.cache.as_ref().borrow().bar(&bar.bar_type)
896 {
897 if bar.ts_event < last_bar.ts_event {
898 log::warn!(
899 "Bar {bar} was prior to last bar `ts_event` {}",
900 last_bar.ts_event
901 );
902 return; }
904 if bar.ts_init < last_bar.ts_init {
905 log::warn!(
906 "Bar {bar} was prior to last bar `ts_init` {}",
907 last_bar.ts_init
908 );
909 return; }
911 }
913
914 if let Err(e) = self.cache.as_ref().borrow_mut().add_bar(bar) {
915 log_error_on_cache_insert(&e);
916 }
917
918 let topic = switchboard::get_bars_topic(bar.bar_type);
919 msgbus::publish(topic, &bar as &dyn Any);
920 }
921
922 fn handle_mark_price(&mut self, mark_price: MarkPriceUpdate) {
923 if let Err(e) = self.cache.as_ref().borrow_mut().add_mark_price(mark_price) {
924 log_error_on_cache_insert(&e);
925 }
926
927 let topic = switchboard::get_mark_price_topic(mark_price.instrument_id);
928 msgbus::publish(topic, &mark_price as &dyn Any);
929 }
930
931 fn handle_index_price(&mut self, index_price: IndexPriceUpdate) {
932 if let Err(e) = self
933 .cache
934 .as_ref()
935 .borrow_mut()
936 .add_index_price(index_price)
937 {
938 log_error_on_cache_insert(&e);
939 }
940
941 let topic = switchboard::get_index_price_topic(index_price.instrument_id);
942 msgbus::publish(topic, &index_price as &dyn Any);
943 }
944
945 pub fn handle_funding_rate(&mut self, funding_rate: FundingRateUpdate) {
947 if let Err(e) = self
948 .cache
949 .as_ref()
950 .borrow_mut()
951 .add_funding_rate(funding_rate)
952 {
953 log_error_on_cache_insert(&e);
954 }
955
956 let topic = switchboard::get_funding_rate_topic(funding_rate.instrument_id);
957 msgbus::publish(topic, &funding_rate as &dyn Any);
958 }
959
960 fn handle_instrument_close(&mut self, close: InstrumentClose) {
961 let topic = switchboard::get_instrument_close_topic(close.instrument_id);
962 msgbus::publish(topic, &close as &dyn Any);
963 }
964
965 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
968 if cmd.instrument_id.is_synthetic() {
969 anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
970 }
971
972 self.setup_order_book(&cmd.instrument_id, cmd.book_type, true, cmd.managed)?;
973
974 Ok(())
975 }
976
977 fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
978 if cmd.instrument_id.is_synthetic() {
979 anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDepth10` data");
980 }
981
982 self.setup_order_book(&cmd.instrument_id, cmd.book_type, false, cmd.managed)?;
983
984 Ok(())
985 }
986
987 fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
988 if self.subscribed_book_deltas().contains(&cmd.instrument_id) {
989 return Ok(());
990 }
991
992 if cmd.instrument_id.is_synthetic() {
993 anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
994 }
995
996 let first_for_interval = match self.book_intervals.entry(cmd.interval_ms) {
998 Entry::Vacant(e) => {
999 let mut set = AHashSet::new();
1000 set.insert(cmd.instrument_id);
1001 e.insert(set);
1002 true
1003 }
1004 Entry::Occupied(mut e) => {
1005 e.get_mut().insert(cmd.instrument_id);
1006 false
1007 }
1008 };
1009
1010 if first_for_interval {
1011 let interval_ns = millis_to_nanos(cmd.interval_ms.get() as f64);
1013 let topic = switchboard::get_book_snapshots_topic(cmd.instrument_id, cmd.interval_ms);
1014
1015 let snap_info = BookSnapshotInfo {
1016 instrument_id: cmd.instrument_id,
1017 venue: cmd.instrument_id.venue,
1018 is_composite: cmd.instrument_id.symbol.is_composite(),
1019 root: Ustr::from(cmd.instrument_id.symbol.root()),
1020 topic,
1021 interval_ms: cmd.interval_ms,
1022 };
1023
1024 let now_ns = self.clock.borrow().timestamp_ns().as_u64();
1026 let start_time_ns = now_ns - (now_ns % interval_ns) + interval_ns;
1027
1028 let snapshotter = Rc::new(BookSnapshotter::new(snap_info, self.cache.clone()));
1029 self.book_snapshotters
1030 .insert(cmd.instrument_id, snapshotter.clone());
1031 let timer_name = snapshotter.timer_name;
1032
1033 let callback =
1034 TimeEventCallback::Rust(Rc::new(move |event| snapshotter.snapshot(event)));
1035
1036 self.clock
1037 .borrow_mut()
1038 .set_timer_ns(
1039 &timer_name,
1040 interval_ns,
1041 Some(start_time_ns.into()),
1042 None,
1043 Some(callback),
1044 None,
1045 None,
1046 )
1047 .expect(FAILED);
1048 }
1049
1050 self.setup_order_book(&cmd.instrument_id, cmd.book_type, false, true)?;
1051
1052 Ok(())
1053 }
1054
1055 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
1056 match cmd.bar_type.aggregation_source() {
1057 AggregationSource::Internal => {
1058 if !self.bar_aggregators.contains_key(&cmd.bar_type.standard()) {
1059 self.start_bar_aggregator(cmd.bar_type)?;
1060 }
1061 }
1062 AggregationSource::External => {
1063 if cmd.bar_type.instrument_id().is_synthetic() {
1064 anyhow::bail!(
1065 "Cannot subscribe for externally aggregated synthetic instrument bar data"
1066 );
1067 }
1068 }
1069 }
1070
1071 Ok(())
1072 }
1073
1074 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
1075 if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
1076 log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
1077 return Ok(());
1078 }
1079
1080 let topics = vec![
1081 switchboard::get_book_deltas_topic(cmd.instrument_id),
1082 switchboard::get_book_depth10_topic(cmd.instrument_id),
1083 ];
1085
1086 self.maintain_book_updater(&cmd.instrument_id, &topics);
1087 self.maintain_book_snapshotter(&cmd.instrument_id);
1088
1089 Ok(())
1090 }
1091
1092 fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
1093 if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
1094 log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
1095 return Ok(());
1096 }
1097
1098 let topics = vec![
1099 switchboard::get_book_deltas_topic(cmd.instrument_id),
1100 switchboard::get_book_depth10_topic(cmd.instrument_id),
1101 ];
1103
1104 self.maintain_book_updater(&cmd.instrument_id, &topics);
1105 self.maintain_book_snapshotter(&cmd.instrument_id);
1106
1107 Ok(())
1108 }
1109
1110 fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
1111 if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
1112 log::warn!("Cannot unsubscribe from `OrderBook` snapshots: not subscribed");
1113 return Ok(());
1114 }
1115
1116 let mut to_remove = Vec::new();
1118 for (interval, set) in &mut self.book_intervals {
1119 if set.remove(&cmd.instrument_id) && set.is_empty() {
1120 to_remove.push(*interval);
1121 }
1122 }
1123
1124 for interval in to_remove {
1125 self.book_intervals.remove(&interval);
1126 }
1127
1128 let topics = vec![
1129 switchboard::get_book_deltas_topic(cmd.instrument_id),
1130 switchboard::get_book_depth10_topic(cmd.instrument_id),
1131 ];
1133
1134 self.maintain_book_updater(&cmd.instrument_id, &topics);
1135 self.maintain_book_snapshotter(&cmd.instrument_id);
1136
1137 Ok(())
1138 }
1139
1140 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
1142 let bar_type = cmd.bar_type;
1144 if self.bar_aggregators.contains_key(&bar_type.standard()) {
1145 if let Err(err) = self.stop_bar_aggregator(bar_type) {
1146 log::error!("Error stopping bar aggregator for {bar_type}: {err}");
1147 }
1148 self.bar_aggregators.remove(&bar_type.standard());
1149 log::debug!("Removed bar aggregator for {bar_type}");
1150 }
1151 Ok(())
1152 }
1153
1154 fn maintain_book_updater(&mut self, instrument_id: &InstrumentId, topics: &[MStr<Topic>]) {
1155 if let Some(updater) = self.book_updaters.get(instrument_id) {
1156 let handler = ShareableMessageHandler(updater.clone());
1157
1158 for topic in topics {
1160 if msgbus::subscriptions_count(topic.as_str()) == 1
1161 && msgbus::is_subscribed(topic.as_str(), handler.clone())
1162 {
1163 log::debug!("Unsubscribing BookUpdater from {topic}");
1164 msgbus::unsubscribe_topic(*topic, handler.clone());
1165 }
1166 }
1167
1168 let still_subscribed = topics
1170 .iter()
1171 .any(|topic| msgbus::is_subscribed(topic.as_str(), handler.clone()));
1172 if !still_subscribed {
1173 self.book_updaters.remove(instrument_id);
1174 log::debug!("Removed BookUpdater for instrument ID {instrument_id}");
1175 }
1176 }
1177 }
1178
1179 fn maintain_book_snapshotter(&mut self, instrument_id: &InstrumentId) {
1180 if let Some(snapshotter) = self.book_snapshotters.get(instrument_id) {
1181 let topic = switchboard::get_book_snapshots_topic(
1182 *instrument_id,
1183 snapshotter.snap_info.interval_ms,
1184 );
1185
1186 if msgbus::subscriptions_count(topic.as_str()) == 0 {
1188 let timer_name = snapshotter.timer_name;
1189 self.book_snapshotters.remove(instrument_id);
1190 let mut clock = self.clock.borrow_mut();
1191 if clock.timer_names().contains(&timer_name.as_str()) {
1192 clock.cancel_timer(&timer_name);
1193 }
1194 log::debug!("Removed BookSnapshotter for instrument ID {instrument_id}");
1195 }
1196 }
1197 }
1198
1199 fn handle_instrument_response(&self, instrument: InstrumentAny) {
1202 let mut cache = self.cache.as_ref().borrow_mut();
1203 if let Err(e) = cache.add_instrument(instrument) {
1204 log_error_on_cache_insert(&e);
1205 }
1206 }
1207
1208 fn handle_instruments(&self, instruments: &[InstrumentAny]) {
1209 let mut cache = self.cache.as_ref().borrow_mut();
1211 for instrument in instruments {
1212 if let Err(e) = cache.add_instrument(instrument.clone()) {
1213 log_error_on_cache_insert(&e);
1214 }
1215 }
1216 }
1217
1218 fn handle_quotes(&self, quotes: &[QuoteTick]) {
1219 if let Err(e) = self.cache.as_ref().borrow_mut().add_quotes(quotes) {
1220 log_error_on_cache_insert(&e);
1221 }
1222 }
1223
1224 fn handle_trades(&self, trades: &[TradeTick]) {
1225 if let Err(e) = self.cache.as_ref().borrow_mut().add_trades(trades) {
1226 log_error_on_cache_insert(&e);
1227 }
1228 }
1229
1230 fn handle_bars(&self, bars: &[Bar]) {
1231 if let Err(e) = self.cache.as_ref().borrow_mut().add_bars(bars) {
1232 log_error_on_cache_insert(&e);
1233 }
1234 }
1235
1236 #[allow(clippy::too_many_arguments)]
1239 fn setup_order_book(
1240 &mut self,
1241 instrument_id: &InstrumentId,
1242 book_type: BookType,
1243 only_deltas: bool,
1244 managed: bool,
1245 ) -> anyhow::Result<()> {
1246 let mut cache = self.cache.borrow_mut();
1247 if managed && !cache.has_order_book(instrument_id) {
1248 let book = OrderBook::new(*instrument_id, book_type);
1249 log::debug!("Created {book}");
1250 cache.add_order_book(book)?;
1251 }
1252
1253 let updater = Rc::new(BookUpdater::new(instrument_id, self.cache.clone()));
1255 self.book_updaters.insert(*instrument_id, updater.clone());
1256
1257 let handler = ShareableMessageHandler(updater);
1258
1259 let topic = switchboard::get_book_deltas_topic(*instrument_id);
1260 if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1261 msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1262 }
1263
1264 let topic = switchboard::get_book_depth10_topic(*instrument_id);
1265 if !only_deltas && !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1266 msgbus::subscribe(topic.into(), handler, Some(self.msgbus_priority));
1267 }
1268
1269 Ok(())
1270 }
1271
1272 #[cfg(feature = "defi")]
1273 fn setup_pool_updater(&mut self, instrument_id: &InstrumentId) {
1274 if self.pool_updaters.contains_key(instrument_id) {
1275 return;
1276 }
1277
1278 let updater = Rc::new(PoolUpdater::new(instrument_id, self.cache.clone()));
1279 let handler = ShareableMessageHandler(updater.clone());
1280
1281 let swap_topic = switchboard::get_defi_pool_swaps_topic(*instrument_id);
1283 if !msgbus::is_subscribed(swap_topic.as_str(), handler.clone()) {
1284 msgbus::subscribe(
1285 swap_topic.into(),
1286 handler.clone(),
1287 Some(self.msgbus_priority),
1288 );
1289 }
1290
1291 let liquidity_topic = switchboard::get_defi_liquidity_topic(*instrument_id);
1292 if !msgbus::is_subscribed(liquidity_topic.as_str(), handler.clone()) {
1293 msgbus::subscribe(liquidity_topic.into(), handler, Some(self.msgbus_priority));
1294 }
1295
1296 self.pool_updaters.insert(*instrument_id, updater);
1297 log::debug!("Created PoolUpdater for instrument ID {instrument_id}");
1298 }
1299
1300 fn create_bar_aggregator(
1301 &mut self,
1302 instrument: &InstrumentAny,
1303 bar_type: BarType,
1304 ) -> Box<dyn BarAggregator> {
1305 let cache = self.cache.clone();
1306
1307 let handler = move |bar: Bar| {
1308 if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
1309 log_error_on_cache_insert(&e);
1310 }
1311
1312 let topic = switchboard::get_bars_topic(bar.bar_type);
1313 msgbus::publish(topic, &bar as &dyn Any);
1314 };
1315
1316 let clock = self.clock.clone();
1317 let config = self.config.clone();
1318
1319 let price_precision = instrument.price_precision();
1320 let size_precision = instrument.size_precision();
1321
1322 if bar_type.spec().is_time_aggregated() {
1323 let time_bars_origin_offset = config
1325 .time_bars_origins
1326 .get(&bar_type.spec().aggregation)
1327 .map(|duration| chrono::TimeDelta::from_std(*duration).unwrap_or_default());
1328
1329 Box::new(TimeBarAggregator::new(
1330 bar_type,
1331 price_precision,
1332 size_precision,
1333 clock,
1334 handler,
1335 false, config.time_bars_build_with_no_updates,
1337 config.time_bars_timestamp_on_close,
1338 config.time_bars_interval_type,
1339 time_bars_origin_offset,
1340 20, false, ))
1343 } else {
1344 match bar_type.spec().aggregation {
1345 BarAggregation::Tick => Box::new(TickBarAggregator::new(
1346 bar_type,
1347 price_precision,
1348 size_precision,
1349 handler,
1350 false,
1351 )) as Box<dyn BarAggregator>,
1352 BarAggregation::Volume => Box::new(VolumeBarAggregator::new(
1353 bar_type,
1354 price_precision,
1355 size_precision,
1356 handler,
1357 false,
1358 )) as Box<dyn BarAggregator>,
1359 BarAggregation::Value => Box::new(ValueBarAggregator::new(
1360 bar_type,
1361 price_precision,
1362 size_precision,
1363 handler,
1364 false,
1365 )) as Box<dyn BarAggregator>,
1366 BarAggregation::Renko => Box::new(RenkoBarAggregator::new(
1367 bar_type,
1368 price_precision,
1369 size_precision,
1370 instrument.price_increment(),
1371 handler,
1372 false,
1373 )) as Box<dyn BarAggregator>,
1374 _ => panic!(
1375 "Cannot create aggregator: {} aggregation not currently supported",
1376 bar_type.spec().aggregation
1377 ),
1378 }
1379 }
1380 }
1381
1382 fn start_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
1383 let instrument = {
1385 let cache = self.cache.borrow();
1386 cache
1387 .instrument(&bar_type.instrument_id())
1388 .ok_or_else(|| {
1389 anyhow::anyhow!(
1390 "Cannot start bar aggregation: no instrument found for {}",
1391 bar_type.instrument_id(),
1392 )
1393 })?
1394 .clone()
1395 };
1396
1397 let bar_key = bar_type.standard();
1399
1400 let aggregator = if let Some(rc) = self.bar_aggregators.get(&bar_key) {
1402 rc.clone()
1403 } else {
1404 let agg = self.create_bar_aggregator(&instrument, bar_type);
1405 let rc = Rc::new(RefCell::new(agg));
1406 self.bar_aggregators.insert(bar_key, rc.clone());
1407 rc
1408 };
1409
1410 let mut handlers = Vec::new();
1412
1413 if bar_type.is_composite() {
1414 let topic = switchboard::get_bars_topic(bar_type.composite());
1415 let handler =
1416 ShareableMessageHandler(Rc::new(BarBarHandler::new(aggregator.clone(), bar_key)));
1417
1418 if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1419 msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1420 }
1421
1422 handlers.push((topic, handler));
1423 } else if bar_type.spec().price_type == PriceType::Last {
1424 let topic = switchboard::get_trades_topic(bar_type.instrument_id());
1425 let handler =
1426 ShareableMessageHandler(Rc::new(BarTradeHandler::new(aggregator.clone(), bar_key)));
1427
1428 if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1429 msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1430 }
1431
1432 handlers.push((topic, handler));
1433 } else {
1434 let topic = switchboard::get_quotes_topic(bar_type.instrument_id());
1435 let handler =
1436 ShareableMessageHandler(Rc::new(BarQuoteHandler::new(aggregator.clone(), bar_key)));
1437
1438 if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1439 msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1440 }
1441
1442 handlers.push((topic, handler));
1443 }
1444
1445 self.bar_aggregator_handlers.insert(bar_key, handlers);
1446 aggregator.borrow_mut().set_is_running(true);
1447
1448 Ok(())
1449 }
1450
1451 fn stop_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
1452 let aggregator = self
1453 .bar_aggregators
1454 .remove(&bar_type.standard())
1455 .ok_or_else(|| {
1456 anyhow::anyhow!("Cannot stop bar aggregator: no aggregator to stop for {bar_type}")
1457 })?;
1458
1459 aggregator.borrow_mut().stop();
1460
1461 let bar_key = bar_type.standard();
1463 if let Some(subs) = self.bar_aggregator_handlers.remove(&bar_key) {
1464 for (topic, handler) in subs {
1465 if msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1466 msgbus::unsubscribe_topic(topic, handler);
1467 }
1468 }
1469 }
1470
1471 Ok(())
1472 }
1473}
1474
1475#[inline(always)]
1476fn log_error_on_cache_insert<T: Display>(e: &T) {
1477 log::error!("Error on cache insert: {e}");
1478}