1use anyhow::Context;
19use nautilus_core::{UUID4, nanos::UnixNanos};
20use nautilus_model::{
21 data::{Bar, BarSpecification, BarType, BookOrder, OrderBookDelta, QuoteTick, TradeTick},
22 enums::{
23 AggregationSource, AggressorSide, BarAggregation, BookAction, LiquiditySide, OrderSide,
24 OrderStatus, OrderType, PriceType, TimeInForce, TriggerType,
25 },
26 identifiers::{AccountId, ClientOrderId, InstrumentId, TradeId, VenueOrderId},
27 instruments::{Instrument, any::InstrumentAny},
28 reports::{FillReport, OrderStatusReport},
29 types::{Currency, Money, Price, Quantity},
30};
31
32use super::{
33 enums::{KrakenExecType, KrakenLiquidityInd, KrakenWsOrderStatus},
34 messages::{
35 KrakenWsBookData, KrakenWsBookLevel, KrakenWsExecutionData, KrakenWsOhlcData,
36 KrakenWsTickerData, KrakenWsTradeData,
37 },
38};
39use crate::common::enums::{KrakenOrderSide, KrakenOrderType, KrakenTimeInForce};
40
41pub fn parse_quote_tick(
48 ticker: &KrakenWsTickerData,
49 instrument: &InstrumentAny,
50 ts_init: UnixNanos,
51) -> anyhow::Result<QuoteTick> {
52 let instrument_id = instrument.id();
53 let price_precision = instrument.price_precision();
54 let size_precision = instrument.size_precision();
55
56 let bid_price = Price::new_checked(ticker.bid, price_precision).with_context(|| {
57 format!("Failed to construct bid Price with precision {price_precision}")
58 })?;
59 let bid_size = Quantity::new_checked(ticker.bid_qty, size_precision).with_context(|| {
60 format!("Failed to construct bid Quantity with precision {size_precision}")
61 })?;
62
63 let ask_price = Price::new_checked(ticker.ask, price_precision).with_context(|| {
64 format!("Failed to construct ask Price with precision {price_precision}")
65 })?;
66 let ask_size = Quantity::new_checked(ticker.ask_qty, size_precision).with_context(|| {
67 format!("Failed to construct ask Quantity with precision {size_precision}")
68 })?;
69
70 let ts_event = ts_init;
72
73 Ok(QuoteTick::new(
74 instrument_id,
75 bid_price,
76 ask_price,
77 bid_size,
78 ask_size,
79 ts_event,
80 ts_init,
81 ))
82}
83
84pub fn parse_trade_tick(
92 trade: &KrakenWsTradeData,
93 instrument: &InstrumentAny,
94 ts_init: UnixNanos,
95) -> anyhow::Result<TradeTick> {
96 let instrument_id = instrument.id();
97 let price_precision = instrument.price_precision();
98 let size_precision = instrument.size_precision();
99
100 let price = Price::new_checked(trade.price, price_precision)
101 .with_context(|| format!("Failed to construct Price with precision {price_precision}"))?;
102 let size = Quantity::new_checked(trade.qty, size_precision)
103 .with_context(|| format!("Failed to construct Quantity with precision {size_precision}"))?;
104
105 let aggressor = match trade.side {
106 KrakenOrderSide::Buy => AggressorSide::Buyer,
107 KrakenOrderSide::Sell => AggressorSide::Seller,
108 };
109
110 let trade_id = TradeId::new_checked(trade.trade_id.to_string())?;
111 let ts_event = parse_rfc3339_timestamp(&trade.timestamp, "trade.timestamp")?;
112
113 TradeTick::new_checked(
114 instrument_id,
115 price,
116 size,
117 aggressor,
118 trade_id,
119 ts_event,
120 ts_init,
121 )
122 .context("Failed to construct TradeTick from Kraken WebSocket trade")
123}
124
125pub fn parse_book_deltas(
135 book: &KrakenWsBookData,
136 instrument: &InstrumentAny,
137 sequence: u64,
138 ts_init: UnixNanos,
139) -> anyhow::Result<Vec<OrderBookDelta>> {
140 let instrument_id = instrument.id();
141 let price_precision = instrument.price_precision();
142 let size_precision = instrument.size_precision();
143
144 let ts_event = if let Some(ref timestamp) = book.timestamp {
146 parse_rfc3339_timestamp(timestamp, "book.timestamp")?
147 } else {
148 ts_init
149 };
150
151 let mut deltas = Vec::new();
152 let mut current_sequence = sequence;
153
154 if let Some(ref bids) = book.bids {
155 for level in bids {
156 let delta = parse_book_level(
157 level,
158 OrderSide::Buy,
159 instrument_id,
160 price_precision,
161 size_precision,
162 current_sequence,
163 ts_event,
164 ts_init,
165 )?;
166 deltas.push(delta);
167 current_sequence += 1;
168 }
169 }
170
171 if let Some(ref asks) = book.asks {
172 for level in asks {
173 let delta = parse_book_level(
174 level,
175 OrderSide::Sell,
176 instrument_id,
177 price_precision,
178 size_precision,
179 current_sequence,
180 ts_event,
181 ts_init,
182 )?;
183 deltas.push(delta);
184 current_sequence += 1;
185 }
186 }
187
188 Ok(deltas)
189}
190
191#[allow(clippy::too_many_arguments)]
192fn parse_book_level(
193 level: &KrakenWsBookLevel,
194 side: OrderSide,
195 instrument_id: InstrumentId,
196 price_precision: u8,
197 size_precision: u8,
198 sequence: u64,
199 ts_event: UnixNanos,
200 ts_init: UnixNanos,
201) -> anyhow::Result<OrderBookDelta> {
202 let price = Price::new_checked(level.price, price_precision)
203 .with_context(|| format!("Failed to construct Price with precision {price_precision}"))?;
204 let size = Quantity::new_checked(level.qty, size_precision)
205 .with_context(|| format!("Failed to construct Quantity with precision {size_precision}"))?;
206
207 let action = if size.raw == 0 {
209 BookAction::Delete
210 } else {
211 BookAction::Update
212 };
213
214 let order_id = price.raw as u64;
216 let order = BookOrder::new(side, price, size, order_id);
217
218 Ok(OrderBookDelta::new(
219 instrument_id,
220 action,
221 order,
222 0, sequence,
224 ts_event,
225 ts_init,
226 ))
227}
228
229fn parse_rfc3339_timestamp(value: &str, field: &str) -> anyhow::Result<UnixNanos> {
230 value
231 .parse::<UnixNanos>()
232 .map_err(|e| anyhow::anyhow!("Failed to parse {field}='{value}': {e}"))
233}
234
235pub fn parse_ws_bar(
245 ohlc: &KrakenWsOhlcData,
246 instrument: &InstrumentAny,
247 ts_init: UnixNanos,
248) -> anyhow::Result<Bar> {
249 let instrument_id = instrument.id();
250 let price_precision = instrument.price_precision();
251 let size_precision = instrument.size_precision();
252
253 let open = Price::new_checked(ohlc.open, price_precision)?;
254 let high = Price::new_checked(ohlc.high, price_precision)?;
255 let low = Price::new_checked(ohlc.low, price_precision)?;
256 let close = Price::new_checked(ohlc.close, price_precision)?;
257 let volume = Quantity::new_checked(ohlc.volume, size_precision)?;
258
259 let bar_spec = interval_to_bar_spec(ohlc.interval)?;
260 let bar_type = BarType::new(instrument_id, bar_spec, AggregationSource::External);
261
262 let interval_secs = i64::from(ohlc.interval) * 60;
264 let close_time = ohlc.interval_begin + chrono::Duration::seconds(interval_secs);
265 let ts_event = UnixNanos::from(close_time.timestamp_nanos_opt().unwrap_or(0) as u64);
266
267 Bar::new_checked(bar_type, open, high, low, close, volume, ts_event, ts_init)
268}
269
270fn interval_to_bar_spec(interval: u32) -> anyhow::Result<BarSpecification> {
272 let (step, aggregation) = match interval {
273 1 => (1, BarAggregation::Minute),
274 5 => (5, BarAggregation::Minute),
275 15 => (15, BarAggregation::Minute),
276 30 => (30, BarAggregation::Minute),
277 60 => (1, BarAggregation::Hour),
278 240 => (4, BarAggregation::Hour),
279 1440 => (1, BarAggregation::Day),
280 10080 => (1, BarAggregation::Week),
281 21600 => (15, BarAggregation::Day), _ => anyhow::bail!("Unsupported Kraken OHLC interval: {interval}"),
283 };
284
285 Ok(BarSpecification::new(step, aggregation, PriceType::Last))
286}
287
288fn parse_order_status(
290 exec_type: KrakenExecType,
291 order_status: Option<KrakenWsOrderStatus>,
292) -> OrderStatus {
293 match exec_type {
294 KrakenExecType::Canceled => return OrderStatus::Canceled,
295 KrakenExecType::Expired => return OrderStatus::Expired,
296 KrakenExecType::Filled => return OrderStatus::Filled,
297 KrakenExecType::Trade => {
298 return match order_status {
299 Some(KrakenWsOrderStatus::Filled) => OrderStatus::Filled,
300 Some(KrakenWsOrderStatus::PartiallyFilled) | None => OrderStatus::PartiallyFilled,
301 Some(status) => status.into(),
302 };
303 }
304 _ => {}
305 }
306
307 match order_status {
308 Some(status) => status.into(),
309 None => OrderStatus::Accepted,
310 }
311}
312
313fn parse_order_type(order_type: Option<KrakenOrderType>) -> OrderType {
315 match order_type {
316 Some(KrakenOrderType::Market) => OrderType::Market,
317 Some(KrakenOrderType::Limit) => OrderType::Limit,
318 Some(KrakenOrderType::StopLoss) => OrderType::StopMarket,
319 Some(KrakenOrderType::TakeProfit) => OrderType::MarketIfTouched,
320 Some(KrakenOrderType::StopLossLimit) => OrderType::StopLimit,
321 Some(KrakenOrderType::TakeProfitLimit) => OrderType::LimitIfTouched,
322 Some(KrakenOrderType::SettlePosition) => OrderType::Market,
323 None => OrderType::Limit,
324 }
325}
326
327fn parse_order_side(side: Option<KrakenOrderSide>) -> OrderSide {
329 match side {
330 Some(KrakenOrderSide::Buy) => OrderSide::Buy,
331 Some(KrakenOrderSide::Sell) => OrderSide::Sell,
332 None => OrderSide::Buy,
333 }
334}
335
336fn parse_time_in_force(
338 time_in_force: Option<KrakenTimeInForce>,
339 post_only: Option<bool>,
340) -> TimeInForce {
341 if post_only == Some(true) {
343 return TimeInForce::Gtc;
344 }
345
346 match time_in_force {
347 Some(KrakenTimeInForce::GoodTilCancelled) => TimeInForce::Gtc,
348 Some(KrakenTimeInForce::ImmediateOrCancel) => TimeInForce::Ioc,
349 Some(KrakenTimeInForce::GoodTilDate) => TimeInForce::Gtd,
350 None => TimeInForce::Gtc,
351 }
352}
353
354fn parse_liquidity_side(liquidity_ind: Option<KrakenLiquidityInd>) -> LiquiditySide {
355 liquidity_ind.map_or(LiquiditySide::NoLiquiditySide, Into::into)
356}
357
358pub fn parse_ws_order_status_report(
364 exec: &KrakenWsExecutionData,
365 instrument: &InstrumentAny,
366 account_id: AccountId,
367 cached_order_qty: Option<f64>,
368 ts_init: UnixNanos,
369) -> anyhow::Result<OrderStatusReport> {
370 let instrument_id = instrument.id();
371 let venue_order_id = VenueOrderId::new(&exec.order_id);
372 let order_side = parse_order_side(exec.side);
373 let order_type = parse_order_type(exec.order_type);
374 let time_in_force = parse_time_in_force(exec.time_in_force, exec.post_only);
375 let order_status = parse_order_status(exec.exec_type, exec.order_status);
376
377 let price_precision = instrument.price_precision();
378 let size_precision = instrument.size_precision();
379
380 let last_qty = exec
382 .last_qty
383 .map(|qty| Quantity::new_checked(qty, size_precision))
384 .transpose()
385 .context("Failed to parse last_qty")?;
386
387 let filled_qty = exec
388 .cum_qty
389 .map(|qty| Quantity::new_checked(qty, size_precision))
390 .transpose()
391 .context("Failed to parse cum_qty")?
392 .or(last_qty)
393 .unwrap_or_else(|| Quantity::new(0.0, size_precision));
394
395 let quantity = exec
396 .order_qty
397 .or(cached_order_qty)
398 .map(|qty| Quantity::new_checked(qty, size_precision))
399 .transpose()
400 .context("Failed to parse order_qty")?
401 .unwrap_or(filled_qty);
402
403 let ts_event = parse_rfc3339_timestamp(&exec.timestamp, "execution.timestamp")?;
404
405 let mut report = OrderStatusReport::new(
406 account_id,
407 instrument_id,
408 None, venue_order_id,
410 order_side,
411 order_type,
412 time_in_force,
413 order_status,
414 quantity,
415 filled_qty,
416 ts_event,
417 ts_event,
418 ts_init,
419 Some(UUID4::new()),
420 );
421
422 if let Some(ref cl_ord_id) = exec.cl_ord_id
423 && !cl_ord_id.is_empty()
424 {
425 report = report.with_client_order_id(ClientOrderId::new(cl_ord_id));
426 }
427
428 let price_value = exec
432 .limit_price
433 .filter(|&p| p > 0.0)
434 .or(exec.avg_price.filter(|&p| p > 0.0))
435 .or(exec.last_price.filter(|&p| p > 0.0));
436
437 if let Some(px) = price_value {
438 let price =
439 Price::new_checked(px, price_precision).context("Failed to parse order price")?;
440 report = report.with_price(price);
441 }
442
443 let avg_px = exec
445 .avg_price
446 .filter(|&p| p > 0.0)
447 .or_else(|| match (exec.cum_cost, exec.cum_qty) {
448 (Some(cost), Some(qty)) if qty > 0.0 => Some(cost / qty),
449 _ => None,
450 })
451 .or_else(|| exec.last_price.filter(|&p| p > 0.0));
452
453 if let Some(avg_price) = avg_px {
454 report = report.with_avg_px(avg_price)?;
455 }
456
457 if exec.post_only == Some(true) {
458 report = report.with_post_only(true);
459 }
460
461 if exec.reduce_only == Some(true) {
462 report = report.with_reduce_only(true);
463 }
464
465 if let Some(ref reason) = exec.reason
466 && !reason.is_empty()
467 {
468 report = report.with_cancel_reason(reason.clone());
469 }
470
471 let is_conditional = matches!(
473 order_type,
474 OrderType::StopMarket
475 | OrderType::StopLimit
476 | OrderType::MarketIfTouched
477 | OrderType::LimitIfTouched
478 );
479 if is_conditional {
480 report = report.with_trigger_type(TriggerType::Default);
481 }
482
483 Ok(report)
484}
485
486pub fn parse_ws_fill_report(
494 exec: &KrakenWsExecutionData,
495 instrument: &InstrumentAny,
496 account_id: AccountId,
497 ts_init: UnixNanos,
498) -> anyhow::Result<FillReport> {
499 let instrument_id = instrument.id();
500 let venue_order_id = VenueOrderId::new(&exec.order_id);
501
502 let exec_id = exec
503 .exec_id
504 .as_ref()
505 .context("Missing exec_id for trade execution")?;
506 let trade_id =
507 TradeId::new_checked(exec_id).context("Invalid exec_id in Kraken trade execution")?;
508
509 let order_side = parse_order_side(exec.side);
510
511 let price_precision = instrument.price_precision();
512 let size_precision = instrument.size_precision();
513
514 let last_qty = exec
515 .last_qty
516 .map(|qty| Quantity::new_checked(qty, size_precision))
517 .transpose()
518 .context("Failed to parse last_qty")?
519 .context("Missing last_qty for trade execution")?;
520
521 let last_px = exec
522 .last_price
523 .map(|px| Price::new_checked(px, price_precision))
524 .transpose()
525 .context("Failed to parse last_price")?
526 .context("Missing last_price for trade execution")?;
527
528 let liquidity_side = parse_liquidity_side(exec.liquidity_ind);
529
530 let commission = if let Some(ref fees) = exec.fees {
532 if let Some(fee) = fees.first() {
533 let currency = Currency::get_or_create_crypto(&fee.asset);
534 Money::new(fee.qty.abs(), currency)
535 } else {
536 Money::new(0.0, instrument.quote_currency())
537 }
538 } else {
539 Money::new(0.0, instrument.quote_currency())
540 };
541
542 let ts_event = parse_rfc3339_timestamp(&exec.timestamp, "execution.timestamp")?;
543
544 let client_order_id = exec
545 .cl_ord_id
546 .as_ref()
547 .filter(|s| !s.is_empty())
548 .map(ClientOrderId::new);
549
550 Ok(FillReport::new(
551 account_id,
552 instrument_id,
553 venue_order_id,
554 trade_id,
555 order_side,
556 last_qty,
557 last_px,
558 commission,
559 liquidity_side,
560 client_order_id,
561 None, ts_event,
563 ts_init,
564 None, ))
566}
567
568#[cfg(test)]
569mod tests {
570 use nautilus_model::{identifiers::Symbol, types::Currency};
571 use rstest::rstest;
572
573 use super::*;
574 use crate::{common::consts::KRAKEN_VENUE, websocket::spot_v2::messages::KrakenWsMessage};
575
576 const TS: UnixNanos = UnixNanos::new(1_700_000_000_000_000_000);
577
578 fn load_test_json(filename: &str) -> String {
579 let path = format!("test_data/{filename}");
580 std::fs::read_to_string(&path)
581 .unwrap_or_else(|e| panic!("Failed to load test data from {path}: {e}"))
582 }
583
584 fn create_mock_instrument() -> InstrumentAny {
585 use nautilus_model::instruments::currency_pair::CurrencyPair;
586
587 let instrument_id = InstrumentId::new(Symbol::new("BTC/USD"), *KRAKEN_VENUE);
588 InstrumentAny::CurrencyPair(CurrencyPair::new(
589 instrument_id,
590 Symbol::new("XBTUSDT"),
591 Currency::BTC(),
592 Currency::USDT(),
593 1, 8, Price::from("0.1"),
596 Quantity::from("0.00000001"),
597 None,
598 None,
599 None,
600 None,
601 None,
602 None,
603 None,
604 None,
605 None,
606 None,
607 None,
608 None,
609 TS,
610 TS,
611 ))
612 }
613
614 #[rstest]
615 fn test_parse_quote_tick() {
616 let json = load_test_json("ws_ticker_snapshot.json");
617 let message: KrakenWsMessage = serde_json::from_str(&json).unwrap();
618 let ticker: KrakenWsTickerData = serde_json::from_value(message.data[0].clone()).unwrap();
619
620 let instrument = create_mock_instrument();
621 let quote_tick = parse_quote_tick(&ticker, &instrument, TS).unwrap();
622
623 assert_eq!(quote_tick.instrument_id, instrument.id());
624 assert!(quote_tick.bid_price.as_f64() > 0.0);
625 assert!(quote_tick.ask_price.as_f64() > 0.0);
626 assert!(quote_tick.bid_size.as_f64() > 0.0);
627 assert!(quote_tick.ask_size.as_f64() > 0.0);
628 }
629
630 #[rstest]
631 fn test_parse_trade_tick() {
632 let json = load_test_json("ws_trade_update.json");
633 let message: KrakenWsMessage = serde_json::from_str(&json).unwrap();
634 let trade: KrakenWsTradeData = serde_json::from_value(message.data[0].clone()).unwrap();
635
636 let instrument = create_mock_instrument();
637 let trade_tick = parse_trade_tick(&trade, &instrument, TS).unwrap();
638
639 assert_eq!(trade_tick.instrument_id, instrument.id());
640 assert!(trade_tick.price.as_f64() > 0.0);
641 assert!(trade_tick.size.as_f64() > 0.0);
642 assert!(matches!(
643 trade_tick.aggressor_side,
644 AggressorSide::Buyer | AggressorSide::Seller
645 ));
646 }
647
648 #[rstest]
649 fn test_parse_book_deltas_snapshot() {
650 let json = load_test_json("ws_book_snapshot.json");
651 let message: KrakenWsMessage = serde_json::from_str(&json).unwrap();
652 let book: KrakenWsBookData = serde_json::from_value(message.data[0].clone()).unwrap();
653
654 let instrument = create_mock_instrument();
655 let deltas = parse_book_deltas(&book, &instrument, 1, TS).unwrap();
656
657 assert!(!deltas.is_empty());
658
659 let bid_count = deltas
661 .iter()
662 .filter(|d| d.order.side == OrderSide::Buy)
663 .count();
664 let ask_count = deltas
665 .iter()
666 .filter(|d| d.order.side == OrderSide::Sell)
667 .count();
668
669 assert!(bid_count > 0);
670 assert!(ask_count > 0);
671
672 let first_delta = &deltas[0];
674 assert_eq!(first_delta.instrument_id, instrument.id());
675 assert!(first_delta.order.price.as_f64() > 0.0);
676 assert!(first_delta.order.size.as_f64() > 0.0);
677 }
678
679 #[rstest]
680 fn test_parse_book_deltas_update() {
681 let json = load_test_json("ws_book_update.json");
682 let message: KrakenWsMessage = serde_json::from_str(&json).unwrap();
683 let book: KrakenWsBookData = serde_json::from_value(message.data[0].clone()).unwrap();
684
685 let instrument = create_mock_instrument();
686 let deltas = parse_book_deltas(&book, &instrument, 1, TS).unwrap();
687
688 assert!(!deltas.is_empty());
689
690 let first_delta = &deltas[0];
692 assert_eq!(first_delta.instrument_id, instrument.id());
693 assert!(first_delta.order.price.as_f64() > 0.0);
694 }
695
696 #[rstest]
697 fn test_parse_rfc3339_timestamp() {
698 let timestamp = "2023-10-06T17:35:55.440295Z";
699 let result = parse_rfc3339_timestamp(timestamp, "test").unwrap();
700 assert!(result.as_u64() > 0);
701 }
702
703 #[rstest]
704 fn test_parse_ws_bar() {
705 let json = load_test_json("ws_ohlc_update.json");
706 let message: KrakenWsMessage = serde_json::from_str(&json).unwrap();
707 let ohlc: KrakenWsOhlcData = serde_json::from_value(message.data[0].clone()).unwrap();
708
709 let instrument = create_mock_instrument();
710 let bar = parse_ws_bar(&ohlc, &instrument, TS).unwrap();
711
712 assert_eq!(bar.bar_type.instrument_id(), instrument.id());
713 assert!(bar.open.as_f64() > 0.0);
714 assert!(bar.high.as_f64() > 0.0);
715 assert!(bar.low.as_f64() > 0.0);
716 assert!(bar.close.as_f64() > 0.0);
717 assert!(bar.volume.as_f64() > 0.0);
718
719 let spec = bar.bar_type.spec();
720 assert_eq!(spec.step.get(), 1);
721 assert_eq!(spec.aggregation, BarAggregation::Minute);
722 assert_eq!(spec.price_type, PriceType::Last);
723
724 let expected_close = ohlc.interval_begin + chrono::Duration::minutes(1);
727 let expected_ts_event =
728 UnixNanos::from(expected_close.timestamp_nanos_opt().unwrap() as u64);
729 assert_eq!(bar.ts_event, expected_ts_event);
730 }
731
732 #[rstest]
733 fn test_interval_to_bar_spec() {
734 let test_cases = [
735 (1, 1, BarAggregation::Minute),
736 (5, 5, BarAggregation::Minute),
737 (15, 15, BarAggregation::Minute),
738 (30, 30, BarAggregation::Minute),
739 (60, 1, BarAggregation::Hour),
740 (240, 4, BarAggregation::Hour),
741 (1440, 1, BarAggregation::Day),
742 (10080, 1, BarAggregation::Week),
743 (21600, 15, BarAggregation::Day), ];
745
746 for (interval, expected_step, expected_aggregation) in test_cases {
747 let spec = interval_to_bar_spec(interval).unwrap();
748 assert_eq!(
749 spec.step.get(),
750 expected_step,
751 "Failed for interval {interval}"
752 );
753 assert_eq!(
754 spec.aggregation, expected_aggregation,
755 "Failed for interval {interval}"
756 );
757 assert_eq!(spec.price_type, PriceType::Last);
758 }
759 }
760
761 #[rstest]
762 fn test_interval_to_bar_spec_invalid() {
763 let result = interval_to_bar_spec(999);
764 assert!(result.is_err());
765 }
766}