1use std::str::FromStr;
19
20use anyhow::Context;
21use nautilus_core::{datetime::NANOSECONDS_IN_MILLISECOND, nanos::UnixNanos};
22use nautilus_model::{
23 data::{Bar, BarType, TradeTick},
24 enums::AggressorSide,
25 identifiers::{InstrumentId, Symbol, TradeId},
26 instruments::{
27 Instrument, any::InstrumentAny, crypto_perpetual::CryptoPerpetual,
28 currency_pair::CurrencyPair,
29 },
30 types::{Currency, Price, Quantity},
31};
32use rust_decimal::Decimal;
33use rust_decimal_macros::dec;
34
35use crate::{
36 common::consts::KRAKEN_VENUE,
37 http::models::{AssetPairInfo, FuturesInstrument, OhlcData},
38};
39
40pub fn parse_decimal(value: &str) -> anyhow::Result<Decimal> {
42 if value.is_empty() || value == "0" {
43 return Ok(dec!(0));
44 }
45 value
46 .parse::<Decimal>()
47 .map_err(|e| anyhow::anyhow!("Failed to parse decimal '{value}': {e}"))
48}
49
50pub fn parse_decimal_opt(value: Option<&str>) -> anyhow::Result<Option<Decimal>> {
52 match value {
53 Some(s) if !s.is_empty() && s != "0" => Ok(Some(parse_decimal(s)?)),
54 _ => Ok(None),
55 }
56}
57
58pub fn parse_spot_instrument(
67 pair_name: &str,
68 definition: &AssetPairInfo,
69 ts_event: UnixNanos,
70 ts_init: UnixNanos,
71) -> anyhow::Result<InstrumentAny> {
72 let symbol_str = definition.wsname.as_ref().unwrap_or(&definition.altname);
73 let instrument_id = InstrumentId::new(Symbol::new(symbol_str.as_str()), *KRAKEN_VENUE);
74 let raw_symbol = Symbol::new(pair_name);
75
76 let base_currency = get_currency(definition.base.as_str());
77 let quote_currency = get_currency(definition.quote.as_str());
78
79 let price_increment = parse_price(
80 definition
81 .tick_size
82 .as_ref()
83 .context("tick_size is required")?,
84 "tick_size",
85 )?;
86
87 let size_precision = definition.lot_decimals;
89 let size_increment = Quantity::new(10.0_f64.powi(-(size_precision as i32)), size_precision);
90
91 let min_quantity = definition
92 .ordermin
93 .as_ref()
94 .map(|s| parse_quantity(s, "ordermin"))
95 .transpose()?;
96
97 let taker_fee = definition
99 .fees
100 .first()
101 .map(|(_, fee)| Decimal::try_from(*fee))
102 .transpose()
103 .context("Failed to parse taker fee")?
104 .map(|f| f / dec!(100));
105
106 let maker_fee = definition
107 .fees_maker
108 .first()
109 .map(|(_, fee)| Decimal::try_from(*fee))
110 .transpose()
111 .context("Failed to parse maker fee")?
112 .map(|f| f / dec!(100));
113
114 let instrument = CurrencyPair::new(
115 instrument_id,
116 raw_symbol,
117 base_currency,
118 quote_currency,
119 price_increment.precision,
120 size_increment.precision,
121 price_increment,
122 size_increment,
123 None,
124 None,
125 None,
126 min_quantity,
127 None,
128 None,
129 None,
130 None,
131 maker_fee,
132 taker_fee,
133 None,
134 None,
135 ts_event,
136 ts_init,
137 );
138
139 Ok(InstrumentAny::CurrencyPair(instrument))
140}
141
142pub fn parse_futures_instrument(
151 instrument: &FuturesInstrument,
152 ts_event: UnixNanos,
153 ts_init: UnixNanos,
154) -> anyhow::Result<InstrumentAny> {
155 let instrument_id = InstrumentId::new(Symbol::new(&instrument.symbol), *KRAKEN_VENUE);
156 let raw_symbol = Symbol::new(&instrument.symbol);
157
158 let base_currency = get_currency(&instrument.base);
159 let quote_currency = get_currency(&instrument.quote);
160
161 let is_inverse = instrument.instrument_type.contains("inverse");
162 let settlement_currency = if is_inverse {
163 base_currency
164 } else {
165 quote_currency
166 };
167
168 let price_increment = Price::from(instrument.tick_size.to_string());
169
170 let size_precision = if instrument.contract_size.fract() == 0.0 {
172 0
173 } else {
174 instrument
175 .contract_size
176 .to_string()
177 .split('.')
178 .nth(1)
179 .map_or(0, |s| s.len() as u8)
180 };
181 let size_increment = Quantity::new(instrument.contract_size, size_precision);
182
183 let multiplier = Some(Quantity::new(instrument.contract_size, size_precision));
184
185 let (margin_init, margin_maint) = instrument
187 .margin_levels
188 .first()
189 .and_then(|level| {
190 let init = Decimal::try_from(level.initial_margin).ok()?;
191 let maint = Decimal::try_from(level.maintenance_margin).ok()?;
192 Some((Some(init), Some(maint)))
193 })
194 .unwrap_or((None, None));
195
196 let instrument = CryptoPerpetual::new(
197 instrument_id,
198 raw_symbol,
199 base_currency,
200 quote_currency,
201 settlement_currency,
202 is_inverse,
203 price_increment.precision,
204 size_increment.precision,
205 price_increment,
206 size_increment,
207 multiplier,
208 None, None, None, None, None, None, None, margin_init,
216 margin_maint,
217 None, None, ts_event,
220 ts_init,
221 );
222
223 Ok(InstrumentAny::CryptoPerpetual(instrument))
224}
225
226fn parse_price(value: &str, field: &str) -> anyhow::Result<Price> {
227 Price::from_str(value)
228 .map_err(|err| anyhow::anyhow!("Failed to parse {field}='{value}': {err}"))
229}
230
231fn parse_quantity(value: &str, field: &str) -> anyhow::Result<Quantity> {
232 Quantity::from_str(value)
233 .map_err(|err| anyhow::anyhow!("Failed to parse {field}='{value}': {err}"))
234}
235
236pub fn get_currency(code: &str) -> Currency {
241 Currency::get_or_create_crypto(code)
242}
243
244pub fn parse_trade_tick_from_array(
255 trade_array: &[serde_json::Value],
256 instrument: &InstrumentAny,
257 ts_init: UnixNanos,
258) -> anyhow::Result<TradeTick> {
259 let price_str = trade_array
260 .first()
261 .and_then(|v| v.as_str())
262 .context("Missing or invalid price")?;
263 let price = parse_price_with_precision(price_str, instrument.price_precision(), "trade.price")?;
264
265 let size_str = trade_array
266 .get(1)
267 .and_then(|v| v.as_str())
268 .context("Missing or invalid volume")?;
269 let size = parse_quantity_with_precision(size_str, instrument.size_precision(), "trade.size")?;
270
271 let time = trade_array
272 .get(2)
273 .and_then(|v| v.as_f64())
274 .context("Missing or invalid timestamp")?;
275 let ts_event = parse_millis_timestamp(time, "trade.time")?;
276
277 let side_str = trade_array
278 .get(3)
279 .and_then(|v| v.as_str())
280 .context("Missing or invalid side")?;
281 let aggressor = match side_str {
282 "b" => AggressorSide::Buyer,
283 "s" => AggressorSide::Seller,
284 _ => AggressorSide::NoAggressor,
285 };
286
287 let trade_id_value = trade_array.get(6).context("Missing trade_id")?;
288 let trade_id = if let Some(id) = trade_id_value.as_i64() {
289 TradeId::new_checked(id.to_string())?
290 } else if let Some(id_str) = trade_id_value.as_str() {
291 TradeId::new_checked(id_str)?
292 } else {
293 anyhow::bail!("Invalid trade_id format");
294 };
295
296 TradeTick::new_checked(
297 instrument.id(),
298 price,
299 size,
300 aggressor,
301 trade_id,
302 ts_event,
303 ts_init,
304 )
305 .context("Failed to construct TradeTick from Kraken trade")
306}
307
308pub fn parse_bar(
316 ohlc: &OhlcData,
317 instrument: &InstrumentAny,
318 bar_type: BarType,
319 ts_init: UnixNanos,
320) -> anyhow::Result<Bar> {
321 let price_precision = instrument.price_precision();
322 let size_precision = instrument.size_precision();
323
324 let open = parse_price_with_precision(&ohlc.open, price_precision, "ohlc.open")?;
325 let high = parse_price_with_precision(&ohlc.high, price_precision, "ohlc.high")?;
326 let low = parse_price_with_precision(&ohlc.low, price_precision, "ohlc.low")?;
327 let close = parse_price_with_precision(&ohlc.close, price_precision, "ohlc.close")?;
328 let volume = parse_quantity_with_precision(&ohlc.volume, size_precision, "ohlc.volume")?;
329
330 let ts_event = UnixNanos::from((ohlc.time as u64) * 1_000_000_000);
331
332 Bar::new_checked(bar_type, open, high, low, close, volume, ts_event, ts_init)
333 .context("Failed to construct Bar from Kraken OHLC")
334}
335
336fn parse_price_with_precision(value: &str, precision: u8, field: &str) -> anyhow::Result<Price> {
337 let parsed = value
338 .parse::<f64>()
339 .with_context(|| format!("Failed to parse {field}='{value}' as f64"))?;
340 Price::new_checked(parsed, precision).with_context(|| {
341 format!("Failed to construct Price for {field} with precision {precision}")
342 })
343}
344
345fn parse_quantity_with_precision(
346 value: &str,
347 precision: u8,
348 field: &str,
349) -> anyhow::Result<Quantity> {
350 let parsed = value
351 .parse::<f64>()
352 .with_context(|| format!("Failed to parse {field}='{value}' as f64"))?;
353 Quantity::new_checked(parsed, precision).with_context(|| {
354 format!("Failed to construct Quantity for {field} with precision {precision}")
355 })
356}
357
358pub fn parse_millis_timestamp(value: f64, field: &str) -> anyhow::Result<UnixNanos> {
359 let millis = (value * 1000.0) as u64;
360 let nanos = millis
361 .checked_mul(NANOSECONDS_IN_MILLISECOND)
362 .with_context(|| format!("{field} timestamp overflowed when converting to nanoseconds"))?;
363 Ok(UnixNanos::from(nanos))
364}
365
366pub fn bar_type_to_spot_interval(bar_type: BarType) -> anyhow::Result<u32> {
374 let step = bar_type.spec().step.get() as u32;
375 let base_interval = match bar_type.spec().aggregation {
376 nautilus_model::enums::BarAggregation::Minute => 1,
377 nautilus_model::enums::BarAggregation::Hour => 60,
378 nautilus_model::enums::BarAggregation::Day => 1440,
379 other => {
380 anyhow::bail!("Unsupported bar aggregation for Kraken Spot: {other:?}");
381 }
382 };
383 Ok(base_interval * step)
384}
385
386pub fn bar_type_to_futures_resolution(bar_type: BarType) -> anyhow::Result<&'static str> {
396 let step = bar_type.spec().step.get() as u32;
397 match bar_type.spec().aggregation {
398 nautilus_model::enums::BarAggregation::Minute => match step {
399 1 => Ok("1m"),
400 5 => Ok("5m"),
401 15 => Ok("15m"),
402 _ => anyhow::bail!("Unsupported minute step for Kraken Futures: {step}"),
403 },
404 nautilus_model::enums::BarAggregation::Hour => match step {
405 1 => Ok("1h"),
406 4 => Ok("4h"),
407 12 => Ok("12h"),
408 _ => anyhow::bail!("Unsupported hour step for Kraken Futures: {step}"),
409 },
410 nautilus_model::enums::BarAggregation::Day => {
411 if step == 1 {
412 Ok("1d")
413 } else {
414 anyhow::bail!("Unsupported day step for Kraken Futures: {step}")
415 }
416 }
417 nautilus_model::enums::BarAggregation::Week => {
418 if step == 1 {
419 Ok("1w")
420 } else {
421 anyhow::bail!("Unsupported week step for Kraken Futures: {step}")
422 }
423 }
424 other => {
425 anyhow::bail!("Unsupported bar aggregation for Kraken Futures: {other:?}");
426 }
427 }
428}
429
430#[cfg(test)]
435mod tests {
436 use nautilus_model::{
437 data::BarSpecification,
438 enums::{AggregationSource, BarAggregation, PriceType},
439 };
440 use rstest::rstest;
441
442 use super::*;
443 use crate::http::models::AssetPairsResponse;
444
445 const TS: UnixNanos = UnixNanos::new(1_700_000_000_000_000_000);
446
447 fn load_test_json(filename: &str) -> String {
448 let path = format!("test_data/{filename}");
449 std::fs::read_to_string(&path)
450 .unwrap_or_else(|e| panic!("Failed to load test data from {path}: {e}"))
451 }
452
453 #[rstest]
454 fn test_parse_decimal() {
455 assert_eq!(parse_decimal("123.45").unwrap(), dec!(123.45));
456 assert_eq!(parse_decimal("0").unwrap(), dec!(0));
457 assert_eq!(parse_decimal("").unwrap(), dec!(0));
458 }
459
460 #[rstest]
461 fn test_parse_decimal_opt() {
462 assert_eq!(
463 parse_decimal_opt(Some("123.45")).unwrap(),
464 Some(dec!(123.45))
465 );
466 assert_eq!(parse_decimal_opt(Some("0")).unwrap(), None);
467 assert_eq!(parse_decimal_opt(Some("")).unwrap(), None);
468 assert_eq!(parse_decimal_opt(None).unwrap(), None);
469 }
470
471 #[rstest]
472 fn test_parse_spot_instrument() {
473 let json = load_test_json("http_asset_pairs.json");
474 let wrapper: serde_json::Value = serde_json::from_str(&json).unwrap();
475 let result = wrapper.get("result").unwrap();
476 let pairs: AssetPairsResponse = serde_json::from_value(result.clone()).unwrap();
477
478 let (pair_name, definition) = pairs.iter().next().unwrap();
479
480 let instrument = parse_spot_instrument(pair_name, definition, TS, TS).unwrap();
481
482 match instrument {
483 InstrumentAny::CurrencyPair(pair) => {
484 assert_eq!(pair.id.venue.as_str(), "KRAKEN");
485 assert_eq!(pair.base_currency.code.as_str(), "XXBT");
486 assert_eq!(pair.quote_currency.code.as_str(), "USDT");
487 assert!(pair.price_increment.as_f64() > 0.0);
488 assert!(pair.size_increment.as_f64() > 0.0);
489 assert!(pair.min_quantity.is_some());
490 }
491 _ => panic!("Expected CurrencyPair"),
492 }
493 }
494
495 #[rstest]
496 fn test_parse_futures_instrument() {
497 let json = load_test_json("http_futures_instruments.json");
498 let response: crate::http::models::FuturesInstrumentsResponse =
499 serde_json::from_str(&json).unwrap();
500
501 let fut_instrument = &response.instruments[0];
502
503 let instrument = parse_futures_instrument(fut_instrument, TS, TS).unwrap();
504
505 match instrument {
506 InstrumentAny::CryptoPerpetual(perp) => {
507 assert_eq!(perp.id.venue.as_str(), "KRAKEN");
508 assert_eq!(perp.id.symbol.as_str(), "PI_XBTUSD");
509 assert_eq!(perp.raw_symbol.as_str(), "PI_XBTUSD");
510 assert_eq!(perp.base_currency.code.as_str(), "BTC");
511 assert_eq!(perp.quote_currency.code.as_str(), "USD");
512 assert_eq!(perp.settlement_currency.code.as_str(), "BTC");
513 assert!(perp.is_inverse);
514 assert_eq!(perp.price_increment.as_f64(), 0.5);
515 assert_eq!(perp.size_increment.as_f64(), 1.0);
516 assert_eq!(perp.margin_init, dec!(0.02));
517 assert_eq!(perp.margin_maint, dec!(0.01));
518 }
519 _ => panic!("Expected CryptoPerpetual"),
520 }
521 }
522
523 #[rstest]
524 fn test_parse_trade_tick_from_array() {
525 let json = load_test_json("http_trades.json");
526 let wrapper: serde_json::Value = serde_json::from_str(&json).unwrap();
527 let result = wrapper.get("result").unwrap();
528 let trades_map = result.as_object().unwrap();
529
530 let (_pair, trades_value) = trades_map.iter().find(|(k, _)| *k != "last").unwrap();
532 let trades = trades_value.as_array().unwrap();
533 let trade_array = trades[0].as_array().unwrap();
534
535 let instrument_id = InstrumentId::new(Symbol::new("BTC/USD"), *KRAKEN_VENUE);
537 let instrument = InstrumentAny::CurrencyPair(CurrencyPair::new(
538 instrument_id,
539 Symbol::new("XBTUSDT"),
540 Currency::BTC(),
541 Currency::USDT(),
542 1, 8, Price::from("0.1"),
545 Quantity::from("0.00000001"),
546 None,
547 None,
548 None,
549 None,
550 None,
551 None,
552 None,
553 None,
554 None,
555 None,
556 None,
557 None,
558 TS,
559 TS,
560 ));
561
562 let trade_tick = parse_trade_tick_from_array(trade_array, &instrument, TS).unwrap();
563
564 assert_eq!(trade_tick.instrument_id, instrument_id);
565 assert!(trade_tick.price.as_f64() > 0.0);
566 assert!(trade_tick.size.as_f64() > 0.0);
567 }
568
569 #[rstest]
570 fn test_parse_bar() {
571 let json = load_test_json("http_ohlc.json");
572 let wrapper: serde_json::Value = serde_json::from_str(&json).unwrap();
573 let result = wrapper.get("result").unwrap();
574 let ohlc_map = result.as_object().unwrap();
575
576 let (_pair, ohlc_value) = ohlc_map.iter().find(|(k, _)| *k != "last").unwrap();
578 let ohlcs = ohlc_value.as_array().unwrap();
579
580 let ohlc_array = ohlcs[0].as_array().unwrap();
582 let ohlc = OhlcData {
583 time: ohlc_array[0].as_i64().unwrap(),
584 open: ohlc_array[1].as_str().unwrap().to_string(),
585 high: ohlc_array[2].as_str().unwrap().to_string(),
586 low: ohlc_array[3].as_str().unwrap().to_string(),
587 close: ohlc_array[4].as_str().unwrap().to_string(),
588 vwap: ohlc_array[5].as_str().unwrap().to_string(),
589 volume: ohlc_array[6].as_str().unwrap().to_string(),
590 count: ohlc_array[7].as_i64().unwrap(),
591 };
592
593 let instrument_id = InstrumentId::new(Symbol::new("BTC/USD"), *KRAKEN_VENUE);
595 let instrument = InstrumentAny::CurrencyPair(CurrencyPair::new(
596 instrument_id,
597 Symbol::new("XBTUSDT"),
598 Currency::BTC(),
599 Currency::USDT(),
600 1, 8, Price::from("0.1"),
603 Quantity::from("0.00000001"),
604 None,
605 None,
606 None,
607 None,
608 None,
609 None,
610 None,
611 None,
612 None,
613 None,
614 None,
615 None,
616 TS,
617 TS,
618 ));
619
620 let bar_type = BarType::new(
621 instrument_id,
622 BarSpecification::new(1, BarAggregation::Minute, PriceType::Last),
623 AggregationSource::External,
624 );
625
626 let bar = parse_bar(&ohlc, &instrument, bar_type, TS).unwrap();
627
628 assert_eq!(bar.bar_type, bar_type);
629 assert!(bar.open.as_f64() > 0.0);
630 assert!(bar.high.as_f64() > 0.0);
631 assert!(bar.low.as_f64() > 0.0);
632 assert!(bar.close.as_f64() > 0.0);
633 assert!(bar.volume.as_f64() >= 0.0);
634 }
635
636 #[rstest]
637 fn test_parse_millis_timestamp() {
638 let timestamp = 1762795433.9717445;
639 let result = parse_millis_timestamp(timestamp, "test").unwrap();
640 assert!(result.as_u64() > 0);
641 }
642
643 #[rstest]
644 #[case(1, BarAggregation::Minute, 1)]
645 #[case(5, BarAggregation::Minute, 5)]
646 #[case(15, BarAggregation::Minute, 15)]
647 #[case(1, BarAggregation::Hour, 60)]
648 #[case(4, BarAggregation::Hour, 240)]
649 #[case(1, BarAggregation::Day, 1440)]
650 fn test_bar_type_to_spot_interval(
651 #[case] step: usize,
652 #[case] aggregation: BarAggregation,
653 #[case] expected: u32,
654 ) {
655 let instrument_id = InstrumentId::new(Symbol::new("BTC/USD"), *KRAKEN_VENUE);
656 let bar_type = BarType::new(
657 instrument_id,
658 BarSpecification::new(step, aggregation, PriceType::Last),
659 AggregationSource::External,
660 );
661
662 let result = bar_type_to_spot_interval(bar_type).unwrap();
663 assert_eq!(result, expected);
664 }
665
666 #[rstest]
667 fn test_bar_type_to_spot_interval_unsupported() {
668 let instrument_id = InstrumentId::new(Symbol::new("BTC/USD"), *KRAKEN_VENUE);
669 let bar_type = BarType::new(
670 instrument_id,
671 BarSpecification::new(1, BarAggregation::Second, PriceType::Last),
672 AggregationSource::External,
673 );
674
675 let result = bar_type_to_spot_interval(bar_type);
676 assert!(result.is_err());
677 assert!(result.unwrap_err().to_string().contains("Unsupported"));
678 }
679
680 #[rstest]
681 #[case(1, BarAggregation::Minute, "1m")]
682 #[case(5, BarAggregation::Minute, "5m")]
683 #[case(15, BarAggregation::Minute, "15m")]
684 #[case(1, BarAggregation::Hour, "1h")]
685 #[case(4, BarAggregation::Hour, "4h")]
686 #[case(12, BarAggregation::Hour, "12h")]
687 #[case(1, BarAggregation::Day, "1d")]
688 #[case(1, BarAggregation::Week, "1w")]
689 fn test_bar_type_to_futures_resolution(
690 #[case] step: usize,
691 #[case] aggregation: BarAggregation,
692 #[case] expected: &str,
693 ) {
694 let instrument_id = InstrumentId::new(Symbol::new("PI_XBTUSD"), *KRAKEN_VENUE);
695 let bar_type = BarType::new(
696 instrument_id,
697 BarSpecification::new(step, aggregation, PriceType::Last),
698 AggregationSource::External,
699 );
700
701 let result = bar_type_to_futures_resolution(bar_type).unwrap();
702 assert_eq!(result, expected);
703 }
704
705 #[rstest]
706 #[case(30, BarAggregation::Minute)] #[case(2, BarAggregation::Hour)] #[case(2, BarAggregation::Day)] #[case(1, BarAggregation::Second)] fn test_bar_type_to_futures_resolution_unsupported(
711 #[case] step: usize,
712 #[case] aggregation: BarAggregation,
713 ) {
714 let instrument_id = InstrumentId::new(Symbol::new("PI_XBTUSD"), *KRAKEN_VENUE);
715 let bar_type = BarType::new(
716 instrument_id,
717 BarSpecification::new(step, aggregation, PriceType::Last),
718 AggregationSource::External,
719 );
720
721 let result = bar_type_to_futures_resolution(bar_type);
722 assert!(result.is_err());
723 assert!(result.unwrap_err().to_string().contains("Unsupported"));
724 }
725}