nautilus_data/engine/
book.rs
1use std::{
17 any::Any,
18 cell::{Ref, RefCell},
19 num::NonZeroUsize,
20 rc::Rc,
21};
22
23use nautilus_common::{
24 cache::Cache,
25 msgbus::{self, handler::MessageHandler},
26 timer::TimeEvent,
27};
28use nautilus_model::{
29 data::Data,
30 identifiers::{InstrumentId, Venue},
31 instruments::Instrument,
32};
33use ustr::Ustr;
34
35#[derive(Clone, Debug)]
37pub struct BookSnapshotInfo {
38 pub instrument_id: InstrumentId,
39 pub venue: Venue,
40 pub is_composite: bool,
41 pub root: Ustr,
42 pub topic: Ustr,
43 pub interval_ms: NonZeroUsize,
44}
45
46pub struct BookUpdater {
47 pub id: Ustr,
48 pub instrument_id: InstrumentId,
49 pub cache: Rc<RefCell<Cache>>,
50}
51
52impl BookUpdater {
53 pub fn new(instrument_id: &InstrumentId, cache: Rc<RefCell<Cache>>) -> Self {
55 Self {
56 id: Ustr::from(&format!("{}-{}", stringify!(BookUpdater), instrument_id)),
57 instrument_id: *instrument_id,
58 cache,
59 }
60 }
61}
62
63impl MessageHandler for BookUpdater {
64 fn id(&self) -> Ustr {
65 self.id
66 }
67
68 fn handle(&self, message: &dyn Any) {
69 if let Some(data) = message.downcast_ref::<Data>() {
71 if let Some(book) = self
72 .cache
73 .borrow_mut()
74 .order_book_mut(&data.instrument_id())
75 {
76 match data {
77 Data::Delta(delta) => book.apply_delta(delta),
78 Data::Deltas(deltas) => book.apply_deltas(deltas),
79 Data::Depth10(depth) => book.apply_depth(depth),
80 _ => log::error!("Invalid data type for book update, was {data:?}"),
81 }
82 }
83 }
84 }
85
86 fn as_any(&self) -> &dyn Any {
87 self
88 }
89}
90
91pub struct BookSnapshotter {
92 pub id: Ustr,
93 pub timer_name: Ustr,
94 pub snap_info: BookSnapshotInfo,
95 pub cache: Rc<RefCell<Cache>>,
96}
97
98impl BookSnapshotter {
99 pub fn new(snap_info: BookSnapshotInfo, cache: Rc<RefCell<Cache>>) -> Self {
101 let id_str = format!(
102 "{}-{}",
103 stringify!(BookSnapshotter),
104 snap_info.instrument_id
105 );
106 let timer_name = format!(
107 "OrderBook|{}|{}",
108 snap_info.instrument_id, snap_info.interval_ms
109 );
110
111 Self {
112 id: Ustr::from(&id_str),
113 timer_name: Ustr::from(&timer_name),
114 snap_info,
115 cache,
116 }
117 }
118
119 pub fn snapshot(&self, event: TimeEvent) {
120 let cache = self.cache.borrow();
121
122 if self.snap_info.is_composite {
123 let topic = self.snap_info.topic;
124 let underlying = self.snap_info.root;
125 for instrument in cache.instruments(&self.snap_info.venue, Some(&underlying)) {
126 self.publish_order_book(&instrument.id(), &topic, &cache);
127 }
128 } else {
129 self.publish_order_book(&self.snap_info.instrument_id, &self.snap_info.topic, &cache);
130 }
131 }
132
133 fn publish_order_book(&self, instrument_id: &InstrumentId, topic: &Ustr, cache: &Ref<Cache>) {
134 let book = cache
135 .order_book(instrument_id)
136 .unwrap_or_else(|| panic!("OrderBook for {instrument_id} was not in cache"));
137
138 if book.update_count == 0 {
139 log::debug!("OrderBook for {instrument_id} not yet updated for snapshot");
140 return;
141 }
142
143 msgbus::publish(topic, book as &dyn Any);
144 }
145}