nautilus_kraken/websocket/
parse.rs1use anyhow::Context;
19use nautilus_core::nanos::UnixNanos;
20use nautilus_model::{
21 data::{BookOrder, OrderBookDelta, QuoteTick, TradeTick},
22 enums::{AggressorSide, BookAction, OrderSide},
23 identifiers::{InstrumentId, TradeId},
24 instruments::{Instrument, any::InstrumentAny},
25 types::{Price, Quantity},
26};
27
28use crate::{
29 common::enums::KrakenOrderSide,
30 websocket::messages::{
31 KrakenWsBookData, KrakenWsBookLevel, KrakenWsTickerData, KrakenWsTradeData,
32 },
33};
34
35pub fn parse_quote_tick(
42 ticker: &KrakenWsTickerData,
43 instrument: &InstrumentAny,
44 ts_init: UnixNanos,
45) -> anyhow::Result<QuoteTick> {
46 let instrument_id = instrument.id();
47 let price_precision = instrument.price_precision();
48 let size_precision = instrument.size_precision();
49
50 let bid_price = Price::new_checked(ticker.bid, price_precision).with_context(|| {
51 format!("Failed to construct bid Price with precision {price_precision}")
52 })?;
53 let bid_size = Quantity::new_checked(ticker.bid_qty, size_precision).with_context(|| {
54 format!("Failed to construct bid Quantity with precision {size_precision}")
55 })?;
56
57 let ask_price = Price::new_checked(ticker.ask, price_precision).with_context(|| {
58 format!("Failed to construct ask Price with precision {price_precision}")
59 })?;
60 let ask_size = Quantity::new_checked(ticker.ask_qty, size_precision).with_context(|| {
61 format!("Failed to construct ask Quantity with precision {size_precision}")
62 })?;
63
64 let ts_event = ts_init;
66
67 Ok(QuoteTick::new(
68 instrument_id,
69 bid_price,
70 ask_price,
71 bid_size,
72 ask_size,
73 ts_event,
74 ts_init,
75 ))
76}
77
78pub fn parse_trade_tick(
86 trade: &KrakenWsTradeData,
87 instrument: &InstrumentAny,
88 ts_init: UnixNanos,
89) -> anyhow::Result<TradeTick> {
90 let instrument_id = instrument.id();
91 let price_precision = instrument.price_precision();
92 let size_precision = instrument.size_precision();
93
94 let price = Price::new_checked(trade.price, price_precision)
95 .with_context(|| format!("Failed to construct Price with precision {price_precision}"))?;
96 let size = Quantity::new_checked(trade.qty, size_precision)
97 .with_context(|| format!("Failed to construct Quantity with precision {size_precision}"))?;
98
99 let aggressor = match trade.side {
100 KrakenOrderSide::Buy => AggressorSide::Buyer,
101 KrakenOrderSide::Sell => AggressorSide::Seller,
102 };
103
104 let trade_id = TradeId::new_checked(trade.trade_id.to_string())?;
105 let ts_event = parse_rfc3339_timestamp(&trade.timestamp, "trade.timestamp")?;
106
107 TradeTick::new_checked(
108 instrument_id,
109 price,
110 size,
111 aggressor,
112 trade_id,
113 ts_event,
114 ts_init,
115 )
116 .context("Failed to construct TradeTick from Kraken WebSocket trade")
117}
118
119pub fn parse_book_deltas(
129 book: &KrakenWsBookData,
130 instrument: &InstrumentAny,
131 sequence: u64,
132 ts_init: UnixNanos,
133) -> anyhow::Result<Vec<OrderBookDelta>> {
134 let instrument_id = instrument.id();
135 let price_precision = instrument.price_precision();
136 let size_precision = instrument.size_precision();
137
138 let ts_event = if let Some(ref timestamp) = book.timestamp {
140 parse_rfc3339_timestamp(timestamp, "book.timestamp")?
141 } else {
142 ts_init
143 };
144
145 let mut deltas = Vec::new();
146 let mut current_sequence = sequence;
147
148 if let Some(ref bids) = book.bids {
150 for level in bids {
151 let delta = parse_book_level(
152 level,
153 OrderSide::Buy,
154 instrument_id,
155 price_precision,
156 size_precision,
157 current_sequence,
158 ts_event,
159 ts_init,
160 )?;
161 deltas.push(delta);
162 current_sequence += 1;
163 }
164 }
165
166 if let Some(ref asks) = book.asks {
168 for level in asks {
169 let delta = parse_book_level(
170 level,
171 OrderSide::Sell,
172 instrument_id,
173 price_precision,
174 size_precision,
175 current_sequence,
176 ts_event,
177 ts_init,
178 )?;
179 deltas.push(delta);
180 current_sequence += 1;
181 }
182 }
183
184 Ok(deltas)
185}
186
187#[allow(clippy::too_many_arguments)]
188fn parse_book_level(
189 level: &KrakenWsBookLevel,
190 side: OrderSide,
191 instrument_id: InstrumentId,
192 price_precision: u8,
193 size_precision: u8,
194 sequence: u64,
195 ts_event: UnixNanos,
196 ts_init: UnixNanos,
197) -> anyhow::Result<OrderBookDelta> {
198 let price = Price::new_checked(level.price, price_precision)
199 .with_context(|| format!("Failed to construct Price with precision {price_precision}"))?;
200 let size = Quantity::new_checked(level.qty, size_precision)
201 .with_context(|| format!("Failed to construct Quantity with precision {size_precision}"))?;
202
203 let action = if size.raw == 0 {
205 BookAction::Delete
206 } else {
207 BookAction::Update
208 };
209
210 let order_id = price.raw as u64;
212 let order = BookOrder::new(side, price, size, order_id);
213
214 Ok(OrderBookDelta::new(
215 instrument_id,
216 action,
217 order,
218 0, sequence,
220 ts_event,
221 ts_init,
222 ))
223}
224
225fn parse_rfc3339_timestamp(value: &str, field: &str) -> anyhow::Result<UnixNanos> {
226 use chrono::DateTime;
227
228 let dt = DateTime::parse_from_rfc3339(value)
229 .with_context(|| format!("Failed to parse {field}='{value}' as RFC3339 timestamp"))?;
230
231 Ok(UnixNanos::from(
232 dt.timestamp_nanos_opt()
233 .with_context(|| format!("Timestamp out of range for {field}"))? as u64,
234 ))
235}
236
237#[cfg(test)]
242mod tests {
243 use nautilus_model::{identifiers::Symbol, types::Currency};
244 use rstest::rstest;
245
246 use super::*;
247 use crate::{common::consts::KRAKEN_VENUE, websocket::messages::KrakenWsMessage};
248
249 const TS: UnixNanos = UnixNanos::new(1_700_000_000_000_000_000);
250
251 fn load_test_json(filename: &str) -> String {
252 let path = format!("test_data/{filename}");
253 std::fs::read_to_string(&path)
254 .unwrap_or_else(|e| panic!("Failed to load test data from {path}: {e}"))
255 }
256
257 fn create_mock_instrument() -> InstrumentAny {
258 use nautilus_model::instruments::currency_pair::CurrencyPair;
259
260 let instrument_id = InstrumentId::new(Symbol::new("BTC/USD"), *KRAKEN_VENUE);
261 InstrumentAny::CurrencyPair(CurrencyPair::new(
262 instrument_id,
263 Symbol::new("XBTUSDT"),
264 Currency::BTC(),
265 Currency::USDT(),
266 1, 8, Price::from("0.1"),
269 Quantity::from("0.00000001"),
270 None,
271 None,
272 None,
273 None,
274 None,
275 None,
276 None,
277 None,
278 None,
279 None,
280 None,
281 None,
282 TS,
283 TS,
284 ))
285 }
286
287 #[rstest]
288 fn test_parse_quote_tick() {
289 let json = load_test_json("ws_ticker_snapshot.json");
290 let message: KrakenWsMessage = serde_json::from_str(&json).unwrap();
291 let ticker: KrakenWsTickerData = serde_json::from_value(message.data[0].clone()).unwrap();
292
293 let instrument = create_mock_instrument();
294 let quote_tick = parse_quote_tick(&ticker, &instrument, TS).unwrap();
295
296 assert_eq!(quote_tick.instrument_id, instrument.id());
297 assert!(quote_tick.bid_price.as_f64() > 0.0);
298 assert!(quote_tick.ask_price.as_f64() > 0.0);
299 assert!(quote_tick.bid_size.as_f64() > 0.0);
300 assert!(quote_tick.ask_size.as_f64() > 0.0);
301 }
302
303 #[rstest]
304 fn test_parse_trade_tick() {
305 let json = load_test_json("ws_trade_update.json");
306 let message: KrakenWsMessage = serde_json::from_str(&json).unwrap();
307 let trade: KrakenWsTradeData = serde_json::from_value(message.data[0].clone()).unwrap();
308
309 let instrument = create_mock_instrument();
310 let trade_tick = parse_trade_tick(&trade, &instrument, TS).unwrap();
311
312 assert_eq!(trade_tick.instrument_id, instrument.id());
313 assert!(trade_tick.price.as_f64() > 0.0);
314 assert!(trade_tick.size.as_f64() > 0.0);
315 assert!(matches!(
316 trade_tick.aggressor_side,
317 AggressorSide::Buyer | AggressorSide::Seller
318 ));
319 }
320
321 #[rstest]
322 fn test_parse_book_deltas_snapshot() {
323 let json = load_test_json("ws_book_snapshot.json");
324 let message: KrakenWsMessage = serde_json::from_str(&json).unwrap();
325 let book: KrakenWsBookData = serde_json::from_value(message.data[0].clone()).unwrap();
326
327 let instrument = create_mock_instrument();
328 let deltas = parse_book_deltas(&book, &instrument, 1, TS).unwrap();
329
330 assert!(!deltas.is_empty());
331
332 let bid_count = deltas
334 .iter()
335 .filter(|d| d.order.side == OrderSide::Buy)
336 .count();
337 let ask_count = deltas
338 .iter()
339 .filter(|d| d.order.side == OrderSide::Sell)
340 .count();
341
342 assert!(bid_count > 0);
343 assert!(ask_count > 0);
344
345 let first_delta = &deltas[0];
347 assert_eq!(first_delta.instrument_id, instrument.id());
348 assert!(first_delta.order.price.as_f64() > 0.0);
349 assert!(first_delta.order.size.as_f64() > 0.0);
350 }
351
352 #[rstest]
353 fn test_parse_book_deltas_update() {
354 let json = load_test_json("ws_book_update.json");
355 let message: KrakenWsMessage = serde_json::from_str(&json).unwrap();
356 let book: KrakenWsBookData = serde_json::from_value(message.data[0].clone()).unwrap();
357
358 let instrument = create_mock_instrument();
359 let deltas = parse_book_deltas(&book, &instrument, 1, TS).unwrap();
360
361 assert!(!deltas.is_empty());
362
363 let first_delta = &deltas[0];
365 assert_eq!(first_delta.instrument_id, instrument.id());
366 assert!(first_delta.order.price.as_f64() > 0.0);
367 }
368
369 #[rstest]
370 fn test_parse_rfc3339_timestamp() {
371 let timestamp = "2023-10-06T17:35:55.440295Z";
372 let result = parse_rfc3339_timestamp(timestamp, "test").unwrap();
373 assert!(result.as_u64() > 0);
374 }
375}