nautilus_data/engine/
book.rs1use std::{
17 cell::{Ref, RefCell},
18 num::NonZeroUsize,
19 rc::Rc,
20};
21
22use nautilus_common::{
23 cache::Cache,
24 msgbus::{self, Handler, MStr, Topic},
25 timer::TimeEvent,
26};
27use nautilus_model::{
28 data::{OrderBookDeltas, OrderBookDepth10},
29 identifiers::{InstrumentId, Venue},
30 instruments::Instrument,
31};
32use ustr::Ustr;
33
34#[derive(Clone, Debug)]
36pub struct BookSnapshotInfo {
37 pub instrument_id: InstrumentId,
38 pub venue: Venue,
39 pub is_composite: bool,
40 pub root: Ustr,
41 pub topic: MStr<Topic>,
42 pub interval_ms: NonZeroUsize,
43}
44
45#[derive(Debug)]
51pub struct BookUpdater {
52 pub id: Ustr,
53 pub instrument_id: InstrumentId,
54 pub cache: Rc<RefCell<Cache>>,
55}
56
57impl BookUpdater {
58 pub fn new(instrument_id: &InstrumentId, cache: Rc<RefCell<Cache>>) -> Self {
60 Self {
61 id: Ustr::from(&format!("{}-{}", stringify!(BookUpdater), instrument_id)),
62 instrument_id: *instrument_id,
63 cache,
64 }
65 }
66}
67
68impl Handler<OrderBookDeltas> for BookUpdater {
69 fn id(&self) -> Ustr {
70 self.id
71 }
72
73 fn handle(&self, deltas: &OrderBookDeltas) {
74 if let Some(book) = self
75 .cache
76 .borrow_mut()
77 .order_book_mut(&deltas.instrument_id)
78 && let Err(e) = book.apply_deltas(deltas)
79 {
80 log::error!("Failed to apply deltas: {e}");
81 }
82 }
83}
84
85impl Handler<OrderBookDepth10> for BookUpdater {
86 fn id(&self) -> Ustr {
87 self.id
88 }
89
90 fn handle(&self, depth: &OrderBookDepth10) {
91 if let Some(book) = self.cache.borrow_mut().order_book_mut(&depth.instrument_id)
92 && let Err(e) = book.apply_depth(depth)
93 {
94 log::error!("Failed to apply depth: {e}");
95 }
96 }
97}
98
99#[derive(Debug)]
105pub struct BookSnapshotter {
106 pub id: Ustr,
107 pub timer_name: Ustr,
108 pub snap_info: BookSnapshotInfo,
109 pub cache: Rc<RefCell<Cache>>,
110}
111
112impl BookSnapshotter {
113 pub fn new(snap_info: BookSnapshotInfo, cache: Rc<RefCell<Cache>>) -> Self {
115 let id_str = format!(
116 "{}-{}",
117 stringify!(BookSnapshotter),
118 snap_info.instrument_id
119 );
120 let timer_name = format!(
121 "OrderBook|{}|{}",
122 snap_info.instrument_id, snap_info.interval_ms
123 );
124
125 Self {
126 id: Ustr::from(&id_str),
127 timer_name: Ustr::from(&timer_name),
128 snap_info,
129 cache,
130 }
131 }
132
133 pub fn snapshot(&self, _event: TimeEvent) {
134 log::debug!(
135 "BookSnapshotter.snapshot called for {}",
136 self.snap_info.instrument_id
137 );
138 let cache = self.cache.borrow();
139
140 if self.snap_info.is_composite {
141 let topic = self.snap_info.topic;
142 let underlying = self.snap_info.root;
143 for instrument in cache.instruments(&self.snap_info.venue, Some(&underlying)) {
144 self.publish_order_book(&instrument.id(), topic, &cache);
145 }
146 } else {
147 self.publish_order_book(&self.snap_info.instrument_id, self.snap_info.topic, &cache);
148 }
149 }
150
151 fn publish_order_book(
152 &self,
153 instrument_id: &InstrumentId,
154 topic: MStr<Topic>,
155 cache: &Ref<Cache>,
156 ) {
157 let book = cache
158 .order_book(instrument_id)
159 .unwrap_or_else(|| panic!("OrderBook for {instrument_id} was not in cache"));
160
161 if book.update_count == 0 {
162 log::debug!("OrderBook not yet updated for snapshot: {instrument_id}");
163 return;
164 }
165 log::debug!(
166 "Publishing OrderBook snapshot for {instrument_id} (update_count={})",
167 book.update_count
168 );
169
170 msgbus::publish_book(topic, book);
171 }
172}