nautilus_data/engine/
book.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    any::Any,
18    cell::{Ref, RefCell},
19    num::NonZeroUsize,
20    rc::Rc,
21};
22
23use nautilus_common::{
24    cache::Cache,
25    msgbus::{self, handler::MessageHandler},
26    timer::TimeEvent,
27};
28use nautilus_model::{
29    data::Data,
30    identifiers::{InstrumentId, Venue},
31    instruments::Instrument,
32};
33use ustr::Ustr;
34
35/// Contains information for creating snapshots of specific order books.
36#[derive(Clone, Debug)]
37pub struct BookSnapshotInfo {
38    pub instrument_id: InstrumentId,
39    pub venue: Venue,
40    pub is_composite: bool,
41    pub root: Ustr,
42    pub topic: Ustr,
43    pub interval_ms: NonZeroUsize,
44}
45
46pub struct BookUpdater {
47    pub id: Ustr,
48    pub instrument_id: InstrumentId,
49    pub cache: Rc<RefCell<Cache>>,
50}
51
52impl BookUpdater {
53    /// Creates a new [`BookUpdater`] instance.
54    pub fn new(instrument_id: &InstrumentId, cache: Rc<RefCell<Cache>>) -> Self {
55        Self {
56            id: Ustr::from(&format!("{}-{}", stringify!(BookUpdater), instrument_id)),
57            instrument_id: *instrument_id,
58            cache,
59        }
60    }
61}
62
63impl MessageHandler for BookUpdater {
64    fn id(&self) -> Ustr {
65        self.id
66    }
67
68    fn handle(&self, message: &dyn Any) {
69        // TODO: Temporary handler implementation (this will be removed soon)
70        if let Some(data) = message.downcast_ref::<Data>() {
71            if let Some(book) = self
72                .cache
73                .borrow_mut()
74                .order_book_mut(&data.instrument_id())
75            {
76                match data {
77                    Data::Delta(delta) => book.apply_delta(delta),
78                    Data::Deltas(deltas) => book.apply_deltas(deltas),
79                    Data::Depth10(depth) => book.apply_depth(depth),
80                    _ => log::error!("Invalid data type for book update, was {data:?}"),
81                }
82            }
83        }
84    }
85
86    fn as_any(&self) -> &dyn Any {
87        self
88    }
89}
90
91pub struct BookSnapshotter {
92    pub id: Ustr,
93    pub timer_name: Ustr,
94    pub snap_info: BookSnapshotInfo,
95    pub cache: Rc<RefCell<Cache>>,
96}
97
98impl BookSnapshotter {
99    /// Creates a new [`BookSnapshotter`] instance.
100    pub fn new(snap_info: BookSnapshotInfo, cache: Rc<RefCell<Cache>>) -> Self {
101        let id_str = format!(
102            "{}-{}",
103            stringify!(BookSnapshotter),
104            snap_info.instrument_id
105        );
106        let timer_name = format!(
107            "OrderBook|{}|{}",
108            snap_info.instrument_id, snap_info.interval_ms
109        );
110
111        Self {
112            id: Ustr::from(&id_str),
113            timer_name: Ustr::from(&timer_name),
114            snap_info,
115            cache,
116        }
117    }
118
119    pub fn snapshot(&self, event: TimeEvent) {
120        let cache = self.cache.borrow();
121
122        if self.snap_info.is_composite {
123            let topic = self.snap_info.topic;
124            let underlying = self.snap_info.root;
125            for instrument in cache.instruments(&self.snap_info.venue, Some(&underlying)) {
126                self.publish_order_book(&instrument.id(), &topic, &cache);
127            }
128        } else {
129            self.publish_order_book(&self.snap_info.instrument_id, &self.snap_info.topic, &cache);
130        }
131    }
132
133    fn publish_order_book(&self, instrument_id: &InstrumentId, topic: &Ustr, cache: &Ref<Cache>) {
134        let book = cache
135            .order_book(instrument_id)
136            .unwrap_or_else(|| panic!("OrderBook for {instrument_id} was not in cache"));
137
138        if book.update_count == 0 {
139            log::debug!("OrderBook for {instrument_id} not yet updated for snapshot");
140            return;
141        }
142
143        msgbus::publish(topic, book as &dyn Any);
144    }
145}