1pub mod book;
32pub mod config;
33mod handlers;
34
35use std::{
36 any::Any,
37 cell::{Ref, RefCell},
38 collections::hash_map::Entry,
39 fmt::Display,
40 num::NonZeroUsize,
41 rc::Rc,
42};
43
44use ahash::{AHashMap, AHashSet};
45#[cfg(feature = "defi")]
46use alloy_primitives::Address;
47use book::{BookSnapshotInfo, BookSnapshotter, BookUpdater};
48use config::DataEngineConfig;
49use handlers::{BarBarHandler, BarQuoteHandler, BarTradeHandler};
50use indexmap::IndexMap;
51#[cfg(feature = "defi")]
52use nautilus_common::messages::defi::{DefiSubscribeCommand, DefiUnsubscribeCommand};
53use nautilus_common::{
54 cache::Cache,
55 clock::Clock,
56 logging::{RECV, RES},
57 messages::data::{
58 DataCommand, DataResponse, RequestCommand, SubscribeBars, SubscribeBookDeltas,
59 SubscribeBookDepth10, SubscribeBookSnapshots, SubscribeCommand, UnsubscribeBars,
60 UnsubscribeBookDeltas, UnsubscribeBookDepth10, UnsubscribeBookSnapshots,
61 UnsubscribeCommand,
62 },
63 msgbus::{self, MStr, Topic, handler::ShareableMessageHandler, switchboard},
64 timer::TimeEventCallback,
65};
66use nautilus_core::{
67 correctness::{
68 FAILED, check_key_in_map, check_key_not_in_map, check_predicate_false, check_predicate_true,
69 },
70 datetime::millis_to_nanos,
71};
72#[cfg(feature = "defi")]
73use nautilus_model::defi::Blockchain;
74#[cfg(feature = "defi")]
75use nautilus_model::defi::DefiData;
76use nautilus_model::{
77 data::{
78 Bar, BarType, Data, DataType, OrderBookDelta, OrderBookDeltas, OrderBookDepth10, QuoteTick,
79 TradeTick,
80 close::InstrumentClose,
81 prices::{IndexPriceUpdate, MarkPriceUpdate},
82 },
83 enums::{AggregationSource, BarAggregation, BookType, PriceType, RecordFlag},
84 identifiers::{ClientId, InstrumentId, Venue},
85 instruments::{Instrument, InstrumentAny, SyntheticInstrument},
86 orderbook::OrderBook,
87};
88use nautilus_persistence::backend::catalog::ParquetDataCatalog;
89use ustr::Ustr;
90
91use crate::{
92 aggregation::{
93 BarAggregator, TickBarAggregator, TimeBarAggregator, ValueBarAggregator,
94 VolumeBarAggregator,
95 },
96 client::DataClientAdapter,
97};
98
99#[derive(Debug)]
101pub struct DataEngine {
102 clock: Rc<RefCell<dyn Clock>>,
103 cache: Rc<RefCell<Cache>>,
104 clients: IndexMap<ClientId, DataClientAdapter>,
105 default_client: Option<DataClientAdapter>,
106 external_clients: AHashSet<ClientId>,
107 catalogs: AHashMap<Ustr, ParquetDataCatalog>,
108 routing_map: IndexMap<Venue, ClientId>,
109 book_intervals: AHashMap<NonZeroUsize, AHashSet<InstrumentId>>,
110 book_updaters: AHashMap<InstrumentId, Rc<BookUpdater>>,
111 book_snapshotters: AHashMap<InstrumentId, Rc<BookSnapshotter>>,
112 bar_aggregators: AHashMap<BarType, Rc<RefCell<Box<dyn BarAggregator>>>>,
113 bar_aggregator_handlers: AHashMap<BarType, Vec<(MStr<Topic>, ShareableMessageHandler)>>,
114 _synthetic_quote_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
115 _synthetic_trade_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
116 buffered_deltas_map: AHashMap<InstrumentId, OrderBookDeltas>,
117 msgbus_priority: u8,
118 config: DataEngineConfig,
119}
120
121impl DataEngine {
122 #[must_use]
124 pub fn new(
125 clock: Rc<RefCell<dyn Clock>>,
126 cache: Rc<RefCell<Cache>>,
127 config: Option<DataEngineConfig>,
128 ) -> Self {
129 let config = config.unwrap_or_default();
130
131 let external_clients: AHashSet<ClientId> = config
132 .external_clients
133 .clone()
134 .unwrap_or_default()
135 .into_iter()
136 .collect();
137
138 Self {
139 clock,
140 cache,
141 clients: IndexMap::new(),
142 default_client: None,
143 external_clients,
144 catalogs: AHashMap::new(),
145 routing_map: IndexMap::new(),
146 book_intervals: AHashMap::new(),
147 book_updaters: AHashMap::new(),
148 book_snapshotters: AHashMap::new(),
149 bar_aggregators: AHashMap::new(),
150 bar_aggregator_handlers: AHashMap::new(),
151 _synthetic_quote_feeds: AHashMap::new(),
152 _synthetic_trade_feeds: AHashMap::new(),
153 buffered_deltas_map: AHashMap::new(),
154 msgbus_priority: 10, config,
156 }
157 }
158
159 #[must_use]
161 pub fn get_clock(&self) -> Ref<'_, dyn Clock> {
162 self.clock.borrow()
163 }
164
165 #[must_use]
167 pub fn get_cache(&self) -> Ref<'_, Cache> {
168 self.cache.borrow()
169 }
170
171 pub fn register_catalog(&mut self, catalog: ParquetDataCatalog, name: Option<String>) {
177 let name = Ustr::from(&name.unwrap_or("catalog_0".to_string()));
178
179 check_key_not_in_map(&name, &self.catalogs, "name", "catalogs").expect(FAILED);
180
181 self.catalogs.insert(name, catalog);
182 log::info!("Registered catalog <{name}>");
183 }
184
185 pub fn register_client(&mut self, client: DataClientAdapter, routing: Option<Venue>) {
192 let client_id = client.client_id();
193
194 if let Some(default_client) = &self.default_client {
195 check_predicate_false(
196 default_client.client_id() == client.client_id(),
197 "client_id already registered as default client",
198 )
199 .expect(FAILED);
200 }
201
202 check_key_not_in_map(&client_id, &self.clients, "client_id", "clients").expect(FAILED);
203
204 if let Some(routing) = routing {
205 self.routing_map.insert(routing, client_id);
206 log::info!("Set client {client_id} routing for {routing}");
207 }
208
209 if client.venue.is_none() && self.default_client.is_none() {
210 self.default_client = Some(client);
211 log::info!("Registered client {client_id} for default routing");
212 } else {
213 self.clients.insert(client_id, client);
214 log::info!("Registered client {client_id}");
215 }
216 }
217
218 pub fn deregister_client(&mut self, client_id: &ClientId) {
224 check_key_in_map(client_id, &self.clients, "client_id", "clients").expect(FAILED);
225
226 self.clients.shift_remove(client_id);
227 log::info!("Deregistered client {client_id}");
228 }
229
230 pub fn register_default_client(&mut self, client: DataClientAdapter) {
242 check_predicate_true(
243 self.default_client.is_none(),
244 "default client already registered",
245 )
246 .expect(FAILED);
247
248 let client_id = client.client_id();
249
250 self.default_client = Some(client);
251 log::info!("Registered default client {client_id}");
252 }
253
254 pub fn start(&mut self) {
256 for client in self.get_clients_mut() {
257 if let Err(e) = client.start() {
258 log::error!("{e}");
259 }
260 }
261 }
262
263 pub fn stop(&mut self) {
265 for client in self.get_clients_mut() {
266 if let Err(e) = client.stop() {
267 log::error!("{e}");
268 }
269 }
270 }
271
272 pub fn reset(&mut self) {
274 for client in self.get_clients_mut() {
275 if let Err(e) = client.reset() {
276 log::error!("{e}");
277 }
278 }
279 }
280
281 pub fn dispose(&mut self) {
283 for client in self.get_clients_mut() {
284 if let Err(e) = client.dispose() {
285 log::error!("{e}");
286 }
287 }
288
289 self.clock.borrow_mut().cancel_timers();
290 }
291
292 #[must_use]
294 pub fn check_connected(&self) -> bool {
295 self.get_clients()
296 .iter()
297 .all(|client| client.is_connected())
298 }
299
300 #[must_use]
302 pub fn check_disconnected(&self) -> bool {
303 self.get_clients()
304 .iter()
305 .all(|client| !client.is_connected())
306 }
307
308 #[must_use]
310 pub fn registered_clients(&self) -> Vec<ClientId> {
311 self.get_clients()
312 .into_iter()
313 .map(|client| client.client_id())
314 .collect()
315 }
316
317 fn collect_subscriptions<F, T>(&self, get_subs: F) -> Vec<T>
320 where
321 F: Fn(&DataClientAdapter) -> &AHashSet<T>,
322 T: Clone,
323 {
324 self.get_clients()
325 .into_iter()
326 .flat_map(get_subs)
327 .cloned()
328 .collect()
329 }
330
331 #[must_use]
332 pub fn get_clients(&self) -> Vec<&DataClientAdapter> {
333 let (default_opt, clients_map) = (&self.default_client, &self.clients);
334 let mut clients: Vec<&DataClientAdapter> = clients_map.values().collect();
335
336 if let Some(default) = default_opt {
337 clients.push(default);
338 }
339
340 clients
341 }
342
343 #[must_use]
344 pub fn get_clients_mut(&mut self) -> Vec<&mut DataClientAdapter> {
345 let (default_opt, clients_map) = (&mut self.default_client, &mut self.clients);
346 let mut clients: Vec<&mut DataClientAdapter> = clients_map.values_mut().collect();
347
348 if let Some(default) = default_opt {
349 clients.push(default);
350 }
351
352 clients
353 }
354
355 pub fn get_client(
356 &mut self,
357 client_id: Option<&ClientId>,
358 venue: Option<&Venue>,
359 ) -> Option<&mut DataClientAdapter> {
360 if let Some(client_id) = client_id {
361 if let Some(client) = self.clients.get_mut(client_id) {
363 return Some(client);
364 }
365
366 if let Some(default) = self.default_client.as_mut()
368 && default.client_id() == *client_id
369 {
370 return Some(default);
371 }
372
373 return None;
375 }
376
377 if let Some(v) = venue {
378 if let Some(client_id) = self.routing_map.get(v) {
380 return self.clients.get_mut(client_id);
381 }
382 }
383
384 self.get_default_client()
386 }
387
388 const fn get_default_client(&mut self) -> Option<&mut DataClientAdapter> {
389 self.default_client.as_mut()
390 }
391
392 #[must_use]
394 pub fn subscribed_custom_data(&self) -> Vec<DataType> {
395 self.collect_subscriptions(|client| &client.subscriptions_custom)
396 }
397
398 #[must_use]
400 pub fn subscribed_instruments(&self) -> Vec<InstrumentId> {
401 self.collect_subscriptions(|client| &client.subscriptions_instrument)
402 }
403
404 #[must_use]
406 pub fn subscribed_book_deltas(&self) -> Vec<InstrumentId> {
407 self.collect_subscriptions(|client| &client.subscriptions_book_deltas)
408 }
409
410 #[must_use]
412 pub fn subscribed_book_snapshots(&self) -> Vec<InstrumentId> {
413 self.collect_subscriptions(|client| &client.subscriptions_book_snapshots)
414 }
415
416 #[must_use]
418 pub fn subscribed_quotes(&self) -> Vec<InstrumentId> {
419 self.collect_subscriptions(|client| &client.subscriptions_quotes)
420 }
421
422 #[must_use]
424 pub fn subscribed_trades(&self) -> Vec<InstrumentId> {
425 self.collect_subscriptions(|client| &client.subscriptions_trades)
426 }
427
428 #[must_use]
430 pub fn subscribed_bars(&self) -> Vec<BarType> {
431 self.collect_subscriptions(|client| &client.subscriptions_bars)
432 }
433
434 #[must_use]
436 pub fn subscribed_mark_prices(&self) -> Vec<InstrumentId> {
437 self.collect_subscriptions(|client| &client.subscriptions_mark_prices)
438 }
439
440 #[must_use]
442 pub fn subscribed_index_prices(&self) -> Vec<InstrumentId> {
443 self.collect_subscriptions(|client| &client.subscriptions_index_prices)
444 }
445
446 #[must_use]
448 pub fn subscribed_instrument_status(&self) -> Vec<InstrumentId> {
449 self.collect_subscriptions(|client| &client.subscriptions_instrument_status)
450 }
451
452 #[must_use]
454 pub fn subscribed_instrument_close(&self) -> Vec<InstrumentId> {
455 self.collect_subscriptions(|client| &client.subscriptions_instrument_close)
456 }
457
458 #[cfg(feature = "defi")]
459 #[must_use]
461 pub fn subscribed_blocks(&self) -> Vec<Blockchain> {
462 self.collect_subscriptions(|client| &client.subscriptions_blocks)
463 }
464
465 #[cfg(feature = "defi")]
466 #[must_use]
468 pub fn subscribed_pools(&self) -> Vec<Address> {
469 self.collect_subscriptions(|client| &client.subscriptions_pools)
470 }
471
472 #[cfg(feature = "defi")]
473 #[must_use]
475 pub fn subscribed_pool_swaps(&self) -> Vec<Address> {
476 self.collect_subscriptions(|client| &client.subscriptions_pool_swaps)
477 }
478
479 #[cfg(feature = "defi")]
480 #[must_use]
482 pub fn subscribed_pool_liquidity_updates(&self) -> Vec<Address> {
483 self.collect_subscriptions(|client| &client.subscriptions_pool_liquidity_updates)
484 }
485
486 pub fn execute(&mut self, cmd: &DataCommand) {
492 if let Err(e) = match cmd {
493 DataCommand::Subscribe(c) => self.execute_subscribe(c),
494 DataCommand::Unsubscribe(c) => self.execute_unsubscribe(c),
495 DataCommand::Request(c) => self.execute_request(c),
496 #[cfg(feature = "defi")]
497 DataCommand::DefiSubscribe(c) => self.execute_defi_subscribe(c),
498 #[cfg(feature = "defi")]
499 DataCommand::DefiUnsubscribe(c) => self.execute_defi_unsubscribe(c),
500 _ => {
501 log::warn!("Unhandled DataCommand variant: {cmd:?}");
502 Ok(())
503 }
504 } {
505 log::error!("{e}");
506 }
507 }
508
509 pub fn execute_subscribe(&mut self, cmd: &SubscribeCommand) -> anyhow::Result<()> {
516 match &cmd {
518 SubscribeCommand::BookDeltas(cmd) => self.subscribe_book_deltas(cmd)?,
519 SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd)?,
520 SubscribeCommand::BookSnapshots(cmd) => self.subscribe_book_snapshots(cmd)?,
521 SubscribeCommand::Bars(cmd) => self.subscribe_bars(cmd)?,
522 _ => {} }
524
525 if let Some(client_id) = cmd.client_id()
527 && self.external_clients.contains(client_id)
528 {
529 return Ok(());
530 }
531
532 if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
534 client.execute_subscribe(cmd);
535 } else {
536 log::error!(
537 "Cannot handle command: no client found for client_id={:?}, venue={:?}",
538 cmd.client_id(),
539 cmd.venue(),
540 );
541 }
542
543 Ok(())
544 }
545
546 #[cfg(feature = "defi")]
547 pub fn execute_defi_subscribe(&mut self, cmd: &DefiSubscribeCommand) -> anyhow::Result<()> {
554 if let Some(client_id) = cmd.client_id()
556 && self.external_clients.contains(client_id)
557 {
558 return Ok(());
559 }
560
561 if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
563 client.execute_defi_subscribe(cmd);
564 } else {
565 log::error!(
566 "Cannot handle command: no client found for client_id={:?}, venue={:?}",
567 cmd.client_id(),
568 cmd.venue(),
569 );
570 }
571
572 Ok(())
573 }
574
575 pub fn execute_unsubscribe(&mut self, cmd: &UnsubscribeCommand) -> anyhow::Result<()> {
581 match &cmd {
582 UnsubscribeCommand::BookDeltas(cmd) => self.unsubscribe_book_deltas(cmd)?,
583 UnsubscribeCommand::BookDepth10(cmd) => self.unsubscribe_book_depth10(cmd)?,
584 UnsubscribeCommand::BookSnapshots(cmd) => self.unsubscribe_book_snapshots(cmd)?,
585 UnsubscribeCommand::Bars(cmd) => self.unsubscribe_bars(cmd)?,
586 _ => {} }
588
589 if let Some(client_id) = cmd.client_id()
591 && self.external_clients.contains(client_id)
592 {
593 return Ok(());
594 }
595
596 if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
598 client.execute_unsubscribe(cmd);
599 } else {
600 log::error!(
601 "Cannot handle command: no client found for client_id={:?}, venue={:?}",
602 cmd.client_id(),
603 cmd.venue(),
604 );
605 }
606
607 Ok(())
608 }
609
610 #[cfg(feature = "defi")]
611 pub fn execute_defi_unsubscribe(&mut self, cmd: &DefiUnsubscribeCommand) -> anyhow::Result<()> {
617 if let Some(client_id) = cmd.client_id()
619 && self.external_clients.contains(client_id)
620 {
621 return Ok(());
622 }
623
624 if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
626 client.execute_defi_unsubscribe(cmd);
627 } else {
628 log::error!(
629 "Cannot handle command: no client found for client_id={:?}, venue={:?}",
630 cmd.client_id(),
631 cmd.venue(),
632 );
633 }
634
635 Ok(())
636 }
637
638 pub fn execute_request(&mut self, req: &RequestCommand) -> anyhow::Result<()> {
645 if let Some(cid) = req.client_id()
647 && self.external_clients.contains(cid)
648 {
649 return Ok(());
650 }
651 if let Some(client) = self.get_client(req.client_id(), req.venue()) {
652 match req {
653 RequestCommand::Data(req) => client.request_data(req),
654 RequestCommand::Instrument(req) => client.request_instrument(req),
655 RequestCommand::Instruments(req) => client.request_instruments(req),
656 RequestCommand::BookSnapshot(req) => client.request_book_snapshot(req),
657 RequestCommand::Quotes(req) => client.request_quotes(req),
658 RequestCommand::Trades(req) => client.request_trades(req),
659 RequestCommand::Bars(req) => client.request_bars(req),
660 }
661 } else {
662 anyhow::bail!(
663 "Cannot handle request: no client found for {:?} {:?}",
664 req.client_id(),
665 req.venue()
666 );
667 }
668 }
669
670 pub fn process(&mut self, data: &dyn Any) {
674 if let Some(data) = data.downcast_ref::<Data>() {
676 self.process_data(data.clone()); return;
678 }
679
680 #[cfg(feature = "defi")]
681 if let Some(data) = data.downcast_ref::<DefiData>() {
682 self.process_defi_data(data.clone()); return;
684 }
685
686 if let Some(instrument) = data.downcast_ref::<InstrumentAny>() {
687 self.handle_instrument(instrument.clone());
688 } else {
689 log::error!("Cannot process data {data:?}, type is unrecognized");
690 }
691 }
692
693 pub fn process_data(&mut self, data: Data) {
695 match data {
696 Data::Delta(delta) => self.handle_delta(delta),
697 Data::Deltas(deltas) => self.handle_deltas(deltas.into_inner()),
698 Data::Depth10(depth) => self.handle_depth10(*depth),
699 Data::Quote(quote) => self.handle_quote(quote),
700 Data::Trade(trade) => self.handle_trade(trade),
701 Data::Bar(bar) => self.handle_bar(bar),
702 Data::MarkPriceUpdate(mark_price) => self.handle_mark_price(mark_price),
703 Data::IndexPriceUpdate(index_price) => self.handle_index_price(index_price),
704 Data::InstrumentClose(close) => self.handle_instrument_close(close),
705 }
706 }
707
708 #[cfg(feature = "defi")]
710 pub fn process_defi_data(&mut self, data: DefiData) {
711 match data {
712 DefiData::Block(block) => {
713 let topic = switchboard::get_defi_blocks_topic(block.chain());
714 msgbus::publish(topic, &block as &dyn Any);
715 }
716 DefiData::Pool(pool) => {
717 let topic = switchboard::get_defi_pool_topic(pool.address);
718 msgbus::publish(topic, &pool as &dyn Any);
719 }
720 DefiData::PoolSwap(swap) => {
721 let topic = switchboard::get_defi_pool_swaps_topic(swap.pool.address);
722 msgbus::publish(topic, &swap as &dyn Any);
723 }
724 DefiData::PoolLiquidityUpdate(update) => {
725 let topic = switchboard::get_defi_liquidity_topic(update.pool.address);
726 msgbus::publish(topic, &update as &dyn Any);
727 }
728 }
729 }
730
731 pub fn response(&self, resp: DataResponse) {
733 log::debug!("{RECV}{RES} {resp:?}");
734
735 match &resp {
736 DataResponse::Instrument(resp) => {
737 self.handle_instrument_response(resp.data.clone());
738 }
739 DataResponse::Instruments(resp) => {
740 self.handle_instruments(&resp.data);
741 }
742 DataResponse::Quotes(resp) => self.handle_quotes(&resp.data),
743 DataResponse::Trades(resp) => self.handle_trades(&resp.data),
744 DataResponse::Bars(resp) => self.handle_bars(&resp.data),
745 _ => todo!(),
746 }
747
748 msgbus::send_response(resp.correlation_id(), &resp);
749 }
750
751 fn handle_instrument(&mut self, instrument: InstrumentAny) {
754 if let Err(e) = self
755 .cache
756 .as_ref()
757 .borrow_mut()
758 .add_instrument(instrument.clone())
759 {
760 log_error_on_cache_insert(&e);
761 }
762
763 let topic = switchboard::get_instrument_topic(instrument.id());
764 msgbus::publish(topic, &instrument as &dyn Any);
765 }
766
767 fn handle_delta(&mut self, delta: OrderBookDelta) {
768 let deltas = if self.config.buffer_deltas {
769 if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&delta.instrument_id) {
770 buffered_deltas.deltas.push(delta);
771 } else {
772 let buffered_deltas = OrderBookDeltas::new(delta.instrument_id, vec![delta]);
773 self.buffered_deltas_map
774 .insert(delta.instrument_id, buffered_deltas);
775 }
776
777 if !RecordFlag::F_LAST.matches(delta.flags) {
778 return; }
780
781 self.buffered_deltas_map
783 .remove(&delta.instrument_id)
784 .unwrap()
785 } else {
786 OrderBookDeltas::new(delta.instrument_id, vec![delta])
787 };
788
789 let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
790 msgbus::publish(topic, &deltas as &dyn Any);
791 }
792
793 fn handle_deltas(&mut self, deltas: OrderBookDeltas) {
794 let deltas = if self.config.buffer_deltas {
795 let mut is_last_delta = false;
796 for delta in &deltas.deltas {
797 if RecordFlag::F_LAST.matches(delta.flags) {
798 is_last_delta = true;
799 break;
800 }
801 }
802
803 let instrument_id = deltas.instrument_id;
804
805 if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&instrument_id) {
806 buffered_deltas.deltas.extend(deltas.deltas);
807 } else {
808 self.buffered_deltas_map.insert(instrument_id, deltas);
809 }
810
811 if !is_last_delta {
812 return;
813 }
814
815 self.buffered_deltas_map.remove(&instrument_id).unwrap()
817 } else {
818 deltas
819 };
820
821 let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
822 msgbus::publish(topic, &deltas as &dyn Any);
823 }
824
825 fn handle_depth10(&mut self, depth: OrderBookDepth10) {
826 let topic = switchboard::get_book_depth10_topic(depth.instrument_id);
827 msgbus::publish(topic, &depth as &dyn Any);
828 }
829
830 fn handle_quote(&mut self, quote: QuoteTick) {
831 if let Err(e) = self.cache.as_ref().borrow_mut().add_quote(quote) {
832 log_error_on_cache_insert(&e);
833 }
834
835 let topic = switchboard::get_quotes_topic(quote.instrument_id);
838 msgbus::publish(topic, "e as &dyn Any);
839 }
840
841 fn handle_trade(&mut self, trade: TradeTick) {
842 if let Err(e) = self.cache.as_ref().borrow_mut().add_trade(trade) {
843 log_error_on_cache_insert(&e);
844 }
845
846 let topic = switchboard::get_trades_topic(trade.instrument_id);
849 msgbus::publish(topic, &trade as &dyn Any);
850 }
851
852 fn handle_bar(&mut self, bar: Bar) {
853 if self.config.validate_data_sequence
855 && let Some(last_bar) = self.cache.as_ref().borrow().bar(&bar.bar_type)
856 {
857 if bar.ts_event < last_bar.ts_event {
858 log::warn!(
859 "Bar {bar} was prior to last bar `ts_event` {}",
860 last_bar.ts_event
861 );
862 return; }
864 if bar.ts_init < last_bar.ts_init {
865 log::warn!(
866 "Bar {bar} was prior to last bar `ts_init` {}",
867 last_bar.ts_init
868 );
869 return; }
871 }
873
874 if let Err(e) = self.cache.as_ref().borrow_mut().add_bar(bar) {
875 log_error_on_cache_insert(&e);
876 }
877
878 let topic = switchboard::get_bars_topic(bar.bar_type);
879 msgbus::publish(topic, &bar as &dyn Any);
880 }
881
882 fn handle_mark_price(&mut self, mark_price: MarkPriceUpdate) {
883 if let Err(e) = self.cache.as_ref().borrow_mut().add_mark_price(mark_price) {
884 log_error_on_cache_insert(&e);
885 }
886
887 let topic = switchboard::get_mark_price_topic(mark_price.instrument_id);
888 msgbus::publish(topic, &mark_price as &dyn Any);
889 }
890
891 fn handle_index_price(&mut self, index_price: IndexPriceUpdate) {
892 if let Err(e) = self
893 .cache
894 .as_ref()
895 .borrow_mut()
896 .add_index_price(index_price)
897 {
898 log_error_on_cache_insert(&e);
899 }
900
901 let topic = switchboard::get_index_price_topic(index_price.instrument_id);
902 msgbus::publish(topic, &index_price as &dyn Any);
903 }
904
905 fn handle_instrument_close(&mut self, close: InstrumentClose) {
906 let topic = switchboard::get_instrument_close_topic(close.instrument_id);
907 msgbus::publish(topic, &close as &dyn Any);
908 }
909
910 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
913 if cmd.instrument_id.is_synthetic() {
914 anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
915 }
916
917 self.setup_order_book(&cmd.instrument_id, cmd.book_type, true, cmd.managed)?;
918
919 Ok(())
920 }
921
922 fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
923 if cmd.instrument_id.is_synthetic() {
924 anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDepth10` data");
925 }
926
927 self.setup_order_book(&cmd.instrument_id, cmd.book_type, false, cmd.managed)?;
928
929 Ok(())
930 }
931
932 fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
933 if self.subscribed_book_deltas().contains(&cmd.instrument_id) {
934 return Ok(());
935 }
936
937 if cmd.instrument_id.is_synthetic() {
938 anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
939 }
940
941 let first_for_interval = match self.book_intervals.entry(cmd.interval_ms) {
943 Entry::Vacant(e) => {
944 let mut set = AHashSet::new();
945 set.insert(cmd.instrument_id);
946 e.insert(set);
947 true
948 }
949 Entry::Occupied(mut e) => {
950 e.get_mut().insert(cmd.instrument_id);
951 false
952 }
953 };
954
955 if first_for_interval {
956 let interval_ns = millis_to_nanos(cmd.interval_ms.get() as f64);
958 let topic = switchboard::get_book_snapshots_topic(cmd.instrument_id);
959
960 let snap_info = BookSnapshotInfo {
961 instrument_id: cmd.instrument_id,
962 venue: cmd.instrument_id.venue,
963 is_composite: cmd.instrument_id.symbol.is_composite(),
964 root: Ustr::from(cmd.instrument_id.symbol.root()),
965 topic,
966 interval_ms: cmd.interval_ms,
967 };
968
969 let now_ns = self.clock.borrow().timestamp_ns().as_u64();
971 let start_time_ns = now_ns - (now_ns % interval_ns) + interval_ns;
972
973 let snapshotter = Rc::new(BookSnapshotter::new(snap_info, self.cache.clone()));
974 self.book_snapshotters
975 .insert(cmd.instrument_id, snapshotter.clone());
976 let timer_name = snapshotter.timer_name;
977
978 let callback =
979 TimeEventCallback::Rust(Rc::new(move |event| snapshotter.snapshot(event)));
980
981 self.clock
982 .borrow_mut()
983 .set_timer_ns(
984 &timer_name,
985 interval_ns,
986 Some(start_time_ns.into()),
987 None,
988 Some(callback),
989 None,
990 None,
991 )
992 .expect(FAILED);
993 }
994
995 self.setup_order_book(&cmd.instrument_id, cmd.book_type, false, true)?;
996
997 Ok(())
998 }
999
1000 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
1001 match cmd.bar_type.aggregation_source() {
1002 AggregationSource::Internal => {
1003 if !self.bar_aggregators.contains_key(&cmd.bar_type.standard()) {
1004 self.start_bar_aggregator(cmd.bar_type)?;
1005 }
1006 }
1007 AggregationSource::External => {
1008 if cmd.bar_type.instrument_id().is_synthetic() {
1009 anyhow::bail!(
1010 "Cannot subscribe for externally aggregated synthetic instrument bar data"
1011 );
1012 }
1013 }
1014 }
1015
1016 Ok(())
1017 }
1018
1019 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
1020 if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
1021 log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
1022 return Ok(());
1023 }
1024
1025 let topics = vec![
1026 switchboard::get_book_deltas_topic(cmd.instrument_id),
1027 switchboard::get_book_depth10_topic(cmd.instrument_id),
1028 switchboard::get_book_snapshots_topic(cmd.instrument_id),
1029 ];
1030
1031 self.maintain_book_updater(&cmd.instrument_id, &topics);
1032 self.maintain_book_snapshotter(&cmd.instrument_id);
1033
1034 Ok(())
1035 }
1036
1037 fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
1038 if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
1039 log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
1040 return Ok(());
1041 }
1042
1043 let topics = vec![
1044 switchboard::get_book_deltas_topic(cmd.instrument_id),
1045 switchboard::get_book_depth10_topic(cmd.instrument_id),
1046 switchboard::get_book_snapshots_topic(cmd.instrument_id),
1047 ];
1048
1049 self.maintain_book_updater(&cmd.instrument_id, &topics);
1050 self.maintain_book_snapshotter(&cmd.instrument_id);
1051
1052 Ok(())
1053 }
1054
1055 fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
1056 if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
1057 log::warn!("Cannot unsubscribe from `OrderBook` snapshots: not subscribed");
1058 return Ok(());
1059 }
1060
1061 let mut to_remove = Vec::new();
1063 for (interval, set) in &mut self.book_intervals {
1064 if set.remove(&cmd.instrument_id) && set.is_empty() {
1065 to_remove.push(*interval);
1066 }
1067 }
1068
1069 for interval in to_remove {
1070 self.book_intervals.remove(&interval);
1071 }
1072
1073 let topics = vec![
1074 switchboard::get_book_deltas_topic(cmd.instrument_id),
1075 switchboard::get_book_depth10_topic(cmd.instrument_id),
1076 switchboard::get_book_snapshots_topic(cmd.instrument_id),
1077 ];
1078
1079 self.maintain_book_updater(&cmd.instrument_id, &topics);
1080 self.maintain_book_snapshotter(&cmd.instrument_id);
1081
1082 Ok(())
1083 }
1084
1085 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
1087 let bar_type = cmd.bar_type;
1089 if self.bar_aggregators.contains_key(&bar_type.standard()) {
1090 if let Err(err) = self.stop_bar_aggregator(bar_type) {
1091 log::error!("Error stopping bar aggregator for {bar_type}: {err}");
1092 }
1093 self.bar_aggregators.remove(&bar_type.standard());
1094 log::debug!("Removed bar aggregator for {bar_type}");
1095 }
1096 Ok(())
1097 }
1098
1099 fn maintain_book_updater(&mut self, instrument_id: &InstrumentId, topics: &[MStr<Topic>]) {
1100 if let Some(updater) = self.book_updaters.get(instrument_id) {
1101 let handler = ShareableMessageHandler(updater.clone());
1102
1103 for topic in topics {
1105 if msgbus::subscriptions_count(topic.as_str()) == 1
1106 && msgbus::is_subscribed(topic.as_str(), handler.clone())
1107 {
1108 log::debug!("Unsubscribing BookUpdater from {topic}");
1109 msgbus::unsubscribe_topic(*topic, handler.clone());
1110 }
1111 }
1112
1113 let still_subscribed = topics
1115 .iter()
1116 .any(|topic| msgbus::is_subscribed(topic.as_str(), handler.clone()));
1117 if !still_subscribed {
1118 self.book_updaters.remove(instrument_id);
1119 log::debug!("Removed BookUpdater for instrument ID {instrument_id}");
1120 }
1121 }
1122 }
1123
1124 fn maintain_book_snapshotter(&mut self, instrument_id: &InstrumentId) {
1125 if let Some(snapshotter) = self.book_snapshotters.get(instrument_id) {
1126 let topic = switchboard::get_book_snapshots_topic(*instrument_id);
1127
1128 if msgbus::subscriptions_count(topic.as_str()) == 0 {
1130 let timer_name = snapshotter.timer_name;
1131 self.book_snapshotters.remove(instrument_id);
1132 let mut clock = self.clock.borrow_mut();
1133 if clock.timer_names().contains(&timer_name.as_str()) {
1134 clock.cancel_timer(&timer_name);
1135 }
1136 log::debug!("Removed BookSnapshotter for instrument ID {instrument_id}");
1137 }
1138 }
1139 }
1140
1141 fn handle_instrument_response(&self, instrument: InstrumentAny) {
1144 let mut cache = self.cache.as_ref().borrow_mut();
1145 if let Err(e) = cache.add_instrument(instrument) {
1146 log_error_on_cache_insert(&e);
1147 }
1148 }
1149
1150 fn handle_instruments(&self, instruments: &[InstrumentAny]) {
1151 let mut cache = self.cache.as_ref().borrow_mut();
1153 for instrument in instruments {
1154 if let Err(e) = cache.add_instrument(instrument.clone()) {
1155 log_error_on_cache_insert(&e);
1156 }
1157 }
1158 }
1159
1160 fn handle_quotes(&self, quotes: &[QuoteTick]) {
1161 if let Err(e) = self.cache.as_ref().borrow_mut().add_quotes(quotes) {
1162 log_error_on_cache_insert(&e);
1163 }
1164 }
1165
1166 fn handle_trades(&self, trades: &[TradeTick]) {
1167 if let Err(e) = self.cache.as_ref().borrow_mut().add_trades(trades) {
1168 log_error_on_cache_insert(&e);
1169 }
1170 }
1171
1172 fn handle_bars(&self, bars: &[Bar]) {
1173 if let Err(e) = self.cache.as_ref().borrow_mut().add_bars(bars) {
1174 log_error_on_cache_insert(&e);
1175 }
1176 }
1177
1178 #[allow(clippy::too_many_arguments)]
1181 fn setup_order_book(
1182 &mut self,
1183 instrument_id: &InstrumentId,
1184 book_type: BookType,
1185 only_deltas: bool,
1186 managed: bool,
1187 ) -> anyhow::Result<()> {
1188 let mut cache = self.cache.borrow_mut();
1189 if managed && !cache.has_order_book(instrument_id) {
1190 let book = OrderBook::new(*instrument_id, book_type);
1191 log::debug!("Created {book}");
1192 cache.add_order_book(book)?;
1193 }
1194
1195 let updater = Rc::new(BookUpdater::new(instrument_id, self.cache.clone()));
1197 self.book_updaters.insert(*instrument_id, updater.clone());
1198
1199 let handler = ShareableMessageHandler(updater);
1200
1201 let topic = switchboard::get_book_deltas_topic(*instrument_id);
1202 if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1203 msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1204 }
1205
1206 let topic = switchboard::get_book_depth10_topic(*instrument_id);
1207 if !only_deltas && !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1208 msgbus::subscribe(topic.into(), handler, Some(self.msgbus_priority));
1209 }
1210
1211 Ok(())
1212 }
1213
1214 fn create_bar_aggregator(
1215 &mut self,
1216 instrument: &InstrumentAny,
1217 bar_type: BarType,
1218 ) -> Box<dyn BarAggregator> {
1219 let cache = self.cache.clone();
1220
1221 let handler = move |bar: Bar| {
1222 if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
1223 log_error_on_cache_insert(&e);
1224 }
1225
1226 let topic = switchboard::get_bars_topic(bar.bar_type);
1227 msgbus::publish(topic, &bar as &dyn Any);
1228 };
1229
1230 let clock = self.clock.clone();
1231 let config = self.config.clone();
1232
1233 let price_precision = instrument.price_precision();
1234 let size_precision = instrument.size_precision();
1235
1236 if bar_type.spec().is_time_aggregated() {
1237 Box::new(TimeBarAggregator::new(
1238 bar_type,
1239 price_precision,
1240 size_precision,
1241 clock,
1242 handler,
1243 false, config.time_bars_build_with_no_updates,
1245 config.time_bars_timestamp_on_close,
1246 config.time_bars_interval_type,
1247 None, 20, false, ))
1251 } else {
1252 match bar_type.spec().aggregation {
1253 BarAggregation::Tick => Box::new(TickBarAggregator::new(
1254 bar_type,
1255 price_precision,
1256 size_precision,
1257 handler,
1258 false,
1259 )) as Box<dyn BarAggregator>,
1260 BarAggregation::Volume => Box::new(VolumeBarAggregator::new(
1261 bar_type,
1262 price_precision,
1263 size_precision,
1264 handler,
1265 false,
1266 )) as Box<dyn BarAggregator>,
1267 BarAggregation::Value => Box::new(ValueBarAggregator::new(
1268 bar_type,
1269 price_precision,
1270 size_precision,
1271 handler,
1272 false,
1273 )) as Box<dyn BarAggregator>,
1274 _ => panic!(
1275 "Cannot create aggregator: {} aggregation not currently supported",
1276 bar_type.spec().aggregation
1277 ),
1278 }
1279 }
1280 }
1281
1282 fn start_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
1283 let instrument = {
1285 let cache = self.cache.borrow();
1286 cache
1287 .instrument(&bar_type.instrument_id())
1288 .ok_or_else(|| {
1289 anyhow::anyhow!(
1290 "Cannot start bar aggregation: no instrument found for {}",
1291 bar_type.instrument_id(),
1292 )
1293 })?
1294 .clone()
1295 };
1296
1297 let bar_key = bar_type.standard();
1299
1300 let aggregator = if let Some(rc) = self.bar_aggregators.get(&bar_key) {
1302 rc.clone()
1303 } else {
1304 let agg = self.create_bar_aggregator(&instrument, bar_type);
1305 let rc = Rc::new(RefCell::new(agg));
1306 self.bar_aggregators.insert(bar_key, rc.clone());
1307 rc
1308 };
1309
1310 let mut handlers = Vec::new();
1312
1313 if bar_type.is_composite() {
1314 let topic = switchboard::get_bars_topic(bar_type.composite());
1315 let handler =
1316 ShareableMessageHandler(Rc::new(BarBarHandler::new(aggregator.clone(), bar_key)));
1317
1318 if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1319 msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1320 }
1321
1322 handlers.push((topic, handler));
1323 } else if bar_type.spec().price_type == PriceType::Last {
1324 let topic = switchboard::get_trades_topic(bar_type.instrument_id());
1325 let handler =
1326 ShareableMessageHandler(Rc::new(BarTradeHandler::new(aggregator.clone(), bar_key)));
1327
1328 if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1329 msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1330 }
1331
1332 handlers.push((topic, handler));
1333 } else {
1334 let topic = switchboard::get_quotes_topic(bar_type.instrument_id());
1335 let handler =
1336 ShareableMessageHandler(Rc::new(BarQuoteHandler::new(aggregator.clone(), bar_key)));
1337
1338 if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1339 msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1340 }
1341
1342 handlers.push((topic, handler));
1343 }
1344
1345 self.bar_aggregator_handlers.insert(bar_key, handlers);
1346 aggregator.borrow_mut().set_is_running(true);
1347
1348 Ok(())
1349 }
1350
1351 fn stop_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
1352 let aggregator = self
1353 .bar_aggregators
1354 .remove(&bar_type.standard())
1355 .ok_or_else(|| {
1356 anyhow::anyhow!("Cannot stop bar aggregator: no aggregator to stop for {bar_type}")
1357 })?;
1358
1359 aggregator.borrow_mut().stop();
1360
1361 let bar_key = bar_type.standard();
1363 if let Some(subs) = self.bar_aggregator_handlers.remove(&bar_key) {
1364 for (topic, handler) in subs {
1365 if msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1366 msgbus::unsubscribe_topic(topic, handler);
1367 }
1368 }
1369 }
1370
1371 Ok(())
1372 }
1373}
1374
1375#[inline(always)]
1376fn log_error_on_cache_insert<T: Display>(e: &T) {
1377 log::error!("Error on cache insert: {e}");
1378}