nautilus_data/engine/
book.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    any::Any,
18    cell::{Ref, RefCell},
19    num::NonZeroU64,
20    rc::Rc,
21};
22
23use nautilus_common::{
24    cache::Cache,
25    messages::data::DataResponse,
26    msgbus::{MessageBus, handler::MessageHandler},
27    timer::TimeEvent,
28};
29use nautilus_model::{
30    data::Data,
31    identifiers::{InstrumentId, Venue},
32};
33use ustr::Ustr;
34
35/// Contains information for creating snapshots of specific order books.
36#[derive(Clone, Debug)]
37pub struct BookSnapshotInfo {
38    pub instrument_id: InstrumentId,
39    pub venue: Venue,
40    pub is_composite: bool,
41    pub root: Ustr,
42    pub topic: Ustr,
43    pub interval_ms: NonZeroU64,
44}
45
46pub struct BookUpdater {
47    pub id: Ustr,
48    pub instrument_id: InstrumentId,
49    pub cache: Rc<RefCell<Cache>>,
50}
51
52impl BookUpdater {
53    /// Creates a new [`BookUpdater`] instance.
54    pub fn new(instrument_id: &InstrumentId, cache: Rc<RefCell<Cache>>) -> Self {
55        Self {
56            id: Ustr::from(&format!("{}-{}", stringify!(BookUpdater), instrument_id)),
57            instrument_id: *instrument_id,
58            cache,
59        }
60    }
61}
62
63impl MessageHandler for BookUpdater {
64    fn id(&self) -> Ustr {
65        self.id
66    }
67
68    fn handle(&self, message: &dyn Any) {}
69    fn handle_response(&self, _resp: DataResponse) {}
70    fn handle_data(&self, data: Data) {
71        if let Some(book) = self
72            .cache
73            .borrow_mut()
74            .order_book_mut(&data.instrument_id())
75        {
76            match data {
77                Data::Delta(delta) => book.apply_delta(&delta),
78                Data::Deltas(deltas) => book.apply_deltas(&deltas),
79                Data::Depth10(depth) => book.apply_depth(&depth),
80                _ => log::error!("Invalid data type for book update, was {data:?}"),
81            }
82        }
83    }
84    fn as_any(&self) -> &dyn Any {
85        self
86    }
87}
88
89pub struct BookSnapshotter {
90    pub id: Ustr,
91    pub timer_name: Ustr,
92    pub snap_info: BookSnapshotInfo,
93    pub cache: Rc<RefCell<Cache>>,
94    pub msgbus: Rc<RefCell<MessageBus>>,
95}
96
97impl BookSnapshotter {
98    /// Creates a new [`BookSnapshotter`] instance.
99    pub fn new(
100        snap_info: BookSnapshotInfo,
101        cache: Rc<RefCell<Cache>>,
102        msgbus: Rc<RefCell<MessageBus>>,
103    ) -> Self {
104        let id_str = format!(
105            "{}-{}",
106            stringify!(BookSnapshotter),
107            snap_info.instrument_id
108        );
109        let timer_name = format!(
110            "OrderBook|{}|{}",
111            snap_info.instrument_id, snap_info.interval_ms
112        );
113
114        Self {
115            id: Ustr::from(&id_str),
116            timer_name: Ustr::from(&timer_name),
117            snap_info,
118            cache,
119            msgbus,
120        }
121    }
122
123    pub fn snapshot(&self, event: TimeEvent) {
124        let cache = self.cache.borrow();
125        let mut msgbus = self.msgbus.borrow_mut();
126
127        if self.snap_info.is_composite {
128            let topic = self.snap_info.topic;
129            let underlying = self.snap_info.root;
130            for instrument in cache.instruments(&self.snap_info.venue, Some(&underlying)) {
131                self.publish_order_book(&instrument.id(), &topic, &cache, &mut msgbus);
132            }
133        } else {
134            self.publish_order_book(
135                &self.snap_info.instrument_id,
136                &self.snap_info.topic,
137                &cache,
138                &mut msgbus,
139            );
140        }
141    }
142
143    fn publish_order_book(
144        &self,
145        instrument_id: &InstrumentId,
146        topic: &Ustr,
147        cache: &Ref<Cache>,
148        msgbus: &mut MessageBus,
149    ) {
150        let book = cache
151            .order_book(instrument_id)
152            .unwrap_or_else(|| panic!("OrderBook for {instrument_id} was not in cache"));
153
154        if book.count == 0 {
155            log::debug!("OrderBook for {instrument_id} not yet updated for snapshot");
156            return;
157        }
158
159        msgbus.publish(topic, book as &dyn Any);
160    }
161}