nautilus_data/engine/
book.rs1use std::{
17 any::Any,
18 cell::{Ref, RefCell},
19 num::NonZeroUsize,
20 rc::Rc,
21};
22
23use nautilus_common::{
24 cache::Cache,
25 msgbus::{self, MStr, Topic, handler::MessageHandler},
26 timer::TimeEvent,
27};
28use nautilus_model::{
29 data::Data,
30 identifiers::{InstrumentId, Venue},
31 instruments::Instrument,
32};
33use ustr::Ustr;
34
35#[derive(Clone, Debug)]
37pub struct BookSnapshotInfo {
38 pub instrument_id: InstrumentId,
39 pub venue: Venue,
40 pub is_composite: bool,
41 pub root: Ustr,
42 pub topic: MStr<Topic>,
43 pub interval_ms: NonZeroUsize,
44}
45
46#[derive(Debug)]
52pub struct BookUpdater {
53 pub id: Ustr,
54 pub instrument_id: InstrumentId,
55 pub cache: Rc<RefCell<Cache>>,
56}
57
58impl BookUpdater {
59 pub fn new(instrument_id: &InstrumentId, cache: Rc<RefCell<Cache>>) -> Self {
61 Self {
62 id: Ustr::from(&format!("{}-{}", stringify!(BookUpdater), instrument_id)),
63 instrument_id: *instrument_id,
64 cache,
65 }
66 }
67}
68
69impl MessageHandler for BookUpdater {
70 fn id(&self) -> Ustr {
71 self.id
72 }
73
74 fn handle(&self, message: &dyn Any) {
75 if let Some(data) = message.downcast_ref::<Data>()
77 && let Some(book) = self
78 .cache
79 .borrow_mut()
80 .order_book_mut(&data.instrument_id())
81 {
82 match data {
83 Data::Delta(delta) => {
84 if let Err(e) = book.apply_delta(delta) {
85 log::error!("Failed to apply delta: {e}");
86 }
87 }
88 Data::Deltas(deltas) => {
89 if let Err(e) = book.apply_deltas(deltas) {
90 log::error!("Failed to apply deltas: {e}");
91 }
92 }
93 Data::Depth10(depth) => {
94 if let Err(e) = book.apply_depth(depth) {
95 log::error!("Failed to apply depth: {e}");
96 }
97 }
98 _ => log::error!("Invalid data type for book update, was {data:?}"),
99 }
100 }
101 }
102
103 fn as_any(&self) -> &dyn Any {
104 self
105 }
106}
107
108#[derive(Debug)]
114pub struct BookSnapshotter {
115 pub id: Ustr,
116 pub timer_name: Ustr,
117 pub snap_info: BookSnapshotInfo,
118 pub cache: Rc<RefCell<Cache>>,
119}
120
121impl BookSnapshotter {
122 pub fn new(snap_info: BookSnapshotInfo, cache: Rc<RefCell<Cache>>) -> Self {
124 let id_str = format!(
125 "{}-{}",
126 stringify!(BookSnapshotter),
127 snap_info.instrument_id
128 );
129 let timer_name = format!(
130 "OrderBook|{}|{}",
131 snap_info.instrument_id, snap_info.interval_ms
132 );
133
134 Self {
135 id: Ustr::from(&id_str),
136 timer_name: Ustr::from(&timer_name),
137 snap_info,
138 cache,
139 }
140 }
141
142 pub fn snapshot(&self, _event: TimeEvent) {
143 let cache = self.cache.borrow();
144
145 if self.snap_info.is_composite {
146 let topic = self.snap_info.topic;
147 let underlying = self.snap_info.root;
148 for instrument in cache.instruments(&self.snap_info.venue, Some(&underlying)) {
149 self.publish_order_book(&instrument.id(), topic, &cache);
150 }
151 } else {
152 self.publish_order_book(&self.snap_info.instrument_id, self.snap_info.topic, &cache);
153 }
154 }
155
156 fn publish_order_book(
157 &self,
158 instrument_id: &InstrumentId,
159 topic: MStr<Topic>,
160 cache: &Ref<Cache>,
161 ) {
162 let book = cache
163 .order_book(instrument_id)
164 .unwrap_or_else(|| panic!("OrderBook for {instrument_id} was not in cache"));
165
166 if book.update_count == 0 {
167 log::debug!("OrderBook for {instrument_id} not yet updated for snapshot");
168 return;
169 }
170
171 msgbus::publish(topic, book as &dyn Any);
172 }
173}