nautilus_kraken/websocket/
parse.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket message parsers for converting Kraken streaming data to Nautilus domain models.
17
18use anyhow::Context;
19use nautilus_core::nanos::UnixNanos;
20use nautilus_model::{
21    data::{BookOrder, OrderBookDelta, QuoteTick, TradeTick},
22    enums::{AggressorSide, BookAction, OrderSide},
23    identifiers::{InstrumentId, TradeId},
24    instruments::{Instrument, any::InstrumentAny},
25    types::{Price, Quantity},
26};
27
28use crate::{
29    common::enums::KrakenOrderSide,
30    websocket::messages::{
31        KrakenWsBookData, KrakenWsBookLevel, KrakenWsTickerData, KrakenWsTradeData,
32    },
33};
34
35/// Parses Kraken WebSocket ticker data into a Nautilus quote tick.
36///
37/// # Errors
38///
39/// Returns an error if:
40/// - Bid or ask price/quantity cannot be parsed.
41pub fn parse_quote_tick(
42    ticker: &KrakenWsTickerData,
43    instrument: &InstrumentAny,
44    ts_init: UnixNanos,
45) -> anyhow::Result<QuoteTick> {
46    let instrument_id = instrument.id();
47    let price_precision = instrument.price_precision();
48    let size_precision = instrument.size_precision();
49
50    let bid_price = Price::new_checked(ticker.bid, price_precision).with_context(|| {
51        format!("Failed to construct bid Price with precision {price_precision}")
52    })?;
53    let bid_size = Quantity::new_checked(ticker.bid_qty, size_precision).with_context(|| {
54        format!("Failed to construct bid Quantity with precision {size_precision}")
55    })?;
56
57    let ask_price = Price::new_checked(ticker.ask, price_precision).with_context(|| {
58        format!("Failed to construct ask Price with precision {price_precision}")
59    })?;
60    let ask_size = Quantity::new_checked(ticker.ask_qty, size_precision).with_context(|| {
61        format!("Failed to construct ask Quantity with precision {size_precision}")
62    })?;
63
64    // Kraken ticker doesn't include timestamp
65    let ts_event = ts_init;
66
67    Ok(QuoteTick::new(
68        instrument_id,
69        bid_price,
70        ask_price,
71        bid_size,
72        ask_size,
73        ts_event,
74        ts_init,
75    ))
76}
77
78/// Parses Kraken WebSocket trade data into a Nautilus trade tick.
79///
80/// # Errors
81///
82/// Returns an error if:
83/// - Price or quantity cannot be parsed.
84/// - Timestamp is invalid.
85pub fn parse_trade_tick(
86    trade: &KrakenWsTradeData,
87    instrument: &InstrumentAny,
88    ts_init: UnixNanos,
89) -> anyhow::Result<TradeTick> {
90    let instrument_id = instrument.id();
91    let price_precision = instrument.price_precision();
92    let size_precision = instrument.size_precision();
93
94    let price = Price::new_checked(trade.price, price_precision)
95        .with_context(|| format!("Failed to construct Price with precision {price_precision}"))?;
96    let size = Quantity::new_checked(trade.qty, size_precision)
97        .with_context(|| format!("Failed to construct Quantity with precision {size_precision}"))?;
98
99    let aggressor = match trade.side {
100        KrakenOrderSide::Buy => AggressorSide::Buyer,
101        KrakenOrderSide::Sell => AggressorSide::Seller,
102    };
103
104    let trade_id = TradeId::new_checked(trade.trade_id.to_string())?;
105    let ts_event = parse_rfc3339_timestamp(&trade.timestamp, "trade.timestamp")?;
106
107    TradeTick::new_checked(
108        instrument_id,
109        price,
110        size,
111        aggressor,
112        trade_id,
113        ts_event,
114        ts_init,
115    )
116    .context("Failed to construct TradeTick from Kraken WebSocket trade")
117}
118
119/// Parses Kraken WebSocket book data into Nautilus order book deltas.
120///
121/// Returns a vector of deltas, one for each bid and ask level.
122///
123/// # Errors
124///
125/// Returns an error if:
126/// - Price or quantity cannot be parsed.
127/// - Timestamp is invalid.
128pub fn parse_book_deltas(
129    book: &KrakenWsBookData,
130    instrument: &InstrumentAny,
131    sequence: u64,
132    ts_init: UnixNanos,
133) -> anyhow::Result<Vec<OrderBookDelta>> {
134    let instrument_id = instrument.id();
135    let price_precision = instrument.price_precision();
136    let size_precision = instrument.size_precision();
137
138    // Parse timestamp if available, otherwise use ts_init
139    let ts_event = if let Some(ref timestamp) = book.timestamp {
140        parse_rfc3339_timestamp(timestamp, "book.timestamp")?
141    } else {
142        ts_init
143    };
144
145    let mut deltas = Vec::new();
146    let mut current_sequence = sequence;
147
148    // Process bids
149    if let Some(ref bids) = book.bids {
150        for level in bids {
151            let delta = parse_book_level(
152                level,
153                OrderSide::Buy,
154                instrument_id,
155                price_precision,
156                size_precision,
157                current_sequence,
158                ts_event,
159                ts_init,
160            )?;
161            deltas.push(delta);
162            current_sequence += 1;
163        }
164    }
165
166    // Process asks
167    if let Some(ref asks) = book.asks {
168        for level in asks {
169            let delta = parse_book_level(
170                level,
171                OrderSide::Sell,
172                instrument_id,
173                price_precision,
174                size_precision,
175                current_sequence,
176                ts_event,
177                ts_init,
178            )?;
179            deltas.push(delta);
180            current_sequence += 1;
181        }
182    }
183
184    Ok(deltas)
185}
186
187#[allow(clippy::too_many_arguments)]
188fn parse_book_level(
189    level: &KrakenWsBookLevel,
190    side: OrderSide,
191    instrument_id: InstrumentId,
192    price_precision: u8,
193    size_precision: u8,
194    sequence: u64,
195    ts_event: UnixNanos,
196    ts_init: UnixNanos,
197) -> anyhow::Result<OrderBookDelta> {
198    let price = Price::new_checked(level.price, price_precision)
199        .with_context(|| format!("Failed to construct Price with precision {price_precision}"))?;
200    let size = Quantity::new_checked(level.qty, size_precision)
201        .with_context(|| format!("Failed to construct Quantity with precision {size_precision}"))?;
202
203    // Determine action based on quantity
204    let action = if size.raw == 0 {
205        BookAction::Delete
206    } else {
207        BookAction::Update
208    };
209
210    // Create order ID from price (Kraken doesn't provide order IDs)
211    let order_id = price.raw as u64;
212    let order = BookOrder::new(side, price, size, order_id);
213
214    Ok(OrderBookDelta::new(
215        instrument_id,
216        action,
217        order,
218        0, // flags
219        sequence,
220        ts_event,
221        ts_init,
222    ))
223}
224
225fn parse_rfc3339_timestamp(value: &str, field: &str) -> anyhow::Result<UnixNanos> {
226    use chrono::DateTime;
227
228    let dt = DateTime::parse_from_rfc3339(value)
229        .with_context(|| format!("Failed to parse {field}='{value}' as RFC3339 timestamp"))?;
230
231    Ok(UnixNanos::from(
232        dt.timestamp_nanos_opt()
233            .with_context(|| format!("Timestamp out of range for {field}"))? as u64,
234    ))
235}
236
237////////////////////////////////////////////////////////////////////////////////
238// Tests
239////////////////////////////////////////////////////////////////////////////////
240
241#[cfg(test)]
242mod tests {
243    use nautilus_model::{identifiers::Symbol, types::Currency};
244    use rstest::rstest;
245
246    use super::*;
247    use crate::{common::consts::KRAKEN_VENUE, websocket::messages::KrakenWsMessage};
248
249    const TS: UnixNanos = UnixNanos::new(1_700_000_000_000_000_000);
250
251    fn load_test_json(filename: &str) -> String {
252        let path = format!("test_data/{filename}");
253        std::fs::read_to_string(&path)
254            .unwrap_or_else(|e| panic!("Failed to load test data from {path}: {e}"))
255    }
256
257    fn create_mock_instrument() -> InstrumentAny {
258        use nautilus_model::instruments::currency_pair::CurrencyPair;
259
260        let instrument_id = InstrumentId::new(Symbol::new("BTC/USD"), *KRAKEN_VENUE);
261        InstrumentAny::CurrencyPair(CurrencyPair::new(
262            instrument_id,
263            Symbol::new("XBTUSDT"),
264            Currency::BTC(),
265            Currency::USDT(),
266            1, // price_precision
267            8, // size_precision
268            Price::from("0.1"),
269            Quantity::from("0.00000001"),
270            None,
271            None,
272            None,
273            None,
274            None,
275            None,
276            None,
277            None,
278            None,
279            None,
280            None,
281            None,
282            TS,
283            TS,
284        ))
285    }
286
287    #[rstest]
288    fn test_parse_quote_tick() {
289        let json = load_test_json("ws_ticker_snapshot.json");
290        let message: KrakenWsMessage = serde_json::from_str(&json).unwrap();
291        let ticker: KrakenWsTickerData = serde_json::from_value(message.data[0].clone()).unwrap();
292
293        let instrument = create_mock_instrument();
294        let quote_tick = parse_quote_tick(&ticker, &instrument, TS).unwrap();
295
296        assert_eq!(quote_tick.instrument_id, instrument.id());
297        assert!(quote_tick.bid_price.as_f64() > 0.0);
298        assert!(quote_tick.ask_price.as_f64() > 0.0);
299        assert!(quote_tick.bid_size.as_f64() > 0.0);
300        assert!(quote_tick.ask_size.as_f64() > 0.0);
301    }
302
303    #[rstest]
304    fn test_parse_trade_tick() {
305        let json = load_test_json("ws_trade_update.json");
306        let message: KrakenWsMessage = serde_json::from_str(&json).unwrap();
307        let trade: KrakenWsTradeData = serde_json::from_value(message.data[0].clone()).unwrap();
308
309        let instrument = create_mock_instrument();
310        let trade_tick = parse_trade_tick(&trade, &instrument, TS).unwrap();
311
312        assert_eq!(trade_tick.instrument_id, instrument.id());
313        assert!(trade_tick.price.as_f64() > 0.0);
314        assert!(trade_tick.size.as_f64() > 0.0);
315        assert!(matches!(
316            trade_tick.aggressor_side,
317            AggressorSide::Buyer | AggressorSide::Seller
318        ));
319    }
320
321    #[rstest]
322    fn test_parse_book_deltas_snapshot() {
323        let json = load_test_json("ws_book_snapshot.json");
324        let message: KrakenWsMessage = serde_json::from_str(&json).unwrap();
325        let book: KrakenWsBookData = serde_json::from_value(message.data[0].clone()).unwrap();
326
327        let instrument = create_mock_instrument();
328        let deltas = parse_book_deltas(&book, &instrument, 1, TS).unwrap();
329
330        assert!(!deltas.is_empty());
331
332        // Check that we have both bids and asks
333        let bid_count = deltas
334            .iter()
335            .filter(|d| d.order.side == OrderSide::Buy)
336            .count();
337        let ask_count = deltas
338            .iter()
339            .filter(|d| d.order.side == OrderSide::Sell)
340            .count();
341
342        assert!(bid_count > 0);
343        assert!(ask_count > 0);
344
345        // Check first delta
346        let first_delta = &deltas[0];
347        assert_eq!(first_delta.instrument_id, instrument.id());
348        assert!(first_delta.order.price.as_f64() > 0.0);
349        assert!(first_delta.order.size.as_f64() > 0.0);
350    }
351
352    #[rstest]
353    fn test_parse_book_deltas_update() {
354        let json = load_test_json("ws_book_update.json");
355        let message: KrakenWsMessage = serde_json::from_str(&json).unwrap();
356        let book: KrakenWsBookData = serde_json::from_value(message.data[0].clone()).unwrap();
357
358        let instrument = create_mock_instrument();
359        let deltas = parse_book_deltas(&book, &instrument, 1, TS).unwrap();
360
361        assert!(!deltas.is_empty());
362
363        // Check that we have at least one delta
364        let first_delta = &deltas[0];
365        assert_eq!(first_delta.instrument_id, instrument.id());
366        assert!(first_delta.order.price.as_f64() > 0.0);
367    }
368
369    #[rstest]
370    fn test_parse_rfc3339_timestamp() {
371        let timestamp = "2023-10-06T17:35:55.440295Z";
372        let result = parse_rfc3339_timestamp(timestamp, "test").unwrap();
373        assert!(result.as_u64() > 0);
374    }
375}