nautilus_bitmex/websocket/
cache.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use ahash::AHashMap;
17use nautilus_core::UnixNanos;
18use nautilus_model::{data::quote::QuoteTick, identifiers::InstrumentId, types::price::Price};
19
20use super::{messages::BitmexQuoteMsg, parse::parse_quote_msg};
21use crate::common::parse::{parse_contracts_quantity, parse_instrument_id};
22
23/// Maintains quote state for each instrument to handle partial quote updates.
24///
25/// BitMEX quote messages may contain incomplete information (missing bid or ask side).
26/// When this happens, we need to reference the last known complete quote to construct
27/// a valid `QuoteTick` which requires both sides.
28pub(crate) struct QuoteCache {
29    last_quotes: AHashMap<InstrumentId, QuoteTick>,
30}
31
32impl QuoteCache {
33    /// Creates a new [`QuoteCache`] instance.
34    pub fn new() -> Self {
35        Self {
36            last_quotes: AHashMap::new(),
37        }
38    }
39
40    pub fn process(
41        &mut self,
42        msg: &BitmexQuoteMsg,
43        price_precision: u8,
44        ts_init: UnixNanos,
45    ) -> Option<QuoteTick> {
46        let instrument_id = parse_instrument_id(msg.symbol);
47
48        let quote = if let Some(last_quote) = self.last_quotes.get(&instrument_id) {
49            Some(parse_quote_msg(msg, last_quote, price_precision, ts_init))
50        } else {
51            match (msg.bid_price, msg.ask_price, msg.bid_size, msg.ask_size) {
52                (Some(bid_price), Some(ask_price), Some(bid_size), Some(ask_size)) => {
53                    Some(QuoteTick::new(
54                        instrument_id,
55                        Price::new(bid_price, price_precision),
56                        Price::new(ask_price, price_precision),
57                        parse_contracts_quantity(bid_size),
58                        parse_contracts_quantity(ask_size),
59                        UnixNanos::from(msg.timestamp),
60                        ts_init,
61                    ))
62                }
63                _ => None,
64            }
65        };
66
67        // Update cache if a quote was created
68        if let Some(quote) = &quote {
69            self.last_quotes.insert(instrument_id, *quote);
70        }
71
72        quote
73    }
74}
75
76#[cfg(test)]
77mod tests {
78    use chrono::Utc;
79    use nautilus_model::types::quantity::Quantity;
80    use rstest::rstest;
81
82    use super::*;
83
84    #[rstest]
85    fn test_quote_cache_new() {
86        let cache = QuoteCache::new();
87        assert!(cache.last_quotes.is_empty());
88    }
89
90    #[rstest]
91    fn test_process_complete_quote() {
92        let mut cache = QuoteCache::new();
93
94        let msg = BitmexQuoteMsg {
95            symbol: "XBTUSD".into(),
96            bid_price: Some(50000.5),
97            ask_price: Some(50001.0),
98            bid_size: Some(100),
99            ask_size: Some(150),
100            timestamp: Utc::now(),
101        };
102
103        let ts_init = UnixNanos::default();
104        let quote = cache.process(&msg, 1, ts_init);
105
106        assert!(quote.is_some());
107        let quote = quote.unwrap();
108        assert_eq!(quote.instrument_id, parse_instrument_id("XBTUSD".into()));
109        assert_eq!(quote.bid_price, Price::new(50000.5, 1));
110        assert_eq!(quote.ask_price, Price::new(50001.0, 1));
111        assert_eq!(quote.bid_size, Quantity::from(100));
112        assert_eq!(quote.ask_size, Quantity::from(150));
113    }
114
115    #[rstest]
116    fn test_process_partial_quote_without_cache() {
117        let mut cache = QuoteCache::new();
118
119        // Partial quote with missing ask_size
120        let msg = BitmexQuoteMsg {
121            symbol: "XBTUSD".into(),
122            bid_price: Some(50000.5),
123            ask_price: Some(50001.0),
124            bid_size: Some(100),
125            ask_size: None,
126            timestamp: Utc::now(),
127        };
128
129        let ts_init = UnixNanos::default();
130        let quote = cache.process(&msg, 1, ts_init);
131
132        // Should return None for incomplete first quote
133        assert!(quote.is_none());
134    }
135
136    #[rstest]
137    fn test_process_partial_quote_with_cache() {
138        let mut cache = QuoteCache::new();
139
140        // First, process a complete quote
141        let complete_msg = BitmexQuoteMsg {
142            symbol: "XBTUSD".into(),
143            bid_price: Some(50000.5),
144            ask_price: Some(50001.0),
145            bid_size: Some(100),
146            ask_size: Some(150),
147            timestamp: Utc::now(),
148        };
149
150        let ts_init = UnixNanos::default();
151        let first_quote = cache.process(&complete_msg, 1, ts_init).unwrap();
152
153        // Now process a partial quote with only bid update
154        let partial_msg = BitmexQuoteMsg {
155            symbol: "XBTUSD".into(),
156            bid_price: Some(50002.0),
157            ask_price: None,
158            bid_size: Some(200),
159            ask_size: None,
160            timestamp: Utc::now(),
161        };
162
163        let quote = cache.process(&partial_msg, 1, ts_init);
164
165        assert!(quote.is_some());
166        let quote = quote.unwrap();
167
168        // Bid should be updated
169        assert_eq!(quote.bid_price, Price::new(50002.0, 1));
170        assert_eq!(quote.bid_size, Quantity::from(200));
171
172        // Ask should be from the cached quote
173        assert_eq!(quote.ask_price, first_quote.ask_price);
174        assert_eq!(quote.ask_size, first_quote.ask_size);
175    }
176
177    #[rstest]
178    fn test_cache_updates_after_processing() {
179        let mut cache = QuoteCache::new();
180
181        let msg = BitmexQuoteMsg {
182            symbol: "XBTUSD".into(),
183            bid_price: Some(50000.5),
184            ask_price: Some(50001.0),
185            bid_size: Some(100),
186            ask_size: Some(150),
187            timestamp: Utc::now(),
188        };
189
190        let ts_init = UnixNanos::default();
191        let quote = cache.process(&msg, 1, ts_init).unwrap();
192
193        let instrument_id = parse_instrument_id("XBTUSD".into());
194        assert!(cache.last_quotes.contains_key(&instrument_id));
195        assert_eq!(cache.last_quotes[&instrument_id], quote);
196    }
197}