nautilus_data/engine/
book.rs1use std::{
17 any::Any,
18 cell::{Ref, RefCell},
19 num::NonZeroUsize,
20 rc::Rc,
21};
22
23use nautilus_common::{
24 cache::Cache,
25 msgbus::{self, MStr, Topic, handler::MessageHandler},
26 timer::TimeEvent,
27};
28use nautilus_model::{
29 data::{Data, OrderBookDeltas, OrderBookDepth10},
30 identifiers::{InstrumentId, Venue},
31 instruments::Instrument,
32};
33use ustr::Ustr;
34
35#[derive(Clone, Debug)]
37pub struct BookSnapshotInfo {
38 pub instrument_id: InstrumentId,
39 pub venue: Venue,
40 pub is_composite: bool,
41 pub root: Ustr,
42 pub topic: MStr<Topic>,
43 pub interval_ms: NonZeroUsize,
44}
45
46#[derive(Debug)]
52pub struct BookUpdater {
53 pub id: Ustr,
54 pub instrument_id: InstrumentId,
55 pub cache: Rc<RefCell<Cache>>,
56}
57
58impl BookUpdater {
59 pub fn new(instrument_id: &InstrumentId, cache: Rc<RefCell<Cache>>) -> Self {
61 Self {
62 id: Ustr::from(&format!("{}-{}", stringify!(BookUpdater), instrument_id)),
63 instrument_id: *instrument_id,
64 cache,
65 }
66 }
67}
68
69impl MessageHandler for BookUpdater {
70 fn id(&self) -> Ustr {
71 self.id
72 }
73
74 fn handle(&self, message: &dyn Any) {
75 let mut cache = self.cache.borrow_mut();
76
77 if let Some(data) = message.downcast_ref::<Data>() {
78 if let Some(book) = cache.order_book_mut(&data.instrument_id()) {
79 match data {
80 Data::Delta(delta) => {
81 if let Err(e) = book.apply_delta(delta) {
82 log::error!("Failed to apply delta: {e}");
83 }
84 }
85 Data::Deltas(deltas) => {
86 if let Err(e) = book.apply_deltas(deltas) {
87 log::error!("Failed to apply deltas: {e}");
88 }
89 }
90 Data::Depth10(depth) => {
91 if let Err(e) = book.apply_depth(depth) {
92 log::error!("Failed to apply depth: {e}");
93 }
94 }
95 _ => log::error!("Invalid data type for book update, was {data:?}"),
96 }
97 }
98 return;
99 }
100
101 if let Some(deltas) = message.downcast_ref::<OrderBookDeltas>() {
103 if let Some(book) = cache.order_book_mut(&deltas.instrument_id)
104 && let Err(e) = book.apply_deltas(deltas)
105 {
106 log::error!("Failed to apply deltas: {e}");
107 }
108 return;
109 }
110
111 if let Some(depth) = message.downcast_ref::<OrderBookDepth10>() {
112 if let Some(book) = cache.order_book_mut(&depth.instrument_id)
113 && let Err(e) = book.apply_depth(depth)
114 {
115 log::error!("Failed to apply depth: {e}");
116 }
117 return;
118 }
119
120 log::error!("BookUpdater received unhandled message type");
121 }
122
123 fn as_any(&self) -> &dyn Any {
124 self
125 }
126}
127
128#[derive(Debug)]
134pub struct BookSnapshotter {
135 pub id: Ustr,
136 pub timer_name: Ustr,
137 pub snap_info: BookSnapshotInfo,
138 pub cache: Rc<RefCell<Cache>>,
139}
140
141impl BookSnapshotter {
142 pub fn new(snap_info: BookSnapshotInfo, cache: Rc<RefCell<Cache>>) -> Self {
144 let id_str = format!(
145 "{}-{}",
146 stringify!(BookSnapshotter),
147 snap_info.instrument_id
148 );
149 let timer_name = format!(
150 "OrderBook|{}|{}",
151 snap_info.instrument_id, snap_info.interval_ms
152 );
153
154 Self {
155 id: Ustr::from(&id_str),
156 timer_name: Ustr::from(&timer_name),
157 snap_info,
158 cache,
159 }
160 }
161
162 pub fn snapshot(&self, _event: TimeEvent) {
163 log::debug!(
164 "BookSnapshotter.snapshot called for {}",
165 self.snap_info.instrument_id
166 );
167 let cache = self.cache.borrow();
168
169 if self.snap_info.is_composite {
170 let topic = self.snap_info.topic;
171 let underlying = self.snap_info.root;
172 for instrument in cache.instruments(&self.snap_info.venue, Some(&underlying)) {
173 self.publish_order_book(&instrument.id(), topic, &cache);
174 }
175 } else {
176 self.publish_order_book(&self.snap_info.instrument_id, self.snap_info.topic, &cache);
177 }
178 }
179
180 fn publish_order_book(
181 &self,
182 instrument_id: &InstrumentId,
183 topic: MStr<Topic>,
184 cache: &Ref<Cache>,
185 ) {
186 let book = cache
187 .order_book(instrument_id)
188 .unwrap_or_else(|| panic!("OrderBook for {instrument_id} was not in cache"));
189
190 if book.update_count == 0 {
191 log::debug!("OrderBook not yet updated for snapshot: {instrument_id}");
192 return;
193 }
194 log::debug!(
195 "Publishing OrderBook snapshot for {instrument_id} (update_count={})",
196 book.update_count
197 );
198
199 msgbus::publish(topic, book as &dyn Any);
200 }
201}