nautilus_bitmex/websocket/
cache.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Quote cache for reconstructing BitMEX WebSocket partial updates.
17
18use ahash::AHashMap;
19use nautilus_core::UnixNanos;
20use nautilus_model::{
21    data::quote::QuoteTick,
22    identifiers::InstrumentId,
23    instruments::{Instrument, InstrumentAny},
24    types::price::Price,
25};
26
27use super::{messages::BitmexQuoteMsg, parse::parse_quote_msg};
28use crate::common::parse::parse_contracts_quantity;
29
30/// Maintains quote state for each instrument to handle partial quote updates.
31///
32/// BitMEX quote messages may contain incomplete information (missing bid or ask side).
33/// When this happens, we need to reference the last known complete quote to construct
34/// a valid `QuoteTick` which requires both sides.
35pub(crate) struct QuoteCache {
36    last_quotes: AHashMap<InstrumentId, QuoteTick>,
37}
38
39impl QuoteCache {
40    /// Creates a new [`QuoteCache`] instance.
41    pub fn new() -> Self {
42        Self {
43            last_quotes: AHashMap::new(),
44        }
45    }
46
47    /// Clears all cached quotes, typically used after reconnection.
48    pub fn clear(&mut self) {
49        self.last_quotes.clear();
50    }
51
52    /// Processes an incoming quote message, emitting a complete quote when possible.
53    pub fn process(
54        &mut self,
55        msg: &BitmexQuoteMsg,
56        instrument: &InstrumentAny,
57        ts_init: UnixNanos,
58    ) -> Option<QuoteTick> {
59        let instrument_id = instrument.id();
60        let price_precision = instrument.price_precision();
61
62        let quote = if let Some(last_quote) = self.last_quotes.get(&instrument_id) {
63            Some(parse_quote_msg(
64                msg,
65                last_quote,
66                instrument,
67                instrument_id,
68                price_precision,
69                ts_init,
70            ))
71        } else {
72            match (msg.bid_price, msg.ask_price, msg.bid_size, msg.ask_size) {
73                (Some(bid_price), Some(ask_price), Some(bid_size), Some(ask_size)) => {
74                    Some(QuoteTick::new(
75                        instrument_id,
76                        Price::new(bid_price, price_precision),
77                        Price::new(ask_price, price_precision),
78                        parse_contracts_quantity(bid_size, instrument),
79                        parse_contracts_quantity(ask_size, instrument),
80                        UnixNanos::from(msg.timestamp),
81                        ts_init,
82                    ))
83                }
84                _ => None,
85            }
86        };
87
88        // Update cache if a quote was created
89        if let Some(quote) = &quote {
90            self.last_quotes.insert(instrument_id, *quote);
91        }
92
93        quote
94    }
95}
96
97////////////////////////////////////////////////////////////////////////////////
98// Tests
99////////////////////////////////////////////////////////////////////////////////
100
101#[cfg(test)]
102mod tests {
103    use chrono::Utc;
104    use nautilus_model::{
105        identifiers::Symbol,
106        instruments::currency_pair::CurrencyPair,
107        types::{Currency, Quantity},
108    };
109    use rstest::rstest;
110
111    use super::*;
112    use crate::common::parse::parse_instrument_id;
113
114    fn make_test_instrument(price_precision: u8, size_precision: u8) -> InstrumentAny {
115        let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
116        InstrumentAny::CurrencyPair(CurrencyPair::new(
117            instrument_id,
118            Symbol::new("XBTUSD"),
119            Currency::BTC(),
120            Currency::USD(),
121            price_precision,
122            size_precision,
123            Price::new(1.0, price_precision),
124            Quantity::new(1.0, size_precision),
125            None,
126            None,
127            None,
128            None,
129            None,
130            None,
131            None,
132            None,
133            None,
134            None,
135            None,
136            None,
137            UnixNanos::default(),
138            UnixNanos::default(),
139        ))
140    }
141
142    #[rstest]
143    fn test_quote_cache_new() {
144        let cache = QuoteCache::new();
145        assert!(cache.last_quotes.is_empty());
146    }
147
148    #[rstest]
149    fn test_process_complete_quote() {
150        let mut cache = QuoteCache::new();
151
152        let msg = BitmexQuoteMsg {
153            symbol: "XBTUSD".into(),
154            bid_price: Some(50000.5),
155            ask_price: Some(50001.0),
156            bid_size: Some(100),
157            ask_size: Some(150),
158            timestamp: Utc::now(),
159        };
160
161        let ts_init = UnixNanos::default();
162        let instrument = make_test_instrument(1, 0);
163        let quote = cache.process(&msg, &instrument, ts_init);
164
165        assert!(quote.is_some());
166        let quote = quote.unwrap();
167        assert_eq!(quote.instrument_id, parse_instrument_id("XBTUSD".into()));
168        assert_eq!(quote.bid_price, Price::new(50000.5, 1));
169        assert_eq!(quote.ask_price, Price::new(50001.0, 1));
170        assert_eq!(quote.bid_size, Quantity::from(100));
171        assert_eq!(quote.ask_size, Quantity::from(150));
172    }
173
174    #[rstest]
175    fn test_process_partial_quote_without_cache() {
176        let mut cache = QuoteCache::new();
177
178        // Partial quote with missing ask_size
179        let msg = BitmexQuoteMsg {
180            symbol: "XBTUSD".into(),
181            bid_price: Some(50000.5),
182            ask_price: Some(50001.0),
183            bid_size: Some(100),
184            ask_size: None,
185            timestamp: Utc::now(),
186        };
187
188        let ts_init = UnixNanos::default();
189        let instrument = make_test_instrument(1, 0);
190        let quote = cache.process(&msg, &instrument, ts_init);
191
192        // Should return None for incomplete first quote
193        assert!(quote.is_none());
194    }
195
196    #[rstest]
197    fn test_process_partial_quote_with_cache() {
198        let mut cache = QuoteCache::new();
199
200        // First, process a complete quote
201        let complete_msg = BitmexQuoteMsg {
202            symbol: "XBTUSD".into(),
203            bid_price: Some(50000.5),
204            ask_price: Some(50001.0),
205            bid_size: Some(100),
206            ask_size: Some(150),
207            timestamp: Utc::now(),
208        };
209
210        let ts_init = UnixNanos::default();
211        let instrument = make_test_instrument(1, 0);
212        let first_quote = cache.process(&complete_msg, &instrument, ts_init).unwrap();
213
214        // Now process a partial quote with only bid update
215        let partial_msg = BitmexQuoteMsg {
216            symbol: "XBTUSD".into(),
217            bid_price: Some(50002.0),
218            ask_price: None,
219            bid_size: Some(200),
220            ask_size: None,
221            timestamp: Utc::now(),
222        };
223
224        let quote = cache.process(&partial_msg, &instrument, ts_init);
225
226        assert!(quote.is_some());
227        let quote = quote.unwrap();
228
229        // Bid should be updated
230        assert_eq!(quote.bid_price, Price::new(50002.0, 1));
231        assert_eq!(quote.bid_size, Quantity::from(200));
232
233        // Ask should be from the cached quote
234        assert_eq!(quote.ask_price, first_quote.ask_price);
235        assert_eq!(quote.ask_size, first_quote.ask_size);
236    }
237
238    #[rstest]
239    fn test_cache_updates_after_processing() {
240        let mut cache = QuoteCache::new();
241
242        let msg = BitmexQuoteMsg {
243            symbol: "XBTUSD".into(),
244            bid_price: Some(50000.5),
245            ask_price: Some(50001.0),
246            bid_size: Some(100),
247            ask_size: Some(150),
248            timestamp: Utc::now(),
249        };
250
251        let ts_init = UnixNanos::default();
252        let instrument = make_test_instrument(1, 0);
253        let quote = cache.process(&msg, &instrument, ts_init).unwrap();
254
255        let instrument_id = parse_instrument_id("XBTUSD".into());
256        assert!(cache.last_quotes.contains_key(&instrument_id));
257        assert_eq!(cache.last_quotes[&instrument_id], quote);
258    }
259}