nautilus_kraken/websocket/spot_v2/
parse.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket message parsers for converting Kraken streaming data to Nautilus domain models.
17
18use anyhow::Context;
19use nautilus_core::{UUID4, nanos::UnixNanos};
20use nautilus_model::{
21    data::{Bar, BarSpecification, BarType, BookOrder, OrderBookDelta, QuoteTick, TradeTick},
22    enums::{
23        AggregationSource, AggressorSide, BarAggregation, BookAction, LiquiditySide, OrderSide,
24        OrderStatus, OrderType, PriceType, TimeInForce, TriggerType,
25    },
26    identifiers::{AccountId, ClientOrderId, InstrumentId, TradeId, VenueOrderId},
27    instruments::{Instrument, any::InstrumentAny},
28    reports::{FillReport, OrderStatusReport},
29    types::{Currency, Money, Price, Quantity},
30};
31
32use super::{
33    enums::{KrakenExecType, KrakenLiquidityInd, KrakenWsOrderStatus},
34    messages::{
35        KrakenWsBookData, KrakenWsBookLevel, KrakenWsExecutionData, KrakenWsOhlcData,
36        KrakenWsTickerData, KrakenWsTradeData,
37    },
38};
39use crate::common::enums::{KrakenOrderSide, KrakenOrderType, KrakenTimeInForce};
40
41/// Parses Kraken WebSocket ticker data into a Nautilus quote tick.
42///
43/// # Errors
44///
45/// Returns an error if:
46/// - Bid or ask price/quantity cannot be parsed.
47pub fn parse_quote_tick(
48    ticker: &KrakenWsTickerData,
49    instrument: &InstrumentAny,
50    ts_init: UnixNanos,
51) -> anyhow::Result<QuoteTick> {
52    let instrument_id = instrument.id();
53    let price_precision = instrument.price_precision();
54    let size_precision = instrument.size_precision();
55
56    let bid_price = Price::new_checked(ticker.bid, price_precision).with_context(|| {
57        format!("Failed to construct bid Price with precision {price_precision}")
58    })?;
59    let bid_size = Quantity::new_checked(ticker.bid_qty, size_precision).with_context(|| {
60        format!("Failed to construct bid Quantity with precision {size_precision}")
61    })?;
62
63    let ask_price = Price::new_checked(ticker.ask, price_precision).with_context(|| {
64        format!("Failed to construct ask Price with precision {price_precision}")
65    })?;
66    let ask_size = Quantity::new_checked(ticker.ask_qty, size_precision).with_context(|| {
67        format!("Failed to construct ask Quantity with precision {size_precision}")
68    })?;
69
70    // Kraken ticker doesn't include timestamp
71    let ts_event = ts_init;
72
73    Ok(QuoteTick::new(
74        instrument_id,
75        bid_price,
76        ask_price,
77        bid_size,
78        ask_size,
79        ts_event,
80        ts_init,
81    ))
82}
83
84/// Parses Kraken WebSocket trade data into a Nautilus trade tick.
85///
86/// # Errors
87///
88/// Returns an error if:
89/// - Price or quantity cannot be parsed.
90/// - Timestamp is invalid.
91pub fn parse_trade_tick(
92    trade: &KrakenWsTradeData,
93    instrument: &InstrumentAny,
94    ts_init: UnixNanos,
95) -> anyhow::Result<TradeTick> {
96    let instrument_id = instrument.id();
97    let price_precision = instrument.price_precision();
98    let size_precision = instrument.size_precision();
99
100    let price = Price::new_checked(trade.price, price_precision)
101        .with_context(|| format!("Failed to construct Price with precision {price_precision}"))?;
102    let size = Quantity::new_checked(trade.qty, size_precision)
103        .with_context(|| format!("Failed to construct Quantity with precision {size_precision}"))?;
104
105    let aggressor = match trade.side {
106        KrakenOrderSide::Buy => AggressorSide::Buyer,
107        KrakenOrderSide::Sell => AggressorSide::Seller,
108    };
109
110    let trade_id = TradeId::new_checked(trade.trade_id.to_string())?;
111    let ts_event = parse_rfc3339_timestamp(&trade.timestamp, "trade.timestamp")?;
112
113    TradeTick::new_checked(
114        instrument_id,
115        price,
116        size,
117        aggressor,
118        trade_id,
119        ts_event,
120        ts_init,
121    )
122    .context("Failed to construct TradeTick from Kraken WebSocket trade")
123}
124
125/// Parses Kraken WebSocket book data into Nautilus order book deltas.
126///
127/// Returns a vector of deltas, one for each bid and ask level.
128///
129/// # Errors
130///
131/// Returns an error if:
132/// - Price or quantity cannot be parsed.
133/// - Timestamp is invalid.
134pub fn parse_book_deltas(
135    book: &KrakenWsBookData,
136    instrument: &InstrumentAny,
137    sequence: u64,
138    ts_init: UnixNanos,
139) -> anyhow::Result<Vec<OrderBookDelta>> {
140    let instrument_id = instrument.id();
141    let price_precision = instrument.price_precision();
142    let size_precision = instrument.size_precision();
143
144    // Parse timestamp if available, otherwise use ts_init
145    let ts_event = if let Some(ref timestamp) = book.timestamp {
146        parse_rfc3339_timestamp(timestamp, "book.timestamp")?
147    } else {
148        ts_init
149    };
150
151    let mut deltas = Vec::new();
152    let mut current_sequence = sequence;
153
154    if let Some(ref bids) = book.bids {
155        for level in bids {
156            let delta = parse_book_level(
157                level,
158                OrderSide::Buy,
159                instrument_id,
160                price_precision,
161                size_precision,
162                current_sequence,
163                ts_event,
164                ts_init,
165            )?;
166            deltas.push(delta);
167            current_sequence += 1;
168        }
169    }
170
171    if let Some(ref asks) = book.asks {
172        for level in asks {
173            let delta = parse_book_level(
174                level,
175                OrderSide::Sell,
176                instrument_id,
177                price_precision,
178                size_precision,
179                current_sequence,
180                ts_event,
181                ts_init,
182            )?;
183            deltas.push(delta);
184            current_sequence += 1;
185        }
186    }
187
188    Ok(deltas)
189}
190
191#[allow(clippy::too_many_arguments)]
192fn parse_book_level(
193    level: &KrakenWsBookLevel,
194    side: OrderSide,
195    instrument_id: InstrumentId,
196    price_precision: u8,
197    size_precision: u8,
198    sequence: u64,
199    ts_event: UnixNanos,
200    ts_init: UnixNanos,
201) -> anyhow::Result<OrderBookDelta> {
202    let price = Price::new_checked(level.price, price_precision)
203        .with_context(|| format!("Failed to construct Price with precision {price_precision}"))?;
204    let size = Quantity::new_checked(level.qty, size_precision)
205        .with_context(|| format!("Failed to construct Quantity with precision {size_precision}"))?;
206
207    // Determine action based on quantity
208    let action = if size.raw == 0 {
209        BookAction::Delete
210    } else {
211        BookAction::Update
212    };
213
214    // Create order ID from price (Kraken doesn't provide order IDs)
215    let order_id = price.raw as u64;
216    let order = BookOrder::new(side, price, size, order_id);
217
218    Ok(OrderBookDelta::new(
219        instrument_id,
220        action,
221        order,
222        0, // flags
223        sequence,
224        ts_event,
225        ts_init,
226    ))
227}
228
229fn parse_rfc3339_timestamp(value: &str, field: &str) -> anyhow::Result<UnixNanos> {
230    value
231        .parse::<UnixNanos>()
232        .map_err(|e| anyhow::anyhow!("Failed to parse {field}='{value}': {e}"))
233}
234
235/// Parses Kraken WebSocket OHLC data into a Nautilus bar.
236///
237/// The bar's `ts_event` is computed as `interval_begin` + `interval` minutes.
238///
239/// # Errors
240///
241/// Returns an error if:
242/// - Price or quantity values cannot be parsed.
243/// - The interval cannot be converted to a valid bar specification.
244pub fn parse_ws_bar(
245    ohlc: &KrakenWsOhlcData,
246    instrument: &InstrumentAny,
247    ts_init: UnixNanos,
248) -> anyhow::Result<Bar> {
249    let instrument_id = instrument.id();
250    let price_precision = instrument.price_precision();
251    let size_precision = instrument.size_precision();
252
253    let open = Price::new_checked(ohlc.open, price_precision)?;
254    let high = Price::new_checked(ohlc.high, price_precision)?;
255    let low = Price::new_checked(ohlc.low, price_precision)?;
256    let close = Price::new_checked(ohlc.close, price_precision)?;
257    let volume = Quantity::new_checked(ohlc.volume, size_precision)?;
258
259    let bar_spec = interval_to_bar_spec(ohlc.interval)?;
260    let bar_type = BarType::new(instrument_id, bar_spec, AggregationSource::External);
261
262    // Compute bar close time: interval_begin + interval minutes
263    let interval_secs = i64::from(ohlc.interval) * 60;
264    let close_time = ohlc.interval_begin + chrono::Duration::seconds(interval_secs);
265    let ts_event = UnixNanos::from(close_time.timestamp_nanos_opt().unwrap_or(0) as u64);
266
267    Bar::new_checked(bar_type, open, high, low, close, volume, ts_event, ts_init)
268}
269
270/// Converts a Kraken OHLC interval (minutes) to a Nautilus bar specification.
271fn interval_to_bar_spec(interval: u32) -> anyhow::Result<BarSpecification> {
272    let (step, aggregation) = match interval {
273        1 => (1, BarAggregation::Minute),
274        5 => (5, BarAggregation::Minute),
275        15 => (15, BarAggregation::Minute),
276        30 => (30, BarAggregation::Minute),
277        60 => (1, BarAggregation::Hour),
278        240 => (4, BarAggregation::Hour),
279        1440 => (1, BarAggregation::Day),
280        10080 => (1, BarAggregation::Week),
281        21600 => (15, BarAggregation::Day), // 21600 min = 360 hours = 15 days
282        _ => anyhow::bail!("Unsupported Kraken OHLC interval: {interval}"),
283    };
284
285    Ok(BarSpecification::new(step, aggregation, PriceType::Last))
286}
287
288/// Parses Kraken execution type and order status to Nautilus order status.
289fn parse_order_status(
290    exec_type: KrakenExecType,
291    order_status: Option<KrakenWsOrderStatus>,
292) -> OrderStatus {
293    // First check exec_type for terminal states
294    match exec_type {
295        KrakenExecType::Canceled => return OrderStatus::Canceled,
296        KrakenExecType::Expired => return OrderStatus::Expired,
297        _ => {}
298    }
299
300    // Then check order_status field
301    match order_status {
302        Some(KrakenWsOrderStatus::PendingNew) => OrderStatus::Submitted,
303        Some(KrakenWsOrderStatus::New) => OrderStatus::Accepted,
304        Some(KrakenWsOrderStatus::PartiallyFilled) => OrderStatus::PartiallyFilled,
305        Some(KrakenWsOrderStatus::Filled) => OrderStatus::Filled,
306        Some(KrakenWsOrderStatus::Canceled) => OrderStatus::Canceled,
307        Some(KrakenWsOrderStatus::Expired) => OrderStatus::Expired,
308        Some(KrakenWsOrderStatus::Triggered) => OrderStatus::Triggered,
309        None => OrderStatus::Accepted,
310    }
311}
312
313/// Parses Kraken order type to Nautilus order type.
314fn parse_order_type(order_type: Option<KrakenOrderType>) -> OrderType {
315    match order_type {
316        Some(KrakenOrderType::Market) => OrderType::Market,
317        Some(KrakenOrderType::Limit) => OrderType::Limit,
318        Some(KrakenOrderType::StopLoss) => OrderType::StopMarket,
319        Some(KrakenOrderType::TakeProfit) => OrderType::MarketIfTouched,
320        Some(KrakenOrderType::StopLossLimit) => OrderType::StopLimit,
321        Some(KrakenOrderType::TakeProfitLimit) => OrderType::LimitIfTouched,
322        Some(KrakenOrderType::SettlePosition) => OrderType::Market,
323        None => OrderType::Limit,
324    }
325}
326
327/// Parses Kraken order side to Nautilus order side.
328fn parse_order_side(side: Option<KrakenOrderSide>) -> OrderSide {
329    match side {
330        Some(KrakenOrderSide::Buy) => OrderSide::Buy,
331        Some(KrakenOrderSide::Sell) => OrderSide::Sell,
332        None => OrderSide::Buy,
333    }
334}
335
336/// Parses Kraken time-in-force to Nautilus time-in-force.
337fn parse_time_in_force(
338    time_in_force: Option<KrakenTimeInForce>,
339    post_only: Option<bool>,
340) -> TimeInForce {
341    // Handle post_only flag
342    if post_only == Some(true) {
343        return TimeInForce::Gtc;
344    }
345
346    match time_in_force {
347        Some(KrakenTimeInForce::GoodTilCancelled) => TimeInForce::Gtc,
348        Some(KrakenTimeInForce::ImmediateOrCancel) => TimeInForce::Ioc,
349        Some(KrakenTimeInForce::GoodTilDate) => TimeInForce::Gtd,
350        None => TimeInForce::Gtc,
351    }
352}
353
354/// Parses Kraken liquidity indicator to Nautilus liquidity side.
355fn parse_liquidity_side(liquidity_ind: Option<KrakenLiquidityInd>) -> LiquiditySide {
356    match liquidity_ind {
357        Some(KrakenLiquidityInd::Maker) => LiquiditySide::Maker,
358        Some(KrakenLiquidityInd::Taker) => LiquiditySide::Taker,
359        None => LiquiditySide::NoLiquiditySide,
360    }
361}
362
363/// Parses a Kraken WebSocket execution message into an [`OrderStatusReport`].
364///
365/// # Errors
366///
367/// Returns an error if required fields are missing or cannot be parsed.
368pub fn parse_ws_order_status_report(
369    exec: &KrakenWsExecutionData,
370    instrument: &InstrumentAny,
371    account_id: AccountId,
372    cached_order_qty: Option<f64>,
373    ts_init: UnixNanos,
374) -> anyhow::Result<OrderStatusReport> {
375    let instrument_id = instrument.id();
376    let venue_order_id = VenueOrderId::new(&exec.order_id);
377    let order_side = parse_order_side(exec.side);
378    let order_type = parse_order_type(exec.order_type);
379    let time_in_force = parse_time_in_force(exec.time_in_force, exec.post_only);
380    let order_status = parse_order_status(exec.exec_type, exec.order_status);
381
382    let price_precision = instrument.price_precision();
383    let size_precision = instrument.size_precision();
384
385    // Quantity fallback: order_qty -> cached -> cum_qty -> last_qty (for trade snapshots)
386    let last_qty = exec
387        .last_qty
388        .map(|qty| Quantity::new_checked(qty, size_precision))
389        .transpose()
390        .context("Failed to parse last_qty")?;
391
392    let filled_qty = exec
393        .cum_qty
394        .map(|qty| Quantity::new_checked(qty, size_precision))
395        .transpose()
396        .context("Failed to parse cum_qty")?
397        .or(last_qty)
398        .unwrap_or_else(|| Quantity::new(0.0, size_precision));
399
400    let quantity = exec
401        .order_qty
402        .or(cached_order_qty)
403        .map(|qty| Quantity::new_checked(qty, size_precision))
404        .transpose()
405        .context("Failed to parse order_qty")?
406        .unwrap_or(filled_qty);
407
408    let ts_event = parse_rfc3339_timestamp(&exec.timestamp, "execution.timestamp")?;
409
410    let mut report = OrderStatusReport::new(
411        account_id,
412        instrument_id,
413        None, // client_order_id set below if present
414        venue_order_id,
415        order_side,
416        order_type,
417        time_in_force,
418        order_status,
419        quantity,
420        filled_qty,
421        ts_event,
422        ts_event,
423        ts_init,
424        Some(UUID4::new()),
425    );
426
427    if let Some(ref cl_ord_id) = exec.cl_ord_id
428        && !cl_ord_id.is_empty()
429    {
430        report = report.with_client_order_id(ClientOrderId::new(cl_ord_id));
431    }
432
433    // Price fallback: limit_price -> avg_price -> last_price
434    // Note: pending_new messages may not include any price fields, which is fine for
435    // orders we submitted (engine already has the price from submission)
436    let price_value = exec
437        .limit_price
438        .filter(|&p| p > 0.0)
439        .or(exec.avg_price.filter(|&p| p > 0.0))
440        .or(exec.last_price.filter(|&p| p > 0.0));
441
442    if let Some(px) = price_value {
443        let price =
444            Price::new_checked(px, price_precision).context("Failed to parse order price")?;
445        report = report.with_price(price);
446    }
447
448    // avg_px fallback: avg_price -> cum_cost / cum_qty -> last_price (for single trades/snapshots)
449    let avg_px = exec
450        .avg_price
451        .filter(|&p| p > 0.0)
452        .or_else(|| match (exec.cum_cost, exec.cum_qty) {
453            (Some(cost), Some(qty)) if qty > 0.0 => Some(cost / qty),
454            _ => None,
455        })
456        .or_else(|| exec.last_price.filter(|&p| p > 0.0));
457
458    if let Some(avg_price) = avg_px {
459        report = report.with_avg_px(avg_price)?;
460    }
461
462    if exec.post_only == Some(true) {
463        report = report.with_post_only(true);
464    }
465
466    if exec.reduce_only == Some(true) {
467        report = report.with_reduce_only(true);
468    }
469
470    if let Some(ref reason) = exec.reason
471        && !reason.is_empty()
472    {
473        report = report.with_cancel_reason(reason.clone());
474    }
475
476    // Set trigger type for conditional orders (WebSocket doesn't provide trigger field)
477    let is_conditional = matches!(
478        order_type,
479        OrderType::StopMarket
480            | OrderType::StopLimit
481            | OrderType::MarketIfTouched
482            | OrderType::LimitIfTouched
483    );
484    if is_conditional {
485        report = report.with_trigger_type(TriggerType::Default);
486    }
487
488    Ok(report)
489}
490
491/// Parses a Kraken WebSocket trade execution into a [`FillReport`].
492///
493/// This should only be called when exec_type is "trade".
494///
495/// # Errors
496///
497/// Returns an error if required fields are missing or cannot be parsed.
498pub fn parse_ws_fill_report(
499    exec: &KrakenWsExecutionData,
500    instrument: &InstrumentAny,
501    account_id: AccountId,
502    ts_init: UnixNanos,
503) -> anyhow::Result<FillReport> {
504    let instrument_id = instrument.id();
505    let venue_order_id = VenueOrderId::new(&exec.order_id);
506
507    let exec_id = exec
508        .exec_id
509        .as_ref()
510        .context("Missing exec_id for trade execution")?;
511    let trade_id =
512        TradeId::new_checked(exec_id).context("Invalid exec_id in Kraken trade execution")?;
513
514    let order_side = parse_order_side(exec.side);
515
516    let price_precision = instrument.price_precision();
517    let size_precision = instrument.size_precision();
518
519    let last_qty = exec
520        .last_qty
521        .map(|qty| Quantity::new_checked(qty, size_precision))
522        .transpose()
523        .context("Failed to parse last_qty")?
524        .context("Missing last_qty for trade execution")?;
525
526    let last_px = exec
527        .last_price
528        .map(|px| Price::new_checked(px, price_precision))
529        .transpose()
530        .context("Failed to parse last_price")?
531        .context("Missing last_price for trade execution")?;
532
533    let liquidity_side = parse_liquidity_side(exec.liquidity_ind);
534
535    // Calculate commission from fees array
536    let commission = if let Some(ref fees) = exec.fees {
537        if let Some(fee) = fees.first() {
538            let currency = Currency::get_or_create_crypto(&fee.asset);
539            Money::new(fee.qty.abs(), currency)
540        } else {
541            Money::new(0.0, instrument.quote_currency())
542        }
543    } else {
544        Money::new(0.0, instrument.quote_currency())
545    };
546
547    let ts_event = parse_rfc3339_timestamp(&exec.timestamp, "execution.timestamp")?;
548
549    let client_order_id = exec
550        .cl_ord_id
551        .as_ref()
552        .filter(|s| !s.is_empty())
553        .map(ClientOrderId::new);
554
555    Ok(FillReport::new(
556        account_id,
557        instrument_id,
558        venue_order_id,
559        trade_id,
560        order_side,
561        last_qty,
562        last_px,
563        commission,
564        liquidity_side,
565        client_order_id,
566        None, // venue_position_id
567        ts_event,
568        ts_init,
569        None, // report_id
570    ))
571}
572
573#[cfg(test)]
574mod tests {
575    use nautilus_model::{identifiers::Symbol, types::Currency};
576    use rstest::rstest;
577
578    use super::*;
579    use crate::{common::consts::KRAKEN_VENUE, websocket::spot_v2::messages::KrakenWsMessage};
580
581    const TS: UnixNanos = UnixNanos::new(1_700_000_000_000_000_000);
582
583    fn load_test_json(filename: &str) -> String {
584        let path = format!("test_data/{filename}");
585        std::fs::read_to_string(&path)
586            .unwrap_or_else(|e| panic!("Failed to load test data from {path}: {e}"))
587    }
588
589    fn create_mock_instrument() -> InstrumentAny {
590        use nautilus_model::instruments::currency_pair::CurrencyPair;
591
592        let instrument_id = InstrumentId::new(Symbol::new("BTC/USD"), *KRAKEN_VENUE);
593        InstrumentAny::CurrencyPair(CurrencyPair::new(
594            instrument_id,
595            Symbol::new("XBTUSDT"),
596            Currency::BTC(),
597            Currency::USDT(),
598            1, // price_precision
599            8, // size_precision
600            Price::from("0.1"),
601            Quantity::from("0.00000001"),
602            None,
603            None,
604            None,
605            None,
606            None,
607            None,
608            None,
609            None,
610            None,
611            None,
612            None,
613            None,
614            TS,
615            TS,
616        ))
617    }
618
619    #[rstest]
620    fn test_parse_quote_tick() {
621        let json = load_test_json("ws_ticker_snapshot.json");
622        let message: KrakenWsMessage = serde_json::from_str(&json).unwrap();
623        let ticker: KrakenWsTickerData = serde_json::from_value(message.data[0].clone()).unwrap();
624
625        let instrument = create_mock_instrument();
626        let quote_tick = parse_quote_tick(&ticker, &instrument, TS).unwrap();
627
628        assert_eq!(quote_tick.instrument_id, instrument.id());
629        assert!(quote_tick.bid_price.as_f64() > 0.0);
630        assert!(quote_tick.ask_price.as_f64() > 0.0);
631        assert!(quote_tick.bid_size.as_f64() > 0.0);
632        assert!(quote_tick.ask_size.as_f64() > 0.0);
633    }
634
635    #[rstest]
636    fn test_parse_trade_tick() {
637        let json = load_test_json("ws_trade_update.json");
638        let message: KrakenWsMessage = serde_json::from_str(&json).unwrap();
639        let trade: KrakenWsTradeData = serde_json::from_value(message.data[0].clone()).unwrap();
640
641        let instrument = create_mock_instrument();
642        let trade_tick = parse_trade_tick(&trade, &instrument, TS).unwrap();
643
644        assert_eq!(trade_tick.instrument_id, instrument.id());
645        assert!(trade_tick.price.as_f64() > 0.0);
646        assert!(trade_tick.size.as_f64() > 0.0);
647        assert!(matches!(
648            trade_tick.aggressor_side,
649            AggressorSide::Buyer | AggressorSide::Seller
650        ));
651    }
652
653    #[rstest]
654    fn test_parse_book_deltas_snapshot() {
655        let json = load_test_json("ws_book_snapshot.json");
656        let message: KrakenWsMessage = serde_json::from_str(&json).unwrap();
657        let book: KrakenWsBookData = serde_json::from_value(message.data[0].clone()).unwrap();
658
659        let instrument = create_mock_instrument();
660        let deltas = parse_book_deltas(&book, &instrument, 1, TS).unwrap();
661
662        assert!(!deltas.is_empty());
663
664        // Check that we have both bids and asks
665        let bid_count = deltas
666            .iter()
667            .filter(|d| d.order.side == OrderSide::Buy)
668            .count();
669        let ask_count = deltas
670            .iter()
671            .filter(|d| d.order.side == OrderSide::Sell)
672            .count();
673
674        assert!(bid_count > 0);
675        assert!(ask_count > 0);
676
677        // Check first delta
678        let first_delta = &deltas[0];
679        assert_eq!(first_delta.instrument_id, instrument.id());
680        assert!(first_delta.order.price.as_f64() > 0.0);
681        assert!(first_delta.order.size.as_f64() > 0.0);
682    }
683
684    #[rstest]
685    fn test_parse_book_deltas_update() {
686        let json = load_test_json("ws_book_update.json");
687        let message: KrakenWsMessage = serde_json::from_str(&json).unwrap();
688        let book: KrakenWsBookData = serde_json::from_value(message.data[0].clone()).unwrap();
689
690        let instrument = create_mock_instrument();
691        let deltas = parse_book_deltas(&book, &instrument, 1, TS).unwrap();
692
693        assert!(!deltas.is_empty());
694
695        // Check that we have at least one delta
696        let first_delta = &deltas[0];
697        assert_eq!(first_delta.instrument_id, instrument.id());
698        assert!(first_delta.order.price.as_f64() > 0.0);
699    }
700
701    #[rstest]
702    fn test_parse_rfc3339_timestamp() {
703        let timestamp = "2023-10-06T17:35:55.440295Z";
704        let result = parse_rfc3339_timestamp(timestamp, "test").unwrap();
705        assert!(result.as_u64() > 0);
706    }
707
708    #[rstest]
709    fn test_parse_ws_bar() {
710        let json = load_test_json("ws_ohlc_update.json");
711        let message: KrakenWsMessage = serde_json::from_str(&json).unwrap();
712        let ohlc: KrakenWsOhlcData = serde_json::from_value(message.data[0].clone()).unwrap();
713
714        let instrument = create_mock_instrument();
715        let bar = parse_ws_bar(&ohlc, &instrument, TS).unwrap();
716
717        assert_eq!(bar.bar_type.instrument_id(), instrument.id());
718        assert!(bar.open.as_f64() > 0.0);
719        assert!(bar.high.as_f64() > 0.0);
720        assert!(bar.low.as_f64() > 0.0);
721        assert!(bar.close.as_f64() > 0.0);
722        assert!(bar.volume.as_f64() > 0.0);
723
724        let spec = bar.bar_type.spec();
725        assert_eq!(spec.step.get(), 1);
726        assert_eq!(spec.aggregation, BarAggregation::Minute);
727        assert_eq!(spec.price_type, PriceType::Last);
728
729        // Verify ts_event is computed as interval_begin + interval (close time)
730        // interval_begin is 2023-10-04T16:25:00Z, interval is 1 minute, so close is 16:26:00Z
731        let expected_close = ohlc.interval_begin + chrono::Duration::minutes(1);
732        let expected_ts_event =
733            UnixNanos::from(expected_close.timestamp_nanos_opt().unwrap() as u64);
734        assert_eq!(bar.ts_event, expected_ts_event);
735    }
736
737    #[rstest]
738    fn test_interval_to_bar_spec() {
739        let test_cases = [
740            (1, 1, BarAggregation::Minute),
741            (5, 5, BarAggregation::Minute),
742            (15, 15, BarAggregation::Minute),
743            (30, 30, BarAggregation::Minute),
744            (60, 1, BarAggregation::Hour),
745            (240, 4, BarAggregation::Hour),
746            (1440, 1, BarAggregation::Day),
747            (10080, 1, BarAggregation::Week),
748            (21600, 15, BarAggregation::Day), // 21600 min = 15 days
749        ];
750
751        for (interval, expected_step, expected_aggregation) in test_cases {
752            let spec = interval_to_bar_spec(interval).unwrap();
753            assert_eq!(
754                spec.step.get(),
755                expected_step,
756                "Failed for interval {interval}"
757            );
758            assert_eq!(
759                spec.aggregation, expected_aggregation,
760                "Failed for interval {interval}"
761            );
762            assert_eq!(spec.price_type, PriceType::Last);
763        }
764    }
765
766    #[rstest]
767    fn test_interval_to_bar_spec_invalid() {
768        let result = interval_to_bar_spec(999);
769        assert!(result.is_err());
770    }
771}