1pub mod book;
32pub mod config;
33mod handlers;
34
35#[cfg(feature = "defi")]
36pub mod pool;
37
38use std::{
39 any::Any,
40 cell::{Ref, RefCell},
41 collections::hash_map::Entry,
42 fmt::Display,
43 num::NonZeroUsize,
44 rc::Rc,
45};
46
47use ahash::{AHashMap, AHashSet};
48use book::{BookSnapshotInfo, BookSnapshotter, BookUpdater};
49use config::DataEngineConfig;
50use handlers::{BarBarHandler, BarQuoteHandler, BarTradeHandler};
51use indexmap::IndexMap;
52use nautilus_common::{
53 cache::Cache,
54 clock::Clock,
55 logging::{RECV, RES},
56 messages::data::{
57 DataCommand, DataResponse, RequestCommand, SubscribeBars, SubscribeBookDeltas,
58 SubscribeBookDepth10, SubscribeBookSnapshots, SubscribeCommand, UnsubscribeBars,
59 UnsubscribeBookDeltas, UnsubscribeBookDepth10, UnsubscribeBookSnapshots,
60 UnsubscribeCommand,
61 },
62 msgbus::{self, MStr, Topic, handler::ShareableMessageHandler, switchboard},
63 timer::{TimeEvent, TimeEventCallback},
64};
65use nautilus_core::{
66 correctness::{
67 FAILED, check_key_in_map, check_key_not_in_map, check_predicate_false, check_predicate_true,
68 },
69 datetime::millis_to_nanos,
70};
71#[cfg(feature = "defi")]
72use nautilus_model::defi::DefiData;
73use nautilus_model::{
74 data::{
75 Bar, BarType, Data, DataType, FundingRateUpdate, IndexPriceUpdate, InstrumentClose,
76 MarkPriceUpdate, OrderBookDelta, OrderBookDeltas, OrderBookDepth10, QuoteTick, TradeTick,
77 },
78 enums::{AggregationSource, BarAggregation, BookType, PriceType, RecordFlag},
79 identifiers::{ClientId, InstrumentId, Venue},
80 instruments::{Instrument, InstrumentAny, SyntheticInstrument},
81 orderbook::OrderBook,
82};
83use nautilus_persistence::backend::catalog::ParquetDataCatalog;
84use ustr::Ustr;
85
86#[cfg(feature = "defi")]
87#[allow(unused_imports)] use crate::defi::engine as _;
89#[cfg(feature = "defi")]
90use crate::engine::pool::PoolUpdater;
91use crate::{
92 aggregation::{
93 BarAggregator, RenkoBarAggregator, TickBarAggregator, TickImbalanceBarAggregator,
94 TickRunsBarAggregator, TimeBarAggregator, ValueBarAggregator, ValueImbalanceBarAggregator,
95 ValueRunsBarAggregator, VolumeBarAggregator, VolumeImbalanceBarAggregator,
96 VolumeRunsBarAggregator,
97 },
98 client::DataClientAdapter,
99};
100
101#[derive(Debug)]
103pub struct DataEngine {
104 pub(crate) clock: Rc<RefCell<dyn Clock>>,
105 pub(crate) cache: Rc<RefCell<Cache>>,
106 pub(crate) external_clients: AHashSet<ClientId>,
107 clients: IndexMap<ClientId, DataClientAdapter>,
108 default_client: Option<DataClientAdapter>,
109 catalogs: AHashMap<Ustr, ParquetDataCatalog>,
110 routing_map: IndexMap<Venue, ClientId>,
111 book_intervals: AHashMap<NonZeroUsize, AHashSet<InstrumentId>>,
112 book_updaters: AHashMap<InstrumentId, Rc<BookUpdater>>,
113 book_snapshotters: AHashMap<InstrumentId, Rc<BookSnapshotter>>,
114 bar_aggregators: AHashMap<BarType, Rc<RefCell<Box<dyn BarAggregator>>>>,
115 bar_aggregator_handlers: AHashMap<BarType, Vec<(MStr<Topic>, ShareableMessageHandler)>>,
116 _synthetic_quote_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
117 _synthetic_trade_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
118 buffered_deltas_map: AHashMap<InstrumentId, OrderBookDeltas>,
119 pub(crate) msgbus_priority: u8,
120 pub(crate) config: DataEngineConfig,
121 #[cfg(feature = "defi")]
122 pub(crate) pool_updaters: AHashMap<InstrumentId, Rc<PoolUpdater>>,
123 #[cfg(feature = "defi")]
124 pub(crate) pool_updaters_pending: AHashSet<InstrumentId>,
125 #[cfg(feature = "defi")]
126 pub(crate) pool_snapshot_pending: AHashSet<InstrumentId>,
127 #[cfg(feature = "defi")]
128 pub(crate) pool_event_buffers: AHashMap<InstrumentId, Vec<DefiData>>,
129}
130
131impl DataEngine {
132 #[must_use]
134 pub fn new(
135 clock: Rc<RefCell<dyn Clock>>,
136 cache: Rc<RefCell<Cache>>,
137 config: Option<DataEngineConfig>,
138 ) -> Self {
139 let config = config.unwrap_or_default();
140
141 let external_clients: AHashSet<ClientId> = config
142 .external_clients
143 .clone()
144 .unwrap_or_default()
145 .into_iter()
146 .collect();
147
148 Self {
149 clock,
150 cache,
151 external_clients,
152 clients: IndexMap::new(),
153 default_client: None,
154 catalogs: AHashMap::new(),
155 routing_map: IndexMap::new(),
156 book_intervals: AHashMap::new(),
157 book_updaters: AHashMap::new(),
158 book_snapshotters: AHashMap::new(),
159 bar_aggregators: AHashMap::new(),
160 bar_aggregator_handlers: AHashMap::new(),
161 _synthetic_quote_feeds: AHashMap::new(),
162 _synthetic_trade_feeds: AHashMap::new(),
163 buffered_deltas_map: AHashMap::new(),
164 msgbus_priority: 10, config,
166 #[cfg(feature = "defi")]
167 pool_updaters: AHashMap::new(),
168 #[cfg(feature = "defi")]
169 pool_updaters_pending: AHashSet::new(),
170 #[cfg(feature = "defi")]
171 pool_snapshot_pending: AHashSet::new(),
172 #[cfg(feature = "defi")]
173 pool_event_buffers: AHashMap::new(),
174 }
175 }
176
177 #[must_use]
179 pub fn get_clock(&self) -> Ref<'_, dyn Clock> {
180 self.clock.borrow()
181 }
182
183 #[must_use]
185 pub fn get_cache(&self) -> Ref<'_, Cache> {
186 self.cache.borrow()
187 }
188
189 #[must_use]
191 pub fn cache_rc(&self) -> Rc<RefCell<Cache>> {
192 Rc::clone(&self.cache)
193 }
194
195 pub fn register_catalog(&mut self, catalog: ParquetDataCatalog, name: Option<String>) {
201 let name = Ustr::from(&name.unwrap_or("catalog_0".to_string()));
202
203 check_key_not_in_map(&name, &self.catalogs, "name", "catalogs").expect(FAILED);
204
205 self.catalogs.insert(name, catalog);
206 log::info!("Registered catalog <{name}>");
207 }
208
209 pub fn register_client(&mut self, client: DataClientAdapter, routing: Option<Venue>) {
216 let client_id = client.client_id();
217
218 if let Some(default_client) = &self.default_client {
219 check_predicate_false(
220 default_client.client_id() == client.client_id(),
221 "client_id already registered as default client",
222 )
223 .expect(FAILED);
224 }
225
226 check_key_not_in_map(&client_id, &self.clients, "client_id", "clients").expect(FAILED);
227
228 if let Some(routing) = routing {
229 self.routing_map.insert(routing, client_id);
230 log::info!("Set client {client_id} routing for {routing}");
231 }
232
233 if client.venue.is_none() && self.default_client.is_none() {
234 self.default_client = Some(client);
235 log::info!("Registered client {client_id} for default routing");
236 } else {
237 self.clients.insert(client_id, client);
238 log::info!("Registered client {client_id}");
239 }
240 }
241
242 pub fn deregister_client(&mut self, client_id: &ClientId) {
248 check_key_in_map(client_id, &self.clients, "client_id", "clients").expect(FAILED);
249
250 self.clients.shift_remove(client_id);
251 log::info!("Deregistered client {client_id}");
252 }
253
254 pub fn register_default_client(&mut self, client: DataClientAdapter) {
266 check_predicate_true(
267 self.default_client.is_none(),
268 "default client already registered",
269 )
270 .expect(FAILED);
271
272 let client_id = client.client_id();
273
274 self.default_client = Some(client);
275 log::info!("Registered default client {client_id}");
276 }
277
278 pub fn start(&mut self) {
280 for client in self.get_clients_mut() {
281 if let Err(e) = client.start() {
282 log::error!("{e}");
283 }
284 }
285 }
286
287 pub fn stop(&mut self) {
289 for client in self.get_clients_mut() {
290 if let Err(e) = client.stop() {
291 log::error!("{e}");
292 }
293 }
294 }
295
296 pub fn reset(&mut self) {
298 for client in self.get_clients_mut() {
299 if let Err(e) = client.reset() {
300 log::error!("{e}");
301 }
302 }
303 }
304
305 pub fn dispose(&mut self) {
307 for client in self.get_clients_mut() {
308 if let Err(e) = client.dispose() {
309 log::error!("{e}");
310 }
311 }
312
313 self.clock.borrow_mut().cancel_timers();
314 }
315
316 #[must_use]
318 pub fn check_connected(&self) -> bool {
319 self.get_clients()
320 .iter()
321 .all(|client| client.is_connected())
322 }
323
324 #[must_use]
326 pub fn check_disconnected(&self) -> bool {
327 self.get_clients()
328 .iter()
329 .all(|client| !client.is_connected())
330 }
331
332 #[must_use]
334 pub fn registered_clients(&self) -> Vec<ClientId> {
335 self.get_clients()
336 .into_iter()
337 .map(|client| client.client_id())
338 .collect()
339 }
340
341 pub(crate) fn collect_subscriptions<F, T>(&self, get_subs: F) -> Vec<T>
344 where
345 F: Fn(&DataClientAdapter) -> &AHashSet<T>,
346 T: Clone,
347 {
348 self.get_clients()
349 .into_iter()
350 .flat_map(get_subs)
351 .cloned()
352 .collect()
353 }
354
355 #[must_use]
356 pub fn get_clients(&self) -> Vec<&DataClientAdapter> {
357 let (default_opt, clients_map) = (&self.default_client, &self.clients);
358 let mut clients: Vec<&DataClientAdapter> = clients_map.values().collect();
359
360 if let Some(default) = default_opt {
361 clients.push(default);
362 }
363
364 clients
365 }
366
367 #[must_use]
368 pub fn get_clients_mut(&mut self) -> Vec<&mut DataClientAdapter> {
369 let (default_opt, clients_map) = (&mut self.default_client, &mut self.clients);
370 let mut clients: Vec<&mut DataClientAdapter> = clients_map.values_mut().collect();
371
372 if let Some(default) = default_opt {
373 clients.push(default);
374 }
375
376 clients
377 }
378
379 pub fn get_client(
380 &mut self,
381 client_id: Option<&ClientId>,
382 venue: Option<&Venue>,
383 ) -> Option<&mut DataClientAdapter> {
384 if let Some(client_id) = client_id {
385 if let Some(client) = self.clients.get_mut(client_id) {
387 return Some(client);
388 }
389
390 if let Some(default) = self.default_client.as_mut()
392 && default.client_id() == *client_id
393 {
394 return Some(default);
395 }
396
397 return None;
399 }
400
401 if let Some(v) = venue {
402 if let Some(client_id) = self.routing_map.get(v) {
404 return self.clients.get_mut(client_id);
405 }
406 }
407
408 self.get_default_client()
410 }
411
412 const fn get_default_client(&mut self) -> Option<&mut DataClientAdapter> {
413 self.default_client.as_mut()
414 }
415
416 #[must_use]
418 pub fn subscribed_custom_data(&self) -> Vec<DataType> {
419 self.collect_subscriptions(|client| &client.subscriptions_custom)
420 }
421
422 #[must_use]
424 pub fn subscribed_instruments(&self) -> Vec<InstrumentId> {
425 self.collect_subscriptions(|client| &client.subscriptions_instrument)
426 }
427
428 #[must_use]
430 pub fn subscribed_book_deltas(&self) -> Vec<InstrumentId> {
431 self.collect_subscriptions(|client| &client.subscriptions_book_deltas)
432 }
433
434 #[must_use]
436 pub fn subscribed_book_snapshots(&self) -> Vec<InstrumentId> {
437 self.collect_subscriptions(|client| &client.subscriptions_book_snapshots)
438 }
439
440 #[must_use]
442 pub fn subscribed_quotes(&self) -> Vec<InstrumentId> {
443 self.collect_subscriptions(|client| &client.subscriptions_quotes)
444 }
445
446 #[must_use]
448 pub fn subscribed_trades(&self) -> Vec<InstrumentId> {
449 self.collect_subscriptions(|client| &client.subscriptions_trades)
450 }
451
452 #[must_use]
454 pub fn subscribed_bars(&self) -> Vec<BarType> {
455 self.collect_subscriptions(|client| &client.subscriptions_bars)
456 }
457
458 #[must_use]
460 pub fn subscribed_mark_prices(&self) -> Vec<InstrumentId> {
461 self.collect_subscriptions(|client| &client.subscriptions_mark_prices)
462 }
463
464 #[must_use]
466 pub fn subscribed_index_prices(&self) -> Vec<InstrumentId> {
467 self.collect_subscriptions(|client| &client.subscriptions_index_prices)
468 }
469
470 #[must_use]
472 pub fn subscribed_funding_rates(&self) -> Vec<InstrumentId> {
473 self.collect_subscriptions(|client| &client.subscriptions_funding_rates)
474 }
475
476 #[must_use]
478 pub fn subscribed_instrument_status(&self) -> Vec<InstrumentId> {
479 self.collect_subscriptions(|client| &client.subscriptions_instrument_status)
480 }
481
482 #[must_use]
484 pub fn subscribed_instrument_close(&self) -> Vec<InstrumentId> {
485 self.collect_subscriptions(|client| &client.subscriptions_instrument_close)
486 }
487
488 pub fn execute(&mut self, cmd: &DataCommand) {
494 if let Err(e) = match cmd {
495 DataCommand::Subscribe(c) => self.execute_subscribe(c),
496 DataCommand::Unsubscribe(c) => self.execute_unsubscribe(c),
497 DataCommand::Request(c) => self.execute_request(c),
498 #[cfg(feature = "defi")]
499 DataCommand::DefiRequest(c) => self.execute_defi_request(c),
500 #[cfg(feature = "defi")]
501 DataCommand::DefiSubscribe(c) => self.execute_defi_subscribe(c),
502 #[cfg(feature = "defi")]
503 DataCommand::DefiUnsubscribe(c) => self.execute_defi_unsubscribe(c),
504 _ => {
505 log::warn!("Unhandled DataCommand variant: {cmd:?}");
506 Ok(())
507 }
508 } {
509 log::error!("{e}");
510 }
511 }
512
513 pub fn execute_subscribe(&mut self, cmd: &SubscribeCommand) -> anyhow::Result<()> {
520 match &cmd {
522 SubscribeCommand::BookDeltas(cmd) => self.subscribe_book_deltas(cmd)?,
523 SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd)?,
524 SubscribeCommand::BookSnapshots(cmd) => self.subscribe_book_snapshots(cmd)?,
525 SubscribeCommand::Bars(cmd) => self.subscribe_bars(cmd)?,
526 _ => {} }
528
529 if let Some(client_id) = cmd.client_id()
530 && self.external_clients.contains(client_id)
531 {
532 if self.config.debug {
533 log::debug!("Skipping subscribe command for external client {client_id}: {cmd:?}",);
534 }
535 return Ok(());
536 }
537
538 if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
539 client.execute_subscribe(cmd);
540 } else {
541 log::error!(
542 "Cannot handle command: no client found for client_id={:?}, venue={:?}",
543 cmd.client_id(),
544 cmd.venue(),
545 );
546 }
547
548 Ok(())
549 }
550
551 pub fn execute_unsubscribe(&mut self, cmd: &UnsubscribeCommand) -> anyhow::Result<()> {
557 match &cmd {
558 UnsubscribeCommand::BookDeltas(cmd) => self.unsubscribe_book_deltas(cmd)?,
559 UnsubscribeCommand::BookDepth10(cmd) => self.unsubscribe_book_depth10(cmd)?,
560 UnsubscribeCommand::BookSnapshots(cmd) => self.unsubscribe_book_snapshots(cmd)?,
561 UnsubscribeCommand::Bars(cmd) => self.unsubscribe_bars(cmd)?,
562 _ => {} }
564
565 if let Some(client_id) = cmd.client_id()
566 && self.external_clients.contains(client_id)
567 {
568 if self.config.debug {
569 log::debug!(
570 "Skipping unsubscribe command for external client {client_id}: {cmd:?}",
571 );
572 }
573 return Ok(());
574 }
575
576 if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
577 client.execute_unsubscribe(cmd);
578 } else {
579 log::error!(
580 "Cannot handle command: no client found for client_id={:?}, venue={:?}",
581 cmd.client_id(),
582 cmd.venue(),
583 );
584 }
585
586 Ok(())
587 }
588
589 pub fn execute_request(&mut self, req: &RequestCommand) -> anyhow::Result<()> {
596 if let Some(cid) = req.client_id()
598 && self.external_clients.contains(cid)
599 {
600 if self.config.debug {
601 log::debug!("Skipping data request for external client {cid}: {req:?}");
602 }
603 return Ok(());
604 }
605 if let Some(client) = self.get_client(req.client_id(), req.venue()) {
606 match req {
607 RequestCommand::Data(req) => client.request_data(req),
608 RequestCommand::Instrument(req) => client.request_instrument(req),
609 RequestCommand::Instruments(req) => client.request_instruments(req),
610 RequestCommand::BookSnapshot(req) => client.request_book_snapshot(req),
611 RequestCommand::BookDepth(req) => client.request_book_depth(req),
612 RequestCommand::Quotes(req) => client.request_quotes(req),
613 RequestCommand::Trades(req) => client.request_trades(req),
614 RequestCommand::Bars(req) => client.request_bars(req),
615 }
616 } else {
617 anyhow::bail!(
618 "Cannot handle request: no client found for {:?} {:?}",
619 req.client_id(),
620 req.venue()
621 );
622 }
623 }
624
625 pub fn process(&mut self, data: &dyn Any) {
629 if let Some(data) = data.downcast_ref::<Data>() {
631 self.process_data(data.clone()); return;
633 }
634
635 #[cfg(feature = "defi")]
636 if let Some(data) = data.downcast_ref::<DefiData>() {
637 self.process_defi_data(data.clone()); return;
639 }
640
641 if let Some(instrument) = data.downcast_ref::<InstrumentAny>() {
642 self.handle_instrument(instrument.clone());
643 } else if let Some(funding_rate) = data.downcast_ref::<FundingRateUpdate>() {
644 self.handle_funding_rate(*funding_rate);
645 } else {
646 log::error!("Cannot process data {data:?}, type is unrecognized");
647 }
648 }
649
650 pub fn process_data(&mut self, data: Data) {
652 match data {
653 Data::Delta(delta) => self.handle_delta(delta),
654 Data::Deltas(deltas) => self.handle_deltas(deltas.into_inner()),
655 Data::Depth10(depth) => self.handle_depth10(*depth),
656 Data::Quote(quote) => self.handle_quote(quote),
657 Data::Trade(trade) => self.handle_trade(trade),
658 Data::Bar(bar) => self.handle_bar(bar),
659 Data::MarkPriceUpdate(mark_price) => self.handle_mark_price(mark_price),
660 Data::IndexPriceUpdate(index_price) => self.handle_index_price(index_price),
661 Data::InstrumentClose(close) => self.handle_instrument_close(close),
662 }
663 }
664
665 pub fn response(&self, resp: DataResponse) {
667 log::debug!("{RECV}{RES} {resp:?}");
668
669 match &resp {
670 DataResponse::Instrument(resp) => {
671 self.handle_instrument_response(resp.data.clone());
672 }
673 DataResponse::Instruments(resp) => {
674 self.handle_instruments(&resp.data);
675 }
676 DataResponse::Quotes(resp) => self.handle_quotes(&resp.data),
677 DataResponse::Trades(resp) => self.handle_trades(&resp.data),
678 DataResponse::Bars(resp) => self.handle_bars(&resp.data),
679 _ => todo!(),
680 }
681
682 msgbus::send_response(resp.correlation_id(), &resp);
683 }
684
685 fn handle_instrument(&mut self, instrument: InstrumentAny) {
688 if let Err(e) = self
689 .cache
690 .as_ref()
691 .borrow_mut()
692 .add_instrument(instrument.clone())
693 {
694 log_error_on_cache_insert(&e);
695 }
696
697 let topic = switchboard::get_instrument_topic(instrument.id());
698 msgbus::publish(topic, &instrument as &dyn Any);
699 }
700
701 fn handle_delta(&mut self, delta: OrderBookDelta) {
702 let deltas = if self.config.buffer_deltas {
703 if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&delta.instrument_id) {
704 buffered_deltas.deltas.push(delta);
705 buffered_deltas.flags = delta.flags;
706 buffered_deltas.sequence = delta.sequence;
707 buffered_deltas.ts_event = delta.ts_event;
708 buffered_deltas.ts_init = delta.ts_init;
709 } else {
710 let buffered_deltas = OrderBookDeltas::new(delta.instrument_id, vec![delta]);
711 self.buffered_deltas_map
712 .insert(delta.instrument_id, buffered_deltas);
713 }
714
715 if !RecordFlag::F_LAST.matches(delta.flags) {
716 return; }
718
719 self.buffered_deltas_map
721 .remove(&delta.instrument_id)
722 .unwrap()
723 } else {
724 OrderBookDeltas::new(delta.instrument_id, vec![delta])
725 };
726
727 let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
728 msgbus::publish(topic, &deltas as &dyn Any);
729 }
730
731 fn handle_deltas(&mut self, deltas: OrderBookDeltas) {
732 let deltas = if self.config.buffer_deltas {
733 let mut is_last_delta = false;
734 for delta in &deltas.deltas {
735 if RecordFlag::F_LAST.matches(delta.flags) {
736 is_last_delta = true;
737 break;
738 }
739 }
740
741 let instrument_id = deltas.instrument_id;
742
743 if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&instrument_id) {
744 buffered_deltas.deltas.extend(deltas.deltas);
745
746 if let Some(last_delta) = buffered_deltas.deltas.last() {
747 buffered_deltas.flags = last_delta.flags;
748 buffered_deltas.sequence = last_delta.sequence;
749 buffered_deltas.ts_event = last_delta.ts_event;
750 buffered_deltas.ts_init = last_delta.ts_init;
751 }
752 } else {
753 self.buffered_deltas_map.insert(instrument_id, deltas);
754 }
755
756 if !is_last_delta {
757 return;
758 }
759
760 self.buffered_deltas_map.remove(&instrument_id).unwrap()
762 } else {
763 deltas
764 };
765
766 let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
767 msgbus::publish(topic, &deltas as &dyn Any);
768 }
769
770 fn handle_depth10(&mut self, depth: OrderBookDepth10) {
771 let topic = switchboard::get_book_depth10_topic(depth.instrument_id);
772 msgbus::publish(topic, &depth as &dyn Any);
773 }
774
775 fn handle_quote(&mut self, quote: QuoteTick) {
776 if let Err(e) = self.cache.as_ref().borrow_mut().add_quote(quote) {
777 log_error_on_cache_insert(&e);
778 }
779
780 let topic = switchboard::get_quotes_topic(quote.instrument_id);
783 msgbus::publish(topic, "e as &dyn Any);
784 }
785
786 fn handle_trade(&mut self, trade: TradeTick) {
787 if let Err(e) = self.cache.as_ref().borrow_mut().add_trade(trade) {
788 log_error_on_cache_insert(&e);
789 }
790
791 let topic = switchboard::get_trades_topic(trade.instrument_id);
794 msgbus::publish(topic, &trade as &dyn Any);
795 }
796
797 fn handle_bar(&mut self, bar: Bar) {
798 if self.config.validate_data_sequence
800 && let Some(last_bar) = self.cache.as_ref().borrow().bar(&bar.bar_type)
801 {
802 if bar.ts_event < last_bar.ts_event {
803 log::warn!(
804 "Bar {bar} was prior to last bar `ts_event` {}",
805 last_bar.ts_event
806 );
807 return; }
809 if bar.ts_init < last_bar.ts_init {
810 log::warn!(
811 "Bar {bar} was prior to last bar `ts_init` {}",
812 last_bar.ts_init
813 );
814 return; }
816 }
818
819 if let Err(e) = self.cache.as_ref().borrow_mut().add_bar(bar) {
820 log_error_on_cache_insert(&e);
821 }
822
823 let topic = switchboard::get_bars_topic(bar.bar_type);
824 msgbus::publish(topic, &bar as &dyn Any);
825 }
826
827 fn handle_mark_price(&mut self, mark_price: MarkPriceUpdate) {
828 if let Err(e) = self.cache.as_ref().borrow_mut().add_mark_price(mark_price) {
829 log_error_on_cache_insert(&e);
830 }
831
832 let topic = switchboard::get_mark_price_topic(mark_price.instrument_id);
833 msgbus::publish(topic, &mark_price as &dyn Any);
834 }
835
836 fn handle_index_price(&mut self, index_price: IndexPriceUpdate) {
837 if let Err(e) = self
838 .cache
839 .as_ref()
840 .borrow_mut()
841 .add_index_price(index_price)
842 {
843 log_error_on_cache_insert(&e);
844 }
845
846 let topic = switchboard::get_index_price_topic(index_price.instrument_id);
847 msgbus::publish(topic, &index_price as &dyn Any);
848 }
849
850 pub fn handle_funding_rate(&mut self, funding_rate: FundingRateUpdate) {
852 if let Err(e) = self
853 .cache
854 .as_ref()
855 .borrow_mut()
856 .add_funding_rate(funding_rate)
857 {
858 log_error_on_cache_insert(&e);
859 }
860
861 let topic = switchboard::get_funding_rate_topic(funding_rate.instrument_id);
862 msgbus::publish(topic, &funding_rate as &dyn Any);
863 }
864
865 fn handle_instrument_close(&mut self, close: InstrumentClose) {
866 let topic = switchboard::get_instrument_close_topic(close.instrument_id);
867 msgbus::publish(topic, &close as &dyn Any);
868 }
869
870 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
873 if cmd.instrument_id.is_synthetic() {
874 anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
875 }
876
877 self.setup_book_updater(&cmd.instrument_id, cmd.book_type, true, cmd.managed)?;
878
879 Ok(())
880 }
881
882 fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
883 if cmd.instrument_id.is_synthetic() {
884 anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDepth10` data");
885 }
886
887 self.setup_book_updater(&cmd.instrument_id, cmd.book_type, false, cmd.managed)?;
888
889 Ok(())
890 }
891
892 fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
893 if self.subscribed_book_deltas().contains(&cmd.instrument_id) {
894 return Ok(());
895 }
896
897 if cmd.instrument_id.is_synthetic() {
898 anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
899 }
900
901 let first_for_interval = match self.book_intervals.entry(cmd.interval_ms) {
903 Entry::Vacant(e) => {
904 let mut set = AHashSet::new();
905 set.insert(cmd.instrument_id);
906 e.insert(set);
907 true
908 }
909 Entry::Occupied(mut e) => {
910 e.get_mut().insert(cmd.instrument_id);
911 false
912 }
913 };
914
915 if first_for_interval {
916 let interval_ns = millis_to_nanos(cmd.interval_ms.get() as f64);
918 let topic = switchboard::get_book_snapshots_topic(cmd.instrument_id, cmd.interval_ms);
919
920 let snap_info = BookSnapshotInfo {
921 instrument_id: cmd.instrument_id,
922 venue: cmd.instrument_id.venue,
923 is_composite: cmd.instrument_id.symbol.is_composite(),
924 root: Ustr::from(cmd.instrument_id.symbol.root()),
925 topic,
926 interval_ms: cmd.interval_ms,
927 };
928
929 let now_ns = self.clock.borrow().timestamp_ns().as_u64();
931 let start_time_ns = now_ns - (now_ns % interval_ns) + interval_ns;
932
933 let snapshotter = Rc::new(BookSnapshotter::new(snap_info, self.cache.clone()));
934 self.book_snapshotters
935 .insert(cmd.instrument_id, snapshotter.clone());
936 let timer_name = snapshotter.timer_name;
937
938 let callback_fn: Rc<dyn Fn(TimeEvent)> =
939 Rc::new(move |event| snapshotter.snapshot(event));
940 let callback = TimeEventCallback::from(callback_fn);
941
942 self.clock
943 .borrow_mut()
944 .set_timer_ns(
945 &timer_name,
946 interval_ns,
947 Some(start_time_ns.into()),
948 None,
949 Some(callback),
950 None,
951 None,
952 )
953 .expect(FAILED);
954 }
955
956 self.setup_book_updater(&cmd.instrument_id, cmd.book_type, false, true)?;
957
958 Ok(())
959 }
960
961 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
962 match cmd.bar_type.aggregation_source() {
963 AggregationSource::Internal => {
964 if !self.bar_aggregators.contains_key(&cmd.bar_type.standard()) {
965 self.start_bar_aggregator(cmd.bar_type)?;
966 }
967 }
968 AggregationSource::External => {
969 if cmd.bar_type.instrument_id().is_synthetic() {
970 anyhow::bail!(
971 "Cannot subscribe for externally aggregated synthetic instrument bar data"
972 );
973 }
974 }
975 }
976
977 Ok(())
978 }
979
980 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
981 if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
982 log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
983 return Ok(());
984 }
985
986 let topics = vec![
987 switchboard::get_book_deltas_topic(cmd.instrument_id),
988 switchboard::get_book_depth10_topic(cmd.instrument_id),
989 ];
991
992 self.maintain_book_updater(&cmd.instrument_id, &topics);
993 self.maintain_book_snapshotter(&cmd.instrument_id);
994
995 Ok(())
996 }
997
998 fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
999 if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
1000 log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
1001 return Ok(());
1002 }
1003
1004 let topics = vec![
1005 switchboard::get_book_deltas_topic(cmd.instrument_id),
1006 switchboard::get_book_depth10_topic(cmd.instrument_id),
1007 ];
1009
1010 self.maintain_book_updater(&cmd.instrument_id, &topics);
1011 self.maintain_book_snapshotter(&cmd.instrument_id);
1012
1013 Ok(())
1014 }
1015
1016 fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
1017 if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
1018 log::warn!("Cannot unsubscribe from `OrderBook` snapshots: not subscribed");
1019 return Ok(());
1020 }
1021
1022 let mut to_remove = Vec::new();
1024 for (interval, set) in &mut self.book_intervals {
1025 if set.remove(&cmd.instrument_id) && set.is_empty() {
1026 to_remove.push(*interval);
1027 }
1028 }
1029
1030 for interval in to_remove {
1031 self.book_intervals.remove(&interval);
1032 }
1033
1034 let topics = vec![
1035 switchboard::get_book_deltas_topic(cmd.instrument_id),
1036 switchboard::get_book_depth10_topic(cmd.instrument_id),
1037 ];
1039
1040 self.maintain_book_updater(&cmd.instrument_id, &topics);
1041 self.maintain_book_snapshotter(&cmd.instrument_id);
1042
1043 Ok(())
1044 }
1045
1046 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
1048 let bar_type = cmd.bar_type;
1050 if self.bar_aggregators.contains_key(&bar_type.standard()) {
1051 if let Err(e) = self.stop_bar_aggregator(bar_type) {
1052 log::error!("Error stopping bar aggregator for {bar_type}: {e}");
1053 }
1054 self.bar_aggregators.remove(&bar_type.standard());
1055 log::debug!("Removed bar aggregator for {bar_type}");
1056 }
1057 Ok(())
1058 }
1059
1060 fn maintain_book_updater(&mut self, instrument_id: &InstrumentId, topics: &[MStr<Topic>]) {
1061 if let Some(updater) = self.book_updaters.get(instrument_id) {
1062 let handler = ShareableMessageHandler(updater.clone());
1063
1064 for topic in topics {
1066 if msgbus::subscriptions_count(topic.as_str()) == 1
1067 && msgbus::is_subscribed(topic.as_str(), handler.clone())
1068 {
1069 log::debug!("Unsubscribing BookUpdater from {topic}");
1070 msgbus::unsubscribe_topic(*topic, handler.clone());
1071 }
1072 }
1073
1074 let still_subscribed = topics
1076 .iter()
1077 .any(|topic| msgbus::is_subscribed(topic.as_str(), handler.clone()));
1078 if !still_subscribed {
1079 self.book_updaters.remove(instrument_id);
1080 log::debug!("Removed BookUpdater for instrument ID {instrument_id}");
1081 }
1082 }
1083 }
1084
1085 fn maintain_book_snapshotter(&mut self, instrument_id: &InstrumentId) {
1086 if let Some(snapshotter) = self.book_snapshotters.get(instrument_id) {
1087 let topic = switchboard::get_book_snapshots_topic(
1088 *instrument_id,
1089 snapshotter.snap_info.interval_ms,
1090 );
1091
1092 if msgbus::subscriptions_count(topic.as_str()) == 0 {
1094 let timer_name = snapshotter.timer_name;
1095 self.book_snapshotters.remove(instrument_id);
1096 let mut clock = self.clock.borrow_mut();
1097 if clock.timer_exists(&timer_name) {
1098 clock.cancel_timer(&timer_name);
1099 }
1100 log::debug!("Removed BookSnapshotter for instrument ID {instrument_id}");
1101 }
1102 }
1103 }
1104
1105 fn handle_instrument_response(&self, instrument: InstrumentAny) {
1108 let mut cache = self.cache.as_ref().borrow_mut();
1109 if let Err(e) = cache.add_instrument(instrument) {
1110 log_error_on_cache_insert(&e);
1111 }
1112 }
1113
1114 fn handle_instruments(&self, instruments: &[InstrumentAny]) {
1115 let mut cache = self.cache.as_ref().borrow_mut();
1117 for instrument in instruments {
1118 if let Err(e) = cache.add_instrument(instrument.clone()) {
1119 log_error_on_cache_insert(&e);
1120 }
1121 }
1122 }
1123
1124 fn handle_quotes(&self, quotes: &[QuoteTick]) {
1125 if let Err(e) = self.cache.as_ref().borrow_mut().add_quotes(quotes) {
1126 log_error_on_cache_insert(&e);
1127 }
1128 }
1129
1130 fn handle_trades(&self, trades: &[TradeTick]) {
1131 if let Err(e) = self.cache.as_ref().borrow_mut().add_trades(trades) {
1132 log_error_on_cache_insert(&e);
1133 }
1134 }
1135
1136 fn handle_bars(&self, bars: &[Bar]) {
1137 if let Err(e) = self.cache.as_ref().borrow_mut().add_bars(bars) {
1138 log_error_on_cache_insert(&e);
1139 }
1140 }
1141
1142 #[allow(clippy::too_many_arguments)]
1145 fn setup_book_updater(
1146 &mut self,
1147 instrument_id: &InstrumentId,
1148 book_type: BookType,
1149 only_deltas: bool,
1150 managed: bool,
1151 ) -> anyhow::Result<()> {
1152 let mut cache = self.cache.borrow_mut();
1153 if managed && !cache.has_order_book(instrument_id) {
1154 let book = OrderBook::new(*instrument_id, book_type);
1155 log::debug!("Created {book}");
1156 cache.add_order_book(book)?;
1157 }
1158
1159 let updater = Rc::new(BookUpdater::new(instrument_id, self.cache.clone()));
1161 self.book_updaters.insert(*instrument_id, updater.clone());
1162
1163 let handler = ShareableMessageHandler(updater);
1164
1165 let topic = switchboard::get_book_deltas_topic(*instrument_id);
1166 if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1167 msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1168 }
1169
1170 let topic = switchboard::get_book_depth10_topic(*instrument_id);
1171 if !only_deltas && !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1172 msgbus::subscribe(topic.into(), handler, Some(self.msgbus_priority));
1173 }
1174
1175 Ok(())
1176 }
1177
1178 fn create_bar_aggregator(
1179 &mut self,
1180 instrument: &InstrumentAny,
1181 bar_type: BarType,
1182 ) -> Box<dyn BarAggregator> {
1183 let cache = self.cache.clone();
1184
1185 let handler = move |bar: Bar| {
1186 if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
1187 log_error_on_cache_insert(&e);
1188 }
1189
1190 let topic = switchboard::get_bars_topic(bar.bar_type);
1191 msgbus::publish(topic, &bar as &dyn Any);
1192 };
1193
1194 let clock = self.clock.clone();
1195 let config = self.config.clone();
1196
1197 let price_precision = instrument.price_precision();
1198 let size_precision = instrument.size_precision();
1199
1200 if bar_type.spec().is_time_aggregated() {
1201 let time_bars_origin_offset = config
1203 .time_bars_origins
1204 .get(&bar_type.spec().aggregation)
1205 .map(|duration| chrono::TimeDelta::from_std(*duration).unwrap_or_default());
1206
1207 Box::new(TimeBarAggregator::new(
1208 bar_type,
1209 price_precision,
1210 size_precision,
1211 clock,
1212 handler,
1213 config.time_bars_build_with_no_updates,
1214 config.time_bars_timestamp_on_close,
1215 config.time_bars_interval_type,
1216 time_bars_origin_offset,
1217 20, false, ))
1220 } else {
1221 match bar_type.spec().aggregation {
1222 BarAggregation::Tick => Box::new(TickBarAggregator::new(
1223 bar_type,
1224 price_precision,
1225 size_precision,
1226 handler,
1227 )) as Box<dyn BarAggregator>,
1228 BarAggregation::TickImbalance => Box::new(TickImbalanceBarAggregator::new(
1229 bar_type,
1230 price_precision,
1231 size_precision,
1232 handler,
1233 )) as Box<dyn BarAggregator>,
1234 BarAggregation::TickRuns => Box::new(TickRunsBarAggregator::new(
1235 bar_type,
1236 price_precision,
1237 size_precision,
1238 handler,
1239 )) as Box<dyn BarAggregator>,
1240 BarAggregation::Volume => Box::new(VolumeBarAggregator::new(
1241 bar_type,
1242 price_precision,
1243 size_precision,
1244 handler,
1245 )) as Box<dyn BarAggregator>,
1246 BarAggregation::VolumeImbalance => Box::new(VolumeImbalanceBarAggregator::new(
1247 bar_type,
1248 price_precision,
1249 size_precision,
1250 handler,
1251 )) as Box<dyn BarAggregator>,
1252 BarAggregation::VolumeRuns => Box::new(VolumeRunsBarAggregator::new(
1253 bar_type,
1254 price_precision,
1255 size_precision,
1256 handler,
1257 )) as Box<dyn BarAggregator>,
1258 BarAggregation::Value => Box::new(ValueBarAggregator::new(
1259 bar_type,
1260 price_precision,
1261 size_precision,
1262 handler,
1263 )) as Box<dyn BarAggregator>,
1264 BarAggregation::ValueImbalance => Box::new(ValueImbalanceBarAggregator::new(
1265 bar_type,
1266 price_precision,
1267 size_precision,
1268 handler,
1269 )) as Box<dyn BarAggregator>,
1270 BarAggregation::ValueRuns => Box::new(ValueRunsBarAggregator::new(
1271 bar_type,
1272 price_precision,
1273 size_precision,
1274 handler,
1275 )) as Box<dyn BarAggregator>,
1276 BarAggregation::Renko => Box::new(RenkoBarAggregator::new(
1277 bar_type,
1278 price_precision,
1279 size_precision,
1280 instrument.price_increment(),
1281 handler,
1282 )) as Box<dyn BarAggregator>,
1283 _ => panic!(
1284 "BarAggregation {:?} is not currently implemented. Supported aggregations: MILLISECOND, SECOND, MINUTE, HOUR, DAY, WEEK, MONTH, YEAR, TICK, TICK_IMBALANCE, TICK_RUNS, VOLUME, VOLUME_IMBALANCE, VOLUME_RUNS, VALUE, VALUE_IMBALANCE, VALUE_RUNS, RENKO",
1285 bar_type.spec().aggregation
1286 ),
1287 }
1288 }
1289 }
1290
1291 fn start_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
1292 let instrument = {
1294 let cache = self.cache.borrow();
1295 cache
1296 .instrument(&bar_type.instrument_id())
1297 .ok_or_else(|| {
1298 anyhow::anyhow!(
1299 "Cannot start bar aggregation: no instrument found for {}",
1300 bar_type.instrument_id(),
1301 )
1302 })?
1303 .clone()
1304 };
1305
1306 let bar_key = bar_type.standard();
1308
1309 let aggregator = if let Some(rc) = self.bar_aggregators.get(&bar_key) {
1311 rc.clone()
1312 } else {
1313 let agg = self.create_bar_aggregator(&instrument, bar_type);
1314 let rc = Rc::new(RefCell::new(agg));
1315 self.bar_aggregators.insert(bar_key, rc.clone());
1316 rc
1317 };
1318
1319 let mut handlers = Vec::new();
1321
1322 if bar_type.is_composite() {
1323 let topic = switchboard::get_bars_topic(bar_type.composite());
1324 let handler =
1325 ShareableMessageHandler(Rc::new(BarBarHandler::new(aggregator.clone(), bar_key)));
1326
1327 if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1328 msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1329 }
1330
1331 handlers.push((topic, handler));
1332 } else if bar_type.spec().price_type == PriceType::Last {
1333 let topic = switchboard::get_trades_topic(bar_type.instrument_id());
1334 let handler =
1335 ShareableMessageHandler(Rc::new(BarTradeHandler::new(aggregator.clone(), bar_key)));
1336
1337 if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1338 msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1339 }
1340
1341 handlers.push((topic, handler));
1342 } else {
1343 let topic = switchboard::get_quotes_topic(bar_type.instrument_id());
1344 let handler =
1345 ShareableMessageHandler(Rc::new(BarQuoteHandler::new(aggregator.clone(), bar_key)));
1346
1347 if !msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1348 msgbus::subscribe(topic.into(), handler.clone(), Some(self.msgbus_priority));
1349 }
1350
1351 handlers.push((topic, handler));
1352 }
1353
1354 self.bar_aggregator_handlers.insert(bar_key, handlers);
1355
1356 self.setup_bar_aggregator(bar_type, false)?;
1358
1359 aggregator.borrow_mut().set_is_running(true);
1360
1361 Ok(())
1362 }
1363
1364 fn setup_bar_aggregator(&mut self, bar_type: BarType, historical: bool) -> anyhow::Result<()> {
1368 let bar_key = bar_type.standard();
1369 let aggregator = self.bar_aggregators.get(&bar_key).ok_or_else(|| {
1370 anyhow::anyhow!("Cannot setup bar aggregator: no aggregator found for {bar_type}")
1371 })?;
1372
1373 let handler: Box<dyn FnMut(Bar)> = if historical {
1375 let cache = self.cache.clone();
1377 Box::new(move |bar: Bar| {
1378 if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
1379 log_error_on_cache_insert(&e);
1380 }
1381 })
1383 } else {
1384 let cache = self.cache.clone();
1386 Box::new(move |bar: Bar| {
1387 if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
1388 log_error_on_cache_insert(&e);
1389 }
1390 let topic = switchboard::get_bars_topic(bar.bar_type);
1391 msgbus::publish(topic, &bar as &dyn Any);
1392 })
1393 };
1394
1395 aggregator
1396 .borrow_mut()
1397 .set_historical_mode(historical, handler);
1398
1399 if bar_type.spec().is_time_aggregated() {
1401 use nautilus_common::clock::TestClock;
1402
1403 if historical {
1404 let test_clock = Rc::new(RefCell::new(TestClock::new()));
1406 aggregator.borrow_mut().set_clock(test_clock);
1407 let aggregator_weak = Rc::downgrade(aggregator);
1410 aggregator.borrow_mut().set_aggregator_weak(aggregator_weak);
1411 } else {
1412 aggregator.borrow_mut().set_clock(self.clock.clone());
1413 aggregator
1414 .borrow_mut()
1415 .start_timer(Some(aggregator.clone()));
1416 }
1417 }
1418
1419 Ok(())
1420 }
1421
1422 fn stop_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
1423 let aggregator = self
1424 .bar_aggregators
1425 .remove(&bar_type.standard())
1426 .ok_or_else(|| {
1427 anyhow::anyhow!("Cannot stop bar aggregator: no aggregator to stop for {bar_type}")
1428 })?;
1429
1430 aggregator.borrow_mut().stop();
1431
1432 let bar_key = bar_type.standard();
1434 if let Some(subs) = self.bar_aggregator_handlers.remove(&bar_key) {
1435 for (topic, handler) in subs {
1436 if msgbus::is_subscribed(topic.as_str(), handler.clone()) {
1437 msgbus::unsubscribe_topic(topic, handler);
1438 }
1439 }
1440 }
1441
1442 Ok(())
1443 }
1444}
1445
1446#[inline(always)]
1447fn log_error_on_cache_insert<T: Display>(e: &T) {
1448 log::error!("Error on cache insert: {e}");
1449}