nautilus_bitmex/websocket/
cache.rs1use ahash::AHashMap;
17use nautilus_core::UnixNanos;
18use nautilus_model::{data::quote::QuoteTick, identifiers::InstrumentId, types::price::Price};
19
20use super::{messages::BitmexQuoteMsg, parse::parse_quote_msg};
21use crate::common::parse::{parse_contracts_quantity, parse_instrument_id};
22
23pub(crate) struct QuoteCache {
29 last_quotes: AHashMap<InstrumentId, QuoteTick>,
30}
31
32impl QuoteCache {
33 pub fn new() -> Self {
35 Self {
36 last_quotes: AHashMap::new(),
37 }
38 }
39
40 pub fn process(
41 &mut self,
42 msg: &BitmexQuoteMsg,
43 price_precision: u8,
44 ts_init: UnixNanos,
45 ) -> Option<QuoteTick> {
46 let instrument_id = parse_instrument_id(msg.symbol);
47
48 let quote = if let Some(last_quote) = self.last_quotes.get(&instrument_id) {
49 Some(parse_quote_msg(msg, last_quote, price_precision, ts_init))
50 } else {
51 match (msg.bid_price, msg.ask_price, msg.bid_size, msg.ask_size) {
52 (Some(bid_price), Some(ask_price), Some(bid_size), Some(ask_size)) => {
53 Some(QuoteTick::new(
54 instrument_id,
55 Price::new(bid_price, price_precision),
56 Price::new(ask_price, price_precision),
57 parse_contracts_quantity(bid_size),
58 parse_contracts_quantity(ask_size),
59 UnixNanos::from(msg.timestamp),
60 ts_init,
61 ))
62 }
63 _ => None,
64 }
65 };
66
67 if let Some(quote) = "e {
69 self.last_quotes.insert(instrument_id, *quote);
70 }
71
72 quote
73 }
74}
75
76#[cfg(test)]
77mod tests {
78 use chrono::Utc;
79 use nautilus_model::types::quantity::Quantity;
80 use rstest::rstest;
81
82 use super::*;
83
84 #[rstest]
85 fn test_quote_cache_new() {
86 let cache = QuoteCache::new();
87 assert!(cache.last_quotes.is_empty());
88 }
89
90 #[rstest]
91 fn test_process_complete_quote() {
92 let mut cache = QuoteCache::new();
93
94 let msg = BitmexQuoteMsg {
95 symbol: "XBTUSD".into(),
96 bid_price: Some(50000.5),
97 ask_price: Some(50001.0),
98 bid_size: Some(100),
99 ask_size: Some(150),
100 timestamp: Utc::now(),
101 };
102
103 let ts_init = UnixNanos::default();
104 let quote = cache.process(&msg, 1, ts_init);
105
106 assert!(quote.is_some());
107 let quote = quote.unwrap();
108 assert_eq!(quote.instrument_id, parse_instrument_id("XBTUSD".into()));
109 assert_eq!(quote.bid_price, Price::new(50000.5, 1));
110 assert_eq!(quote.ask_price, Price::new(50001.0, 1));
111 assert_eq!(quote.bid_size, Quantity::from(100));
112 assert_eq!(quote.ask_size, Quantity::from(150));
113 }
114
115 #[rstest]
116 fn test_process_partial_quote_without_cache() {
117 let mut cache = QuoteCache::new();
118
119 let msg = BitmexQuoteMsg {
121 symbol: "XBTUSD".into(),
122 bid_price: Some(50000.5),
123 ask_price: Some(50001.0),
124 bid_size: Some(100),
125 ask_size: None,
126 timestamp: Utc::now(),
127 };
128
129 let ts_init = UnixNanos::default();
130 let quote = cache.process(&msg, 1, ts_init);
131
132 assert!(quote.is_none());
134 }
135
136 #[rstest]
137 fn test_process_partial_quote_with_cache() {
138 let mut cache = QuoteCache::new();
139
140 let complete_msg = BitmexQuoteMsg {
142 symbol: "XBTUSD".into(),
143 bid_price: Some(50000.5),
144 ask_price: Some(50001.0),
145 bid_size: Some(100),
146 ask_size: Some(150),
147 timestamp: Utc::now(),
148 };
149
150 let ts_init = UnixNanos::default();
151 let first_quote = cache.process(&complete_msg, 1, ts_init).unwrap();
152
153 let partial_msg = BitmexQuoteMsg {
155 symbol: "XBTUSD".into(),
156 bid_price: Some(50002.0),
157 ask_price: None,
158 bid_size: Some(200),
159 ask_size: None,
160 timestamp: Utc::now(),
161 };
162
163 let quote = cache.process(&partial_msg, 1, ts_init);
164
165 assert!(quote.is_some());
166 let quote = quote.unwrap();
167
168 assert_eq!(quote.bid_price, Price::new(50002.0, 1));
170 assert_eq!(quote.bid_size, Quantity::from(200));
171
172 assert_eq!(quote.ask_price, first_quote.ask_price);
174 assert_eq!(quote.ask_size, first_quote.ask_size);
175 }
176
177 #[rstest]
178 fn test_cache_updates_after_processing() {
179 let mut cache = QuoteCache::new();
180
181 let msg = BitmexQuoteMsg {
182 symbol: "XBTUSD".into(),
183 bid_price: Some(50000.5),
184 ask_price: Some(50001.0),
185 bid_size: Some(100),
186 ask_size: Some(150),
187 timestamp: Utc::now(),
188 };
189
190 let ts_init = UnixNanos::default();
191 let quote = cache.process(&msg, 1, ts_init).unwrap();
192
193 let instrument_id = parse_instrument_id("XBTUSD".into());
194 assert!(cache.last_quotes.contains_key(&instrument_id));
195 assert_eq!(cache.last_quotes[&instrument_id], quote);
196 }
197}