1#![allow(dead_code)]
33#![allow(unused_variables)]
34#![allow(unused_assignments)]
35
36pub mod book;
37pub mod config;
38
39#[cfg(test)]
40mod tests;
41
42use std::{
43 any::Any,
44 cell::{Ref, RefCell},
45 collections::{HashMap, HashSet, VecDeque},
46 num::NonZeroUsize,
47 rc::Rc,
48 sync::Arc,
49};
50
51use book::{BookSnapshotInfo, BookSnapshotter, BookUpdater};
52use config::DataEngineConfig;
53use indexmap::IndexMap;
54use nautilus_common::{
55 cache::Cache,
56 clock::Clock,
57 logging::{RECV, RES},
58 messages::data::{
59 DataCommand, DataResponse, RequestCommand, SubscribeBars, SubscribeBookDeltas,
60 SubscribeBookDepth10, SubscribeBookSnapshots, SubscribeCommand, UnsubscribeBars,
61 UnsubscribeBookDeltas, UnsubscribeBookDepth10, UnsubscribeBookSnapshots,
62 UnsubscribeCommand,
63 },
64 msgbus::{
65 self, get_message_bus,
66 handler::{MessageHandler, ShareableMessageHandler},
67 switchboard::{self},
68 },
69 timer::TimeEventCallback,
70};
71use nautilus_core::{
72 correctness::{FAILED, check_key_in_index_map, check_key_not_in_index_map},
73 datetime::{NANOSECONDS_IN_MILLISECOND, NANOSECONDS_IN_SECOND, millis_to_nanos},
74};
75use nautilus_model::{
76 data::{
77 Bar, BarType, Data, DataType, OrderBookDelta, OrderBookDeltas, OrderBookDepth10, QuoteTick,
78 TradeTick,
79 close::InstrumentClose,
80 prices::{IndexPriceUpdate, MarkPriceUpdate},
81 },
82 enums::{AggregationSource, BarAggregation, BookType, PriceType, RecordFlag},
83 identifiers::{ClientId, InstrumentId, Venue},
84 instruments::{Instrument, InstrumentAny, SyntheticInstrument},
85 orderbook::OrderBook,
86};
87use ustr::Ustr;
88
89use crate::{
90 aggregation::{
91 BarAggregator, TickBarAggregator, TimeBarAggregator, ValueBarAggregator,
92 VolumeBarAggregator,
93 },
94 client::DataClientAdapter,
95};
96
97pub struct DataEngine {
99 clock: Rc<RefCell<dyn Clock>>,
100 cache: Rc<RefCell<Cache>>,
101 clients: IndexMap<ClientId, DataClientAdapter>,
102 default_client: Option<DataClientAdapter>,
103 external_clients: HashSet<ClientId>,
104 routing_map: IndexMap<Venue, ClientId>,
105 book_intervals: HashMap<NonZeroUsize, HashSet<InstrumentId>>,
106 book_updaters: HashMap<InstrumentId, Rc<BookUpdater>>,
107 book_snapshotters: HashMap<InstrumentId, Rc<BookSnapshotter>>,
108 bar_aggregators: HashMap<BarType, Box<dyn BarAggregator>>,
109 synthetic_quote_feeds: HashMap<InstrumentId, Vec<SyntheticInstrument>>,
110 synthetic_trade_feeds: HashMap<InstrumentId, Vec<SyntheticInstrument>>,
111 buffered_deltas_map: HashMap<InstrumentId, Vec<OrderBookDelta>>, msgbus_priority: u8,
113 command_queue: VecDeque<DataCommand>,
114 config: DataEngineConfig,
115}
116
117impl DataEngine {
118 #[must_use]
120 pub fn new(
121 clock: Rc<RefCell<dyn Clock>>,
122 cache: Rc<RefCell<Cache>>,
123 config: Option<DataEngineConfig>,
124 ) -> Self {
125 Self {
126 clock,
127 cache,
128 clients: IndexMap::new(),
129 default_client: None,
130 external_clients: HashSet::new(),
131 routing_map: IndexMap::new(),
132 book_intervals: HashMap::new(),
133 book_updaters: HashMap::new(),
134 book_snapshotters: HashMap::new(),
135 bar_aggregators: HashMap::new(),
136 synthetic_quote_feeds: HashMap::new(),
137 synthetic_trade_feeds: HashMap::new(),
138 buffered_deltas_map: HashMap::new(),
139 msgbus_priority: 10, command_queue: VecDeque::new(),
141 config: config.unwrap_or_default(),
142 }
143 }
144
145 #[must_use]
147 pub fn get_cache(&self) -> Ref<'_, Cache> {
148 self.cache.borrow()
149 }
150
151 pub fn register_default_client(&mut self, client: DataClientAdapter) {
162 log::info!("Registered default client {}", client.client_id());
163 self.default_client = Some(client);
164 }
165
166 pub fn start(self) {
167 self.clients.values().for_each(|client| client.start());
168 }
169
170 pub fn stop(self) {
171 self.clients.values().for_each(|client| client.stop());
172 }
173
174 pub fn reset(self) {
175 self.clients.values().for_each(|client| client.reset());
176 }
177
178 pub fn dispose(self) {
179 self.clients.values().for_each(|client| client.dispose());
180 self.clock.borrow_mut().cancel_timers();
181 }
182
183 pub fn connect(&self) {
184 todo!() }
186
187 pub fn disconnect(&self) {
188 todo!() }
190
191 #[must_use]
192 pub fn check_connected(&self) -> bool {
193 self.clients.values().all(|client| client.is_connected())
194 }
195
196 #[must_use]
197 pub fn check_disconnected(&self) -> bool {
198 self.clients.values().all(|client| !client.is_connected())
199 }
200
201 #[must_use]
202 pub fn registered_clients(&self) -> Vec<ClientId> {
203 self.clients.keys().copied().collect()
204 }
205
206 fn collect_subscriptions<F, T>(&self, get_subs: F) -> Vec<T>
209 where
210 F: Fn(&DataClientAdapter) -> &HashSet<T>,
211 T: Clone,
212 {
213 let mut subs = Vec::new();
214 for client in self.clients.values() {
215 subs.extend(get_subs(client).iter().cloned());
216 }
217 subs
218 }
219
220 fn get_client(
221 &mut self,
222 client_id: Option<&ClientId>,
223 venue: Option<&Venue>,
224 ) -> Option<&mut DataClientAdapter> {
225 if let Some(client_id) = client_id {
226 if self.clients.contains_key(client_id) {
228 return self.clients.get_mut(client_id);
229 }
230 }
231
232 if let Some(venue) = venue {
233 if let Some(mapped_client_id) = self.routing_map.get(venue) {
235 return self.clients.get_mut(mapped_client_id);
236 }
237 }
238
239 None
240 }
241
242 #[must_use]
243 pub fn subscribed_custom_data(&self) -> Vec<DataType> {
244 self.collect_subscriptions(|client| &client.subscriptions_generic)
245 }
246
247 #[must_use]
248 pub fn subscribed_instruments(&self) -> Vec<InstrumentId> {
249 self.collect_subscriptions(|client| &client.subscriptions_instrument)
250 }
251
252 #[must_use]
253 pub fn subscribed_book_deltas(&self) -> Vec<InstrumentId> {
254 self.collect_subscriptions(|client| &client.subscriptions_book_deltas)
255 }
256
257 #[must_use]
258 pub fn subscribed_book_snapshots(&self) -> Vec<InstrumentId> {
259 self.collect_subscriptions(|client| &client.subscriptions_book_snapshots)
260 }
261
262 #[must_use]
263 pub fn subscribed_quotes(&self) -> Vec<InstrumentId> {
264 self.collect_subscriptions(|client| &client.subscriptions_quotes)
265 }
266
267 #[must_use]
268 pub fn subscribed_trades(&self) -> Vec<InstrumentId> {
269 self.collect_subscriptions(|client| &client.subscriptions_trades)
270 }
271
272 #[must_use]
273 pub fn subscribed_bars(&self) -> Vec<BarType> {
274 self.collect_subscriptions(|client| &client.subscriptions_bars)
275 }
276
277 #[must_use]
278 pub fn subscribed_mark_prices(&self) -> Vec<InstrumentId> {
279 self.collect_subscriptions(|client| &client.subscriptions_mark_prices)
280 }
281
282 #[must_use]
283 pub fn subscribed_index_prices(&self) -> Vec<InstrumentId> {
284 self.collect_subscriptions(|client| &client.subscriptions_index_prices)
285 }
286
287 #[must_use]
288 pub fn subscribed_instrument_status(&self) -> Vec<InstrumentId> {
289 self.collect_subscriptions(|client| &client.subscriptions_instrument_status)
290 }
291
292 #[must_use]
293 pub fn subscribed_instrument_close(&self) -> Vec<InstrumentId> {
294 self.collect_subscriptions(|client| &client.subscriptions_instrument_close)
295 }
296
297 pub fn on_start(self) {
298 todo!()
299 }
300
301 pub fn on_stop(self) {
302 todo!()
303 }
304
305 pub fn register_client(&mut self, client: DataClientAdapter, routing: Option<Venue>) {
312 check_key_not_in_index_map(&client.client_id, &self.clients, "client_id", "clients")
313 .expect(FAILED);
314
315 if let Some(routing) = routing {
316 self.routing_map.insert(routing, client.client_id());
317 log::info!("Set client {} routing for {routing}", client.client_id());
318 }
319
320 log::info!("Registered client {}", client.client_id());
321 self.clients.insert(client.client_id, client);
322 }
323
324 pub fn deregister_client(&mut self, client_id: &ClientId) {
331 check_key_in_index_map(client_id, &self.clients, "client_id", "clients").expect(FAILED);
332
333 self.clients.shift_remove(client_id);
334 log::info!("Deregistered client {client_id}");
335 }
336
337 pub fn run(&mut self) {
338 let commands: Vec<_> = self.command_queue.drain(..).collect();
339 for cmd in commands {
340 self.execute(cmd);
341 }
342 }
343
344 pub fn enqueue(&mut self, cmd: &dyn Any) {
345 if let Some(cmd) = cmd.downcast_ref::<DataCommand>() {
346 self.command_queue.push_back(cmd.clone());
347 } else {
348 log::error!("Invalid message type received: {cmd:?}");
349 }
350 }
351
352 pub fn execute(&mut self, cmd: DataCommand) {
353 let result = match cmd {
354 DataCommand::Subscribe(cmd) => self.execute_subscribe(cmd),
355 DataCommand::Unsubscribe(cmd) => self.execute_unsubscribe(cmd),
356 DataCommand::Request(cmd) => self.execute_request(cmd),
357 };
358
359 if let Err(e) = result {
360 log::error!("{e}");
361 }
362 }
363
364 pub fn execute_subscribe(&mut self, cmd: SubscribeCommand) -> anyhow::Result<()> {
365 match &cmd {
366 SubscribeCommand::BookDeltas(cmd) => self.subscribe_book_deltas(cmd)?,
367 SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd)?,
368 SubscribeCommand::BookSnapshots(cmd) => self.subscribe_book_snapshots(cmd)?,
369 SubscribeCommand::Bars(cmd) => self.subscribe_bars(cmd)?,
370 _ => {} }
372
373 if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
374 client.execute_subscribe_command(cmd.clone());
375 } else {
376 log::error!(
377 "Cannot handle command: no client found for client_id={:?}, venue={:?}",
378 cmd.client_id(),
379 cmd.venue(),
380 );
381 }
382
383 Ok(())
384 }
385
386 pub fn execute_unsubscribe(&mut self, cmd: UnsubscribeCommand) -> anyhow::Result<()> {
387 match &cmd {
388 UnsubscribeCommand::BookDeltas(cmd) => self.unsubscribe_book_deltas(cmd)?,
389 UnsubscribeCommand::BookDepth10(cmd) => self.unsubscribe_book_depth10(cmd)?,
390 UnsubscribeCommand::BookSnapshots(cmd) => self.unsubscribe_book_snapshots(cmd)?,
391 UnsubscribeCommand::Bars(cmd) => self.unsubscribe_bars(cmd)?,
392 _ => {} }
394
395 if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
396 client.execute_unsubscribe_command(cmd.clone());
397 } else {
398 log::error!(
399 "Cannot handle command: no client found for client_id={:?}, venue={:?}",
400 cmd.client_id(),
401 cmd.venue(),
402 );
403 }
404
405 Ok(())
406 }
407
408 pub fn execute_request(&mut self, req: RequestCommand) -> anyhow::Result<()> {
410 if let Some(client) = self.get_client(req.client_id(), req.venue()) {
411 match req {
412 RequestCommand::Data(req) => client.request_data(req),
413 RequestCommand::Instrument(req) => client.request_instrument(req),
414 RequestCommand::Instruments(req) => client.request_instruments(req),
415 RequestCommand::BookSnapshot(req) => client.request_book_snapshot(req),
416 RequestCommand::Quotes(req) => client.request_quotes(req),
417 RequestCommand::Trades(req) => client.request_trades(req),
418 RequestCommand::Bars(req) => client.request_bars(req),
419 }
420 } else {
421 anyhow::bail!(
422 "Cannot handle request: no client found for {:?} {:?}",
423 req.client_id(),
424 req.venue()
425 );
426 }
427 }
428
429 pub fn process(&mut self, data: &dyn Any) {
430 if let Some(instrument) = data.downcast_ref::<InstrumentAny>() {
432 self.handle_instrument(instrument.clone());
433 } else {
434 log::error!("Cannot process data {data:?}, type is unrecognized");
435 }
436 }
437
438 pub fn process_data(&mut self, data: Data) {
439 match data {
440 Data::Delta(delta) => self.handle_delta(delta),
441 Data::Deltas(deltas) => self.handle_deltas(deltas.into_inner()),
442 Data::Depth10(depth) => self.handle_depth10(*depth),
443 Data::Quote(quote) => self.handle_quote(quote),
444 Data::Trade(trade) => self.handle_trade(trade),
445 Data::Bar(bar) => self.handle_bar(bar),
446 Data::MarkPriceUpdate(mark_price) => self.handle_mark_price(mark_price),
447 Data::IndexPriceUpdate(index_price) => self.handle_index_price(index_price),
448 Data::InstrumentClose(close) => self.handle_instrument_close(close),
449 }
450 }
451
452 pub fn response(&self, resp: DataResponse) {
454 log::debug!("{RECV}{RES} {resp:?}");
455
456 match resp.data_type.type_name() {
457 stringify!(InstrumentAny) => {
458 let instruments = Arc::downcast::<Vec<InstrumentAny>>(resp.data.clone())
459 .expect("Invalid response data");
460 self.handle_instruments(instruments);
461 }
462 stringify!(QuoteTick) => {
463 let quotes = Arc::downcast::<Vec<QuoteTick>>(resp.data.clone())
464 .expect("Invalid response data");
465 self.handle_quotes(quotes);
466 }
467 stringify!(TradeTick) => {
468 let trades = Arc::downcast::<Vec<TradeTick>>(resp.data.clone())
469 .expect("Invalid response data");
470 self.handle_trades(trades);
471 }
472 stringify!(Bar) => {
473 let bars =
474 Arc::downcast::<Vec<Bar>>(resp.data.clone()).expect("Invalid response data");
475 self.handle_bars(bars);
476 }
477 type_name => log::error!("Cannot handle request, type {type_name} is unrecognized"),
478 }
479
480 get_message_bus().borrow().send_response(resp);
481 }
482
483 fn handle_instrument(&mut self, instrument: InstrumentAny) {
486 if let Err(e) = self
487 .cache
488 .as_ref()
489 .borrow_mut()
490 .add_instrument(instrument.clone())
491 {
492 log::error!("Error on cache insert: {e}");
493 }
494
495 let topic = switchboard::get_instrument_topic(instrument.id());
496 msgbus::publish(&topic, &instrument as &dyn Any); }
498
499 fn handle_delta(&mut self, delta: OrderBookDelta) {
500 let deltas = if self.config.buffer_deltas {
501 let buffer_deltas = self
502 .buffered_deltas_map
503 .entry(delta.instrument_id)
504 .or_default();
505 buffer_deltas.push(delta);
506
507 if !RecordFlag::F_LAST.matches(delta.flags) {
508 return; }
510
511 let deltas = self
513 .buffered_deltas_map
514 .remove(&delta.instrument_id)
515 .unwrap();
516 OrderBookDeltas::new(delta.instrument_id, deltas)
517 } else {
518 OrderBookDeltas::new(delta.instrument_id, vec![delta])
519 };
520
521 let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
522 msgbus::publish(&topic, &deltas as &dyn Any);
523 }
524
525 fn handle_deltas(&mut self, deltas: OrderBookDeltas) {
526 let deltas = if self.config.buffer_deltas {
527 let buffer_deltas = self
528 .buffered_deltas_map
529 .entry(deltas.instrument_id)
530 .or_default();
531 buffer_deltas.extend(deltas.deltas);
532
533 let mut is_last_delta = false;
534 for delta in buffer_deltas.iter_mut() {
535 if RecordFlag::F_LAST.matches(delta.flags) {
536 is_last_delta = true;
537 }
538 }
539
540 if !is_last_delta {
541 return;
542 }
543
544 let buffer_deltas = self
546 .buffered_deltas_map
547 .remove(&deltas.instrument_id)
548 .unwrap();
549 OrderBookDeltas::new(deltas.instrument_id, buffer_deltas)
550 } else {
551 deltas
552 };
553
554 let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
555 msgbus::publish(&topic, &deltas as &dyn Any);
556 }
557
558 fn handle_depth10(&mut self, depth: OrderBookDepth10) {
559 let topic = switchboard::get_book_depth10_topic(depth.instrument_id);
560 msgbus::publish(&topic, &depth as &dyn Any);
561 }
562
563 fn handle_quote(&mut self, quote: QuoteTick) {
564 if let Err(e) = self.cache.as_ref().borrow_mut().add_quote(quote) {
565 log::error!("Error on cache insert: {e}");
566 }
567
568 let topic = switchboard::get_quotes_topic(quote.instrument_id);
571 msgbus::publish(&topic, "e as &dyn Any);
572 }
573
574 fn handle_trade(&mut self, trade: TradeTick) {
575 if let Err(e) = self.cache.as_ref().borrow_mut().add_trade(trade) {
576 log::error!("Error on cache insert: {e}");
577 }
578
579 let topic = switchboard::get_trades_topic(trade.instrument_id);
582 msgbus::publish(&topic, &trade as &dyn Any);
583 }
584
585 fn handle_bar(&mut self, bar: Bar) {
586 if self.config.validate_data_sequence {
588 if let Some(last_bar) = self.cache.as_ref().borrow().bar(&bar.bar_type) {
589 if bar.ts_event < last_bar.ts_event {
590 log::warn!(
591 "Bar {bar} was prior to last bar `ts_event` {}",
592 last_bar.ts_event
593 );
594 return; }
596 if bar.ts_init < last_bar.ts_init {
597 log::warn!(
598 "Bar {bar} was prior to last bar `ts_init` {}",
599 last_bar.ts_init
600 );
601 return; }
603 }
605 }
606
607 if let Err(e) = self.cache.as_ref().borrow_mut().add_bar(bar) {
608 log::error!("Error on cache insert: {e}");
609 }
610
611 let topic = switchboard::get_bars_topic(bar.bar_type);
612 msgbus::publish(&topic, &bar as &dyn Any);
613 }
614
615 fn handle_mark_price(&mut self, mark_price: MarkPriceUpdate) {
616 if let Err(e) = self.cache.as_ref().borrow_mut().add_mark_price(mark_price) {
617 log::error!("Error on cache insert: {e}");
618 }
619
620 let topic = switchboard::get_mark_price_topic(mark_price.instrument_id);
621 msgbus::publish(&topic, &mark_price as &dyn Any);
622 }
623
624 fn handle_index_price(&mut self, index_price: IndexPriceUpdate) {
625 if let Err(e) = self
626 .cache
627 .as_ref()
628 .borrow_mut()
629 .add_index_price(index_price)
630 {
631 log::error!("Error on cache insert: {e}");
632 }
633
634 let topic = switchboard::get_index_price_topic(index_price.instrument_id);
635 msgbus::publish(&topic, &index_price as &dyn Any);
636 }
637
638 fn handle_instrument_close(&mut self, close: InstrumentClose) {
639 let topic = switchboard::get_instrument_close_topic(close.instrument_id);
640 msgbus::publish(&topic, &close as &dyn Any);
641 }
642
643 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
646 if cmd.instrument_id.is_synthetic() {
647 anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
648 }
649
650 self.setup_order_book(
651 &cmd.instrument_id,
652 cmd.book_type,
653 cmd.depth,
654 true,
655 cmd.managed,
656 )?;
657
658 Ok(())
659 }
660
661 fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
662 if cmd.instrument_id.is_synthetic() {
663 anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDepth10` data");
664 }
665
666 self.setup_order_book(
667 &cmd.instrument_id,
668 cmd.book_type,
669 cmd.depth, false,
671 cmd.managed,
672 )?;
673
674 Ok(())
675 }
676
677 fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
678 if self.subscribed_book_deltas().contains(&cmd.instrument_id) {
679 return Ok(());
680 }
681
682 if cmd.instrument_id.is_synthetic() {
683 anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
684 }
685
686 {
687 if !self.book_intervals.contains_key(&cmd.interval_ms) {
688 let interval_ns = millis_to_nanos(cmd.interval_ms.get() as f64);
689 let topic = switchboard::get_book_snapshots_topic(cmd.instrument_id);
690
691 let snap_info = BookSnapshotInfo {
692 instrument_id: cmd.instrument_id,
693 venue: cmd.instrument_id.venue,
694 is_composite: cmd.instrument_id.symbol.is_composite(),
695 root: Ustr::from(cmd.instrument_id.symbol.root()),
696 topic,
697 interval_ms: cmd.interval_ms,
698 };
699
700 let now_ns = self.clock.borrow().timestamp_ns().as_u64();
701 let mut start_time_ns = now_ns - (now_ns % interval_ns);
702
703 if start_time_ns - NANOSECONDS_IN_MILLISECOND <= now_ns {
704 start_time_ns += NANOSECONDS_IN_SECOND; }
706
707 let snapshotter = Rc::new(BookSnapshotter::new(snap_info, self.cache.clone()));
708 self.book_snapshotters
709 .insert(cmd.instrument_id, snapshotter.clone());
710 let timer_name = snapshotter.timer_name;
711
712 let callback =
713 TimeEventCallback::Rust(Rc::new(move |event| snapshotter.snapshot(event)));
714
715 self.clock
716 .borrow_mut()
717 .set_timer_ns(
718 &timer_name,
719 interval_ns,
720 start_time_ns.into(),
721 None,
722 Some(callback),
723 None,
724 )
725 .expect(FAILED);
726 }
727 }
728
729 self.setup_order_book(&cmd.instrument_id, cmd.book_type, cmd.depth, false, true)?;
730
731 Ok(())
732 }
733
734 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
735 match cmd.bar_type.aggregation_source() {
736 AggregationSource::Internal => {
737 if !self.bar_aggregators.contains_key(&cmd.bar_type.standard()) {
738 self.start_bar_aggregator(cmd.bar_type)?;
739 }
740 }
741 AggregationSource::External => {
742 if cmd.bar_type.instrument_id().is_synthetic() {
743 anyhow::bail!(
744 "Cannot subscribe for externally aggregated synthetic instrument bar data"
745 );
746 }
747 }
748 }
749
750 Ok(())
751 }
752
753 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
754 if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
755 log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
756 return Ok(());
757 }
758
759 let topics = vec![
760 switchboard::get_book_deltas_topic(cmd.instrument_id),
761 switchboard::get_book_depth10_topic(cmd.instrument_id),
762 switchboard::get_book_snapshots_topic(cmd.instrument_id),
763 ];
764
765 self.maintain_book_updater(&cmd.instrument_id, &topics);
766 self.maintain_book_snapshotter(&cmd.instrument_id);
767
768 Ok(())
769 }
770
771 fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) -> anyhow::Result<()> {
772 if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
773 log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
774 return Ok(());
775 }
776
777 let topics = vec![
778 switchboard::get_book_deltas_topic(cmd.instrument_id),
779 switchboard::get_book_depth10_topic(cmd.instrument_id),
780 switchboard::get_book_snapshots_topic(cmd.instrument_id),
781 ];
782
783 self.maintain_book_updater(&cmd.instrument_id, &topics);
784 self.maintain_book_snapshotter(&cmd.instrument_id);
785
786 Ok(())
787 }
788
789 fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) -> anyhow::Result<()> {
790 if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
791 log::warn!("Cannot unsubscribe from `OrderBook` snapshots: not subscribed");
792 return Ok(());
793 }
794
795 let topics = vec![
796 switchboard::get_book_deltas_topic(cmd.instrument_id),
797 switchboard::get_book_depth10_topic(cmd.instrument_id),
798 switchboard::get_book_snapshots_topic(cmd.instrument_id),
799 ];
800
801 self.maintain_book_updater(&cmd.instrument_id, &topics);
802 self.maintain_book_snapshotter(&cmd.instrument_id);
803
804 Ok(())
805 }
806
807 const fn unsubscribe_bars(&mut self, command: &UnsubscribeBars) -> anyhow::Result<()> {
808 Ok(())
810 }
811
812 fn maintain_book_updater(&mut self, instrument_id: &InstrumentId, topics: &[Ustr]) {
813 if let Some(updater) = self.book_updaters.get(instrument_id) {
814 let handler = ShareableMessageHandler(updater.clone());
815
816 for topic in topics {
818 if msgbus::subscriptions_count(*topic) == 1
819 && msgbus::is_subscribed(*topic, handler.clone())
820 {
821 log::debug!("Unsubscribing BookUpdater from {topic}");
822 msgbus::unsubscribe(*topic, handler.clone());
823 }
824 }
825
826 let still_subscribed = topics
828 .iter()
829 .any(|topic| msgbus::is_subscribed(*topic, handler.clone()));
830 if !still_subscribed {
831 self.book_updaters.remove(instrument_id);
832 log::debug!("Removed BookUpdater for instrument ID {instrument_id}");
833 }
834 }
835 }
836
837 fn maintain_book_snapshotter(&mut self, instrument_id: &InstrumentId) {
838 if let Some(snapshotter) = self.book_snapshotters.get(instrument_id) {
839 let topic = switchboard::get_book_snapshots_topic(*instrument_id);
840
841 if msgbus::subscriptions_count(topic) == 0 {
843 let timer_name = snapshotter.timer_name;
844 self.book_snapshotters.remove(instrument_id);
845 let mut clock = self.clock.borrow_mut();
846 if clock.timer_names().contains(&timer_name.as_str()) {
847 clock.cancel_timer(&timer_name);
848 }
849 log::debug!("Removed BookSnapshotter for instrument ID {instrument_id}");
850 }
851 }
852 }
853
854 fn handle_instruments(&self, instruments: Arc<Vec<InstrumentAny>>) {
857 let mut cache = self.cache.as_ref().borrow_mut();
859 for instrument in instruments.iter() {
860 if let Err(e) = cache.add_instrument(instrument.clone()) {
861 log::error!("Error on cache insert: {e}");
862 }
863 }
864 }
865
866 fn handle_quotes(&self, quotes: Arc<Vec<QuoteTick>>) {
867 if let Err(e) = self.cache.as_ref().borrow_mut().add_quotes("es) {
868 log::error!("Error on cache insert: {e}");
869 }
870 }
871
872 fn handle_trades(&self, trades: Arc<Vec<TradeTick>>) {
873 if let Err(e) = self.cache.as_ref().borrow_mut().add_trades(&trades) {
874 log::error!("Error on cache insert: {e}");
875 }
876 }
877
878 fn handle_bars(&self, bars: Arc<Vec<Bar>>) {
879 if let Err(e) = self.cache.as_ref().borrow_mut().add_bars(&bars) {
880 log::error!("Error on cache insert: {e}");
881 }
882 }
883
884 #[allow(clippy::too_many_arguments)]
887 fn setup_order_book(
888 &mut self,
889 instrument_id: &InstrumentId,
890 book_type: BookType,
891 depth: Option<NonZeroUsize>,
892 only_deltas: bool,
893 managed: bool,
894 ) -> anyhow::Result<()> {
895 let mut cache = self.cache.borrow_mut();
896 if managed && !cache.has_order_book(instrument_id) {
897 let book = OrderBook::new(*instrument_id, book_type);
898 log::debug!("Created {book}");
899 cache.add_order_book(book)?;
900 }
901
902 let updater = Rc::new(BookUpdater::new(instrument_id, self.cache.clone()));
904 self.book_updaters.insert(*instrument_id, updater.clone());
905
906 let handler = ShareableMessageHandler(updater);
907
908 let topic = switchboard::get_book_deltas_topic(*instrument_id);
909 if !msgbus::is_subscribed(topic, handler.clone()) {
910 msgbus::subscribe(topic, handler.clone(), Some(self.msgbus_priority));
911 }
912
913 let topic = switchboard::get_book_depth10_topic(*instrument_id);
914 if !only_deltas && !msgbus::is_subscribed(topic, handler.clone()) {
915 msgbus::subscribe(topic, handler, Some(self.msgbus_priority));
916 }
917
918 Ok(())
919 }
920
921 fn create_bar_aggregator(
922 &mut self,
923 instrument: &InstrumentAny,
924 bar_type: BarType,
925 ) -> Box<dyn BarAggregator> {
926 let cache = self.cache.clone();
927
928 let handler = move |bar: Bar| {
929 if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
930 log::error!("Error on cache insert: {e}");
931 }
932
933 let topic = switchboard::get_bars_topic(bar.bar_type);
934 msgbus::publish(&topic, &bar as &dyn Any);
935 };
936
937 let clock = self.clock.clone();
938 let config = self.config.clone();
939
940 let price_precision = instrument.price_precision();
941 let size_precision = instrument.size_precision();
942
943 if bar_type.spec().is_time_aggregated() {
944 Box::new(TimeBarAggregator::new(
945 bar_type,
946 price_precision,
947 size_precision,
948 clock,
949 handler,
950 false, config.time_bars_build_with_no_updates,
952 config.time_bars_timestamp_on_close,
953 config.time_bars_interval_type,
954 None, 20, false, ))
958 } else {
959 match bar_type.spec().aggregation {
960 BarAggregation::Tick => Box::new(TickBarAggregator::new(
961 bar_type,
962 price_precision,
963 size_precision,
964 handler,
965 false,
966 )) as Box<dyn BarAggregator>,
967 BarAggregation::Volume => Box::new(VolumeBarAggregator::new(
968 bar_type,
969 price_precision,
970 size_precision,
971 handler,
972 false,
973 )) as Box<dyn BarAggregator>,
974 BarAggregation::Value => Box::new(ValueBarAggregator::new(
975 bar_type,
976 price_precision,
977 size_precision,
978 handler,
979 false,
980 )) as Box<dyn BarAggregator>,
981 _ => panic!(
982 "Cannot create aggregator: {} aggregation not currently supported",
983 bar_type.spec().aggregation
984 ),
985 }
986 }
987 }
988
989 fn start_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
990 let instrument = {
991 let cache = self.cache.borrow();
992 cache
993 .instrument(&bar_type.instrument_id())
994 .ok_or_else(|| {
995 anyhow::anyhow!(
996 "Cannot start bar aggregation: no instrument found for {}",
997 bar_type.instrument_id(),
998 )
999 })?
1000 .clone()
1001 };
1002
1003 let aggregator = if let Some(aggregator) = self.bar_aggregators.get_mut(&bar_type) {
1004 aggregator
1005 } else {
1006 let aggregator = self.create_bar_aggregator(&instrument, bar_type);
1007 self.bar_aggregators.insert(bar_type, aggregator);
1008 self.bar_aggregators.get_mut(&bar_type).unwrap()
1009 };
1010
1011 aggregator.set_is_running(true);
1014
1015 Ok(())
1016 }
1017
1018 fn stop_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
1019 let aggregator = self
1020 .bar_aggregators
1021 .remove(&bar_type.standard())
1022 .ok_or_else(|| {
1023 anyhow::anyhow!("Cannot stop bar aggregator: no aggregator to stop for {bar_type}")
1024 })?;
1025
1026 if bar_type.is_composite() {
1035 let composite_bar_type = bar_type.composite();
1036 } else if bar_type.spec().price_type == PriceType::Last {
1038 todo!()
1040 } else {
1041 todo!()
1043 }
1044
1045 Ok(())
1046 }
1047}
1048
1049pub struct SubscriptionCommandHandler {
1050 pub id: Ustr,
1051 pub engine_ref: Rc<RefCell<DataEngine>>,
1052}
1053
1054impl MessageHandler for SubscriptionCommandHandler {
1055 fn id(&self) -> Ustr {
1056 self.id
1057 }
1058
1059 fn handle(&self, msg: &dyn Any) {
1060 self.engine_ref.borrow_mut().enqueue(msg);
1061 }
1062
1063 fn as_any(&self) -> &dyn Any {
1064 self
1065 }
1066}