nautilus_bitmex/websocket/
cache.rs1use ahash::AHashMap;
19use nautilus_core::UnixNanos;
20use nautilus_model::{
21 data::quote::QuoteTick,
22 identifiers::InstrumentId,
23 instruments::{Instrument, InstrumentAny},
24 types::price::Price,
25};
26
27use super::{messages::BitmexQuoteMsg, parse::parse_quote_msg};
28use crate::common::parse::parse_contracts_quantity;
29
30pub(crate) struct QuoteCache {
36 last_quotes: AHashMap<InstrumentId, QuoteTick>,
37}
38
39impl QuoteCache {
40 pub fn new() -> Self {
42 Self {
43 last_quotes: AHashMap::new(),
44 }
45 }
46
47 pub fn clear(&mut self) {
49 self.last_quotes.clear();
50 }
51
52 pub fn process(
54 &mut self,
55 msg: &BitmexQuoteMsg,
56 instrument: &InstrumentAny,
57 ts_init: UnixNanos,
58 ) -> Option<QuoteTick> {
59 let instrument_id = instrument.id();
60 let price_precision = instrument.price_precision();
61
62 let quote = if let Some(last_quote) = self.last_quotes.get(&instrument_id) {
63 Some(parse_quote_msg(
64 msg,
65 last_quote,
66 instrument,
67 instrument_id,
68 price_precision,
69 ts_init,
70 ))
71 } else {
72 match (msg.bid_price, msg.ask_price, msg.bid_size, msg.ask_size) {
73 (Some(bid_price), Some(ask_price), Some(bid_size), Some(ask_size)) => {
74 Some(QuoteTick::new(
75 instrument_id,
76 Price::new(bid_price, price_precision),
77 Price::new(ask_price, price_precision),
78 parse_contracts_quantity(bid_size, instrument),
79 parse_contracts_quantity(ask_size, instrument),
80 UnixNanos::from(msg.timestamp),
81 ts_init,
82 ))
83 }
84 _ => None,
85 }
86 };
87
88 if let Some(quote) = "e {
90 self.last_quotes.insert(instrument_id, *quote);
91 }
92
93 quote
94 }
95}
96
97#[cfg(test)]
102mod tests {
103 use chrono::Utc;
104 use nautilus_model::{
105 identifiers::Symbol,
106 instruments::currency_pair::CurrencyPair,
107 types::{Currency, Quantity},
108 };
109 use rstest::rstest;
110
111 use super::*;
112 use crate::common::parse::parse_instrument_id;
113
114 fn make_test_instrument(price_precision: u8, size_precision: u8) -> InstrumentAny {
115 let instrument_id = InstrumentId::from("XBTUSD.BITMEX");
116 InstrumentAny::CurrencyPair(CurrencyPair::new(
117 instrument_id,
118 Symbol::new("XBTUSD"),
119 Currency::BTC(),
120 Currency::USD(),
121 price_precision,
122 size_precision,
123 Price::new(1.0, price_precision),
124 Quantity::new(1.0, size_precision),
125 None,
126 None,
127 None,
128 None,
129 None,
130 None,
131 None,
132 None,
133 None,
134 None,
135 None,
136 None,
137 UnixNanos::default(),
138 UnixNanos::default(),
139 ))
140 }
141
142 #[rstest]
143 fn test_quote_cache_new() {
144 let cache = QuoteCache::new();
145 assert!(cache.last_quotes.is_empty());
146 }
147
148 #[rstest]
149 fn test_process_complete_quote() {
150 let mut cache = QuoteCache::new();
151
152 let msg = BitmexQuoteMsg {
153 symbol: "XBTUSD".into(),
154 bid_price: Some(50000.5),
155 ask_price: Some(50001.0),
156 bid_size: Some(100),
157 ask_size: Some(150),
158 timestamp: Utc::now(),
159 };
160
161 let ts_init = UnixNanos::default();
162 let instrument = make_test_instrument(1, 0);
163 let quote = cache.process(&msg, &instrument, ts_init);
164
165 assert!(quote.is_some());
166 let quote = quote.unwrap();
167 assert_eq!(quote.instrument_id, parse_instrument_id("XBTUSD".into()));
168 assert_eq!(quote.bid_price, Price::new(50000.5, 1));
169 assert_eq!(quote.ask_price, Price::new(50001.0, 1));
170 assert_eq!(quote.bid_size, Quantity::from(100));
171 assert_eq!(quote.ask_size, Quantity::from(150));
172 }
173
174 #[rstest]
175 fn test_process_partial_quote_without_cache() {
176 let mut cache = QuoteCache::new();
177
178 let msg = BitmexQuoteMsg {
180 symbol: "XBTUSD".into(),
181 bid_price: Some(50000.5),
182 ask_price: Some(50001.0),
183 bid_size: Some(100),
184 ask_size: None,
185 timestamp: Utc::now(),
186 };
187
188 let ts_init = UnixNanos::default();
189 let instrument = make_test_instrument(1, 0);
190 let quote = cache.process(&msg, &instrument, ts_init);
191
192 assert!(quote.is_none());
194 }
195
196 #[rstest]
197 fn test_process_partial_quote_with_cache() {
198 let mut cache = QuoteCache::new();
199
200 let complete_msg = BitmexQuoteMsg {
202 symbol: "XBTUSD".into(),
203 bid_price: Some(50000.5),
204 ask_price: Some(50001.0),
205 bid_size: Some(100),
206 ask_size: Some(150),
207 timestamp: Utc::now(),
208 };
209
210 let ts_init = UnixNanos::default();
211 let instrument = make_test_instrument(1, 0);
212 let first_quote = cache.process(&complete_msg, &instrument, ts_init).unwrap();
213
214 let partial_msg = BitmexQuoteMsg {
216 symbol: "XBTUSD".into(),
217 bid_price: Some(50002.0),
218 ask_price: None,
219 bid_size: Some(200),
220 ask_size: None,
221 timestamp: Utc::now(),
222 };
223
224 let quote = cache.process(&partial_msg, &instrument, ts_init);
225
226 assert!(quote.is_some());
227 let quote = quote.unwrap();
228
229 assert_eq!(quote.bid_price, Price::new(50002.0, 1));
231 assert_eq!(quote.bid_size, Quantity::from(200));
232
233 assert_eq!(quote.ask_price, first_quote.ask_price);
235 assert_eq!(quote.ask_size, first_quote.ask_size);
236 }
237
238 #[rstest]
239 fn test_cache_updates_after_processing() {
240 let mut cache = QuoteCache::new();
241
242 let msg = BitmexQuoteMsg {
243 symbol: "XBTUSD".into(),
244 bid_price: Some(50000.5),
245 ask_price: Some(50001.0),
246 bid_size: Some(100),
247 ask_size: Some(150),
248 timestamp: Utc::now(),
249 };
250
251 let ts_init = UnixNanos::default();
252 let instrument = make_test_instrument(1, 0);
253 let quote = cache.process(&msg, &instrument, ts_init).unwrap();
254
255 let instrument_id = parse_instrument_id("XBTUSD".into());
256 assert!(cache.last_quotes.contains_key(&instrument_id));
257 assert_eq!(cache.last_quotes[&instrument_id], quote);
258 }
259}