nautilus_data/engine/
book.rs1use std::{
17 any::Any,
18 cell::{Ref, RefCell},
19 num::NonZeroUsize,
20 rc::Rc,
21};
22
23use nautilus_common::{
24 cache::Cache,
25 msgbus::{self, MStr, Topic, handler::MessageHandler},
26 timer::TimeEvent,
27};
28use nautilus_model::{
29 data::Data,
30 identifiers::{InstrumentId, Venue},
31 instruments::Instrument,
32};
33use ustr::Ustr;
34
35#[derive(Clone, Debug)]
37pub struct BookSnapshotInfo {
38 pub instrument_id: InstrumentId,
39 pub venue: Venue,
40 pub is_composite: bool,
41 pub root: Ustr,
42 pub topic: MStr<Topic>,
43 pub interval_ms: NonZeroUsize,
44}
45
46#[derive(Debug)]
52pub struct BookUpdater {
53 pub id: Ustr,
54 pub instrument_id: InstrumentId,
55 pub cache: Rc<RefCell<Cache>>,
56}
57
58impl BookUpdater {
59 pub fn new(instrument_id: &InstrumentId, cache: Rc<RefCell<Cache>>) -> Self {
61 Self {
62 id: Ustr::from(&format!("{}-{}", stringify!(BookUpdater), instrument_id)),
63 instrument_id: *instrument_id,
64 cache,
65 }
66 }
67}
68
69impl MessageHandler for BookUpdater {
70 fn id(&self) -> Ustr {
71 self.id
72 }
73
74 fn handle(&self, message: &dyn Any) {
75 if let Some(data) = message.downcast_ref::<Data>()
77 && let Some(book) = self
78 .cache
79 .borrow_mut()
80 .order_book_mut(&data.instrument_id())
81 {
82 match data {
83 Data::Delta(delta) => {
84 if let Err(e) = book.apply_delta(delta) {
85 log::error!("Failed to apply delta: {e}");
86 }
87 }
88 Data::Deltas(deltas) => {
89 if let Err(e) = book.apply_deltas(deltas) {
90 log::error!("Failed to apply deltas: {e}");
91 }
92 }
93 Data::Depth10(depth) => book.apply_depth(depth),
94 _ => log::error!("Invalid data type for book update, was {data:?}"),
95 }
96 }
97 }
98
99 fn as_any(&self) -> &dyn Any {
100 self
101 }
102}
103
104#[derive(Debug)]
110pub struct BookSnapshotter {
111 pub id: Ustr,
112 pub timer_name: Ustr,
113 pub snap_info: BookSnapshotInfo,
114 pub cache: Rc<RefCell<Cache>>,
115}
116
117impl BookSnapshotter {
118 pub fn new(snap_info: BookSnapshotInfo, cache: Rc<RefCell<Cache>>) -> Self {
120 let id_str = format!(
121 "{}-{}",
122 stringify!(BookSnapshotter),
123 snap_info.instrument_id
124 );
125 let timer_name = format!(
126 "OrderBook|{}|{}",
127 snap_info.instrument_id, snap_info.interval_ms
128 );
129
130 Self {
131 id: Ustr::from(&id_str),
132 timer_name: Ustr::from(&timer_name),
133 snap_info,
134 cache,
135 }
136 }
137
138 pub fn snapshot(&self, _event: TimeEvent) {
139 let cache = self.cache.borrow();
140
141 if self.snap_info.is_composite {
142 let topic = self.snap_info.topic;
143 let underlying = self.snap_info.root;
144 for instrument in cache.instruments(&self.snap_info.venue, Some(&underlying)) {
145 self.publish_order_book(&instrument.id(), topic, &cache);
146 }
147 } else {
148 self.publish_order_book(&self.snap_info.instrument_id, self.snap_info.topic, &cache);
149 }
150 }
151
152 fn publish_order_book(
153 &self,
154 instrument_id: &InstrumentId,
155 topic: MStr<Topic>,
156 cache: &Ref<Cache>,
157 ) {
158 let book = cache
159 .order_book(instrument_id)
160 .unwrap_or_else(|| panic!("OrderBook for {instrument_id} was not in cache"));
161
162 if book.update_count == 0 {
163 log::debug!("OrderBook for {instrument_id} not yet updated for snapshot");
164 return;
165 }
166
167 msgbus::publish(topic, book as &dyn Any);
168 }
169}