1use std::str::FromStr;
19
20use anyhow::Context;
21use nautilus_core::{
22 datetime::NANOSECONDS_IN_MILLISECOND, nanos::UnixNanos, parsing::precision_from_str,
23 uuid::UUID4,
24};
25use nautilus_model::{
26 data::{Bar, BarType, TradeTick},
27 enums::{
28 AggressorSide, BarAggregation, ContingencyType, LiquiditySide, OrderStatus, OrderType,
29 PositionSideSpecified, TimeInForce, TrailingOffsetType, TriggerType,
30 },
31 identifiers::{AccountId, InstrumentId, Symbol, TradeId, VenueOrderId},
32 instruments::{
33 Instrument, any::InstrumentAny, crypto_perpetual::CryptoPerpetual,
34 currency_pair::CurrencyPair,
35 },
36 reports::{FillReport, OrderStatusReport, PositionStatusReport},
37 types::{Currency, Money, Price, Quantity, fixed::FIXED_PRECISION},
38};
39use rust_decimal::Decimal;
40use rust_decimal_macros::dec;
41
42use crate::{
43 common::{
44 consts::KRAKEN_VENUE,
45 enums::{
46 KrakenFillType, KrakenInstrumentType, KrakenPositionSide, KrakenSpotTrigger,
47 KrakenTriggerSignal,
48 },
49 },
50 http::models::{
51 AssetPairInfo, FuturesFill, FuturesInstrument, FuturesOpenOrder, FuturesOrderEvent,
52 FuturesPosition, FuturesPublicExecution, OhlcData, SpotOrder, SpotTrade,
53 },
54};
55
56pub fn parse_decimal(value: &str) -> anyhow::Result<Decimal> {
58 if value.is_empty() || value == "0" {
59 return Ok(dec!(0));
60 }
61 value
62 .parse::<Decimal>()
63 .map_err(|e| anyhow::anyhow!("Failed to parse decimal '{value}': {e}"))
64}
65
66fn parse_rfc3339_timestamp(value: &str, field: &str) -> anyhow::Result<UnixNanos> {
67 value
68 .parse::<UnixNanos>()
69 .map_err(|e| anyhow::anyhow!("Failed to parse {field}='{value}': {e}"))
70}
71
72#[inline]
77pub fn normalize_currency_code(code: &str) -> &str {
78 code.strip_prefix("X")
79 .or_else(|| code.strip_prefix("Z"))
80 .unwrap_or(code)
81}
82
83pub fn parse_decimal_opt(value: Option<&str>) -> anyhow::Result<Option<Decimal>> {
85 match value {
86 Some(s) if !s.is_empty() && s != "0" => Ok(Some(parse_decimal(s)?)),
87 _ => Ok(None),
88 }
89}
90
91fn parse_trigger_type(
93 order_type: OrderType,
94 trigger: Option<KrakenSpotTrigger>,
95) -> Option<TriggerType> {
96 let is_conditional = matches!(
97 order_type,
98 OrderType::StopMarket
99 | OrderType::StopLimit
100 | OrderType::MarketIfTouched
101 | OrderType::LimitIfTouched
102 );
103
104 if !is_conditional {
105 return None;
106 }
107
108 match trigger {
109 Some(KrakenSpotTrigger::Last) => Some(TriggerType::LastPrice),
110 Some(KrakenSpotTrigger::Index) => Some(TriggerType::IndexPrice),
111 None => Some(TriggerType::Default),
112 }
113}
114
115fn parse_futures_trigger_type(
117 order_type: OrderType,
118 trigger_signal: Option<KrakenTriggerSignal>,
119) -> Option<TriggerType> {
120 let is_conditional = matches!(
121 order_type,
122 OrderType::StopMarket
123 | OrderType::StopLimit
124 | OrderType::MarketIfTouched
125 | OrderType::LimitIfTouched
126 );
127
128 if !is_conditional {
129 return None;
130 }
131
132 match trigger_signal {
133 Some(KrakenTriggerSignal::Last) => Some(TriggerType::LastPrice),
134 Some(KrakenTriggerSignal::Mark) => Some(TriggerType::MarkPrice),
135 Some(KrakenTriggerSignal::Index) => Some(TriggerType::IndexPrice),
136 None => Some(TriggerType::Default),
137 }
138}
139
140pub fn parse_spot_instrument(
149 pair_name: &str,
150 definition: &AssetPairInfo,
151 ts_event: UnixNanos,
152 ts_init: UnixNanos,
153) -> anyhow::Result<InstrumentAny> {
154 let symbol_str = definition.wsname.as_ref().unwrap_or(&definition.altname);
155 let instrument_id = InstrumentId::new(Symbol::new(symbol_str.as_str()), *KRAKEN_VENUE);
156 let raw_symbol = Symbol::new(pair_name);
157
158 let base_currency = get_currency(definition.base.as_str());
159 let quote_currency = get_currency(definition.quote.as_str());
160
161 let price_increment = parse_price(
162 definition
163 .tick_size
164 .as_ref()
165 .context("tick_size is required")?,
166 "tick_size",
167 )?;
168
169 let size_precision = definition.lot_decimals;
171 let size_increment = Quantity::new(10.0_f64.powi(-(size_precision as i32)), size_precision);
172
173 let min_quantity = definition
174 .ordermin
175 .as_ref()
176 .map(|s| parse_quantity(s, "ordermin"))
177 .transpose()?;
178
179 let taker_fee = definition
181 .fees
182 .first()
183 .map(|(_, fee)| Decimal::try_from(*fee))
184 .transpose()
185 .context("Failed to parse taker fee")?
186 .map(|f| f / dec!(100));
187
188 let maker_fee = definition
189 .fees_maker
190 .first()
191 .map(|(_, fee)| Decimal::try_from(*fee))
192 .transpose()
193 .context("Failed to parse maker fee")?
194 .map(|f| f / dec!(100));
195
196 let instrument = CurrencyPair::new(
197 instrument_id,
198 raw_symbol,
199 base_currency,
200 quote_currency,
201 price_increment.precision,
202 size_increment.precision,
203 price_increment,
204 size_increment,
205 None,
206 None,
207 None,
208 min_quantity,
209 None,
210 None,
211 None,
212 None,
213 maker_fee,
214 taker_fee,
215 None,
216 None,
217 ts_event,
218 ts_init,
219 );
220
221 Ok(InstrumentAny::CurrencyPair(instrument))
222}
223
224pub fn parse_futures_instrument(
233 instrument: &FuturesInstrument,
234 ts_event: UnixNanos,
235 ts_init: UnixNanos,
236) -> anyhow::Result<InstrumentAny> {
237 let instrument_id = InstrumentId::new(Symbol::new(&instrument.symbol), *KRAKEN_VENUE);
238 let raw_symbol = Symbol::new(&instrument.symbol);
239
240 let base_currency = get_currency(&instrument.base);
241 let quote_currency = get_currency(&instrument.quote);
242
243 let is_inverse = instrument.instrument_type == KrakenInstrumentType::FuturesInverse;
244 let settlement_currency = if is_inverse {
245 base_currency
246 } else {
247 quote_currency
248 };
249
250 let tick_size = instrument.tick_size;
253 let price_precision = precision_from_str(&tick_size.to_string());
254 if price_precision > FIXED_PRECISION {
255 anyhow::bail!(
256 "Cannot parse instrument '{}': tick_size {tick_size} requires precision {price_precision} \
257 which exceeds FIXED_PRECISION ({FIXED_PRECISION})",
258 instrument.symbol
259 );
260 }
261 let price_increment = Price::new(tick_size, price_precision);
262
263 let (_size_precision, size_increment) = if instrument.contract_value_trade_precision >= 0 {
268 let precision = instrument.contract_value_trade_precision as u8;
269 let increment = Quantity::new(10.0_f64.powi(-(precision as i32)), precision);
270 (precision, increment)
271 } else {
272 let increment_value = 10.0_f64.powi(-instrument.contract_value_trade_precision);
274 (0, Quantity::new(increment_value, 0))
275 };
276
277 let multiplier_precision = if instrument.contract_size.fract() == 0.0 {
278 0
279 } else {
280 instrument
281 .contract_size
282 .to_string()
283 .split('.')
284 .nth(1)
285 .map_or(0, |s| s.len() as u8)
286 };
287 let multiplier = Some(Quantity::new(
288 instrument.contract_size,
289 multiplier_precision,
290 ));
291
292 let (margin_init, margin_maint) = instrument
294 .margin_levels
295 .first()
296 .and_then(|level| {
297 let init = Decimal::try_from(level.initial_margin).ok()?;
298 let maint = Decimal::try_from(level.maintenance_margin).ok()?;
299 Some((Some(init), Some(maint)))
300 })
301 .unwrap_or((None, None));
302
303 let instrument = CryptoPerpetual::new(
304 instrument_id,
305 raw_symbol,
306 base_currency,
307 quote_currency,
308 settlement_currency,
309 is_inverse,
310 price_increment.precision,
311 size_increment.precision,
312 price_increment,
313 size_increment,
314 multiplier,
315 None, None, None, None, None, None, None, margin_init,
323 margin_maint,
324 None, None, ts_event,
327 ts_init,
328 );
329
330 Ok(InstrumentAny::CryptoPerpetual(instrument))
331}
332
333fn parse_price(value: &str, field: &str) -> anyhow::Result<Price> {
334 Price::from_str(value)
335 .map_err(|err| anyhow::anyhow!("Failed to parse {field}='{value}': {err}"))
336}
337
338fn parse_quantity(value: &str, field: &str) -> anyhow::Result<Quantity> {
339 Quantity::from_str(value)
340 .map_err(|err| anyhow::anyhow!("Failed to parse {field}='{value}': {err}"))
341}
342
343pub fn get_currency(code: &str) -> Currency {
348 Currency::get_or_create_crypto(code)
349}
350
351pub fn parse_trade_tick_from_array(
362 trade_array: &[serde_json::Value],
363 instrument: &InstrumentAny,
364 ts_init: UnixNanos,
365) -> anyhow::Result<TradeTick> {
366 let price_str = trade_array
367 .first()
368 .and_then(|v| v.as_str())
369 .context("Missing or invalid price")?;
370 let price = parse_price_with_precision(price_str, instrument.price_precision(), "trade.price")?;
371
372 let size_str = trade_array
373 .get(1)
374 .and_then(|v| v.as_str())
375 .context("Missing or invalid volume")?;
376 let size = parse_quantity_with_precision(size_str, instrument.size_precision(), "trade.size")?;
377
378 let time = trade_array
379 .get(2)
380 .and_then(|v| v.as_f64())
381 .context("Missing or invalid timestamp")?;
382 let ts_event = parse_millis_timestamp(time, "trade.time")?;
383
384 let side_str = trade_array
385 .get(3)
386 .and_then(|v| v.as_str())
387 .context("Missing or invalid side")?;
388 let aggressor = match side_str {
389 "b" => AggressorSide::Buyer,
390 "s" => AggressorSide::Seller,
391 _ => AggressorSide::NoAggressor,
392 };
393
394 let trade_id_value = trade_array.get(6).context("Missing trade_id")?;
395 let trade_id = if let Some(id) = trade_id_value.as_i64() {
396 TradeId::new_checked(id.to_string())?
397 } else if let Some(id_str) = trade_id_value.as_str() {
398 TradeId::new_checked(id_str)?
399 } else {
400 anyhow::bail!("Invalid trade_id format");
401 };
402
403 TradeTick::new_checked(
404 instrument.id(),
405 price,
406 size,
407 aggressor,
408 trade_id,
409 ts_event,
410 ts_init,
411 )
412 .context("Failed to construct TradeTick from Kraken trade")
413}
414
415pub fn parse_futures_public_execution(
423 execution: &FuturesPublicExecution,
424 instrument: &InstrumentAny,
425 ts_init: UnixNanos,
426) -> anyhow::Result<TradeTick> {
427 let price =
428 parse_price_with_precision(&execution.price, instrument.price_precision(), "price")?;
429 let size = parse_quantity_with_precision(
430 &execution.quantity,
431 instrument.size_precision(),
432 "quantity",
433 )?;
434
435 let ts_event = UnixNanos::from((execution.timestamp as u64) * 1_000_000);
437
438 let aggressor = match execution.taker_order.direction.to_lowercase().as_str() {
440 "buy" => AggressorSide::Buyer,
441 "sell" => AggressorSide::Seller,
442 _ => AggressorSide::NoAggressor,
443 };
444
445 let trade_id = TradeId::new_checked(&execution.uid)?;
446
447 TradeTick::new_checked(
448 instrument.id(),
449 price,
450 size,
451 aggressor,
452 trade_id,
453 ts_event,
454 ts_init,
455 )
456 .context("Failed to construct TradeTick from Kraken futures execution")
457}
458
459pub fn parse_bar(
467 ohlc: &OhlcData,
468 instrument: &InstrumentAny,
469 bar_type: BarType,
470 ts_init: UnixNanos,
471) -> anyhow::Result<Bar> {
472 let price_precision = instrument.price_precision();
473 let size_precision = instrument.size_precision();
474
475 let open = parse_price_with_precision(&ohlc.open, price_precision, "ohlc.open")?;
476 let high = parse_price_with_precision(&ohlc.high, price_precision, "ohlc.high")?;
477 let low = parse_price_with_precision(&ohlc.low, price_precision, "ohlc.low")?;
478 let close = parse_price_with_precision(&ohlc.close, price_precision, "ohlc.close")?;
479 let volume = parse_quantity_with_precision(&ohlc.volume, size_precision, "ohlc.volume")?;
480
481 let ts_event = UnixNanos::from((ohlc.time as u64) * 1_000_000_000);
482
483 Bar::new_checked(bar_type, open, high, low, close, volume, ts_event, ts_init)
484 .context("Failed to construct Bar from Kraken OHLC")
485}
486
487fn parse_price_with_precision(value: &str, precision: u8, field: &str) -> anyhow::Result<Price> {
488 let parsed = value
489 .parse::<f64>()
490 .with_context(|| format!("Failed to parse {field}='{value}' as f64"))?;
491 Price::new_checked(parsed, precision).with_context(|| {
492 format!("Failed to construct Price for {field} with precision {precision}")
493 })
494}
495
496fn parse_quantity_with_precision(
497 value: &str,
498 precision: u8,
499 field: &str,
500) -> anyhow::Result<Quantity> {
501 let parsed = value
502 .parse::<f64>()
503 .with_context(|| format!("Failed to parse {field}='{value}' as f64"))?;
504 Quantity::new_checked(parsed, precision).with_context(|| {
505 format!("Failed to construct Quantity for {field} with precision {precision}")
506 })
507}
508
509pub fn parse_millis_timestamp(value: f64, field: &str) -> anyhow::Result<UnixNanos> {
510 let millis = (value * 1000.0) as u64;
511 let nanos = millis
512 .checked_mul(NANOSECONDS_IN_MILLISECOND)
513 .with_context(|| format!("{field} timestamp overflowed when converting to nanoseconds"))?;
514 Ok(UnixNanos::from(nanos))
515}
516
517pub fn parse_order_status_report(
525 order_id: &str,
526 order: &SpotOrder,
527 instrument: &InstrumentAny,
528 account_id: AccountId,
529 ts_init: UnixNanos,
530) -> anyhow::Result<OrderStatusReport> {
531 let instrument_id = instrument.id();
532 let venue_order_id = VenueOrderId::new(order_id);
533
534 let order_side = order.descr.order_side.into();
535 let order_type = order.descr.ordertype.into();
536 let order_status = order.status.into();
537
538 let has_expiration = order.expiretm.is_some_and(|t| t > 0.0);
540 let time_in_force = if has_expiration {
541 TimeInForce::Gtd
542 } else if order.oflags.contains("ioc") {
543 TimeInForce::Ioc
544 } else {
545 TimeInForce::Gtc
546 };
547
548 let quantity =
549 parse_quantity_with_precision(&order.vol, instrument.size_precision(), "order.vol")?;
550
551 let filled_qty = parse_quantity_with_precision(
552 &order.vol_exec,
553 instrument.size_precision(),
554 "order.vol_exec",
555 )?;
556
557 let ts_accepted = parse_millis_timestamp(order.opentm, "order.opentm")?;
558
559 let ts_last = order
560 .closetm
561 .map(|t| parse_millis_timestamp(t, "order.closetm"))
562 .transpose()?
563 .unwrap_or(ts_accepted);
564
565 let price = if !order.price.is_empty() && order.price != "0" {
566 Some(parse_price_with_precision(
567 &order.price,
568 instrument.price_precision(),
569 "order.price",
570 )?)
571 } else {
572 None
573 };
574
575 let trigger_price = order
576 .stopprice
577 .as_ref()
578 .and_then(|p| {
579 if !p.is_empty() && p != "0" {
580 Some(parse_price_with_precision(
581 p,
582 instrument.price_precision(),
583 "order.stopprice",
584 ))
585 } else {
586 None
587 }
588 })
589 .transpose()?;
590
591 let expire_time = if has_expiration {
592 order
593 .expiretm
594 .map(|t| parse_millis_timestamp(t, "order.expiretm"))
595 .transpose()?
596 } else {
597 None
598 };
599
600 let trigger_type = parse_trigger_type(order_type, order.trigger);
601
602 Ok(OrderStatusReport {
603 account_id,
604 instrument_id,
605 client_order_id: None,
606 venue_order_id,
607 order_side,
608 order_type,
609 time_in_force,
610 order_status,
611 quantity,
612 filled_qty,
613 report_id: UUID4::new(),
614 ts_accepted,
615 ts_last,
616 ts_init,
617 order_list_id: None,
618 venue_position_id: None,
619 linked_order_ids: None,
620 parent_order_id: None,
621 contingency_type: ContingencyType::NoContingency,
622 expire_time,
623 price,
624 trigger_price,
625 trigger_type,
626 limit_offset: None,
627 trailing_offset: None,
628 trailing_offset_type: TrailingOffsetType::NoTrailingOffset,
629 display_qty: None,
630 avg_px: compute_avg_px(order),
631 post_only: order.oflags.contains("post"),
632 reduce_only: false,
633 cancel_reason: order.reason.clone(),
634 ts_triggered: None,
635 })
636}
637
638fn compute_avg_px(order: &SpotOrder) -> Option<Decimal> {
642 if let Some(ref avg) = order.avg_price
643 && let Ok(v) = parse_decimal(avg)
644 && v > dec!(0)
645 {
646 return Some(v);
647 }
648
649 let cost = parse_decimal(&order.cost);
650 let vol_exec = parse_decimal(&order.vol_exec);
651 match (&cost, &vol_exec) {
652 (Ok(c), Ok(v)) if *v > dec!(0) => Some(*c / *v),
653 _ => {
654 if let Ok(v) = &vol_exec
655 && *v > dec!(0)
656 {
657 log::warn!("Cannot compute avg_px: cost={cost:?}, vol_exec={vol_exec:?}");
658 }
659 None
660 }
661 }
662}
663
664pub fn parse_fill_report(
671 trade_id: &str,
672 trade: &SpotTrade,
673 instrument: &InstrumentAny,
674 account_id: AccountId,
675 ts_init: UnixNanos,
676) -> anyhow::Result<FillReport> {
677 let instrument_id = instrument.id();
678 let venue_order_id = VenueOrderId::new(&trade.ordertxid);
679 let trade_id_obj = TradeId::new(trade_id);
680
681 let order_side = trade.trade_type.into();
682
683 let last_qty =
684 parse_quantity_with_precision(&trade.vol, instrument.size_precision(), "trade.vol")?;
685
686 let last_px =
687 parse_price_with_precision(&trade.price, instrument.price_precision(), "trade.price")?;
688
689 let fee_decimal = parse_decimal(&trade.fee)?;
690 let quote_currency = match instrument {
691 InstrumentAny::CurrencyPair(pair) => pair.quote_currency,
692 InstrumentAny::CryptoPerpetual(perp) => perp.quote_currency,
693 _ => anyhow::bail!("Unsupported instrument type for fill report"),
694 };
695
696 let fee_f64 = fee_decimal
697 .try_into()
698 .context("Failed to convert fee to f64")?;
699 let commission = Money::new(fee_f64, quote_currency);
700
701 let liquidity_side = match trade.maker {
702 Some(true) => LiquiditySide::Maker,
703 Some(false) => LiquiditySide::Taker,
704 None => LiquiditySide::NoLiquiditySide,
705 };
706
707 let ts_event = parse_millis_timestamp(trade.time, "trade.time")?;
708
709 Ok(FillReport {
710 account_id,
711 instrument_id,
712 venue_order_id,
713 trade_id: trade_id_obj,
714 order_side,
715 last_qty,
716 last_px,
717 commission,
718 liquidity_side,
719 report_id: UUID4::new(),
720 ts_event,
721 ts_init,
722 client_order_id: None,
723 venue_position_id: None,
724 })
725}
726
727pub fn parse_futures_order_status_report(
733 order: &FuturesOpenOrder,
734 instrument: &InstrumentAny,
735 account_id: AccountId,
736 ts_init: UnixNanos,
737) -> anyhow::Result<OrderStatusReport> {
738 let instrument_id = instrument.id();
739 let venue_order_id = VenueOrderId::new(&order.order_id);
740
741 let order_side = order.side.into();
742 let order_type = order.order_type.into();
743 let order_status = order.status.into();
744
745 let quantity = Quantity::new(
746 order.unfilled_size + order.filled_size,
747 instrument.size_precision(),
748 );
749
750 let filled_qty = Quantity::new(order.filled_size, instrument.size_precision());
751
752 let ts_accepted = parse_rfc3339_timestamp(&order.received_time, "order.received_time")?;
753 let ts_last = parse_rfc3339_timestamp(&order.last_update_time, "order.last_update_time")?;
754
755 let price = order
756 .limit_price
757 .map(|p| Price::new(p, instrument.price_precision()));
758
759 let trigger_price = order
760 .stop_price
761 .map(|p| Price::new(p, instrument.price_precision()));
762
763 let trigger_type = parse_futures_trigger_type(order_type, order.trigger_signal);
764
765 Ok(OrderStatusReport {
766 account_id,
767 instrument_id,
768 client_order_id: order.cli_ord_id.as_ref().map(|s| s.as_str().into()),
769 venue_order_id,
770 order_side,
771 order_type,
772 time_in_force: TimeInForce::Gtc,
773 order_status,
774 quantity,
775 filled_qty,
776 report_id: UUID4::new(),
777 ts_accepted,
778 ts_last,
779 ts_init,
780 order_list_id: None,
781 venue_position_id: None,
782 linked_order_ids: None,
783 parent_order_id: None,
784 contingency_type: ContingencyType::NoContingency,
785 expire_time: None,
786 price,
787 trigger_price,
788 trigger_type,
789 limit_offset: None,
790 trailing_offset: None,
791 trailing_offset_type: TrailingOffsetType::NoTrailingOffset,
792 display_qty: None,
793 avg_px: None,
794 post_only: false,
795 reduce_only: order.reduce_only.unwrap_or(false),
796 cancel_reason: None,
797 ts_triggered: None,
798 })
799}
800
801pub fn parse_futures_order_event_status_report(
807 event: &FuturesOrderEvent,
808 instrument: &InstrumentAny,
809 account_id: AccountId,
810 ts_init: UnixNanos,
811) -> anyhow::Result<OrderStatusReport> {
812 let instrument_id = instrument.id();
813 let venue_order_id = VenueOrderId::new(&event.order_id);
814
815 let order_side = event.side.into();
816 let order_type = event.order_type.into();
817
818 let order_status = if event.filled >= event.quantity {
820 OrderStatus::Filled
821 } else if event.filled > 0.0 {
822 OrderStatus::PartiallyFilled
823 } else {
824 OrderStatus::Canceled
825 };
826
827 let quantity = Quantity::new(event.quantity, instrument.size_precision());
828 let filled_qty = Quantity::new(event.filled, instrument.size_precision());
829
830 let ts_accepted = parse_rfc3339_timestamp(&event.timestamp, "event.timestamp")?;
831 let ts_last =
832 parse_rfc3339_timestamp(&event.last_update_timestamp, "event.last_update_timestamp")?;
833
834 let price = event
835 .limit_price
836 .map(|p| Price::new(p, instrument.price_precision()));
837
838 let trigger_price = event
839 .stop_price
840 .map(|p| Price::new(p, instrument.price_precision()));
841
842 let trigger_type = parse_futures_trigger_type(order_type, None);
845
846 Ok(OrderStatusReport {
847 account_id,
848 instrument_id,
849 client_order_id: event.cli_ord_id.as_ref().map(|s| s.as_str().into()),
850 venue_order_id,
851 order_side,
852 order_type,
853 time_in_force: TimeInForce::Gtc,
854 order_status,
855 quantity,
856 filled_qty,
857 report_id: UUID4::new(),
858 ts_accepted,
859 ts_last,
860 ts_init,
861 order_list_id: None,
862 venue_position_id: None,
863 linked_order_ids: None,
864 parent_order_id: None,
865 contingency_type: ContingencyType::NoContingency,
866 expire_time: None,
867 price,
868 trigger_price,
869 trigger_type,
870 limit_offset: None,
871 trailing_offset: None,
872 trailing_offset_type: TrailingOffsetType::NoTrailingOffset,
873 display_qty: None,
874 avg_px: None,
875 post_only: false,
876 reduce_only: event.reduce_only,
877 cancel_reason: None,
878 ts_triggered: None,
879 })
880}
881
882pub fn parse_futures_fill_report(
888 fill: &FuturesFill,
889 instrument: &InstrumentAny,
890 account_id: AccountId,
891 ts_init: UnixNanos,
892) -> anyhow::Result<FillReport> {
893 let instrument_id = instrument.id();
894 let venue_order_id = VenueOrderId::new(&fill.order_id);
895 let trade_id = TradeId::new(&fill.fill_id);
896
897 let order_side = fill.side.into();
898
899 let last_qty = Quantity::new(fill.size, instrument.size_precision());
900 let last_px = Price::new(fill.price, instrument.price_precision());
901
902 let quote_currency = match instrument {
903 InstrumentAny::CryptoPerpetual(perp) => perp.quote_currency,
904 InstrumentAny::CryptoFuture(future) => future.quote_currency,
905 _ => anyhow::bail!("Unsupported instrument type for futures fill report"),
906 };
907
908 let fee_f64 = fill.fee_paid.unwrap_or(0.0);
909 let commission = Money::new(fee_f64, quote_currency);
910
911 let liquidity_side = match fill.fill_type {
912 KrakenFillType::Maker => LiquiditySide::Maker,
913 KrakenFillType::Taker => LiquiditySide::Taker,
914 };
915
916 let ts_event = parse_rfc3339_timestamp(&fill.fill_time, "fill.fill_time")?;
917
918 Ok(FillReport {
919 account_id,
920 instrument_id,
921 venue_order_id,
922 trade_id,
923 order_side,
924 last_qty,
925 last_px,
926 commission,
927 liquidity_side,
928 report_id: UUID4::new(),
929 ts_event,
930 ts_init,
931 client_order_id: fill.cli_ord_id.as_ref().map(|s| s.as_str().into()),
932 venue_position_id: None,
933 })
934}
935
936pub fn parse_futures_position_status_report(
942 position: &FuturesPosition,
943 instrument: &InstrumentAny,
944 account_id: AccountId,
945 ts_init: UnixNanos,
946) -> anyhow::Result<PositionStatusReport> {
947 let instrument_id = instrument.id();
948
949 let position_side = match position.side {
950 KrakenPositionSide::Long => PositionSideSpecified::Long,
951 KrakenPositionSide::Short => PositionSideSpecified::Short,
952 };
953
954 let quantity = Quantity::new(position.size, instrument.size_precision());
955 let signed_decimal_qty = match position_side {
956 PositionSideSpecified::Long => Decimal::from_f64_retain(position.size).unwrap_or(dec!(0)),
957 PositionSideSpecified::Short => -Decimal::from_f64_retain(position.size).unwrap_or(dec!(0)),
958 PositionSideSpecified::Flat => dec!(0),
959 };
960
961 let avg_px_open = Some(Decimal::from_f64_retain(position.price).unwrap_or(dec!(0)));
962
963 Ok(PositionStatusReport {
964 account_id,
965 instrument_id,
966 position_side,
967 quantity,
968 signed_decimal_qty,
969 report_id: UUID4::new(),
970 ts_last: ts_init,
971 ts_init,
972 venue_position_id: None,
973 avg_px_open,
974 })
975}
976
977pub fn bar_type_to_spot_interval(bar_type: BarType) -> anyhow::Result<u32> {
985 let step = bar_type.spec().step.get() as u32;
986 let base_interval = match bar_type.spec().aggregation {
987 BarAggregation::Minute => 1,
988 BarAggregation::Hour => 60,
989 BarAggregation::Day => 1440,
990 other => {
991 anyhow::bail!("Unsupported bar aggregation for Kraken Spot: {other:?}");
992 }
993 };
994 Ok(base_interval * step)
995}
996
997pub fn bar_type_to_futures_resolution(bar_type: BarType) -> anyhow::Result<&'static str> {
1007 let step = bar_type.spec().step.get() as u32;
1008 match bar_type.spec().aggregation {
1009 BarAggregation::Minute => match step {
1010 1 => Ok("1m"),
1011 5 => Ok("5m"),
1012 15 => Ok("15m"),
1013 _ => anyhow::bail!("Unsupported minute step for Kraken Futures: {step}"),
1014 },
1015 BarAggregation::Hour => match step {
1016 1 => Ok("1h"),
1017 4 => Ok("4h"),
1018 12 => Ok("12h"),
1019 _ => anyhow::bail!("Unsupported hour step for Kraken Futures: {step}"),
1020 },
1021 BarAggregation::Day => {
1022 if step == 1 {
1023 Ok("1d")
1024 } else {
1025 anyhow::bail!("Unsupported day step for Kraken Futures: {step}")
1026 }
1027 }
1028 BarAggregation::Week => {
1029 if step == 1 {
1030 Ok("1w")
1031 } else {
1032 anyhow::bail!("Unsupported week step for Kraken Futures: {step}")
1033 }
1034 }
1035 other => {
1036 anyhow::bail!("Unsupported bar aggregation for Kraken Futures: {other:?}");
1037 }
1038 }
1039}
1040
1041#[cfg(test)]
1042mod tests {
1043 use indexmap::IndexMap;
1044 use nautilus_model::{
1045 data::BarSpecification,
1046 enums::{AggregationSource, BarAggregation, OrderStatus, PriceType},
1047 };
1048 use rstest::rstest;
1049
1050 use super::*;
1051 use crate::http::models::AssetPairsResponse;
1052
1053 const TS: UnixNanos = UnixNanos::new(1_700_000_000_000_000_000);
1054
1055 fn load_test_json(filename: &str) -> String {
1056 let path = format!("test_data/{filename}");
1057 std::fs::read_to_string(&path)
1058 .unwrap_or_else(|e| panic!("Failed to load test data from {path}: {e}"))
1059 }
1060
1061 #[rstest]
1062 fn test_parse_decimal() {
1063 assert_eq!(parse_decimal("123.45").unwrap(), dec!(123.45));
1064 assert_eq!(parse_decimal("0").unwrap(), dec!(0));
1065 assert_eq!(parse_decimal("").unwrap(), dec!(0));
1066 }
1067
1068 #[rstest]
1069 fn test_parse_decimal_opt() {
1070 assert_eq!(
1071 parse_decimal_opt(Some("123.45")).unwrap(),
1072 Some(dec!(123.45))
1073 );
1074 assert_eq!(parse_decimal_opt(Some("0")).unwrap(), None);
1075 assert_eq!(parse_decimal_opt(Some("")).unwrap(), None);
1076 assert_eq!(parse_decimal_opt(None).unwrap(), None);
1077 }
1078
1079 #[rstest]
1080 fn test_parse_spot_instrument() {
1081 let json = load_test_json("http_asset_pairs.json");
1082 let wrapper: serde_json::Value = serde_json::from_str(&json).unwrap();
1083 let result = wrapper.get("result").unwrap();
1084 let pairs: AssetPairsResponse = serde_json::from_value(result.clone()).unwrap();
1085
1086 let (pair_name, definition) = pairs.iter().next().unwrap();
1087
1088 let instrument = parse_spot_instrument(pair_name, definition, TS, TS).unwrap();
1089
1090 match instrument {
1091 InstrumentAny::CurrencyPair(pair) => {
1092 assert_eq!(pair.id.venue.as_str(), "KRAKEN");
1093 assert_eq!(pair.base_currency.code.as_str(), "XXBT");
1094 assert_eq!(pair.quote_currency.code.as_str(), "USDT");
1095 assert!(pair.price_increment.as_f64() > 0.0);
1096 assert!(pair.size_increment.as_f64() > 0.0);
1097 assert!(pair.min_quantity.is_some());
1098 }
1099 _ => panic!("Expected CurrencyPair"),
1100 }
1101 }
1102
1103 #[rstest]
1104 fn test_parse_futures_instrument_inverse() {
1105 let json = load_test_json("http_futures_instruments.json");
1106 let response: crate::http::models::FuturesInstrumentsResponse =
1107 serde_json::from_str(&json).unwrap();
1108
1109 let fut_instrument = &response.instruments[0];
1110
1111 let instrument = parse_futures_instrument(fut_instrument, TS, TS).unwrap();
1112
1113 match instrument {
1114 InstrumentAny::CryptoPerpetual(perp) => {
1115 assert_eq!(perp.id.venue.as_str(), "KRAKEN");
1116 assert_eq!(perp.id.symbol.as_str(), "PI_XBTUSD");
1117 assert_eq!(perp.raw_symbol.as_str(), "PI_XBTUSD");
1118 assert_eq!(perp.base_currency.code.as_str(), "BTC");
1119 assert_eq!(perp.quote_currency.code.as_str(), "USD");
1120 assert_eq!(perp.settlement_currency.code.as_str(), "BTC");
1121 assert!(perp.is_inverse);
1122 assert_eq!(perp.price_increment.as_f64(), 0.5);
1123 assert_eq!(perp.size_increment.as_f64(), 1.0);
1124 assert_eq!(perp.size_precision(), 0);
1125 assert_eq!(perp.margin_init, dec!(0.02));
1126 assert_eq!(perp.margin_maint, dec!(0.01));
1127 }
1128 _ => panic!("Expected CryptoPerpetual"),
1129 }
1130 }
1131
1132 #[rstest]
1133 fn test_parse_futures_instrument_flexible() {
1134 let json = load_test_json("http_futures_instruments.json");
1135 let response: crate::http::models::FuturesInstrumentsResponse =
1136 serde_json::from_str(&json).unwrap();
1137
1138 let fut_instrument = &response.instruments[1];
1139
1140 let instrument = parse_futures_instrument(fut_instrument, TS, TS).unwrap();
1141
1142 match instrument {
1143 InstrumentAny::CryptoPerpetual(perp) => {
1144 assert_eq!(perp.id.venue.as_str(), "KRAKEN");
1145 assert_eq!(perp.id.symbol.as_str(), "PF_ETHUSD");
1146 assert_eq!(perp.raw_symbol.as_str(), "PF_ETHUSD");
1147 assert_eq!(perp.base_currency.code.as_str(), "ETH");
1148 assert_eq!(perp.quote_currency.code.as_str(), "USD");
1149 assert_eq!(perp.settlement_currency.code.as_str(), "USD");
1150 assert!(!perp.is_inverse);
1151 assert_eq!(perp.price_increment.as_f64(), 0.1);
1152 assert_eq!(perp.size_increment.as_f64(), 0.001);
1153 assert_eq!(perp.size_precision(), 3);
1154 assert_eq!(perp.margin_init, dec!(0.02));
1155 assert_eq!(perp.margin_maint, dec!(0.01));
1156 }
1157 _ => panic!("Expected CryptoPerpetual"),
1158 }
1159 }
1160
1161 #[rstest]
1164 fn test_parse_futures_instrument_negative_precision() {
1165 let json = load_test_json("http_futures_instruments.json");
1166 let response: crate::http::models::FuturesInstrumentsResponse =
1167 serde_json::from_str(&json).unwrap();
1168
1169 let fut_instrument = &response.instruments[2];
1171
1172 let instrument = parse_futures_instrument(fut_instrument, TS, TS).unwrap();
1173
1174 match instrument {
1175 InstrumentAny::CryptoPerpetual(perp) => {
1176 assert_eq!(perp.id.symbol.as_str(), "PF_PEPEUSD");
1177 assert_eq!(perp.base_currency.code.as_str(), "PEPE");
1178 assert!(!perp.is_inverse);
1179 assert_eq!(perp.size_increment.as_f64(), 1000.0);
1180 assert_eq!(perp.size_precision(), 0);
1181 }
1182 _ => panic!("Expected CryptoPerpetual"),
1183 }
1184 }
1185
1186 #[rstest]
1187 fn test_parse_trade_tick_from_array() {
1188 let json = load_test_json("http_trades.json");
1189 let wrapper: serde_json::Value = serde_json::from_str(&json).unwrap();
1190 let result = wrapper.get("result").unwrap();
1191 let trades_map = result.as_object().unwrap();
1192
1193 let (_pair, trades_value) = trades_map.iter().find(|(k, _)| *k != "last").unwrap();
1195 let trades = trades_value.as_array().unwrap();
1196 let trade_array = trades[0].as_array().unwrap();
1197
1198 let instrument_id = InstrumentId::new(Symbol::new("BTC/USD"), *KRAKEN_VENUE);
1200 let instrument = InstrumentAny::CurrencyPair(CurrencyPair::new(
1201 instrument_id,
1202 Symbol::new("XBTUSDT"),
1203 Currency::BTC(),
1204 Currency::USDT(),
1205 1, 8, Price::from("0.1"),
1208 Quantity::from("0.00000001"),
1209 None,
1210 None,
1211 None,
1212 None,
1213 None,
1214 None,
1215 None,
1216 None,
1217 None,
1218 None,
1219 None,
1220 None,
1221 TS,
1222 TS,
1223 ));
1224
1225 let trade_tick = parse_trade_tick_from_array(trade_array, &instrument, TS).unwrap();
1226
1227 assert_eq!(trade_tick.instrument_id, instrument_id);
1228 assert!(trade_tick.price.as_f64() > 0.0);
1229 assert!(trade_tick.size.as_f64() > 0.0);
1230 }
1231
1232 #[rstest]
1233 fn test_parse_bar() {
1234 let json = load_test_json("http_ohlc.json");
1235 let wrapper: serde_json::Value = serde_json::from_str(&json).unwrap();
1236 let result = wrapper.get("result").unwrap();
1237 let ohlc_map = result.as_object().unwrap();
1238
1239 let (_pair, ohlc_value) = ohlc_map.iter().find(|(k, _)| *k != "last").unwrap();
1241 let ohlcs = ohlc_value.as_array().unwrap();
1242
1243 let ohlc_array = ohlcs[0].as_array().unwrap();
1245 let ohlc = OhlcData {
1246 time: ohlc_array[0].as_i64().unwrap(),
1247 open: ohlc_array[1].as_str().unwrap().to_string(),
1248 high: ohlc_array[2].as_str().unwrap().to_string(),
1249 low: ohlc_array[3].as_str().unwrap().to_string(),
1250 close: ohlc_array[4].as_str().unwrap().to_string(),
1251 vwap: ohlc_array[5].as_str().unwrap().to_string(),
1252 volume: ohlc_array[6].as_str().unwrap().to_string(),
1253 count: ohlc_array[7].as_i64().unwrap(),
1254 };
1255
1256 let instrument_id = InstrumentId::new(Symbol::new("BTC/USD"), *KRAKEN_VENUE);
1258 let instrument = InstrumentAny::CurrencyPair(CurrencyPair::new(
1259 instrument_id,
1260 Symbol::new("XBTUSDT"),
1261 Currency::BTC(),
1262 Currency::USDT(),
1263 1, 8, Price::from("0.1"),
1266 Quantity::from("0.00000001"),
1267 None,
1268 None,
1269 None,
1270 None,
1271 None,
1272 None,
1273 None,
1274 None,
1275 None,
1276 None,
1277 None,
1278 None,
1279 TS,
1280 TS,
1281 ));
1282
1283 let bar_type = BarType::new(
1284 instrument_id,
1285 BarSpecification::new(1, BarAggregation::Minute, PriceType::Last),
1286 AggregationSource::External,
1287 );
1288
1289 let bar = parse_bar(&ohlc, &instrument, bar_type, TS).unwrap();
1290
1291 assert_eq!(bar.bar_type, bar_type);
1292 assert!(bar.open.as_f64() > 0.0);
1293 assert!(bar.high.as_f64() > 0.0);
1294 assert!(bar.low.as_f64() > 0.0);
1295 assert!(bar.close.as_f64() > 0.0);
1296 assert!(bar.volume.as_f64() >= 0.0);
1297 }
1298
1299 #[rstest]
1300 fn test_parse_millis_timestamp() {
1301 let timestamp = 1762795433.9717445;
1302 let result = parse_millis_timestamp(timestamp, "test").unwrap();
1303 assert!(result.as_u64() > 0);
1304 }
1305
1306 #[rstest]
1307 #[case(1, BarAggregation::Minute, 1)]
1308 #[case(5, BarAggregation::Minute, 5)]
1309 #[case(15, BarAggregation::Minute, 15)]
1310 #[case(1, BarAggregation::Hour, 60)]
1311 #[case(4, BarAggregation::Hour, 240)]
1312 #[case(1, BarAggregation::Day, 1440)]
1313 fn test_bar_type_to_spot_interval(
1314 #[case] step: usize,
1315 #[case] aggregation: BarAggregation,
1316 #[case] expected: u32,
1317 ) {
1318 let instrument_id = InstrumentId::new(Symbol::new("BTC/USD"), *KRAKEN_VENUE);
1319 let bar_type = BarType::new(
1320 instrument_id,
1321 BarSpecification::new(step, aggregation, PriceType::Last),
1322 AggregationSource::External,
1323 );
1324
1325 let result = bar_type_to_spot_interval(bar_type).unwrap();
1326 assert_eq!(result, expected);
1327 }
1328
1329 #[rstest]
1330 fn test_bar_type_to_spot_interval_unsupported() {
1331 let instrument_id = InstrumentId::new(Symbol::new("BTC/USD"), *KRAKEN_VENUE);
1332 let bar_type = BarType::new(
1333 instrument_id,
1334 BarSpecification::new(1, BarAggregation::Second, PriceType::Last),
1335 AggregationSource::External,
1336 );
1337
1338 let result = bar_type_to_spot_interval(bar_type);
1339 assert!(result.is_err());
1340 assert!(result.unwrap_err().to_string().contains("Unsupported"));
1341 }
1342
1343 #[rstest]
1344 #[case(1, BarAggregation::Minute, "1m")]
1345 #[case(5, BarAggregation::Minute, "5m")]
1346 #[case(15, BarAggregation::Minute, "15m")]
1347 #[case(1, BarAggregation::Hour, "1h")]
1348 #[case(4, BarAggregation::Hour, "4h")]
1349 #[case(12, BarAggregation::Hour, "12h")]
1350 #[case(1, BarAggregation::Day, "1d")]
1351 #[case(1, BarAggregation::Week, "1w")]
1352 fn test_bar_type_to_futures_resolution(
1353 #[case] step: usize,
1354 #[case] aggregation: BarAggregation,
1355 #[case] expected: &str,
1356 ) {
1357 let instrument_id = InstrumentId::new(Symbol::new("PI_XBTUSD"), *KRAKEN_VENUE);
1358 let bar_type = BarType::new(
1359 instrument_id,
1360 BarSpecification::new(step, aggregation, PriceType::Last),
1361 AggregationSource::External,
1362 );
1363
1364 let result = bar_type_to_futures_resolution(bar_type).unwrap();
1365 assert_eq!(result, expected);
1366 }
1367
1368 #[rstest]
1369 #[case(30, BarAggregation::Minute)] #[case(2, BarAggregation::Hour)] #[case(2, BarAggregation::Day)] #[case(1, BarAggregation::Second)] fn test_bar_type_to_futures_resolution_unsupported(
1374 #[case] step: usize,
1375 #[case] aggregation: BarAggregation,
1376 ) {
1377 let instrument_id = InstrumentId::new(Symbol::new("PI_XBTUSD"), *KRAKEN_VENUE);
1378 let bar_type = BarType::new(
1379 instrument_id,
1380 BarSpecification::new(step, aggregation, PriceType::Last),
1381 AggregationSource::External,
1382 );
1383
1384 let result = bar_type_to_futures_resolution(bar_type);
1385 assert!(result.is_err());
1386 assert!(result.unwrap_err().to_string().contains("Unsupported"));
1387 }
1388
1389 #[rstest]
1390 fn test_parse_order_status_report() {
1391 let json = load_test_json("http_open_orders.json");
1392 let wrapper: serde_json::Value = serde_json::from_str(&json).unwrap();
1393 let result = wrapper.get("result").unwrap();
1394 let open_map = result.get("open").unwrap();
1395 let orders: IndexMap<String, SpotOrder> = serde_json::from_value(open_map.clone()).unwrap();
1396
1397 let account_id = AccountId::new("KRAKEN-001");
1398 let instrument_id = InstrumentId::new(Symbol::new("BTC/USDT"), *KRAKEN_VENUE);
1399 let instrument = InstrumentAny::CurrencyPair(CurrencyPair::new(
1400 instrument_id,
1401 Symbol::new("XBTUSDT"),
1402 Currency::BTC(),
1403 Currency::USDT(),
1404 2,
1405 8,
1406 Price::from("0.01"),
1407 Quantity::from("0.00000001"),
1408 None,
1409 None,
1410 None,
1411 None,
1412 None,
1413 None,
1414 None,
1415 None,
1416 None,
1417 None,
1418 None,
1419 None,
1420 TS,
1421 TS,
1422 ));
1423
1424 let (order_id, order) = orders.iter().next().unwrap();
1425
1426 let report =
1427 parse_order_status_report(order_id, order, &instrument, account_id, TS).unwrap();
1428
1429 assert_eq!(report.account_id, account_id);
1430 assert_eq!(report.instrument_id, instrument_id);
1431 assert_eq!(report.venue_order_id.as_str(), order_id);
1432 assert_eq!(report.order_status, OrderStatus::Accepted);
1433 assert!(report.quantity.as_f64() > 0.0);
1434 }
1435
1436 #[rstest]
1437 fn test_parse_fill_report() {
1438 let json = load_test_json("http_trades_history.json");
1439 let wrapper: serde_json::Value = serde_json::from_str(&json).unwrap();
1440 let result = wrapper.get("result").unwrap();
1441 let trades_map = result.get("trades").unwrap();
1442 let trades: IndexMap<String, SpotTrade> =
1443 serde_json::from_value(trades_map.clone()).unwrap();
1444
1445 let account_id = AccountId::new("KRAKEN-001");
1446 let instrument_id = InstrumentId::new(Symbol::new("BTC/USDT"), *KRAKEN_VENUE);
1447 let instrument = InstrumentAny::CurrencyPair(CurrencyPair::new(
1448 instrument_id,
1449 Symbol::new("XBTUSDT"),
1450 Currency::BTC(),
1451 Currency::USDT(),
1452 2,
1453 8,
1454 Price::from("0.01"),
1455 Quantity::from("0.00000001"),
1456 None,
1457 None,
1458 None,
1459 None,
1460 None,
1461 None,
1462 None,
1463 None,
1464 None,
1465 None,
1466 None,
1467 None,
1468 TS,
1469 TS,
1470 ));
1471
1472 let (trade_id, trade) = trades.iter().next().unwrap();
1473
1474 let report = parse_fill_report(trade_id, trade, &instrument, account_id, TS).unwrap();
1475
1476 assert_eq!(report.account_id, account_id);
1477 assert_eq!(report.instrument_id, instrument_id);
1478 assert_eq!(report.trade_id.to_string(), *trade_id);
1479 assert!(report.last_qty.as_f64() > 0.0);
1480 assert!(report.last_px.as_f64() > 0.0);
1481 assert!(report.commission.as_f64() > 0.0);
1482 }
1483
1484 #[rstest]
1485 #[case("XXBT", "XBT")]
1486 #[case("XETH", "ETH")]
1487 #[case("ZUSD", "USD")]
1488 #[case("ZEUR", "EUR")]
1489 #[case("BTC", "BTC")]
1490 #[case("ETH", "ETH")]
1491 #[case("USDT", "USDT")]
1492 #[case("SOL", "SOL")]
1493 fn test_normalize_currency_code(#[case] input: &str, #[case] expected: &str) {
1494 assert_eq!(normalize_currency_code(input), expected);
1495 }
1496}