Skip to main content

nautilus_data/engine/
book.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    cell::{Ref, RefCell},
18    num::NonZeroUsize,
19    rc::Rc,
20};
21
22use nautilus_common::{
23    cache::Cache,
24    msgbus::{self, Handler, MStr, Topic},
25    timer::TimeEvent,
26};
27use nautilus_model::{
28    data::{OrderBookDeltas, OrderBookDepth10},
29    identifiers::{InstrumentId, Venue},
30    instruments::Instrument,
31};
32use ustr::Ustr;
33
34/// Contains information for creating snapshots of specific order books.
35#[derive(Clone, Debug)]
36pub struct BookSnapshotInfo {
37    pub instrument_id: InstrumentId,
38    pub venue: Venue,
39    pub is_composite: bool,
40    pub root: Ustr,
41    pub topic: MStr<Topic>,
42    pub interval_ms: NonZeroUsize,
43}
44
45/// Handles order book updates and delta processing for a specific instrument.
46///
47/// The `BookUpdater` processes incoming order book deltas and maintains
48/// the current state of an order book. It can handle both incremental
49/// updates and full snapshots for the instrument it's assigned to.
50#[derive(Debug)]
51pub struct BookUpdater {
52    pub id: Ustr,
53    pub instrument_id: InstrumentId,
54    pub cache: Rc<RefCell<Cache>>,
55}
56
57impl BookUpdater {
58    /// Creates a new [`BookUpdater`] instance.
59    pub fn new(instrument_id: &InstrumentId, cache: Rc<RefCell<Cache>>) -> Self {
60        Self {
61            id: Ustr::from(&format!("{}-{}", stringify!(BookUpdater), instrument_id)),
62            instrument_id: *instrument_id,
63            cache,
64        }
65    }
66}
67
68impl Handler<OrderBookDeltas> for BookUpdater {
69    fn id(&self) -> Ustr {
70        self.id
71    }
72
73    fn handle(&self, deltas: &OrderBookDeltas) {
74        if let Some(book) = self
75            .cache
76            .borrow_mut()
77            .order_book_mut(&deltas.instrument_id)
78            && let Err(e) = book.apply_deltas(deltas)
79        {
80            log::error!("Failed to apply deltas: {e}");
81        }
82    }
83}
84
85impl Handler<OrderBookDepth10> for BookUpdater {
86    fn id(&self) -> Ustr {
87        self.id
88    }
89
90    fn handle(&self, depth: &OrderBookDepth10) {
91        if let Some(book) = self.cache.borrow_mut().order_book_mut(&depth.instrument_id)
92            && let Err(e) = book.apply_depth(depth)
93        {
94            log::error!("Failed to apply depth: {e}");
95        }
96    }
97}
98
99/// Creates periodic snapshots of order books at configured intervals.
100///
101/// The `BookSnapshotter` generates order book snapshots on timer events,
102/// publishing them as market data. This is useful for providing periodic
103/// full order book state updates in addition to incremental delta updates.
104#[derive(Debug)]
105pub struct BookSnapshotter {
106    pub id: Ustr,
107    pub timer_name: Ustr,
108    pub snap_info: BookSnapshotInfo,
109    pub cache: Rc<RefCell<Cache>>,
110}
111
112impl BookSnapshotter {
113    /// Creates a new [`BookSnapshotter`] instance.
114    pub fn new(snap_info: BookSnapshotInfo, cache: Rc<RefCell<Cache>>) -> Self {
115        let id_str = format!(
116            "{}-{}",
117            stringify!(BookSnapshotter),
118            snap_info.instrument_id
119        );
120        let timer_name = format!(
121            "OrderBook|{}|{}",
122            snap_info.instrument_id, snap_info.interval_ms
123        );
124
125        Self {
126            id: Ustr::from(&id_str),
127            timer_name: Ustr::from(&timer_name),
128            snap_info,
129            cache,
130        }
131    }
132
133    pub fn snapshot(&self, _event: TimeEvent) {
134        log::debug!(
135            "BookSnapshotter.snapshot called for {}",
136            self.snap_info.instrument_id
137        );
138        let cache = self.cache.borrow();
139
140        if self.snap_info.is_composite {
141            let topic = self.snap_info.topic;
142            let underlying = self.snap_info.root;
143            for instrument in cache.instruments(&self.snap_info.venue, Some(&underlying)) {
144                self.publish_order_book(&instrument.id(), topic, &cache);
145            }
146        } else {
147            self.publish_order_book(&self.snap_info.instrument_id, self.snap_info.topic, &cache);
148        }
149    }
150
151    fn publish_order_book(
152        &self,
153        instrument_id: &InstrumentId,
154        topic: MStr<Topic>,
155        cache: &Ref<Cache>,
156    ) {
157        let book = cache
158            .order_book(instrument_id)
159            .unwrap_or_else(|| panic!("OrderBook for {instrument_id} was not in cache"));
160
161        if book.update_count == 0 {
162            log::debug!("OrderBook not yet updated for snapshot: {instrument_id}");
163            return;
164        }
165        log::debug!(
166            "Publishing OrderBook snapshot for {instrument_id} (update_count={})",
167            book.update_count
168        );
169
170        msgbus::publish_book(topic, book);
171    }
172}