1use std::str::FromStr;
19
20use anyhow::Context;
21use nautilus_core::{
22 datetime::NANOSECONDS_IN_MILLISECOND, nanos::UnixNanos,
23 parsing::min_increment_precision_from_str, uuid::UUID4,
24};
25use nautilus_model::{
26 data::{Bar, BarType, TradeTick},
27 enums::{
28 AggressorSide, BarAggregation, ContingencyType, LiquiditySide, OrderStatus, OrderType,
29 PositionSideSpecified, TimeInForce, TrailingOffsetType, TriggerType,
30 },
31 identifiers::{AccountId, InstrumentId, Symbol, TradeId, VenueOrderId},
32 instruments::{
33 Instrument, any::InstrumentAny, crypto_perpetual::CryptoPerpetual,
34 currency_pair::CurrencyPair,
35 },
36 reports::{FillReport, OrderStatusReport, PositionStatusReport},
37 types::{Currency, Money, Price, Quantity, fixed::FIXED_PRECISION},
38};
39use rust_decimal::Decimal;
40use rust_decimal_macros::dec;
41
42use crate::{
43 common::{
44 consts::KRAKEN_VENUE,
45 enums::{
46 KrakenFillType, KrakenInstrumentType, KrakenPositionSide, KrakenSpotTrigger,
47 KrakenTriggerSignal,
48 },
49 },
50 http::models::{
51 AssetPairInfo, FuturesFill, FuturesInstrument, FuturesOpenOrder, FuturesOrderEvent,
52 FuturesPosition, FuturesPublicExecution, OhlcData, SpotOrder, SpotTrade,
53 },
54};
55
56pub fn parse_decimal(value: &str) -> anyhow::Result<Decimal> {
58 if value.is_empty() || value == "0" {
59 return Ok(dec!(0));
60 }
61 value
62 .parse::<Decimal>()
63 .map_err(|e| anyhow::anyhow!("Failed to parse decimal '{value}': {e}"))
64}
65
66fn parse_rfc3339_timestamp(value: &str, field: &str) -> anyhow::Result<UnixNanos> {
67 value
68 .parse::<UnixNanos>()
69 .map_err(|e| anyhow::anyhow!("Failed to parse {field}='{value}': {e}"))
70}
71
72#[inline]
77pub fn normalize_currency_code(code: &str) -> &str {
78 code.strip_prefix("X")
79 .or_else(|| code.strip_prefix("Z"))
80 .unwrap_or(code)
81}
82
83pub fn parse_decimal_opt(value: Option<&str>) -> anyhow::Result<Option<Decimal>> {
85 match value {
86 Some(s) if !s.is_empty() && s != "0" => Ok(Some(parse_decimal(s)?)),
87 _ => Ok(None),
88 }
89}
90
91fn parse_trigger_type(
93 order_type: OrderType,
94 trigger: Option<KrakenSpotTrigger>,
95) -> Option<TriggerType> {
96 let is_conditional = matches!(
97 order_type,
98 OrderType::StopMarket
99 | OrderType::StopLimit
100 | OrderType::MarketIfTouched
101 | OrderType::LimitIfTouched
102 );
103
104 if !is_conditional {
105 return None;
106 }
107
108 match trigger {
109 Some(KrakenSpotTrigger::Last) => Some(TriggerType::LastPrice),
110 Some(KrakenSpotTrigger::Index) => Some(TriggerType::IndexPrice),
111 None => Some(TriggerType::Default),
112 }
113}
114
115fn parse_futures_trigger_type(
117 order_type: OrderType,
118 trigger_signal: Option<KrakenTriggerSignal>,
119) -> Option<TriggerType> {
120 let is_conditional = matches!(
121 order_type,
122 OrderType::StopMarket
123 | OrderType::StopLimit
124 | OrderType::MarketIfTouched
125 | OrderType::LimitIfTouched
126 );
127
128 if !is_conditional {
129 return None;
130 }
131
132 match trigger_signal {
133 Some(KrakenTriggerSignal::Last) => Some(TriggerType::LastPrice),
134 Some(KrakenTriggerSignal::Mark) => Some(TriggerType::MarkPrice),
135 Some(KrakenTriggerSignal::Index) => Some(TriggerType::IndexPrice),
136 None => Some(TriggerType::Default),
137 }
138}
139
140pub fn parse_spot_instrument(
149 pair_name: &str,
150 definition: &AssetPairInfo,
151 ts_event: UnixNanos,
152 ts_init: UnixNanos,
153) -> anyhow::Result<InstrumentAny> {
154 let symbol_str = definition.wsname.as_ref().unwrap_or(&definition.altname);
155 let instrument_id = InstrumentId::new(Symbol::new(symbol_str.as_str()), *KRAKEN_VENUE);
156 let raw_symbol = Symbol::new(pair_name);
157
158 let base_currency = get_currency(definition.base.as_str());
159 let quote_currency = get_currency(definition.quote.as_str());
160
161 let price_increment = parse_price(
162 definition
163 .tick_size
164 .as_ref()
165 .context("tick_size is required")?,
166 "tick_size",
167 )?;
168
169 let size_precision = definition.lot_decimals;
171 let size_increment = Quantity::new(10.0_f64.powi(-(size_precision as i32)), size_precision);
172
173 let min_quantity = definition
174 .ordermin
175 .as_ref()
176 .map(|s| parse_quantity(s, "ordermin"))
177 .transpose()?;
178
179 let taker_fee = definition
181 .fees
182 .first()
183 .map(|(_, fee)| Decimal::try_from(*fee))
184 .transpose()
185 .context("Failed to parse taker fee")?
186 .map(|f| f / dec!(100));
187
188 let maker_fee = definition
189 .fees_maker
190 .first()
191 .map(|(_, fee)| Decimal::try_from(*fee))
192 .transpose()
193 .context("Failed to parse maker fee")?
194 .map(|f| f / dec!(100));
195
196 let instrument = CurrencyPair::new(
197 instrument_id,
198 raw_symbol,
199 base_currency,
200 quote_currency,
201 price_increment.precision,
202 size_increment.precision,
203 price_increment,
204 size_increment,
205 None,
206 None,
207 None,
208 min_quantity,
209 None,
210 None,
211 None,
212 None,
213 maker_fee,
214 taker_fee,
215 None,
216 None,
217 ts_event,
218 ts_init,
219 );
220
221 Ok(InstrumentAny::CurrencyPair(instrument))
222}
223
224pub fn parse_futures_instrument(
233 instrument: &FuturesInstrument,
234 ts_event: UnixNanos,
235 ts_init: UnixNanos,
236) -> anyhow::Result<InstrumentAny> {
237 let instrument_id = InstrumentId::new(Symbol::new(&instrument.symbol), *KRAKEN_VENUE);
238 let raw_symbol = Symbol::new(&instrument.symbol);
239
240 let base_currency = get_currency(&instrument.base);
241 let quote_currency = get_currency(&instrument.quote);
242
243 let is_inverse = instrument.instrument_type == KrakenInstrumentType::FuturesInverse;
244 let settlement_currency = if is_inverse {
245 base_currency
246 } else {
247 quote_currency
248 };
249
250 let tick_size = instrument.tick_size;
253 let price_precision = min_increment_precision_from_str(&tick_size.to_string());
254 if price_precision > FIXED_PRECISION {
255 anyhow::bail!(
256 "Cannot parse instrument '{}': tick_size {tick_size} requires precision {price_precision} \
257 which exceeds FIXED_PRECISION ({FIXED_PRECISION})",
258 instrument.symbol
259 );
260 }
261 let price_increment = Price::new(tick_size, price_precision);
262
263 let (_size_precision, size_increment) = if instrument.contract_value_trade_precision >= 0 {
268 let precision = instrument.contract_value_trade_precision as u8;
269 let increment = Quantity::new(10.0_f64.powi(-(precision as i32)), precision);
270 (precision, increment)
271 } else {
272 let increment_value = 10.0_f64.powi(-instrument.contract_value_trade_precision);
274 (0, Quantity::new(increment_value, 0))
275 };
276
277 let multiplier_precision = if instrument.contract_size.fract() == 0.0 {
278 0
279 } else {
280 instrument
281 .contract_size
282 .to_string()
283 .split('.')
284 .nth(1)
285 .map_or(0, |s| s.len() as u8)
286 };
287 let multiplier = Some(Quantity::new(
288 instrument.contract_size,
289 multiplier_precision,
290 ));
291
292 let (margin_init, margin_maint) = instrument
294 .margin_levels
295 .first()
296 .and_then(|level| {
297 let init = Decimal::try_from(level.initial_margin).ok()?;
298 let maint = Decimal::try_from(level.maintenance_margin).ok()?;
299 Some((Some(init), Some(maint)))
300 })
301 .unwrap_or((None, None));
302
303 let instrument = CryptoPerpetual::new(
304 instrument_id,
305 raw_symbol,
306 base_currency,
307 quote_currency,
308 settlement_currency,
309 is_inverse,
310 price_increment.precision,
311 size_increment.precision,
312 price_increment,
313 size_increment,
314 multiplier,
315 None, None, None, None, None, None, None, margin_init,
323 margin_maint,
324 None, None, ts_event,
327 ts_init,
328 );
329
330 Ok(InstrumentAny::CryptoPerpetual(instrument))
331}
332
333fn parse_price(value: &str, field: &str) -> anyhow::Result<Price> {
334 Price::from_str(value)
335 .map_err(|err| anyhow::anyhow!("Failed to parse {field}='{value}': {err}"))
336}
337
338fn parse_quantity(value: &str, field: &str) -> anyhow::Result<Quantity> {
339 Quantity::from_str(value)
340 .map_err(|err| anyhow::anyhow!("Failed to parse {field}='{value}': {err}"))
341}
342
343pub fn get_currency(code: &str) -> Currency {
348 Currency::get_or_create_crypto(code)
349}
350
351pub fn parse_trade_tick_from_array(
362 trade_array: &[serde_json::Value],
363 instrument: &InstrumentAny,
364 ts_init: UnixNanos,
365) -> anyhow::Result<TradeTick> {
366 let price_str = trade_array
367 .first()
368 .and_then(|v| v.as_str())
369 .context("Missing or invalid price")?;
370 let price = parse_price_with_precision(price_str, instrument.price_precision(), "trade.price")?;
371
372 let size_str = trade_array
373 .get(1)
374 .and_then(|v| v.as_str())
375 .context("Missing or invalid volume")?;
376 let size = parse_quantity_with_precision(size_str, instrument.size_precision(), "trade.size")?;
377
378 let time = trade_array
379 .get(2)
380 .and_then(|v| v.as_f64())
381 .context("Missing or invalid timestamp")?;
382 let ts_event = parse_millis_timestamp(time, "trade.time")?;
383
384 let side_str = trade_array
385 .get(3)
386 .and_then(|v| v.as_str())
387 .context("Missing or invalid side")?;
388 let aggressor = match side_str {
389 "b" => AggressorSide::Buyer,
390 "s" => AggressorSide::Seller,
391 _ => AggressorSide::NoAggressor,
392 };
393
394 let trade_id_value = trade_array.get(6).context("Missing trade_id")?;
395 let trade_id = if let Some(id) = trade_id_value.as_i64() {
396 TradeId::new_checked(id.to_string())?
397 } else if let Some(id_str) = trade_id_value.as_str() {
398 TradeId::new_checked(id_str)?
399 } else {
400 anyhow::bail!("Invalid trade_id format");
401 };
402
403 TradeTick::new_checked(
404 instrument.id(),
405 price,
406 size,
407 aggressor,
408 trade_id,
409 ts_event,
410 ts_init,
411 )
412 .context("Failed to construct TradeTick from Kraken trade")
413}
414
415pub fn parse_futures_public_execution(
423 execution: &FuturesPublicExecution,
424 instrument: &InstrumentAny,
425 ts_init: UnixNanos,
426) -> anyhow::Result<TradeTick> {
427 let price =
428 parse_price_with_precision(&execution.price, instrument.price_precision(), "price")?;
429 let size = parse_quantity_with_precision(
430 &execution.quantity,
431 instrument.size_precision(),
432 "quantity",
433 )?;
434
435 let ts_event = UnixNanos::from((execution.timestamp as u64) * 1_000_000);
437
438 let aggressor = match execution.taker_order.direction.to_lowercase().as_str() {
440 "buy" => AggressorSide::Buyer,
441 "sell" => AggressorSide::Seller,
442 _ => AggressorSide::NoAggressor,
443 };
444
445 let trade_id = TradeId::new_checked(&execution.uid)?;
446
447 TradeTick::new_checked(
448 instrument.id(),
449 price,
450 size,
451 aggressor,
452 trade_id,
453 ts_event,
454 ts_init,
455 )
456 .context("Failed to construct TradeTick from Kraken futures execution")
457}
458
459pub fn parse_bar(
467 ohlc: &OhlcData,
468 instrument: &InstrumentAny,
469 bar_type: BarType,
470 ts_init: UnixNanos,
471) -> anyhow::Result<Bar> {
472 let price_precision = instrument.price_precision();
473 let size_precision = instrument.size_precision();
474
475 let open = parse_price_with_precision(&ohlc.open, price_precision, "ohlc.open")?;
476 let high = parse_price_with_precision(&ohlc.high, price_precision, "ohlc.high")?;
477 let low = parse_price_with_precision(&ohlc.low, price_precision, "ohlc.low")?;
478 let close = parse_price_with_precision(&ohlc.close, price_precision, "ohlc.close")?;
479 let volume = parse_quantity_with_precision(&ohlc.volume, size_precision, "ohlc.volume")?;
480
481 let ts_event = UnixNanos::from((ohlc.time as u64) * 1_000_000_000);
482
483 Bar::new_checked(bar_type, open, high, low, close, volume, ts_event, ts_init)
484 .context("Failed to construct Bar from Kraken OHLC")
485}
486
487fn parse_price_with_precision(value: &str, precision: u8, field: &str) -> anyhow::Result<Price> {
488 let parsed = value
489 .parse::<f64>()
490 .with_context(|| format!("Failed to parse {field}='{value}' as f64"))?;
491 Price::new_checked(parsed, precision).with_context(|| {
492 format!("Failed to construct Price for {field} with precision {precision}")
493 })
494}
495
496fn parse_quantity_with_precision(
497 value: &str,
498 precision: u8,
499 field: &str,
500) -> anyhow::Result<Quantity> {
501 let parsed = value
502 .parse::<f64>()
503 .with_context(|| format!("Failed to parse {field}='{value}' as f64"))?;
504 Quantity::new_checked(parsed, precision).with_context(|| {
505 format!("Failed to construct Quantity for {field} with precision {precision}")
506 })
507}
508
509pub fn parse_millis_timestamp(value: f64, field: &str) -> anyhow::Result<UnixNanos> {
510 let millis = (value * 1000.0) as u64;
511 let nanos = millis
512 .checked_mul(NANOSECONDS_IN_MILLISECOND)
513 .with_context(|| format!("{field} timestamp overflowed when converting to nanoseconds"))?;
514 Ok(UnixNanos::from(nanos))
515}
516
517pub fn parse_order_status_report(
525 order_id: &str,
526 order: &SpotOrder,
527 instrument: &InstrumentAny,
528 account_id: AccountId,
529 ts_init: UnixNanos,
530) -> anyhow::Result<OrderStatusReport> {
531 let instrument_id = instrument.id();
532 let venue_order_id = VenueOrderId::new(order_id);
533
534 let order_side = order.descr.order_side.into();
535 let order_type = order.descr.ordertype.into();
536 let order_status = order.status.into();
537
538 let has_expiration = order.expiretm.is_some_and(|t| t > 0.0);
540 let time_in_force = if has_expiration {
541 TimeInForce::Gtd
542 } else if order.oflags.contains("ioc") {
543 TimeInForce::Ioc
544 } else {
545 TimeInForce::Gtc
546 };
547
548 let quantity =
549 parse_quantity_with_precision(&order.vol, instrument.size_precision(), "order.vol")?;
550
551 let filled_qty = parse_quantity_with_precision(
552 &order.vol_exec,
553 instrument.size_precision(),
554 "order.vol_exec",
555 )?;
556
557 let ts_accepted = parse_millis_timestamp(order.opentm, "order.opentm")?;
558
559 let ts_last = order
560 .closetm
561 .map(|t| parse_millis_timestamp(t, "order.closetm"))
562 .transpose()?
563 .unwrap_or(ts_accepted);
564
565 let price = if !order.price.is_empty() && order.price != "0" {
566 Some(parse_price_with_precision(
567 &order.price,
568 instrument.price_precision(),
569 "order.price",
570 )?)
571 } else {
572 None
573 };
574
575 let trigger_price = order
576 .stopprice
577 .as_ref()
578 .and_then(|p| {
579 if !p.is_empty() && p != "0" {
580 Some(parse_price_with_precision(
581 p,
582 instrument.price_precision(),
583 "order.stopprice",
584 ))
585 } else {
586 None
587 }
588 })
589 .transpose()?;
590
591 let expire_time = if has_expiration {
592 order
593 .expiretm
594 .map(|t| parse_millis_timestamp(t, "order.expiretm"))
595 .transpose()?
596 } else {
597 None
598 };
599
600 let trigger_type = parse_trigger_type(order_type, order.trigger);
601
602 Ok(OrderStatusReport {
603 account_id,
604 instrument_id,
605 client_order_id: None,
606 venue_order_id,
607 order_side,
608 order_type,
609 time_in_force,
610 order_status,
611 quantity,
612 filled_qty,
613 report_id: UUID4::new(),
614 ts_accepted,
615 ts_last,
616 ts_init,
617 order_list_id: None,
618 venue_position_id: None,
619 linked_order_ids: None,
620 parent_order_id: None,
621 contingency_type: ContingencyType::NoContingency,
622 expire_time,
623 price,
624 trigger_price,
625 trigger_type,
626 limit_offset: None,
627 trailing_offset: None,
628 trailing_offset_type: TrailingOffsetType::NoTrailingOffset,
629 display_qty: None,
630 avg_px: compute_avg_px(order),
631 post_only: order.oflags.contains("post"),
632 reduce_only: false,
633 cancel_reason: order.reason.clone(),
634 ts_triggered: None,
635 })
636}
637
638fn compute_avg_px(order: &SpotOrder) -> Option<Decimal> {
642 if let Some(ref avg) = order.avg_price
643 && let Ok(v) = parse_decimal(avg)
644 && v > dec!(0)
645 {
646 return Some(v);
647 }
648
649 let cost = parse_decimal(&order.cost);
650 let vol_exec = parse_decimal(&order.vol_exec);
651 match (&cost, &vol_exec) {
652 (Ok(c), Ok(v)) if *v > dec!(0) => Some(*c / *v),
653 _ => {
654 if let Ok(v) = &vol_exec
655 && *v > dec!(0)
656 {
657 tracing::warn!(
658 "Cannot compute avg_px: cost={:?}, vol_exec={:?}",
659 cost,
660 vol_exec
661 );
662 }
663 None
664 }
665 }
666}
667
668pub fn parse_fill_report(
675 trade_id: &str,
676 trade: &SpotTrade,
677 instrument: &InstrumentAny,
678 account_id: AccountId,
679 ts_init: UnixNanos,
680) -> anyhow::Result<FillReport> {
681 let instrument_id = instrument.id();
682 let venue_order_id = VenueOrderId::new(&trade.ordertxid);
683 let trade_id_obj = TradeId::new(trade_id);
684
685 let order_side = trade.trade_type.into();
686
687 let last_qty =
688 parse_quantity_with_precision(&trade.vol, instrument.size_precision(), "trade.vol")?;
689
690 let last_px =
691 parse_price_with_precision(&trade.price, instrument.price_precision(), "trade.price")?;
692
693 let fee_decimal = parse_decimal(&trade.fee)?;
694 let quote_currency = match instrument {
695 InstrumentAny::CurrencyPair(pair) => pair.quote_currency,
696 InstrumentAny::CryptoPerpetual(perp) => perp.quote_currency,
697 _ => anyhow::bail!("Unsupported instrument type for fill report"),
698 };
699
700 let fee_f64 = fee_decimal
701 .try_into()
702 .context("Failed to convert fee to f64")?;
703 let commission = Money::new(fee_f64, quote_currency);
704
705 let liquidity_side = match trade.maker {
706 Some(true) => LiquiditySide::Maker,
707 Some(false) => LiquiditySide::Taker,
708 None => LiquiditySide::NoLiquiditySide,
709 };
710
711 let ts_event = parse_millis_timestamp(trade.time, "trade.time")?;
712
713 Ok(FillReport {
714 account_id,
715 instrument_id,
716 venue_order_id,
717 trade_id: trade_id_obj,
718 order_side,
719 last_qty,
720 last_px,
721 commission,
722 liquidity_side,
723 report_id: UUID4::new(),
724 ts_event,
725 ts_init,
726 client_order_id: None,
727 venue_position_id: None,
728 })
729}
730
731pub fn parse_futures_order_status_report(
737 order: &FuturesOpenOrder,
738 instrument: &InstrumentAny,
739 account_id: AccountId,
740 ts_init: UnixNanos,
741) -> anyhow::Result<OrderStatusReport> {
742 let instrument_id = instrument.id();
743 let venue_order_id = VenueOrderId::new(&order.order_id);
744
745 let order_side = order.side.into();
746 let order_type = order.order_type.into();
747 let order_status = order.status.into();
748
749 let quantity = Quantity::new(
750 order.unfilled_size + order.filled_size,
751 instrument.size_precision(),
752 );
753
754 let filled_qty = Quantity::new(order.filled_size, instrument.size_precision());
755
756 let ts_accepted = parse_rfc3339_timestamp(&order.received_time, "order.received_time")?;
757 let ts_last = parse_rfc3339_timestamp(&order.last_update_time, "order.last_update_time")?;
758
759 let price = order
760 .limit_price
761 .map(|p| Price::new(p, instrument.price_precision()));
762
763 let trigger_price = order
764 .stop_price
765 .map(|p| Price::new(p, instrument.price_precision()));
766
767 let trigger_type = parse_futures_trigger_type(order_type, order.trigger_signal);
768
769 Ok(OrderStatusReport {
770 account_id,
771 instrument_id,
772 client_order_id: order.cli_ord_id.as_ref().map(|s| s.as_str().into()),
773 venue_order_id,
774 order_side,
775 order_type,
776 time_in_force: TimeInForce::Gtc,
777 order_status,
778 quantity,
779 filled_qty,
780 report_id: UUID4::new(),
781 ts_accepted,
782 ts_last,
783 ts_init,
784 order_list_id: None,
785 venue_position_id: None,
786 linked_order_ids: None,
787 parent_order_id: None,
788 contingency_type: ContingencyType::NoContingency,
789 expire_time: None,
790 price,
791 trigger_price,
792 trigger_type,
793 limit_offset: None,
794 trailing_offset: None,
795 trailing_offset_type: TrailingOffsetType::NoTrailingOffset,
796 display_qty: None,
797 avg_px: None,
798 post_only: false,
799 reduce_only: order.reduce_only.unwrap_or(false),
800 cancel_reason: None,
801 ts_triggered: None,
802 })
803}
804
805pub fn parse_futures_order_event_status_report(
811 event: &FuturesOrderEvent,
812 instrument: &InstrumentAny,
813 account_id: AccountId,
814 ts_init: UnixNanos,
815) -> anyhow::Result<OrderStatusReport> {
816 let instrument_id = instrument.id();
817 let venue_order_id = VenueOrderId::new(&event.order_id);
818
819 let order_side = event.side.into();
820 let order_type = event.order_type.into();
821
822 let order_status = if event.filled >= event.quantity {
824 OrderStatus::Filled
825 } else if event.filled > 0.0 {
826 OrderStatus::PartiallyFilled
827 } else {
828 OrderStatus::Canceled
829 };
830
831 let quantity = Quantity::new(event.quantity, instrument.size_precision());
832 let filled_qty = Quantity::new(event.filled, instrument.size_precision());
833
834 let ts_accepted = parse_rfc3339_timestamp(&event.timestamp, "event.timestamp")?;
835 let ts_last =
836 parse_rfc3339_timestamp(&event.last_update_timestamp, "event.last_update_timestamp")?;
837
838 let price = event
839 .limit_price
840 .map(|p| Price::new(p, instrument.price_precision()));
841
842 let trigger_price = event
843 .stop_price
844 .map(|p| Price::new(p, instrument.price_precision()));
845
846 let trigger_type = parse_futures_trigger_type(order_type, None);
849
850 Ok(OrderStatusReport {
851 account_id,
852 instrument_id,
853 client_order_id: event.cli_ord_id.as_ref().map(|s| s.as_str().into()),
854 venue_order_id,
855 order_side,
856 order_type,
857 time_in_force: TimeInForce::Gtc,
858 order_status,
859 quantity,
860 filled_qty,
861 report_id: UUID4::new(),
862 ts_accepted,
863 ts_last,
864 ts_init,
865 order_list_id: None,
866 venue_position_id: None,
867 linked_order_ids: None,
868 parent_order_id: None,
869 contingency_type: ContingencyType::NoContingency,
870 expire_time: None,
871 price,
872 trigger_price,
873 trigger_type,
874 limit_offset: None,
875 trailing_offset: None,
876 trailing_offset_type: TrailingOffsetType::NoTrailingOffset,
877 display_qty: None,
878 avg_px: None,
879 post_only: false,
880 reduce_only: event.reduce_only,
881 cancel_reason: None,
882 ts_triggered: None,
883 })
884}
885
886pub fn parse_futures_fill_report(
892 fill: &FuturesFill,
893 instrument: &InstrumentAny,
894 account_id: AccountId,
895 ts_init: UnixNanos,
896) -> anyhow::Result<FillReport> {
897 let instrument_id = instrument.id();
898 let venue_order_id = VenueOrderId::new(&fill.order_id);
899 let trade_id = TradeId::new(&fill.fill_id);
900
901 let order_side = fill.side.into();
902
903 let last_qty = Quantity::new(fill.size, instrument.size_precision());
904 let last_px = Price::new(fill.price, instrument.price_precision());
905
906 let quote_currency = match instrument {
907 InstrumentAny::CryptoPerpetual(perp) => perp.quote_currency,
908 InstrumentAny::CryptoFuture(future) => future.quote_currency,
909 _ => anyhow::bail!("Unsupported instrument type for futures fill report"),
910 };
911
912 let fee_f64 = fill.fee_paid.unwrap_or(0.0);
913 let commission = Money::new(fee_f64, quote_currency);
914
915 let liquidity_side = match fill.fill_type {
916 KrakenFillType::Maker => LiquiditySide::Maker,
917 KrakenFillType::Taker => LiquiditySide::Taker,
918 };
919
920 let ts_event = parse_rfc3339_timestamp(&fill.fill_time, "fill.fill_time")?;
921
922 Ok(FillReport {
923 account_id,
924 instrument_id,
925 venue_order_id,
926 trade_id,
927 order_side,
928 last_qty,
929 last_px,
930 commission,
931 liquidity_side,
932 report_id: UUID4::new(),
933 ts_event,
934 ts_init,
935 client_order_id: fill.cli_ord_id.as_ref().map(|s| s.as_str().into()),
936 venue_position_id: None,
937 })
938}
939
940pub fn parse_futures_position_status_report(
946 position: &FuturesPosition,
947 instrument: &InstrumentAny,
948 account_id: AccountId,
949 ts_init: UnixNanos,
950) -> anyhow::Result<PositionStatusReport> {
951 let instrument_id = instrument.id();
952
953 let position_side = match position.side {
954 KrakenPositionSide::Long => PositionSideSpecified::Long,
955 KrakenPositionSide::Short => PositionSideSpecified::Short,
956 };
957
958 let quantity = Quantity::new(position.size, instrument.size_precision());
959 let signed_decimal_qty = match position_side {
960 PositionSideSpecified::Long => Decimal::from_f64_retain(position.size).unwrap_or(dec!(0)),
961 PositionSideSpecified::Short => -Decimal::from_f64_retain(position.size).unwrap_or(dec!(0)),
962 PositionSideSpecified::Flat => dec!(0),
963 };
964
965 let avg_px_open = Some(Decimal::from_f64_retain(position.price).unwrap_or(dec!(0)));
966
967 Ok(PositionStatusReport {
968 account_id,
969 instrument_id,
970 position_side,
971 quantity,
972 signed_decimal_qty,
973 report_id: UUID4::new(),
974 ts_last: ts_init,
975 ts_init,
976 venue_position_id: None,
977 avg_px_open,
978 })
979}
980
981pub fn bar_type_to_spot_interval(bar_type: BarType) -> anyhow::Result<u32> {
989 let step = bar_type.spec().step.get() as u32;
990 let base_interval = match bar_type.spec().aggregation {
991 BarAggregation::Minute => 1,
992 BarAggregation::Hour => 60,
993 BarAggregation::Day => 1440,
994 other => {
995 anyhow::bail!("Unsupported bar aggregation for Kraken Spot: {other:?}");
996 }
997 };
998 Ok(base_interval * step)
999}
1000
1001pub fn bar_type_to_futures_resolution(bar_type: BarType) -> anyhow::Result<&'static str> {
1011 let step = bar_type.spec().step.get() as u32;
1012 match bar_type.spec().aggregation {
1013 BarAggregation::Minute => match step {
1014 1 => Ok("1m"),
1015 5 => Ok("5m"),
1016 15 => Ok("15m"),
1017 _ => anyhow::bail!("Unsupported minute step for Kraken Futures: {step}"),
1018 },
1019 BarAggregation::Hour => match step {
1020 1 => Ok("1h"),
1021 4 => Ok("4h"),
1022 12 => Ok("12h"),
1023 _ => anyhow::bail!("Unsupported hour step for Kraken Futures: {step}"),
1024 },
1025 BarAggregation::Day => {
1026 if step == 1 {
1027 Ok("1d")
1028 } else {
1029 anyhow::bail!("Unsupported day step for Kraken Futures: {step}")
1030 }
1031 }
1032 BarAggregation::Week => {
1033 if step == 1 {
1034 Ok("1w")
1035 } else {
1036 anyhow::bail!("Unsupported week step for Kraken Futures: {step}")
1037 }
1038 }
1039 other => {
1040 anyhow::bail!("Unsupported bar aggregation for Kraken Futures: {other:?}");
1041 }
1042 }
1043}
1044
1045#[cfg(test)]
1046mod tests {
1047 use indexmap::IndexMap;
1048 use nautilus_model::{
1049 data::BarSpecification,
1050 enums::{AggregationSource, BarAggregation, OrderStatus, PriceType},
1051 };
1052 use rstest::rstest;
1053
1054 use super::*;
1055 use crate::http::models::AssetPairsResponse;
1056
1057 const TS: UnixNanos = UnixNanos::new(1_700_000_000_000_000_000);
1058
1059 fn load_test_json(filename: &str) -> String {
1060 let path = format!("test_data/{filename}");
1061 std::fs::read_to_string(&path)
1062 .unwrap_or_else(|e| panic!("Failed to load test data from {path}: {e}"))
1063 }
1064
1065 #[rstest]
1066 fn test_parse_decimal() {
1067 assert_eq!(parse_decimal("123.45").unwrap(), dec!(123.45));
1068 assert_eq!(parse_decimal("0").unwrap(), dec!(0));
1069 assert_eq!(parse_decimal("").unwrap(), dec!(0));
1070 }
1071
1072 #[rstest]
1073 fn test_parse_decimal_opt() {
1074 assert_eq!(
1075 parse_decimal_opt(Some("123.45")).unwrap(),
1076 Some(dec!(123.45))
1077 );
1078 assert_eq!(parse_decimal_opt(Some("0")).unwrap(), None);
1079 assert_eq!(parse_decimal_opt(Some("")).unwrap(), None);
1080 assert_eq!(parse_decimal_opt(None).unwrap(), None);
1081 }
1082
1083 #[rstest]
1084 fn test_parse_spot_instrument() {
1085 let json = load_test_json("http_asset_pairs.json");
1086 let wrapper: serde_json::Value = serde_json::from_str(&json).unwrap();
1087 let result = wrapper.get("result").unwrap();
1088 let pairs: AssetPairsResponse = serde_json::from_value(result.clone()).unwrap();
1089
1090 let (pair_name, definition) = pairs.iter().next().unwrap();
1091
1092 let instrument = parse_spot_instrument(pair_name, definition, TS, TS).unwrap();
1093
1094 match instrument {
1095 InstrumentAny::CurrencyPair(pair) => {
1096 assert_eq!(pair.id.venue.as_str(), "KRAKEN");
1097 assert_eq!(pair.base_currency.code.as_str(), "XXBT");
1098 assert_eq!(pair.quote_currency.code.as_str(), "USDT");
1099 assert!(pair.price_increment.as_f64() > 0.0);
1100 assert!(pair.size_increment.as_f64() > 0.0);
1101 assert!(pair.min_quantity.is_some());
1102 }
1103 _ => panic!("Expected CurrencyPair"),
1104 }
1105 }
1106
1107 #[rstest]
1108 fn test_parse_futures_instrument_inverse() {
1109 let json = load_test_json("http_futures_instruments.json");
1110 let response: crate::http::models::FuturesInstrumentsResponse =
1111 serde_json::from_str(&json).unwrap();
1112
1113 let fut_instrument = &response.instruments[0];
1114
1115 let instrument = parse_futures_instrument(fut_instrument, TS, TS).unwrap();
1116
1117 match instrument {
1118 InstrumentAny::CryptoPerpetual(perp) => {
1119 assert_eq!(perp.id.venue.as_str(), "KRAKEN");
1120 assert_eq!(perp.id.symbol.as_str(), "PI_XBTUSD");
1121 assert_eq!(perp.raw_symbol.as_str(), "PI_XBTUSD");
1122 assert_eq!(perp.base_currency.code.as_str(), "BTC");
1123 assert_eq!(perp.quote_currency.code.as_str(), "USD");
1124 assert_eq!(perp.settlement_currency.code.as_str(), "BTC");
1125 assert!(perp.is_inverse);
1126 assert_eq!(perp.price_increment.as_f64(), 0.5);
1127 assert_eq!(perp.size_increment.as_f64(), 1.0);
1128 assert_eq!(perp.size_precision(), 0);
1129 assert_eq!(perp.margin_init, dec!(0.02));
1130 assert_eq!(perp.margin_maint, dec!(0.01));
1131 }
1132 _ => panic!("Expected CryptoPerpetual"),
1133 }
1134 }
1135
1136 #[rstest]
1137 fn test_parse_futures_instrument_flexible() {
1138 let json = load_test_json("http_futures_instruments.json");
1139 let response: crate::http::models::FuturesInstrumentsResponse =
1140 serde_json::from_str(&json).unwrap();
1141
1142 let fut_instrument = &response.instruments[1];
1143
1144 let instrument = parse_futures_instrument(fut_instrument, TS, TS).unwrap();
1145
1146 match instrument {
1147 InstrumentAny::CryptoPerpetual(perp) => {
1148 assert_eq!(perp.id.venue.as_str(), "KRAKEN");
1149 assert_eq!(perp.id.symbol.as_str(), "PF_ETHUSD");
1150 assert_eq!(perp.raw_symbol.as_str(), "PF_ETHUSD");
1151 assert_eq!(perp.base_currency.code.as_str(), "ETH");
1152 assert_eq!(perp.quote_currency.code.as_str(), "USD");
1153 assert_eq!(perp.settlement_currency.code.as_str(), "USD");
1154 assert!(!perp.is_inverse);
1155 assert_eq!(perp.price_increment.as_f64(), 0.1);
1156 assert_eq!(perp.size_increment.as_f64(), 0.001);
1157 assert_eq!(perp.size_precision(), 3);
1158 assert_eq!(perp.margin_init, dec!(0.02));
1159 assert_eq!(perp.margin_maint, dec!(0.01));
1160 }
1161 _ => panic!("Expected CryptoPerpetual"),
1162 }
1163 }
1164
1165 #[rstest]
1168 fn test_parse_futures_instrument_negative_precision() {
1169 let json = load_test_json("http_futures_instruments.json");
1170 let response: crate::http::models::FuturesInstrumentsResponse =
1171 serde_json::from_str(&json).unwrap();
1172
1173 let fut_instrument = &response.instruments[2];
1175
1176 let instrument = parse_futures_instrument(fut_instrument, TS, TS).unwrap();
1177
1178 match instrument {
1179 InstrumentAny::CryptoPerpetual(perp) => {
1180 assert_eq!(perp.id.symbol.as_str(), "PF_PEPEUSD");
1181 assert_eq!(perp.base_currency.code.as_str(), "PEPE");
1182 assert!(!perp.is_inverse);
1183 assert_eq!(perp.size_increment.as_f64(), 1000.0);
1184 assert_eq!(perp.size_precision(), 0);
1185 }
1186 _ => panic!("Expected CryptoPerpetual"),
1187 }
1188 }
1189
1190 #[rstest]
1191 fn test_parse_trade_tick_from_array() {
1192 let json = load_test_json("http_trades.json");
1193 let wrapper: serde_json::Value = serde_json::from_str(&json).unwrap();
1194 let result = wrapper.get("result").unwrap();
1195 let trades_map = result.as_object().unwrap();
1196
1197 let (_pair, trades_value) = trades_map.iter().find(|(k, _)| *k != "last").unwrap();
1199 let trades = trades_value.as_array().unwrap();
1200 let trade_array = trades[0].as_array().unwrap();
1201
1202 let instrument_id = InstrumentId::new(Symbol::new("BTC/USD"), *KRAKEN_VENUE);
1204 let instrument = InstrumentAny::CurrencyPair(CurrencyPair::new(
1205 instrument_id,
1206 Symbol::new("XBTUSDT"),
1207 Currency::BTC(),
1208 Currency::USDT(),
1209 1, 8, Price::from("0.1"),
1212 Quantity::from("0.00000001"),
1213 None,
1214 None,
1215 None,
1216 None,
1217 None,
1218 None,
1219 None,
1220 None,
1221 None,
1222 None,
1223 None,
1224 None,
1225 TS,
1226 TS,
1227 ));
1228
1229 let trade_tick = parse_trade_tick_from_array(trade_array, &instrument, TS).unwrap();
1230
1231 assert_eq!(trade_tick.instrument_id, instrument_id);
1232 assert!(trade_tick.price.as_f64() > 0.0);
1233 assert!(trade_tick.size.as_f64() > 0.0);
1234 }
1235
1236 #[rstest]
1237 fn test_parse_bar() {
1238 let json = load_test_json("http_ohlc.json");
1239 let wrapper: serde_json::Value = serde_json::from_str(&json).unwrap();
1240 let result = wrapper.get("result").unwrap();
1241 let ohlc_map = result.as_object().unwrap();
1242
1243 let (_pair, ohlc_value) = ohlc_map.iter().find(|(k, _)| *k != "last").unwrap();
1245 let ohlcs = ohlc_value.as_array().unwrap();
1246
1247 let ohlc_array = ohlcs[0].as_array().unwrap();
1249 let ohlc = OhlcData {
1250 time: ohlc_array[0].as_i64().unwrap(),
1251 open: ohlc_array[1].as_str().unwrap().to_string(),
1252 high: ohlc_array[2].as_str().unwrap().to_string(),
1253 low: ohlc_array[3].as_str().unwrap().to_string(),
1254 close: ohlc_array[4].as_str().unwrap().to_string(),
1255 vwap: ohlc_array[5].as_str().unwrap().to_string(),
1256 volume: ohlc_array[6].as_str().unwrap().to_string(),
1257 count: ohlc_array[7].as_i64().unwrap(),
1258 };
1259
1260 let instrument_id = InstrumentId::new(Symbol::new("BTC/USD"), *KRAKEN_VENUE);
1262 let instrument = InstrumentAny::CurrencyPair(CurrencyPair::new(
1263 instrument_id,
1264 Symbol::new("XBTUSDT"),
1265 Currency::BTC(),
1266 Currency::USDT(),
1267 1, 8, Price::from("0.1"),
1270 Quantity::from("0.00000001"),
1271 None,
1272 None,
1273 None,
1274 None,
1275 None,
1276 None,
1277 None,
1278 None,
1279 None,
1280 None,
1281 None,
1282 None,
1283 TS,
1284 TS,
1285 ));
1286
1287 let bar_type = BarType::new(
1288 instrument_id,
1289 BarSpecification::new(1, BarAggregation::Minute, PriceType::Last),
1290 AggregationSource::External,
1291 );
1292
1293 let bar = parse_bar(&ohlc, &instrument, bar_type, TS).unwrap();
1294
1295 assert_eq!(bar.bar_type, bar_type);
1296 assert!(bar.open.as_f64() > 0.0);
1297 assert!(bar.high.as_f64() > 0.0);
1298 assert!(bar.low.as_f64() > 0.0);
1299 assert!(bar.close.as_f64() > 0.0);
1300 assert!(bar.volume.as_f64() >= 0.0);
1301 }
1302
1303 #[rstest]
1304 fn test_parse_millis_timestamp() {
1305 let timestamp = 1762795433.9717445;
1306 let result = parse_millis_timestamp(timestamp, "test").unwrap();
1307 assert!(result.as_u64() > 0);
1308 }
1309
1310 #[rstest]
1311 #[case(1, BarAggregation::Minute, 1)]
1312 #[case(5, BarAggregation::Minute, 5)]
1313 #[case(15, BarAggregation::Minute, 15)]
1314 #[case(1, BarAggregation::Hour, 60)]
1315 #[case(4, BarAggregation::Hour, 240)]
1316 #[case(1, BarAggregation::Day, 1440)]
1317 fn test_bar_type_to_spot_interval(
1318 #[case] step: usize,
1319 #[case] aggregation: BarAggregation,
1320 #[case] expected: u32,
1321 ) {
1322 let instrument_id = InstrumentId::new(Symbol::new("BTC/USD"), *KRAKEN_VENUE);
1323 let bar_type = BarType::new(
1324 instrument_id,
1325 BarSpecification::new(step, aggregation, PriceType::Last),
1326 AggregationSource::External,
1327 );
1328
1329 let result = bar_type_to_spot_interval(bar_type).unwrap();
1330 assert_eq!(result, expected);
1331 }
1332
1333 #[rstest]
1334 fn test_bar_type_to_spot_interval_unsupported() {
1335 let instrument_id = InstrumentId::new(Symbol::new("BTC/USD"), *KRAKEN_VENUE);
1336 let bar_type = BarType::new(
1337 instrument_id,
1338 BarSpecification::new(1, BarAggregation::Second, PriceType::Last),
1339 AggregationSource::External,
1340 );
1341
1342 let result = bar_type_to_spot_interval(bar_type);
1343 assert!(result.is_err());
1344 assert!(result.unwrap_err().to_string().contains("Unsupported"));
1345 }
1346
1347 #[rstest]
1348 #[case(1, BarAggregation::Minute, "1m")]
1349 #[case(5, BarAggregation::Minute, "5m")]
1350 #[case(15, BarAggregation::Minute, "15m")]
1351 #[case(1, BarAggregation::Hour, "1h")]
1352 #[case(4, BarAggregation::Hour, "4h")]
1353 #[case(12, BarAggregation::Hour, "12h")]
1354 #[case(1, BarAggregation::Day, "1d")]
1355 #[case(1, BarAggregation::Week, "1w")]
1356 fn test_bar_type_to_futures_resolution(
1357 #[case] step: usize,
1358 #[case] aggregation: BarAggregation,
1359 #[case] expected: &str,
1360 ) {
1361 let instrument_id = InstrumentId::new(Symbol::new("PI_XBTUSD"), *KRAKEN_VENUE);
1362 let bar_type = BarType::new(
1363 instrument_id,
1364 BarSpecification::new(step, aggregation, PriceType::Last),
1365 AggregationSource::External,
1366 );
1367
1368 let result = bar_type_to_futures_resolution(bar_type).unwrap();
1369 assert_eq!(result, expected);
1370 }
1371
1372 #[rstest]
1373 #[case(30, BarAggregation::Minute)] #[case(2, BarAggregation::Hour)] #[case(2, BarAggregation::Day)] #[case(1, BarAggregation::Second)] fn test_bar_type_to_futures_resolution_unsupported(
1378 #[case] step: usize,
1379 #[case] aggregation: BarAggregation,
1380 ) {
1381 let instrument_id = InstrumentId::new(Symbol::new("PI_XBTUSD"), *KRAKEN_VENUE);
1382 let bar_type = BarType::new(
1383 instrument_id,
1384 BarSpecification::new(step, aggregation, PriceType::Last),
1385 AggregationSource::External,
1386 );
1387
1388 let result = bar_type_to_futures_resolution(bar_type);
1389 assert!(result.is_err());
1390 assert!(result.unwrap_err().to_string().contains("Unsupported"));
1391 }
1392
1393 #[rstest]
1394 fn test_parse_order_status_report() {
1395 let json = load_test_json("http_open_orders.json");
1396 let wrapper: serde_json::Value = serde_json::from_str(&json).unwrap();
1397 let result = wrapper.get("result").unwrap();
1398 let open_map = result.get("open").unwrap();
1399 let orders: IndexMap<String, SpotOrder> = serde_json::from_value(open_map.clone()).unwrap();
1400
1401 let account_id = AccountId::new("KRAKEN-001");
1402 let instrument_id = InstrumentId::new(Symbol::new("BTC/USDT"), *KRAKEN_VENUE);
1403 let instrument = InstrumentAny::CurrencyPair(CurrencyPair::new(
1404 instrument_id,
1405 Symbol::new("XBTUSDT"),
1406 Currency::BTC(),
1407 Currency::USDT(),
1408 2,
1409 8,
1410 Price::from("0.01"),
1411 Quantity::from("0.00000001"),
1412 None,
1413 None,
1414 None,
1415 None,
1416 None,
1417 None,
1418 None,
1419 None,
1420 None,
1421 None,
1422 None,
1423 None,
1424 TS,
1425 TS,
1426 ));
1427
1428 let (order_id, order) = orders.iter().next().unwrap();
1429
1430 let report =
1431 parse_order_status_report(order_id, order, &instrument, account_id, TS).unwrap();
1432
1433 assert_eq!(report.account_id, account_id);
1434 assert_eq!(report.instrument_id, instrument_id);
1435 assert_eq!(report.venue_order_id.as_str(), order_id);
1436 assert_eq!(report.order_status, OrderStatus::Accepted);
1437 assert!(report.quantity.as_f64() > 0.0);
1438 }
1439
1440 #[rstest]
1441 fn test_parse_fill_report() {
1442 let json = load_test_json("http_trades_history.json");
1443 let wrapper: serde_json::Value = serde_json::from_str(&json).unwrap();
1444 let result = wrapper.get("result").unwrap();
1445 let trades_map = result.get("trades").unwrap();
1446 let trades: IndexMap<String, SpotTrade> =
1447 serde_json::from_value(trades_map.clone()).unwrap();
1448
1449 let account_id = AccountId::new("KRAKEN-001");
1450 let instrument_id = InstrumentId::new(Symbol::new("BTC/USDT"), *KRAKEN_VENUE);
1451 let instrument = InstrumentAny::CurrencyPair(CurrencyPair::new(
1452 instrument_id,
1453 Symbol::new("XBTUSDT"),
1454 Currency::BTC(),
1455 Currency::USDT(),
1456 2,
1457 8,
1458 Price::from("0.01"),
1459 Quantity::from("0.00000001"),
1460 None,
1461 None,
1462 None,
1463 None,
1464 None,
1465 None,
1466 None,
1467 None,
1468 None,
1469 None,
1470 None,
1471 None,
1472 TS,
1473 TS,
1474 ));
1475
1476 let (trade_id, trade) = trades.iter().next().unwrap();
1477
1478 let report = parse_fill_report(trade_id, trade, &instrument, account_id, TS).unwrap();
1479
1480 assert_eq!(report.account_id, account_id);
1481 assert_eq!(report.instrument_id, instrument_id);
1482 assert_eq!(report.trade_id.to_string(), *trade_id);
1483 assert!(report.last_qty.as_f64() > 0.0);
1484 assert!(report.last_px.as_f64() > 0.0);
1485 assert!(report.commission.as_f64() > 0.0);
1486 }
1487
1488 #[rstest]
1489 #[case("XXBT", "XBT")]
1490 #[case("XETH", "ETH")]
1491 #[case("ZUSD", "USD")]
1492 #[case("ZEUR", "EUR")]
1493 #[case("BTC", "BTC")]
1494 #[case("ETH", "ETH")]
1495 #[case("USDT", "USDT")]
1496 #[case("SOL", "SOL")]
1497 fn test_normalize_currency_code(#[case] input: &str, #[case] expected: &str) {
1498 assert_eq!(normalize_currency_code(input), expected);
1499 }
1500}