Skip to main content

nautilus_kraken/websocket/spot_v2/
parse.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket message parsers for converting Kraken streaming data to Nautilus domain models.
17
18use anyhow::Context;
19use nautilus_core::{UUID4, nanos::UnixNanos};
20use nautilus_model::{
21    data::{Bar, BarSpecification, BarType, BookOrder, OrderBookDelta, QuoteTick, TradeTick},
22    enums::{
23        AggregationSource, AggressorSide, BarAggregation, BookAction, LiquiditySide, OrderSide,
24        OrderStatus, OrderType, PriceType, TimeInForce, TriggerType,
25    },
26    identifiers::{AccountId, ClientOrderId, InstrumentId, TradeId, VenueOrderId},
27    instruments::{Instrument, any::InstrumentAny},
28    reports::{FillReport, OrderStatusReport},
29    types::{Currency, Money, Price, Quantity},
30};
31
32use super::{
33    enums::{KrakenExecType, KrakenLiquidityInd, KrakenWsOrderStatus},
34    messages::{
35        KrakenWsBookData, KrakenWsBookLevel, KrakenWsExecutionData, KrakenWsOhlcData,
36        KrakenWsTickerData, KrakenWsTradeData,
37    },
38};
39use crate::common::enums::{KrakenOrderSide, KrakenOrderType, KrakenTimeInForce};
40
41/// Parses Kraken WebSocket ticker data into a Nautilus quote tick.
42///
43/// # Errors
44///
45/// Returns an error if:
46/// - Bid or ask price/quantity cannot be parsed.
47pub fn parse_quote_tick(
48    ticker: &KrakenWsTickerData,
49    instrument: &InstrumentAny,
50    ts_init: UnixNanos,
51) -> anyhow::Result<QuoteTick> {
52    let instrument_id = instrument.id();
53    let price_precision = instrument.price_precision();
54    let size_precision = instrument.size_precision();
55
56    let bid_price = Price::new_checked(ticker.bid, price_precision).with_context(|| {
57        format!("Failed to construct bid Price with precision {price_precision}")
58    })?;
59    let bid_size = Quantity::new_checked(ticker.bid_qty, size_precision).with_context(|| {
60        format!("Failed to construct bid Quantity with precision {size_precision}")
61    })?;
62
63    let ask_price = Price::new_checked(ticker.ask, price_precision).with_context(|| {
64        format!("Failed to construct ask Price with precision {price_precision}")
65    })?;
66    let ask_size = Quantity::new_checked(ticker.ask_qty, size_precision).with_context(|| {
67        format!("Failed to construct ask Quantity with precision {size_precision}")
68    })?;
69
70    // Kraken ticker doesn't include timestamp
71    let ts_event = ts_init;
72
73    Ok(QuoteTick::new(
74        instrument_id,
75        bid_price,
76        ask_price,
77        bid_size,
78        ask_size,
79        ts_event,
80        ts_init,
81    ))
82}
83
84/// Parses Kraken WebSocket trade data into a Nautilus trade tick.
85///
86/// # Errors
87///
88/// Returns an error if:
89/// - Price or quantity cannot be parsed.
90/// - Timestamp is invalid.
91pub fn parse_trade_tick(
92    trade: &KrakenWsTradeData,
93    instrument: &InstrumentAny,
94    ts_init: UnixNanos,
95) -> anyhow::Result<TradeTick> {
96    let instrument_id = instrument.id();
97    let price_precision = instrument.price_precision();
98    let size_precision = instrument.size_precision();
99
100    let price = Price::new_checked(trade.price, price_precision)
101        .with_context(|| format!("Failed to construct Price with precision {price_precision}"))?;
102    let size = Quantity::new_checked(trade.qty, size_precision)
103        .with_context(|| format!("Failed to construct Quantity with precision {size_precision}"))?;
104
105    let aggressor = match trade.side {
106        KrakenOrderSide::Buy => AggressorSide::Buyer,
107        KrakenOrderSide::Sell => AggressorSide::Seller,
108    };
109
110    let trade_id = TradeId::new_checked(trade.trade_id.to_string())?;
111    let ts_event = parse_rfc3339_timestamp(&trade.timestamp, "trade.timestamp")?;
112
113    TradeTick::new_checked(
114        instrument_id,
115        price,
116        size,
117        aggressor,
118        trade_id,
119        ts_event,
120        ts_init,
121    )
122    .context("Failed to construct TradeTick from Kraken WebSocket trade")
123}
124
125/// Parses Kraken WebSocket book data into Nautilus order book deltas.
126///
127/// Returns a vector of deltas, one for each bid and ask level.
128///
129/// # Errors
130///
131/// Returns an error if:
132/// - Price or quantity cannot be parsed.
133/// - Timestamp is invalid.
134pub fn parse_book_deltas(
135    book: &KrakenWsBookData,
136    instrument: &InstrumentAny,
137    sequence: u64,
138    ts_init: UnixNanos,
139) -> anyhow::Result<Vec<OrderBookDelta>> {
140    let instrument_id = instrument.id();
141    let price_precision = instrument.price_precision();
142    let size_precision = instrument.size_precision();
143
144    // Parse timestamp if available, otherwise use ts_init
145    let ts_event = if let Some(ref timestamp) = book.timestamp {
146        parse_rfc3339_timestamp(timestamp, "book.timestamp")?
147    } else {
148        ts_init
149    };
150
151    let mut deltas = Vec::new();
152    let mut current_sequence = sequence;
153
154    if let Some(ref bids) = book.bids {
155        for level in bids {
156            let delta = parse_book_level(
157                level,
158                OrderSide::Buy,
159                instrument_id,
160                price_precision,
161                size_precision,
162                current_sequence,
163                ts_event,
164                ts_init,
165            )?;
166            deltas.push(delta);
167            current_sequence += 1;
168        }
169    }
170
171    if let Some(ref asks) = book.asks {
172        for level in asks {
173            let delta = parse_book_level(
174                level,
175                OrderSide::Sell,
176                instrument_id,
177                price_precision,
178                size_precision,
179                current_sequence,
180                ts_event,
181                ts_init,
182            )?;
183            deltas.push(delta);
184            current_sequence += 1;
185        }
186    }
187
188    Ok(deltas)
189}
190
191#[allow(clippy::too_many_arguments)]
192fn parse_book_level(
193    level: &KrakenWsBookLevel,
194    side: OrderSide,
195    instrument_id: InstrumentId,
196    price_precision: u8,
197    size_precision: u8,
198    sequence: u64,
199    ts_event: UnixNanos,
200    ts_init: UnixNanos,
201) -> anyhow::Result<OrderBookDelta> {
202    let price = Price::new_checked(level.price, price_precision)
203        .with_context(|| format!("Failed to construct Price with precision {price_precision}"))?;
204    let size = Quantity::new_checked(level.qty, size_precision)
205        .with_context(|| format!("Failed to construct Quantity with precision {size_precision}"))?;
206
207    // Determine action based on quantity
208    let action = if size.raw == 0 {
209        BookAction::Delete
210    } else {
211        BookAction::Update
212    };
213
214    // Create order ID from price (Kraken doesn't provide order IDs)
215    let order_id = price.raw as u64;
216    let order = BookOrder::new(side, price, size, order_id);
217
218    Ok(OrderBookDelta::new(
219        instrument_id,
220        action,
221        order,
222        0, // flags
223        sequence,
224        ts_event,
225        ts_init,
226    ))
227}
228
229fn parse_rfc3339_timestamp(value: &str, field: &str) -> anyhow::Result<UnixNanos> {
230    value
231        .parse::<UnixNanos>()
232        .map_err(|e| anyhow::anyhow!("Failed to parse {field}='{value}': {e}"))
233}
234
235/// Parses Kraken WebSocket OHLC data into a Nautilus bar.
236///
237/// The bar's `ts_event` is computed as `interval_begin` + `interval` minutes.
238///
239/// # Errors
240///
241/// Returns an error if:
242/// - Price or quantity values cannot be parsed.
243/// - The interval cannot be converted to a valid bar specification.
244pub fn parse_ws_bar(
245    ohlc: &KrakenWsOhlcData,
246    instrument: &InstrumentAny,
247    ts_init: UnixNanos,
248) -> anyhow::Result<Bar> {
249    let instrument_id = instrument.id();
250    let price_precision = instrument.price_precision();
251    let size_precision = instrument.size_precision();
252
253    let open = Price::new_checked(ohlc.open, price_precision)?;
254    let high = Price::new_checked(ohlc.high, price_precision)?;
255    let low = Price::new_checked(ohlc.low, price_precision)?;
256    let close = Price::new_checked(ohlc.close, price_precision)?;
257    let volume = Quantity::new_checked(ohlc.volume, size_precision)?;
258
259    let bar_spec = interval_to_bar_spec(ohlc.interval)?;
260    let bar_type = BarType::new(instrument_id, bar_spec, AggregationSource::External);
261
262    // Compute bar close time: interval_begin + interval minutes
263    let interval_secs = i64::from(ohlc.interval) * 60;
264    let close_time = ohlc.interval_begin + chrono::Duration::seconds(interval_secs);
265    let ts_event = UnixNanos::from(close_time.timestamp_nanos_opt().unwrap_or(0) as u64);
266
267    Bar::new_checked(bar_type, open, high, low, close, volume, ts_event, ts_init)
268}
269
270/// Converts a Kraken OHLC interval (minutes) to a Nautilus bar specification.
271fn interval_to_bar_spec(interval: u32) -> anyhow::Result<BarSpecification> {
272    let (step, aggregation) = match interval {
273        1 => (1, BarAggregation::Minute),
274        5 => (5, BarAggregation::Minute),
275        15 => (15, BarAggregation::Minute),
276        30 => (30, BarAggregation::Minute),
277        60 => (1, BarAggregation::Hour),
278        240 => (4, BarAggregation::Hour),
279        1440 => (1, BarAggregation::Day),
280        10080 => (1, BarAggregation::Week),
281        21600 => (15, BarAggregation::Day), // 21600 min = 360 hours = 15 days
282        _ => anyhow::bail!("Unsupported Kraken OHLC interval: {interval}"),
283    };
284
285    Ok(BarSpecification::new(step, aggregation, PriceType::Last))
286}
287
288/// Parses Kraken execution type and order status to Nautilus order status.
289fn parse_order_status(
290    exec_type: KrakenExecType,
291    order_status: Option<KrakenWsOrderStatus>,
292) -> OrderStatus {
293    match exec_type {
294        KrakenExecType::Canceled => return OrderStatus::Canceled,
295        KrakenExecType::Expired => return OrderStatus::Expired,
296        KrakenExecType::Filled => return OrderStatus::Filled,
297        KrakenExecType::Trade => {
298            return match order_status {
299                Some(KrakenWsOrderStatus::Filled) => OrderStatus::Filled,
300                Some(KrakenWsOrderStatus::PartiallyFilled) | None => OrderStatus::PartiallyFilled,
301                Some(status) => status.into(),
302            };
303        }
304        _ => {}
305    }
306
307    match order_status {
308        Some(status) => status.into(),
309        None => OrderStatus::Accepted,
310    }
311}
312
313/// Parses Kraken order type to Nautilus order type.
314fn parse_order_type(order_type: Option<KrakenOrderType>) -> OrderType {
315    match order_type {
316        Some(KrakenOrderType::Market) => OrderType::Market,
317        Some(KrakenOrderType::Limit) => OrderType::Limit,
318        Some(KrakenOrderType::StopLoss) => OrderType::StopMarket,
319        Some(KrakenOrderType::TakeProfit) => OrderType::MarketIfTouched,
320        Some(KrakenOrderType::StopLossLimit) => OrderType::StopLimit,
321        Some(KrakenOrderType::TakeProfitLimit) => OrderType::LimitIfTouched,
322        Some(KrakenOrderType::SettlePosition) => OrderType::Market,
323        None => OrderType::Limit,
324    }
325}
326
327/// Parses Kraken order side to Nautilus order side.
328fn parse_order_side(side: Option<KrakenOrderSide>) -> OrderSide {
329    match side {
330        Some(KrakenOrderSide::Buy) => OrderSide::Buy,
331        Some(KrakenOrderSide::Sell) => OrderSide::Sell,
332        None => OrderSide::Buy,
333    }
334}
335
336/// Parses Kraken time-in-force to Nautilus time-in-force.
337fn parse_time_in_force(
338    time_in_force: Option<KrakenTimeInForce>,
339    post_only: Option<bool>,
340) -> TimeInForce {
341    // Handle post_only flag
342    if post_only == Some(true) {
343        return TimeInForce::Gtc;
344    }
345
346    match time_in_force {
347        Some(KrakenTimeInForce::GoodTilCancelled) => TimeInForce::Gtc,
348        Some(KrakenTimeInForce::ImmediateOrCancel) => TimeInForce::Ioc,
349        Some(KrakenTimeInForce::GoodTilDate) => TimeInForce::Gtd,
350        None => TimeInForce::Gtc,
351    }
352}
353
354fn parse_liquidity_side(liquidity_ind: Option<KrakenLiquidityInd>) -> LiquiditySide {
355    liquidity_ind.map_or(LiquiditySide::NoLiquiditySide, Into::into)
356}
357
358/// Parses a Kraken WebSocket execution message into an [`OrderStatusReport`].
359///
360/// # Errors
361///
362/// Returns an error if required fields are missing or cannot be parsed.
363pub fn parse_ws_order_status_report(
364    exec: &KrakenWsExecutionData,
365    instrument: &InstrumentAny,
366    account_id: AccountId,
367    cached_order_qty: Option<f64>,
368    ts_init: UnixNanos,
369) -> anyhow::Result<OrderStatusReport> {
370    let instrument_id = instrument.id();
371    let venue_order_id = VenueOrderId::new(&exec.order_id);
372    let order_side = parse_order_side(exec.side);
373    let order_type = parse_order_type(exec.order_type);
374    let time_in_force = parse_time_in_force(exec.time_in_force, exec.post_only);
375    let order_status = parse_order_status(exec.exec_type, exec.order_status);
376
377    let price_precision = instrument.price_precision();
378    let size_precision = instrument.size_precision();
379
380    // Quantity fallback: order_qty -> cached -> cum_qty -> last_qty (for trade snapshots)
381    let last_qty = exec
382        .last_qty
383        .map(|qty| Quantity::new_checked(qty, size_precision))
384        .transpose()
385        .context("Failed to parse last_qty")?;
386
387    let filled_qty = exec
388        .cum_qty
389        .map(|qty| Quantity::new_checked(qty, size_precision))
390        .transpose()
391        .context("Failed to parse cum_qty")?
392        .or(last_qty)
393        .unwrap_or_else(|| Quantity::new(0.0, size_precision));
394
395    let quantity = exec
396        .order_qty
397        .or(cached_order_qty)
398        .map(|qty| Quantity::new_checked(qty, size_precision))
399        .transpose()
400        .context("Failed to parse order_qty")?
401        .unwrap_or(filled_qty);
402
403    let ts_event = parse_rfc3339_timestamp(&exec.timestamp, "execution.timestamp")?;
404
405    let mut report = OrderStatusReport::new(
406        account_id,
407        instrument_id,
408        None, // client_order_id set below if present
409        venue_order_id,
410        order_side,
411        order_type,
412        time_in_force,
413        order_status,
414        quantity,
415        filled_qty,
416        ts_event,
417        ts_event,
418        ts_init,
419        Some(UUID4::new()),
420    );
421
422    if let Some(ref cl_ord_id) = exec.cl_ord_id
423        && !cl_ord_id.is_empty()
424    {
425        report = report.with_client_order_id(ClientOrderId::new(cl_ord_id));
426    }
427
428    // Price fallback: limit_price -> avg_price -> last_price
429    // Note: pending_new messages may not include any price fields, which is fine for
430    // orders we submitted (engine already has the price from submission)
431    let price_value = exec
432        .limit_price
433        .filter(|&p| p > 0.0)
434        .or(exec.avg_price.filter(|&p| p > 0.0))
435        .or(exec.last_price.filter(|&p| p > 0.0));
436
437    if let Some(px) = price_value {
438        let price =
439            Price::new_checked(px, price_precision).context("Failed to parse order price")?;
440        report = report.with_price(price);
441    }
442
443    // avg_px fallback: avg_price -> cum_cost / cum_qty -> last_price (for single trades/snapshots)
444    let avg_px = exec
445        .avg_price
446        .filter(|&p| p > 0.0)
447        .or_else(|| match (exec.cum_cost, exec.cum_qty) {
448            (Some(cost), Some(qty)) if qty > 0.0 => Some(cost / qty),
449            _ => None,
450        })
451        .or_else(|| exec.last_price.filter(|&p| p > 0.0));
452
453    if let Some(avg_price) = avg_px {
454        report = report.with_avg_px(avg_price)?;
455    }
456
457    if exec.post_only == Some(true) {
458        report = report.with_post_only(true);
459    }
460
461    if exec.reduce_only == Some(true) {
462        report = report.with_reduce_only(true);
463    }
464
465    if let Some(ref reason) = exec.reason
466        && !reason.is_empty()
467    {
468        report = report.with_cancel_reason(reason.clone());
469    }
470
471    // Set trigger type for conditional orders (WebSocket doesn't provide trigger field)
472    let is_conditional = matches!(
473        order_type,
474        OrderType::StopMarket
475            | OrderType::StopLimit
476            | OrderType::MarketIfTouched
477            | OrderType::LimitIfTouched
478    );
479    if is_conditional {
480        report = report.with_trigger_type(TriggerType::Default);
481    }
482
483    Ok(report)
484}
485
486/// Parses a Kraken WebSocket trade execution into a [`FillReport`].
487///
488/// This should only be called when exec_type is "trade".
489///
490/// # Errors
491///
492/// Returns an error if required fields are missing or cannot be parsed.
493pub fn parse_ws_fill_report(
494    exec: &KrakenWsExecutionData,
495    instrument: &InstrumentAny,
496    account_id: AccountId,
497    ts_init: UnixNanos,
498) -> anyhow::Result<FillReport> {
499    let instrument_id = instrument.id();
500    let venue_order_id = VenueOrderId::new(&exec.order_id);
501
502    let exec_id = exec
503        .exec_id
504        .as_ref()
505        .context("Missing exec_id for trade execution")?;
506    let trade_id =
507        TradeId::new_checked(exec_id).context("Invalid exec_id in Kraken trade execution")?;
508
509    let order_side = parse_order_side(exec.side);
510
511    let price_precision = instrument.price_precision();
512    let size_precision = instrument.size_precision();
513
514    let last_qty = exec
515        .last_qty
516        .map(|qty| Quantity::new_checked(qty, size_precision))
517        .transpose()
518        .context("Failed to parse last_qty")?
519        .context("Missing last_qty for trade execution")?;
520
521    let last_px = exec
522        .last_price
523        .map(|px| Price::new_checked(px, price_precision))
524        .transpose()
525        .context("Failed to parse last_price")?
526        .context("Missing last_price for trade execution")?;
527
528    let liquidity_side = parse_liquidity_side(exec.liquidity_ind);
529
530    // Calculate commission from fees array
531    let commission = if let Some(ref fees) = exec.fees {
532        if let Some(fee) = fees.first() {
533            let currency = Currency::get_or_create_crypto(&fee.asset);
534            Money::new(fee.qty.abs(), currency)
535        } else {
536            Money::new(0.0, instrument.quote_currency())
537        }
538    } else {
539        Money::new(0.0, instrument.quote_currency())
540    };
541
542    let ts_event = parse_rfc3339_timestamp(&exec.timestamp, "execution.timestamp")?;
543
544    let client_order_id = exec
545        .cl_ord_id
546        .as_ref()
547        .filter(|s| !s.is_empty())
548        .map(ClientOrderId::new);
549
550    Ok(FillReport::new(
551        account_id,
552        instrument_id,
553        venue_order_id,
554        trade_id,
555        order_side,
556        last_qty,
557        last_px,
558        commission,
559        liquidity_side,
560        client_order_id,
561        None, // venue_position_id
562        ts_event,
563        ts_init,
564        None, // report_id
565    ))
566}
567
568#[cfg(test)]
569mod tests {
570    use nautilus_model::{identifiers::Symbol, types::Currency};
571    use rstest::rstest;
572
573    use super::*;
574    use crate::{common::consts::KRAKEN_VENUE, websocket::spot_v2::messages::KrakenWsMessage};
575
576    const TS: UnixNanos = UnixNanos::new(1_700_000_000_000_000_000);
577
578    fn load_test_json(filename: &str) -> String {
579        let path = format!("test_data/{filename}");
580        std::fs::read_to_string(&path)
581            .unwrap_or_else(|e| panic!("Failed to load test data from {path}: {e}"))
582    }
583
584    fn create_mock_instrument() -> InstrumentAny {
585        use nautilus_model::instruments::currency_pair::CurrencyPair;
586
587        let instrument_id = InstrumentId::new(Symbol::new("BTC/USD"), *KRAKEN_VENUE);
588        InstrumentAny::CurrencyPair(CurrencyPair::new(
589            instrument_id,
590            Symbol::new("XBTUSDT"),
591            Currency::BTC(),
592            Currency::USDT(),
593            1, // price_precision
594            8, // size_precision
595            Price::from("0.1"),
596            Quantity::from("0.00000001"),
597            None,
598            None,
599            None,
600            None,
601            None,
602            None,
603            None,
604            None,
605            None,
606            None,
607            None,
608            None,
609            TS,
610            TS,
611        ))
612    }
613
614    #[rstest]
615    fn test_parse_quote_tick() {
616        let json = load_test_json("ws_ticker_snapshot.json");
617        let message: KrakenWsMessage = serde_json::from_str(&json).unwrap();
618        let ticker: KrakenWsTickerData = serde_json::from_value(message.data[0].clone()).unwrap();
619
620        let instrument = create_mock_instrument();
621        let quote_tick = parse_quote_tick(&ticker, &instrument, TS).unwrap();
622
623        assert_eq!(quote_tick.instrument_id, instrument.id());
624        assert!(quote_tick.bid_price.as_f64() > 0.0);
625        assert!(quote_tick.ask_price.as_f64() > 0.0);
626        assert!(quote_tick.bid_size.as_f64() > 0.0);
627        assert!(quote_tick.ask_size.as_f64() > 0.0);
628    }
629
630    #[rstest]
631    fn test_parse_trade_tick() {
632        let json = load_test_json("ws_trade_update.json");
633        let message: KrakenWsMessage = serde_json::from_str(&json).unwrap();
634        let trade: KrakenWsTradeData = serde_json::from_value(message.data[0].clone()).unwrap();
635
636        let instrument = create_mock_instrument();
637        let trade_tick = parse_trade_tick(&trade, &instrument, TS).unwrap();
638
639        assert_eq!(trade_tick.instrument_id, instrument.id());
640        assert!(trade_tick.price.as_f64() > 0.0);
641        assert!(trade_tick.size.as_f64() > 0.0);
642        assert!(matches!(
643            trade_tick.aggressor_side,
644            AggressorSide::Buyer | AggressorSide::Seller
645        ));
646    }
647
648    #[rstest]
649    fn test_parse_book_deltas_snapshot() {
650        let json = load_test_json("ws_book_snapshot.json");
651        let message: KrakenWsMessage = serde_json::from_str(&json).unwrap();
652        let book: KrakenWsBookData = serde_json::from_value(message.data[0].clone()).unwrap();
653
654        let instrument = create_mock_instrument();
655        let deltas = parse_book_deltas(&book, &instrument, 1, TS).unwrap();
656
657        assert!(!deltas.is_empty());
658
659        // Check that we have both bids and asks
660        let bid_count = deltas
661            .iter()
662            .filter(|d| d.order.side == OrderSide::Buy)
663            .count();
664        let ask_count = deltas
665            .iter()
666            .filter(|d| d.order.side == OrderSide::Sell)
667            .count();
668
669        assert!(bid_count > 0);
670        assert!(ask_count > 0);
671
672        // Check first delta
673        let first_delta = &deltas[0];
674        assert_eq!(first_delta.instrument_id, instrument.id());
675        assert!(first_delta.order.price.as_f64() > 0.0);
676        assert!(first_delta.order.size.as_f64() > 0.0);
677    }
678
679    #[rstest]
680    fn test_parse_book_deltas_update() {
681        let json = load_test_json("ws_book_update.json");
682        let message: KrakenWsMessage = serde_json::from_str(&json).unwrap();
683        let book: KrakenWsBookData = serde_json::from_value(message.data[0].clone()).unwrap();
684
685        let instrument = create_mock_instrument();
686        let deltas = parse_book_deltas(&book, &instrument, 1, TS).unwrap();
687
688        assert!(!deltas.is_empty());
689
690        // Check that we have at least one delta
691        let first_delta = &deltas[0];
692        assert_eq!(first_delta.instrument_id, instrument.id());
693        assert!(first_delta.order.price.as_f64() > 0.0);
694    }
695
696    #[rstest]
697    fn test_parse_rfc3339_timestamp() {
698        let timestamp = "2023-10-06T17:35:55.440295Z";
699        let result = parse_rfc3339_timestamp(timestamp, "test").unwrap();
700        assert!(result.as_u64() > 0);
701    }
702
703    #[rstest]
704    fn test_parse_ws_bar() {
705        let json = load_test_json("ws_ohlc_update.json");
706        let message: KrakenWsMessage = serde_json::from_str(&json).unwrap();
707        let ohlc: KrakenWsOhlcData = serde_json::from_value(message.data[0].clone()).unwrap();
708
709        let instrument = create_mock_instrument();
710        let bar = parse_ws_bar(&ohlc, &instrument, TS).unwrap();
711
712        assert_eq!(bar.bar_type.instrument_id(), instrument.id());
713        assert!(bar.open.as_f64() > 0.0);
714        assert!(bar.high.as_f64() > 0.0);
715        assert!(bar.low.as_f64() > 0.0);
716        assert!(bar.close.as_f64() > 0.0);
717        assert!(bar.volume.as_f64() > 0.0);
718
719        let spec = bar.bar_type.spec();
720        assert_eq!(spec.step.get(), 1);
721        assert_eq!(spec.aggregation, BarAggregation::Minute);
722        assert_eq!(spec.price_type, PriceType::Last);
723
724        // Verify ts_event is computed as interval_begin + interval (close time)
725        // interval_begin is 2023-10-04T16:25:00Z, interval is 1 minute, so close is 16:26:00Z
726        let expected_close = ohlc.interval_begin + chrono::Duration::minutes(1);
727        let expected_ts_event =
728            UnixNanos::from(expected_close.timestamp_nanos_opt().unwrap() as u64);
729        assert_eq!(bar.ts_event, expected_ts_event);
730    }
731
732    #[rstest]
733    fn test_interval_to_bar_spec() {
734        let test_cases = [
735            (1, 1, BarAggregation::Minute),
736            (5, 5, BarAggregation::Minute),
737            (15, 15, BarAggregation::Minute),
738            (30, 30, BarAggregation::Minute),
739            (60, 1, BarAggregation::Hour),
740            (240, 4, BarAggregation::Hour),
741            (1440, 1, BarAggregation::Day),
742            (10080, 1, BarAggregation::Week),
743            (21600, 15, BarAggregation::Day), // 21600 min = 15 days
744        ];
745
746        for (interval, expected_step, expected_aggregation) in test_cases {
747            let spec = interval_to_bar_spec(interval).unwrap();
748            assert_eq!(
749                spec.step.get(),
750                expected_step,
751                "Failed for interval {interval}"
752            );
753            assert_eq!(
754                spec.aggregation, expected_aggregation,
755                "Failed for interval {interval}"
756            );
757            assert_eq!(spec.price_type, PriceType::Last);
758        }
759    }
760
761    #[rstest]
762    fn test_interval_to_bar_spec_invalid() {
763        let result = interval_to_bar_spec(999);
764        assert!(result.is_err());
765    }
766}