1pub mod book;
32pub mod config;
33mod handlers;
34
35#[cfg(feature = "defi")]
36pub mod pool;
37
38use std::{
39 any::Any,
40 cell::{Ref, RefCell},
41 collections::hash_map::Entry,
42 fmt::Display,
43 num::NonZeroUsize,
44 rc::Rc,
45};
46
47use ahash::{AHashMap, AHashSet};
48use book::{BookSnapshotInfo, BookSnapshotter, BookUpdater};
49use config::DataEngineConfig;
50use handlers::{BarBarHandler, BarQuoteHandler, BarTradeHandler};
51use indexmap::IndexMap;
52use nautilus_common::{
53 cache::Cache,
54 clock::Clock,
55 logging::{RECV, RES},
56 messages::data::{
57 DataCommand, DataResponse, RequestCommand, SubscribeBars, SubscribeBookDeltas,
58 SubscribeBookDepth10, SubscribeBookSnapshots, SubscribeCommand, UnsubscribeBars,
59 UnsubscribeBookDeltas, UnsubscribeBookDepth10, UnsubscribeBookSnapshots,
60 UnsubscribeCommand,
61 },
62 msgbus::{self, MStr, Topic, handler::ShareableMessageHandler, switchboard},
63 timer::{TimeEvent, TimeEventCallback},
64};
65use nautilus_core::{
66 correctness::{
67 FAILED, check_key_in_map, check_key_not_in_map, check_predicate_false, check_predicate_true,
68 },
69 datetime::millis_to_nanos,
70};
71#[cfg(feature = "defi")]
72use nautilus_model::defi::DefiData;
73use nautilus_model::{
74 data::{
75 Bar, BarType, Data, DataType, FundingRateUpdate, IndexPriceUpdate, InstrumentClose,
76 MarkPriceUpdate, OrderBookDelta, OrderBookDeltas, OrderBookDepth10, QuoteTick, TradeTick,
77 },
78 enums::{AggregationSource, BarAggregation, BookType, PriceType, RecordFlag},
79 identifiers::{ClientId, InstrumentId, Venue},
80 instruments::{Instrument, InstrumentAny, SyntheticInstrument},
81 orderbook::OrderBook,
82};
83use nautilus_persistence::backend::catalog::ParquetDataCatalog;
84use ustr::Ustr;
85
86#[cfg(feature = "defi")]
87#[allow(unused_imports)] use crate::defi::engine as _;
89#[cfg(feature = "defi")]
90use crate::engine::pool::PoolUpdater;
91use crate::{
92 aggregation::{
93 BarAggregator, RenkoBarAggregator, TickBarAggregator, TimeBarAggregator,
94 ValueBarAggregator, VolumeBarAggregator,
95 },
96 client::DataClientAdapter,
97};
98
99#[derive(Debug)]
101pub struct DataEngine {
102 pub(crate) clock: Rc<RefCell<dyn Clock>>,
103 pub(crate) cache: Rc<RefCell<Cache>>,
104 pub(crate) external_clients: AHashSet<ClientId>,
105 clients: IndexMap<ClientId, DataClientAdapter>,
106 default_client: Option<DataClientAdapter>,
107 catalogs: AHashMap<Ustr, ParquetDataCatalog>,
108 routing_map: IndexMap<Venue, ClientId>,
109 book_intervals: AHashMap<NonZeroUsize, AHashSet<InstrumentId>>,
110 book_updaters: AHashMap<InstrumentId, Rc<BookUpdater>>,
111 book_snapshotters: AHashMap<InstrumentId, Rc<BookSnapshotter>>,
112 bar_aggregators: AHashMap<BarType, Rc<RefCell<Box<dyn BarAggregator>>>>,
113 bar_aggregator_handlers: AHashMap<BarType, Vec<(MStr<Topic>, ShareableMessageHandler)>>,
114 _synthetic_quote_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
115 _synthetic_trade_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
116 buffered_deltas_map: AHashMap<InstrumentId, OrderBookDeltas>,
117 pub(crate) msgbus_priority: u8,
118 pub(crate) config: DataEngineConfig,
119 #[cfg(feature = "defi")]
120 pub(crate) pool_updaters: AHashMap<InstrumentId, Rc<PoolUpdater>>,
121 #[cfg(feature = "defi")]
122 pub(crate) pool_updaters_pending: AHashSet<InstrumentId>,
123 #[cfg(feature = "defi")]
124 pub(crate) pool_snapshot_pending: AHashSet<InstrumentId>,
125 #[cfg(feature = "defi")]
126 pub(crate) pool_event_buffers: AHashMap<InstrumentId, Vec<DefiData>>,
127}
128
129impl DataEngine {
130 #[must_use]
132 pub fn new(
133 clock: Rc<RefCell<dyn Clock>>,
134 cache: Rc<RefCell<Cache>>,
135 config: Option<DataEngineConfig>,
136 ) -> Self {
137 let config = config.unwrap_or_default();
138
139 let external_clients: AHashSet<ClientId> = config
140 .external_clients
141 .clone()
142 .unwrap_or_default()
143 .into_iter()
144 .collect();
145
146 Self {
147 clock,
148 cache,
149 external_clients,
150 clients: IndexMap::new(),
151 default_client: None,
152 catalogs: AHashMap::new(),
153 routing_map: IndexMap::new(),
154 book_intervals: AHashMap::new(),
155 book_updaters: AHashMap::new(),
156 book_snapshotters: AHashMap::new(),
157 bar_aggregators: AHashMap::new(),
158 bar_aggregator_handlers: AHashMap::new(),
159 _synthetic_quote_feeds: AHashMap::new(),
160 _synthetic_trade_feeds: AHashMap::new(),
161 buffered_deltas_map: AHashMap::new(),
162 msgbus_priority: 10, config,
164 #[cfg(feature = "defi")]
165 pool_updaters: AHashMap::new(),
166 #[cfg(feature = "defi")]
167 pool_updaters_pending: AHashSet::new(),
168 #[cfg(feature = "defi")]
169 pool_snapshot_pending: AHashSet::new(),
170 #[cfg(feature = "defi")]
171 pool_event_buffers: AHashMap::new(),
172 }
173 }
174
175 #[must_use]
177 pub fn get_clock(&self) -> Ref<'_, dyn Clock> {
178 self.clock.borrow()
179 }
180
181 #[must_use]
183 pub fn get_cache(&self) -> Ref<'_, Cache> {
184 self.cache.borrow()
185 }
186
187 #[must_use]
189 pub fn cache_rc(&self) -> Rc<RefCell<Cache>> {
190 Rc::clone(&self.cache)
191 }
192
193 pub fn register_catalog(&mut self, catalog: ParquetDataCatalog, name: Option<String>) {
199 let name = Ustr::from(&name.unwrap_or("catalog_0".to_string()));
200
201 check_key_not_in_map(&name, &self.catalogs, "name", "catalogs").expect(FAILED);
202
203 self.catalogs.insert(name, catalog);
204 log::info!("Registered catalog <{name}>");
205 }
206
207 pub fn register_client(&mut self, client: DataClientAdapter, routing: Option<Venue>) {
214 let client_id = client.client_id();
215
216 if let Some(default_client) = &self.default_client {
217 check_predicate_false(
218 default_client.client_id() == client.client_id(),
219 "client_id already registered as default client",
220 )
221 .expect(FAILED);
222 }
223
224 check_key_not_in_map(&client_id, &self.clients, "client_id", "clients").expect(FAILED);
225
226 if let Some(routing) = routing {
227 self.routing_map.insert(routing, client_id);
228 log::info!("Set client {client_id} routing for {routing}");
229 }
230
231 if client.venue.is_none() && self.default_client.is_none() {
232 self.default_client = Some(client);
233 log::info!("Registered client {client_id} for default routing");
234 } else {
235 self.clients.insert(client_id, client);
236 log::info!("Registered client {client_id}");
237 }
238 }
239
240 pub fn deregister_client(&mut self, client_id: &ClientId) {
246 check_key_in_map(client_id, &self.clients, "client_id", "clients").expect(FAILED);
247
248 self.clients.shift_remove(client_id);
249 log::info!("Deregistered client {client_id}");
250 }
251
252 pub fn register_default_client(&mut self, client: DataClientAdapter) {
264 check_predicate_true(
265 self.default_client.is_none(),
266 "default client already registered",
267 )
268 .expect(FAILED);
269
270 let client_id = client.client_id();
271
272 self.default_client = Some(client);
273 log::info!("Registered default client {client_id}");
274 }
275
276 pub fn start(&mut self) {
278 for client in self.get_clients_mut() {
279 if let Err(e) = client.start() {
280 log::error!("{e}");
281 }
282 }
283 }
284
285 pub fn stop(&mut self) {
287 for client in self.get_clients_mut() {
288 if let Err(e) = client.stop() {
289 log::error!("{e}");
290 }
291 }
292 }
293
294 pub fn reset(&mut self) {
296 for client in self.get_clients_mut() {
297 if let Err(e) = client.reset() {
298 log::error!("{e}");
299 }
300 }
301 }
302
303 pub fn dispose(&mut self) {
305 for client in self.get_clients_mut() {
306 if let Err(e) = client.dispose() {
307 log::error!("{e}");
308 }
309 }
310
311 self.clock.borrow_mut().cancel_timers();
312 }
313
314 #[must_use]
316 pub fn check_connected(&self) -> bool {
317 self.get_clients()
318 .iter()
319 .all(|client| client.is_connected())
320 }
321
322 #[must_use]
324 pub fn check_disconnected(&self) -> bool {
325 self.get_clients()
326 .iter()
327 .all(|client| !client.is_connected())
328 }
329
330 #[must_use]
332 pub fn registered_clients(&self) -> Vec<ClientId> {
333 self.get_clients()
334 .into_iter()
335 .map(|client| client.client_id())
336 .collect()
337 }
338
339 pub(crate) fn collect_subscriptions<F, T>(&self, get_subs: F) -> Vec<T>
342 where
343 F: Fn(&DataClientAdapter) -> &AHashSet<T>,
344 T: Clone,
345 {
346 self.get_clients()
347 .into_iter()
348 .flat_map(get_subs)
349 .cloned()
350 .collect()
351 }
352
353 #[must_use]
354 pub fn get_clients(&self) -> Vec<&DataClientAdapter> {
355 let (default_opt, clients_map) = (&self.default_client, &self.clients);
356 let mut clients: Vec<&DataClientAdapter> = clients_map.values().collect();
357
358 if let Some(default) = default_opt {
359 clients.push(default);
360 }
361
362 clients
363 }
364
365 #[must_use]
366 pub fn get_clients_mut(&mut self) -> Vec<&mut DataClientAdapter> {
367 let (default_opt, clients_map) = (&mut self.default_client, &mut self.clients);
368 let mut clients: Vec<&mut DataClientAdapter> = clients_map.values_mut().collect();
369
370 if let Some(default) = default_opt {
371 clients.push(default);
372 }
373
374 clients
375 }
376
377 pub fn get_client(
378 &mut self,
379 client_id: Option<&ClientId>,
380 venue: Option<&Venue>,
381 ) -> Option<&mut DataClientAdapter> {
382 if let Some(client_id) = client_id {
383 if let Some(client) = self.clients.get_mut(client_id) {
385 return Some(client);
386 }
387
388 if let Some(default) = self.default_client.as_mut()
390 && default.client_id() == *client_id
391 {
392 return Some(default);
393 }
394
395 return None;
397 }
398
399 if let Some(v) = venue {
400 if let Some(client_id) = self.routing_map.get(v) {
402 return self.clients.get_mut(client_id);
403 }
404 }
405
406 self.get_default_client()
408 }
409
410 const fn get_default_client(&mut self) -> Option<&mut DataClientAdapter> {
411 self.default_client.as_mut()
412 }
413
414 #[must_use]
416 pub fn subscribed_custom_data(&self) -> Vec<DataType> {
417 self.collect_subscriptions(|client| &client.subscriptions_custom)
418 }
419
420 #[must_use]
422 pub fn subscribed_instruments(&self) -> Vec<InstrumentId> {
423 self.collect_subscriptions(|client| &client.subscriptions_instrument)
424 }
425
426 #[must_use]
428 pub fn subscribed_book_deltas(&self) -> Vec<InstrumentId> {
429 self.collect_subscriptions(|client| &client.subscriptions_book_deltas)
430 }
431
432 #[must_use]
434 pub fn subscribed_book_snapshots(&self) -> Vec<InstrumentId> {
435 self.collect_subscriptions(|client| &client.subscriptions_book_snapshots)
436 }
437
438 #[must_use]
440 pub fn subscribed_quotes(&self) -> Vec<InstrumentId> {
441 self.collect_subscriptions(|client| &client.subscriptions_quotes)
442 }
443
444 #[must_use]
446 pub fn subscribed_trades(&self) -> Vec<InstrumentId> {
447 self.collect_subscriptions(|client| &client.subscriptions_trades)
448 }
449
450 #[must_use]
452 pub fn subscribed_bars(&self) -> Vec<BarType> {
453 self.collect_subscriptions(|client| &client.subscriptions_bars)
454 }
455
456 #[must_use]
458 pub fn subscribed_mark_prices(&self) -> Vec<InstrumentId> {
459 self.collect_subscriptions(|client| &client.subscriptions_mark_prices)
460 }
461
462 #[must_use]
464 pub fn subscribed_index_prices(&self) -> Vec<InstrumentId> {
465 self.collect_subscriptions(|client| &client.subscriptions_index_prices)
466 }
467
468 #[must_use]
470 pub fn subscribed_funding_rates(&self) -> Vec<InstrumentId> {
471 self.collect_subscriptions(|client| &client.subscriptions_funding_rates)
472 }
473
474 #[must_use]
476 pub fn subscribed_instrument_status(&self) -> Vec<InstrumentId> {
477 self.collect_subscriptions(|client| &client.subscriptions_instrument_status)
478 }
479
480 #[must_use]
482 pub fn subscribed_instrument_close(&self) -> Vec<InstrumentId> {
483 self.collect_subscriptions(|client| &client.subscriptions_instrument_close)
484 }
485
486 pub fn execute(&mut self, cmd: &DataCommand) {
492 if let Err(e) = match cmd {
493 DataCommand::Subscribe(c) => self.execute_subscribe(c),
494 DataCommand::Unsubscribe(c) => self.execute_unsubscribe(c),
495 DataCommand::Request(c) => self.execute_request(c),
496 #[cfg(feature = "defi")]
497 DataCommand::DefiRequest(c) => self.execute_defi_request(c),
498 #[cfg(feature = "defi")]
499 DataCommand::DefiSubscribe(c) => self.execute_defi_subscribe(c),
500 #[cfg(feature = "defi")]
501 DataCommand::DefiUnsubscribe(c) => self.execute_defi_unsubscribe(c),
502 _ => {
503 log::warn!("Unhandled DataCommand variant: {cmd:?}");
504 Ok(())
505 }
506 } {
507 log::error!("{e}");
508 }
509 }
510
511 pub fn execute_subscribe(&mut self, cmd: &SubscribeCommand) -> anyhow::Result<()> {
518 match &cmd {
520 SubscribeCommand::BookDeltas(cmd) => self.subscribe_book_deltas(cmd)?,
521 SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd)?,
522 SubscribeCommand::BookSnapshots(cmd) => self.subscribe_book_snapshots(cmd)?,
523 SubscribeCommand::Bars(cmd) => self.subscribe_bars(cmd)?,
524 _ => {} }
526
527 if let Some(client_id) = cmd.client_id()
528 && self.external_clients.contains(client_id)
529 {
530 if self.config.debug {
531 log::debug!("Skipping subscribe command for external client {client_id}: {cmd:?}",);
532 }
533 return Ok(());
534 }
535
536 if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
537 client.execute_subscribe(cmd);
538 } else {
539 log::error!(
540 "Cannot handle command: no client found for client_id={:?}, venue={:?}",
541 cmd.client_id(),
542 cmd.venue(),
543 );
544 }
545
546 Ok(())
547 }
548
549 pub fn execute_unsubscribe(&mut self, cmd: &UnsubscribeCommand) -> anyhow::Result<()> {
555 match &cmd {
556 UnsubscribeCommand::BookDeltas(cmd) => self.unsubscribe_book_deltas(cmd)?,
557 UnsubscribeCommand::BookDepth10(cmd) => self.unsubscribe_book_depth10(cmd)?,
558 UnsubscribeCommand::BookSnapshots(cmd) => self.unsubscribe_book_snapshots(cmd)?,
559 UnsubscribeCommand::Bars(cmd) => self.unsubscribe_bars(cmd)?,
560 _ => {} }
562
563 if let Some(client_id) = cmd.client_id()
564 && self.external_clients.contains(client_id)
565 {
566 if self.config.debug {
567 log::debug!(
568 "Skipping unsubscribe command for external client {client_id}: {cmd:?}",
569 );
570 }
571 return Ok(());
572 }
573
574 if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
575 client.execute_unsubscribe(cmd);
576 } else {
577 log::error!(
578 "Cannot handle command: no client found for client_id={:?}, venue={:?}",
579 cmd.client_id(),
580 cmd.venue(),
581 );
582 }
583
584 Ok(())
585 }
586
587 pub fn execute_request(&mut self, req: &RequestCommand) -> anyhow::Result<()> {
594 if let Some(cid) = req.client_id()
596 && self.external_clients.contains(cid)
597 {
598 if self.config.debug {
599 log::debug!("Skipping data request for external client {cid}: {req:?}");
600 }
601 return Ok(());
602 }
603 if let Some(client) = self.get_client(req.client_id(), req.venue()) {
604 match req {
605 RequestCommand::Data(req) => client.request_data(req),
606 RequestCommand::Instrument(req) => client.request_instrument(req),
607 RequestCommand::Instruments(req) => client.request_instruments(req),
608 RequestCommand::BookSnapshot(req) => client.request_book_snapshot(req),
609 RequestCommand::BookDepth(req) => client.request_book_depth(req),
610 RequestCommand::Quotes(req) => client.request_quotes(req),
611 RequestCommand::Trades(req) => client.request_trades(req),
612 RequestCommand::Bars(req) => client.request_bars(req),
613 }
614 } else {
615 anyhow::bail!(
616 "Cannot handle request: no client found for {:?} {:?}",
617 req.client_id(),
618 req.venue()
619 );
620 }
621 }
622
623 pub fn process(&mut self, data: &dyn Any) {
627 if let Some(data) = data.downcast_ref::<Data>() {
629 self.process_data(data.clone()); return;
631 }
632
633 #[cfg(feature = "defi")]
634 if let Some(data) = data.downcast_ref::<DefiData>() {
635 self.process_defi_data(data.clone()); return;
637 }
638
639 if let Some(instrument) = data.downcast_ref::<InstrumentAny>() {
640 self.handle_instrument(instrument.clone());
641 } else if let Some(funding_rate) = data.downcast_ref::<FundingRateUpdate>() {
642 self.handle_funding_rate(*funding_rate);
643 } else {
644 log::error!("Cannot process data {data:?}, type is unrecognized");
645 }
646 }
647
648 pub fn process_data(&mut self, data: Data) {
650 match data {
651 Data::Delta(delta) => self.handle_delta(delta),
652 Data::Deltas(deltas) => self.handle_deltas(deltas.into_inner()),
653 Data::Depth10(depth) => self.handle_depth10(*depth),
654 Data::Quote(quote) => self.handle_quote(quote),
655 Data::Trade(trade) => self.handle_trade(trade),
656 Data::Bar(bar) => self.handle_bar(bar),
657 Data::MarkPriceUpdate(mark_price) => self.handle_mark_price(mark_price),
658 Data::IndexPriceUpdate(index_price) => self.handle_index_price(index_price),
659 Data::InstrumentClose(close) => self.handle_instrument_close(close),
660 }
661 }
662
663 pub fn response(&self, resp: DataResponse) {
665 log::debug!("{RECV}{RES} {resp:?}");
666
667 match &resp {
668 DataResponse::Instrument(resp) => {
669 self.handle_instrument_response(resp.data.clone());
670 }
671 DataResponse::Instruments(resp) => {
672 self.handle_instruments(&resp.data);
673 }
674 DataResponse::Quotes(resp) => self.handle_quotes(&resp.data),
675 DataResponse::Trades(resp) => self.handle_trades(&resp.data),
676 DataResponse::Bars(resp) => self.handle_bars(&resp.data),
677 _ => todo!(),
678 }
679
680 msgbus::send_response(resp.correlation_id(), &resp);
681 }
682
683 fn handle_instrument(&mut self, instrument: InstrumentAny) {
686 if let Err(e) = self
687 .cache
688 .as_ref()
689 .borrow_mut()
690 .add_instrument(instrument.clone())
691 {
692 log_error_on_cache_insert(&e);
693 }
694
695 let topic = switchboard::get_instrument_topic(instrument.id());
696 msgbus::publish(topic, &instrument as &dyn Any);
697 }
698
699 fn handle_delta(&mut self, delta: OrderBookDelta) {
700 let deltas = if self.config.buffer_deltas {
701 if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&delta.instrument_id) {
702 buffered_deltas.deltas.push(delta);
703 buffered_deltas.flags = delta.flags;
704 buffered_deltas.sequence = delta.sequence;
705 buffered_deltas.ts_event = delta.ts_event;
706 buffered_deltas.ts_init = delta.ts_init;
707 } else {
708 let buffered_deltas = OrderBookDeltas::new(delta.instrument_id, vec![delta]);
709 self.buffered_deltas_map
710 .insert(delta.instrument_id, buffered_deltas);
711 }
712
713 if !RecordFlag::F_LAST.matches(delta.flags) {
714 return; }
716
717 self.buffered_deltas_map
719 .remove(&delta.instrument_id)
720 .unwrap()
721 } else {
722 OrderBookDeltas::new(delta.instrument_id, vec![delta])
723 };
724
725 let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
726 msgbus::publish(topic, &deltas as &dyn Any);
727 }
728
729 fn handle_deltas(&mut self, deltas: OrderBookDeltas) {
730 let deltas = if self.config.buffer_deltas {
731 let mut is_last_delta = false;
732 for delta in &deltas.deltas {
733 if RecordFlag::F_LAST.matches(delta.flags) {
734 is_last_delta = true;
735 break;
736 }
737 }
738
739 let instrument_id = deltas.instrument_id;
740
741 if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&instrument_id) {
742 buffered_deltas.deltas.extend(deltas.deltas);
743
744 if let Some(last_delta) = buffered_deltas.deltas.last() {
745 buffered_deltas.flags = last_delta.flags;
746 buffered_deltas.sequence = last_delta.sequence;
747 buffered_deltas.ts_event = last_delta.ts_event;
748 buffered_deltas.ts_init = last_delta.ts_init;
749 }
750 } else {
751 self.buffered_deltas_map.insert(instrument_id, deltas);
752 }
753
754 if !is_last_delta {
755 return;
756 }
757
758 self.buffered_deltas_map.remove(&instrument_id).unwrap()
760 } else {
761 deltas
762 };
763
764 let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
765 msgbus::publish(topic, &deltas as &dyn Any);
766 }
767
768 fn handle_depth10(&mut self, depth: OrderBookDepth10) {
769 let topic = switchboard::get_book_depth10_topic(depth.instrument_id);
770 msgbus::publish(topic, &depth as &dyn Any);
771 }
772
773 fn handle_quote(&mut self, quote: QuoteTick) {
774 if let Err(e) = self.cache.as_ref().borrow_mut().add_quote(quote) {
775 log_error_on_cache_insert(&e);
776 }
777
778 let topic = switchboard::get_quotes_topic(quote.instrument_id);
781 msgbus::publish(topic, "e as &dyn Any);
782 }
783
784 fn handle_trade(&mut self, trade: TradeTick) {
785 if let Err(e) = self.cache.as_ref().borrow_mut().add_trade(trade) {
786 log_error_on_cache_insert(&e);
787 }
788
789 let topic = switchboard::get_trades_topic(trade.instrument_id);
792 msgbus::publish(topic, &trade as &dyn Any);
793 }
794
795 fn handle_bar(&mut self, bar: Bar) {
796 if self.config.validate_data_sequence
798 && let Some(last_bar) = self.cache.as_ref().borrow().bar(&bar.bar_type)
799 {
800 if bar.ts_event < last_bar.ts_event {
801 log::warn!(
802 "Bar {bar} was prior to last bar `ts_event` {}",
803 last_bar.ts_event
804 );
805 return; }
807 if bar.ts_init < last_bar.ts_init {
808 log::warn!(
809 "Bar {bar} was prior to last bar `ts_init` {}",
810 last_bar.ts_init
811 );
812 return; }
814 }
816
817 if let Err(e) = self.cache.as_ref().borrow_mut().add_bar(bar) {
818 log_error_on_cache_insert(&e);
819 }
820
821 let topic = switchboard::get_bars_topic(bar.bar_type);
822 msgbus::publish(topic, &bar as &dyn Any);
823 }
824
825 fn handle_mark_price(&mut self, mark_price: MarkPriceUpdate) {
826 if let Err(e) = self.cache.as_ref().borrow_mut().add_mark_price(mark_price) {
827 log_error_on_cache_insert(&e);
828 }
829
830 let topic = switchboard::get_mark_price_topic(mark_price.instrument_id);
831 msgbus::publish(topic, &mark_price as &dyn Any);
832 }
833
834 fn handle_index_price(&mut self, index_price: IndexPriceUpdate) {
835 if let Err(e) = self
836 .cache
837 .as_ref()
838 .borrow_mut()
839 .add_index_price(index_price)
840 {
841 log_error_on_cache_insert(&e);
842 }
843
844 let topic = switchboard::get_index_price_topic(index_price.instrument_id);
845 msgbus::publish(topic, &index_price as &dyn Any);
846 }
847
848 pub fn handle_funding_rate(&mut self, funding_rate: FundingRateUpdate) {
850 if let Err(e) = self
851 .cache
852 .as_ref()
853 .borrow_mut()
854 .add_funding_rate(funding_rate)
855 {
856 log_error_on_cache_insert(&e);
857 }
858
859 let topic = switchboard::get_funding_rate_topic(funding_rate.instrument_id);
860 msgbus::publish(topic, &funding_rate as &dyn Any);
861 }
862
863 fn handle_instrument_close(&mut self, close: InstrumentClose) {
864 let topic = switchboard::get_instrument_close_topic(close.instrument_id);
865 msgbus::publish(topic, &close as &dyn Any);
866 }
867
868 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
871 if cmd.instrument_id.is_synthetic() {
872 anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
873 }
874
875 self.setup_book_updater(&cmd.instrument_id, cmd.book_type, true, cmd.managed)?;
876
877 Ok(())
878 }
879
880 fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
881 if cmd.instrument_id.is_synthetic() {
882 anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDepth10` data");
883 }
884
885 self.setup_book_updater(&cmd.instrument_id, cmd.book_type, false, cmd.managed)?;
886
887 Ok(())
888 }
889
890 fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
891 if self.subscribed_book_deltas().contains(&cmd.instrument_id) {
892 return Ok(());
893 }
894
895 if cmd.instrument_id.is_synthetic() {
896 anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
897 }
898
899 let first_for_interval = match self.book_intervals.entry(cmd.interval_ms) {
901 Entry::Vacant(e) => {
902 let mut set = AHashSet::new();
903 set.insert(cmd.instrument_id);
904 e.insert(set);
905 true
906 }
907 Entry::Occupied(mut e) => {
908 e.get_mut().insert(cmd.instrument_id);
909 false
910 }
911 };
912
913 if first_for_interval {
914 let interval_ns = millis_to_nanos(cmd.interval_ms.get() as f64);
916 let topic = switchboard::get_book_snapshots_topic(cmd.instrument_id, cmd.interval_ms);
917
918 let snap_info = BookSnapshotInfo {
919 instrument_id: cmd.instrument_id,
920 venue: cmd.instrument_id.venue,
921 is_composite: cmd.instrument_id.symbol.is_composite(),
922 root: Ustr::from(cmd.instrument_id.symbol.root()),
923 topic,
924 interval_ms: cmd.interval_ms,
925 };
926
927 let now_ns = self.clock.borrow().timestamp_ns().as_u64();
929 let start_time_ns = now_ns - (now_ns % interval_ns) + interval_ns;
930
931 let snapshotter = Rc::new(BookSnapshotter::new(snap_info, self.cache.clone()));
932 self.book_snapshotters
933 .insert(cmd.instrument_id, snapshotter.clone());
934 let timer_name = snapshotter.timer_name;
935
936 let callback_fn: Rc<dyn Fn(TimeEvent)> =
937 Rc::new(move |event| snapshotter.snapshot(event));
938 let callback = TimeEventCallback::from(callback_fn);
939
940 self.clock
941 .borrow_mut()
942 .set_timer_ns(
943 &timer_name,
944 interval_ns,
945 Some(start_time_ns.into()),
946 None,
947 Some(callback),
948 None,
949 None,
950 )
951 .expect(FAILED);
952 }
953
954 self.setup_book_updater(&cmd.instrument_id, cmd.book_type, false, true)?;
955
956 Ok(())
957 }
958
959 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
960 match cmd.bar_type.aggregation_source() {
961 AggregationSource::Internal => {
962 if !self.bar_aggregators.contains_key(&cmd.bar_type.standard()) {
963 self.start_bar_aggregator(cmd.bar_type)?;
964 }
965 }
966 AggregationSource::External => {
967 if cmd.bar_type.instrument_id().is_synthetic() {
968 anyhow::bail!(
969 "Cannot subscribe for externally aggregated synthetic instrument bar data"
970 );
971 }
972 }
973 }
974
975 Ok(())
976 }
977
978 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
979 if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
980 log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
981 return Ok(());
982 }
983
984 let topics = vec![
985 switchboard::get_book_deltas_topic(cmd.instrument_id),
986 switchboard::get_book_depth10_topic(cmd.instrument_id),
987 ];
989
990 self.maintain_book_updater(&cmd.instrument_id, &topics);
991 self.maintain_book_snapshotter(&cmd.instrument_id);
992
993 Ok(())
994 }
995
996 fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
997 if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
998 log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
999 return Ok(());
1000 }
1001
1002 let topics = vec![
1003 switchboard::get_book_deltas_topic(cmd.instrument_id),
1004 switchboard::get_book_depth10_topic(cmd.instrument_id),
1005 ];
1007
1008 self.maintain_book_updater(&cmd.instrument_id, &topics);
1009 self.maintain_book_snapshotter(&cmd.instrument_id);
1010
1011 Ok(())
1012 }
1013
1014 fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
1015 if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
1016 log::warn!("Cannot unsubscribe from `OrderBook` snapshots: not subscribed");
1017 return Ok(());
1018 }
1019
1020 let mut to_remove = Vec::new();
1022 for (interval, set) in &mut self.book_intervals {
1023 if set.remove(&cmd.instrument_id) && set.is_empty() {
1024 to_remove.push(*interval);
1025 }
1026 }
1027
1028 for interval in to_remove {
1029 self.book_intervals.remove(&interval);
1030 }
1031
1032 let topics = vec![
1033 switchboard::get_book_deltas_topic(cmd.instrument_id),
1034 switchboard::get_book_depth10_topic(cmd.instrument_id),
1035 ];
1037
1038 self.maintain_book_updater(&cmd.instrument_id, &topics);
1039 self.maintain_book_snapshotter(&cmd.instrument_id);
1040
1041 Ok(())
1042 }
1043
1044 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
1046 let bar_type = cmd.bar_type;
1048 if self.bar_aggregators.contains_key(&bar_type.standard()) {
1049 if let Err(err) = self.stop_bar_aggregator(bar_type) {
1050 log::error!("Error stopping bar aggregator for {bar_type}: {err}");
1051 }
1052 self.bar_aggregators.remove(&bar_type.standard());
1053 log::debug!("Removed bar aggregator for {bar_type}");
1054 }
1055 Ok(())
1056 }
1057
1058 fn maintain_book_updater(&mut self, instrument_id: &InstrumentId, topics: &[MStr<Topic>]) {
1059 if let Some(updater) = self.book_updaters.get(instrument_id) {
1060 let handler = ShareableMessageHandler(updater.clone());
1061
1062 for topic in topics {
1064 if msgbus::subscriptions_count(topic.as_str()) == 1
1065 && msgbus::is_subscribed(topic.as_str(), handler.clone())
1066 {
1067 log::debug!("Unsubscribing BookUpdater from {topic}");
1068 msgbus::unsubscribe_topic(*topic, handler.clone());
1069 }
1070 }
1071
1072 let still_subscribed = topics
1074 .iter()
1075 .any(|topic| msgbus::is_subscribed(topic.as_str(), handler.clone()));
1076 if !still_subscribed {
1077 self.book_updaters.remove(instrument_id);
1078 log::debug!("Removed BookUpdater for instrument ID {instrument_id}");
1079 }
1080 }
1081 }
1082
1083 fn maintain_book_snapshotter(&mut self, instrument_id: &InstrumentId) {
1084 if let Some(snapshotter) = self.book_snapshotters.get(instrument_id) {
1085 let topic = switchboard::get_book_snapshots_topic(
1086 *instrument_id,
1087 snapshotter.snap_info.interval_ms,
1088 );
1089
1090 if msgbus::subscriptions_count(topic.as_str()) == 0 {
1092 let timer_name = snapshotter.timer_name;
1093 self.book_snapshotters.remove(instrument_id);
1094 let mut clock = self.clock.borrow_mut();
1095 if clock.timer_exists(&timer_name) {
1096 clock.cancel_timer(&timer_name);
1097 }
1098 log::debug!("Removed BookSnapshotter for instrument ID {instrument_id}");
1099 }
1100 }
1101 }
1102
1103 fn handle_instrument_response(&self, instrument: InstrumentAny) {
1106 let mut cache = self.cache.as_ref().borrow_mut();
1107 if let Err(e) = cache.add_instrument(instrument) {
1108 log_error_on_cache_insert(&e);
1109 }
1110 }
1111
1112 fn handle_instruments(&self, instruments: &[InstrumentAny]) {
1113 let mut cache = self.cache.as_ref().borrow_mut();
1115 for instrument in instruments {
1116 if let Err(e) = cache.add_instrument(instrument.clone()) {
1117 log_error_on_cache_insert(&e);
1118 }
1119 }
1120 }
1121
1122 fn handle_quotes(&self, quotes: &[QuoteTick]) {
1123 if let Err(e) = self.cache.as_ref().borrow_mut().add_quotes(quotes) {
1124 log_error_on_cache_insert(&e);
1125 }
1126 }
1127
1128 fn handle_trades(&self, trades: &[TradeTick]) {
1129 if let Err(e) = self.cache.as_ref().borrow_mut().add_trades(trades) {
1130 log_error_on_cache_insert(&e);
1131 }
1132 }
1133
1134 fn handle_bars(&self, bars: &[Bar]) {
1135 if let Err(e) = self.cache.as_ref().borrow_mut().add_bars(bars) {
1136 log_error_on_cache_insert(&e);
1137 }
1138 }
1139
1140 #[allow(clippy::too_many_arguments)]
1143 fn setup_book_updater(
1144 &mut self,
1145 instrument_id: &InstrumentId,
1146 book_type: BookType,
1147 only_deltas: bool,
1148 managed: bool,
1149 ) -> anyhow::Result<()> {
1150 let mut cache = self.cache.borrow_mut();
1151 if managed && !cache.has_order_book(instrument_id) {
1152 let book = OrderBook::new(*instrument_id, book_type);
1153 log::debug!("Created {book}");
1154 cache.add_order_book(book)?;
1155 }
1156
1157 let updater = Rc::new(BookUpdater::new(instrument_id, self.cache.clone()));
1159 self.book_updaters.insert(*instrument_id, updater.clone());
1160
1161 let handler = ShareableMessageHandler(updater);
1162
1163 let topic = switchboard::get_book_deltas_topic(*instrument_id);
1164 if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1165 msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1166 }
1167
1168 let topic = switchboard::get_book_depth10_topic(*instrument_id);
1169 if !only_deltas && !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1170 msgbus::subscribe(topic.into(), handler, Some(self.msgbus_priority));
1171 }
1172
1173 Ok(())
1174 }
1175
1176 fn create_bar_aggregator(
1177 &mut self,
1178 instrument: &InstrumentAny,
1179 bar_type: BarType,
1180 ) -> Box<dyn BarAggregator> {
1181 let cache = self.cache.clone();
1182
1183 let handler = move |bar: Bar| {
1184 if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
1185 log_error_on_cache_insert(&e);
1186 }
1187
1188 let topic = switchboard::get_bars_topic(bar.bar_type);
1189 msgbus::publish(topic, &bar as &dyn Any);
1190 };
1191
1192 let clock = self.clock.clone();
1193 let config = self.config.clone();
1194
1195 let price_precision = instrument.price_precision();
1196 let size_precision = instrument.size_precision();
1197
1198 if bar_type.spec().is_time_aggregated() {
1199 let time_bars_origin_offset = config
1201 .time_bars_origins
1202 .get(&bar_type.spec().aggregation)
1203 .map(|duration| chrono::TimeDelta::from_std(*duration).unwrap_or_default());
1204
1205 Box::new(TimeBarAggregator::new(
1206 bar_type,
1207 price_precision,
1208 size_precision,
1209 clock,
1210 handler,
1211 config.time_bars_build_with_no_updates,
1212 config.time_bars_timestamp_on_close,
1213 config.time_bars_interval_type,
1214 time_bars_origin_offset,
1215 20, false, ))
1218 } else {
1219 match bar_type.spec().aggregation {
1220 BarAggregation::Tick => Box::new(TickBarAggregator::new(
1221 bar_type,
1222 price_precision,
1223 size_precision,
1224 handler,
1225 )) as Box<dyn BarAggregator>,
1226 BarAggregation::Volume => Box::new(VolumeBarAggregator::new(
1227 bar_type,
1228 price_precision,
1229 size_precision,
1230 handler,
1231 )) as Box<dyn BarAggregator>,
1232 BarAggregation::Value => Box::new(ValueBarAggregator::new(
1233 bar_type,
1234 price_precision,
1235 size_precision,
1236 handler,
1237 )) as Box<dyn BarAggregator>,
1238 BarAggregation::Renko => Box::new(RenkoBarAggregator::new(
1239 bar_type,
1240 price_precision,
1241 size_precision,
1242 instrument.price_increment(),
1243 handler,
1244 )) as Box<dyn BarAggregator>,
1245 _ => panic!(
1246 "BarAggregation {:?} is not currently implemented. Supported aggregations: MILLISECOND, SECOND, MINUTE, HOUR, DAY, WEEK, MONTH, YEAR, TICK, VOLUME, VALUE, RENKO",
1247 bar_type.spec().aggregation
1248 ),
1249 }
1250 }
1251 }
1252
1253 fn start_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
1254 let instrument = {
1256 let cache = self.cache.borrow();
1257 cache
1258 .instrument(&bar_type.instrument_id())
1259 .ok_or_else(|| {
1260 anyhow::anyhow!(
1261 "Cannot start bar aggregation: no instrument found for {}",
1262 bar_type.instrument_id(),
1263 )
1264 })?
1265 .clone()
1266 };
1267
1268 let bar_key = bar_type.standard();
1270
1271 let aggregator = if let Some(rc) = self.bar_aggregators.get(&bar_key) {
1273 rc.clone()
1274 } else {
1275 let agg = self.create_bar_aggregator(&instrument, bar_type);
1276 let rc = Rc::new(RefCell::new(agg));
1277 self.bar_aggregators.insert(bar_key, rc.clone());
1278 rc
1279 };
1280
1281 let mut handlers = Vec::new();
1283
1284 if bar_type.is_composite() {
1285 let topic = switchboard::get_bars_topic(bar_type.composite());
1286 let handler =
1287 ShareableMessageHandler(Rc::new(BarBarHandler::new(aggregator.clone(), bar_key)));
1288
1289 if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1290 msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1291 }
1292
1293 handlers.push((topic, handler));
1294 } else if bar_type.spec().price_type == PriceType::Last {
1295 let topic = switchboard::get_trades_topic(bar_type.instrument_id());
1296 let handler =
1297 ShareableMessageHandler(Rc::new(BarTradeHandler::new(aggregator.clone(), bar_key)));
1298
1299 if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1300 msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1301 }
1302
1303 handlers.push((topic, handler));
1304 } else {
1305 let topic = switchboard::get_quotes_topic(bar_type.instrument_id());
1306 let handler =
1307 ShareableMessageHandler(Rc::new(BarQuoteHandler::new(aggregator.clone(), bar_key)));
1308
1309 if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1310 msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1311 }
1312
1313 handlers.push((topic, handler));
1314 }
1315
1316 self.bar_aggregator_handlers.insert(bar_key, handlers);
1317 aggregator.borrow_mut().set_is_running(true);
1318
1319 Ok(())
1320 }
1321
1322 fn stop_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
1323 let aggregator = self
1324 .bar_aggregators
1325 .remove(&bar_type.standard())
1326 .ok_or_else(|| {
1327 anyhow::anyhow!("Cannot stop bar aggregator: no aggregator to stop for {bar_type}")
1328 })?;
1329
1330 aggregator.borrow_mut().stop();
1331
1332 let bar_key = bar_type.standard();
1334 if let Some(subs) = self.bar_aggregator_handlers.remove(&bar_key) {
1335 for (topic, handler) in subs {
1336 if msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1337 msgbus::unsubscribe_topic(topic, handler);
1338 }
1339 }
1340 }
1341
1342 Ok(())
1343 }
1344}
1345
1346#[inline(always)]
1347fn log_error_on_cache_insert<T: Display>(e: &T) {
1348 log::error!("Error on cache insert: {e}");
1349}