nautilus_data/engine/
book.rs1use std::{
17 any::Any,
18 cell::{Ref, RefCell},
19 num::NonZeroUsize,
20 rc::Rc,
21};
22
23use nautilus_common::{
24 cache::Cache,
25 msgbus::{self, MStr, Topic, handler::MessageHandler},
26 timer::TimeEvent,
27};
28use nautilus_model::{
29 data::Data,
30 identifiers::{InstrumentId, Venue},
31 instruments::Instrument,
32};
33use ustr::Ustr;
34
35#[derive(Clone, Debug)]
37pub struct BookSnapshotInfo {
38 pub instrument_id: InstrumentId,
39 pub venue: Venue,
40 pub is_composite: bool,
41 pub root: Ustr,
42 pub topic: MStr<Topic>,
43 pub interval_ms: NonZeroUsize,
44}
45
46#[derive(Debug)]
52pub struct BookUpdater {
53 pub id: Ustr,
54 pub instrument_id: InstrumentId,
55 pub cache: Rc<RefCell<Cache>>,
56}
57
58impl BookUpdater {
59 pub fn new(instrument_id: &InstrumentId, cache: Rc<RefCell<Cache>>) -> Self {
61 Self {
62 id: Ustr::from(&format!("{}-{}", stringify!(BookUpdater), instrument_id)),
63 instrument_id: *instrument_id,
64 cache,
65 }
66 }
67}
68
69impl MessageHandler for BookUpdater {
70 fn id(&self) -> Ustr {
71 self.id
72 }
73
74 fn handle(&self, message: &dyn Any) {
75 if let Some(data) = message.downcast_ref::<Data>()
77 && let Some(book) = self
78 .cache
79 .borrow_mut()
80 .order_book_mut(&data.instrument_id())
81 {
82 match data {
83 Data::Delta(delta) => book.apply_delta(delta),
84 Data::Deltas(deltas) => book.apply_deltas(deltas),
85 Data::Depth10(depth) => book.apply_depth(depth),
86 _ => log::error!("Invalid data type for book update, was {data:?}"),
87 }
88 }
89 }
90
91 fn as_any(&self) -> &dyn Any {
92 self
93 }
94}
95
96#[derive(Debug)]
102pub struct BookSnapshotter {
103 pub id: Ustr,
104 pub timer_name: Ustr,
105 pub snap_info: BookSnapshotInfo,
106 pub cache: Rc<RefCell<Cache>>,
107}
108
109impl BookSnapshotter {
110 pub fn new(snap_info: BookSnapshotInfo, cache: Rc<RefCell<Cache>>) -> Self {
112 let id_str = format!(
113 "{}-{}",
114 stringify!(BookSnapshotter),
115 snap_info.instrument_id
116 );
117 let timer_name = format!(
118 "OrderBook|{}|{}",
119 snap_info.instrument_id, snap_info.interval_ms
120 );
121
122 Self {
123 id: Ustr::from(&id_str),
124 timer_name: Ustr::from(&timer_name),
125 snap_info,
126 cache,
127 }
128 }
129
130 pub fn snapshot(&self, _event: TimeEvent) {
131 let cache = self.cache.borrow();
132
133 if self.snap_info.is_composite {
134 let topic = self.snap_info.topic;
135 let underlying = self.snap_info.root;
136 for instrument in cache.instruments(&self.snap_info.venue, Some(&underlying)) {
137 self.publish_order_book(&instrument.id(), topic, &cache);
138 }
139 } else {
140 self.publish_order_book(&self.snap_info.instrument_id, self.snap_info.topic, &cache);
141 }
142 }
143
144 fn publish_order_book(
145 &self,
146 instrument_id: &InstrumentId,
147 topic: MStr<Topic>,
148 cache: &Ref<Cache>,
149 ) {
150 let book = cache
151 .order_book(instrument_id)
152 .unwrap_or_else(|| panic!("OrderBook for {instrument_id} was not in cache"));
153
154 if book.update_count == 0 {
155 log::debug!("OrderBook for {instrument_id} not yet updated for snapshot");
156 return;
157 }
158
159 msgbus::publish(topic, book as &dyn Any);
160 }
161}