nautilus_data/engine/
book.rs1use std::{
17 any::Any,
18 cell::{Ref, RefCell},
19 num::NonZeroU64,
20 rc::Rc,
21};
22
23use nautilus_common::{
24 cache::Cache,
25 messages::data::DataResponse,
26 msgbus::{MessageBus, handler::MessageHandler},
27 timer::TimeEvent,
28};
29use nautilus_model::{
30 data::Data,
31 identifiers::{InstrumentId, Venue},
32};
33use ustr::Ustr;
34
35#[derive(Clone, Debug)]
37pub struct BookSnapshotInfo {
38 pub instrument_id: InstrumentId,
39 pub venue: Venue,
40 pub is_composite: bool,
41 pub root: Ustr,
42 pub topic: Ustr,
43 pub interval_ms: NonZeroU64,
44}
45
46pub struct BookUpdater {
47 pub id: Ustr,
48 pub instrument_id: InstrumentId,
49 pub cache: Rc<RefCell<Cache>>,
50}
51
52impl BookUpdater {
53 pub fn new(instrument_id: &InstrumentId, cache: Rc<RefCell<Cache>>) -> Self {
55 Self {
56 id: Ustr::from(&format!("{}-{}", stringify!(BookUpdater), instrument_id)),
57 instrument_id: *instrument_id,
58 cache,
59 }
60 }
61}
62
63impl MessageHandler for BookUpdater {
64 fn id(&self) -> Ustr {
65 self.id
66 }
67
68 fn handle(&self, message: &dyn Any) {}
69 fn handle_response(&self, _resp: DataResponse) {}
70 fn handle_data(&self, data: Data) {
71 if let Some(book) = self
72 .cache
73 .borrow_mut()
74 .order_book_mut(&data.instrument_id())
75 {
76 match data {
77 Data::Delta(delta) => book.apply_delta(&delta),
78 Data::Deltas(deltas) => book.apply_deltas(&deltas),
79 Data::Depth10(depth) => book.apply_depth(&depth),
80 _ => log::error!("Invalid data type for book update, was {data:?}"),
81 }
82 }
83 }
84 fn as_any(&self) -> &dyn Any {
85 self
86 }
87}
88
89pub struct BookSnapshotter {
90 pub id: Ustr,
91 pub timer_name: Ustr,
92 pub snap_info: BookSnapshotInfo,
93 pub cache: Rc<RefCell<Cache>>,
94 pub msgbus: Rc<RefCell<MessageBus>>,
95}
96
97impl BookSnapshotter {
98 pub fn new(
100 snap_info: BookSnapshotInfo,
101 cache: Rc<RefCell<Cache>>,
102 msgbus: Rc<RefCell<MessageBus>>,
103 ) -> Self {
104 let id_str = format!(
105 "{}-{}",
106 stringify!(BookSnapshotter),
107 snap_info.instrument_id
108 );
109 let timer_name = format!(
110 "OrderBook|{}|{}",
111 snap_info.instrument_id, snap_info.interval_ms
112 );
113
114 Self {
115 id: Ustr::from(&id_str),
116 timer_name: Ustr::from(&timer_name),
117 snap_info,
118 cache,
119 msgbus,
120 }
121 }
122
123 pub fn snapshot(&self, event: TimeEvent) {
124 let cache = self.cache.borrow();
125 let mut msgbus = self.msgbus.borrow_mut();
126
127 if self.snap_info.is_composite {
128 let topic = self.snap_info.topic;
129 let underlying = self.snap_info.root;
130 for instrument in cache.instruments(&self.snap_info.venue, Some(&underlying)) {
131 self.publish_order_book(&instrument.id(), &topic, &cache, &mut msgbus);
132 }
133 } else {
134 self.publish_order_book(
135 &self.snap_info.instrument_id,
136 &self.snap_info.topic,
137 &cache,
138 &mut msgbus,
139 );
140 }
141 }
142
143 fn publish_order_book(
144 &self,
145 instrument_id: &InstrumentId,
146 topic: &Ustr,
147 cache: &Ref<Cache>,
148 msgbus: &mut MessageBus,
149 ) {
150 let book = cache
151 .order_book(instrument_id)
152 .unwrap_or_else(|| panic!("OrderBook for {instrument_id} was not in cache"));
153
154 if book.count == 0 {
155 log::debug!("OrderBook for {instrument_id} not yet updated for snapshot");
156 return;
157 }
158
159 msgbus.publish(topic, book as &dyn Any);
160 }
161}