nautilus_data/engine/
book.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    any::Any,
18    cell::{Ref, RefCell},
19    num::NonZeroUsize,
20    rc::Rc,
21};
22
23use nautilus_common::{
24    cache::Cache,
25    msgbus::{self, MStr, Topic, handler::MessageHandler},
26    timer::TimeEvent,
27};
28use nautilus_model::{
29    data::Data,
30    identifiers::{InstrumentId, Venue},
31    instruments::Instrument,
32};
33use ustr::Ustr;
34
35/// Contains information for creating snapshots of specific order books.
36#[derive(Clone, Debug)]
37pub struct BookSnapshotInfo {
38    pub instrument_id: InstrumentId,
39    pub venue: Venue,
40    pub is_composite: bool,
41    pub root: Ustr,
42    pub topic: MStr<Topic>,
43    pub interval_ms: NonZeroUsize,
44}
45
46/// Handles order book updates and delta processing for a specific instrument.
47///
48/// The `BookUpdater` processes incoming order book deltas and maintains
49/// the current state of an order book. It can handle both incremental
50/// updates and full snapshots for the instrument it's assigned to.
51#[derive(Debug)]
52pub struct BookUpdater {
53    pub id: Ustr,
54    pub instrument_id: InstrumentId,
55    pub cache: Rc<RefCell<Cache>>,
56}
57
58impl BookUpdater {
59    /// Creates a new [`BookUpdater`] instance.
60    pub fn new(instrument_id: &InstrumentId, cache: Rc<RefCell<Cache>>) -> Self {
61        Self {
62            id: Ustr::from(&format!("{}-{}", stringify!(BookUpdater), instrument_id)),
63            instrument_id: *instrument_id,
64            cache,
65        }
66    }
67}
68
69impl MessageHandler for BookUpdater {
70    fn id(&self) -> Ustr {
71        self.id
72    }
73
74    fn handle(&self, message: &dyn Any) {
75        // TODO: Temporary handler implementation (this will be removed soon)
76        if let Some(data) = message.downcast_ref::<Data>()
77            && let Some(book) = self
78                .cache
79                .borrow_mut()
80                .order_book_mut(&data.instrument_id())
81        {
82            match data {
83                Data::Delta(delta) => {
84                    if let Err(e) = book.apply_delta(delta) {
85                        log::error!("Failed to apply delta: {e}");
86                    }
87                }
88                Data::Deltas(deltas) => {
89                    if let Err(e) = book.apply_deltas(deltas) {
90                        log::error!("Failed to apply deltas: {e}");
91                    }
92                }
93                Data::Depth10(depth) => {
94                    if let Err(e) = book.apply_depth(depth) {
95                        log::error!("Failed to apply depth: {e}");
96                    }
97                }
98                _ => log::error!("Invalid data type for book update, was {data:?}"),
99            }
100        }
101    }
102
103    fn as_any(&self) -> &dyn Any {
104        self
105    }
106}
107
108/// Creates periodic snapshots of order books at configured intervals.
109///
110/// The `BookSnapshotter` generates order book snapshots on timer events,
111/// publishing them as market data. This is useful for providing periodic
112/// full order book state updates in addition to incremental delta updates.
113#[derive(Debug)]
114pub struct BookSnapshotter {
115    pub id: Ustr,
116    pub timer_name: Ustr,
117    pub snap_info: BookSnapshotInfo,
118    pub cache: Rc<RefCell<Cache>>,
119}
120
121impl BookSnapshotter {
122    /// Creates a new [`BookSnapshotter`] instance.
123    pub fn new(snap_info: BookSnapshotInfo, cache: Rc<RefCell<Cache>>) -> Self {
124        let id_str = format!(
125            "{}-{}",
126            stringify!(BookSnapshotter),
127            snap_info.instrument_id
128        );
129        let timer_name = format!(
130            "OrderBook|{}|{}",
131            snap_info.instrument_id, snap_info.interval_ms
132        );
133
134        Self {
135            id: Ustr::from(&id_str),
136            timer_name: Ustr::from(&timer_name),
137            snap_info,
138            cache,
139        }
140    }
141
142    pub fn snapshot(&self, _event: TimeEvent) {
143        let cache = self.cache.borrow();
144
145        if self.snap_info.is_composite {
146            let topic = self.snap_info.topic;
147            let underlying = self.snap_info.root;
148            for instrument in cache.instruments(&self.snap_info.venue, Some(&underlying)) {
149                self.publish_order_book(&instrument.id(), topic, &cache);
150            }
151        } else {
152            self.publish_order_book(&self.snap_info.instrument_id, self.snap_info.topic, &cache);
153        }
154    }
155
156    fn publish_order_book(
157        &self,
158        instrument_id: &InstrumentId,
159        topic: MStr<Topic>,
160        cache: &Ref<Cache>,
161    ) {
162        let book = cache
163            .order_book(instrument_id)
164            .unwrap_or_else(|| panic!("OrderBook for {instrument_id} was not in cache"));
165
166        if book.update_count == 0 {
167            log::debug!("OrderBook for {instrument_id} not yet updated for snapshot");
168            return;
169        }
170
171        msgbus::publish(topic, book as &dyn Any);
172    }
173}