1use std::{
19 collections::HashMap,
20 fmt::{Debug, Formatter},
21 num::NonZeroU32,
22 sync::{
23 Arc, LazyLock,
24 atomic::{AtomicBool, Ordering},
25 },
26};
27
28use dashmap::DashMap;
29use nautilus_core::{
30 consts::NAUTILUS_USER_AGENT, nanos::UnixNanos, time::get_atomic_clock_realtime,
31};
32use nautilus_model::{
33 data::{Bar, BarType, TradeTick},
34 identifiers::InstrumentId,
35 instruments::{Instrument, InstrumentAny},
36};
37use nautilus_network::{
38 http::HttpClient,
39 ratelimiter::quota::Quota,
40 retry::{RetryConfig, RetryManager},
41};
42use reqwest::{Method, header::USER_AGENT};
43use serde::de::DeserializeOwned;
44use tokio_util::sync::CancellationToken;
45use ustr::Ustr;
46
47use super::{
48 error::KrakenHttpError,
49 models::{FuturesCandlesResponse, FuturesInstrumentsResponse, FuturesTickersResponse, *},
50};
51use crate::common::{
52 credential::KrakenCredential,
53 enums::{KrakenEnvironment, KrakenProductType},
54 parse::{
55 bar_type_to_futures_resolution, bar_type_to_spot_interval, parse_bar,
56 parse_futures_instrument, parse_spot_instrument, parse_trade_tick_from_array,
57 },
58 urls::get_http_base_url,
59};
60
61pub static KRAKEN_REST_QUOTA: LazyLock<Quota> = LazyLock::new(|| {
62 Quota::per_second(NonZeroU32::new(5).expect("Should be a valid non-zero u32"))
63});
64
65const KRAKEN_GLOBAL_RATE_KEY: &str = "kraken:global";
66
67#[derive(Debug, Clone, serde::Deserialize)]
68pub struct KrakenResponse<T> {
69 pub error: Vec<String>,
70 pub result: Option<T>,
71}
72
73pub struct KrakenRawHttpClient {
74 base_url: String,
75 client: HttpClient,
76 credential: Option<KrakenCredential>,
77 retry_manager: RetryManager<KrakenHttpError>,
78 cancellation_token: CancellationToken,
79}
80
81impl Default for KrakenRawHttpClient {
82 fn default() -> Self {
83 Self::new(None, Some(60), None, None, None, None)
84 .expect("Failed to create default KrakenRawHttpClient")
85 }
86}
87
88impl Debug for KrakenRawHttpClient {
89 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
90 f.debug_struct("KrakenRawHttpClient")
91 .field("base_url", &self.base_url)
92 .field("has_credentials", &self.credential.is_some())
93 .finish()
94 }
95}
96
97impl KrakenRawHttpClient {
98 #[allow(clippy::too_many_arguments)]
99 pub fn new(
100 base_url: Option<String>,
101 timeout_secs: Option<u64>,
102 max_retries: Option<u32>,
103 retry_delay_ms: Option<u64>,
104 retry_delay_max_ms: Option<u64>,
105 proxy_url: Option<String>,
106 ) -> anyhow::Result<Self> {
107 let retry_config = RetryConfig {
108 max_retries: max_retries.unwrap_or(3),
109 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
110 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
111 backoff_factor: 2.0,
112 jitter_ms: 1000,
113 operation_timeout_ms: Some(60_000),
114 immediate_first: false,
115 max_elapsed_ms: Some(180_000),
116 };
117
118 let retry_manager = RetryManager::new(retry_config);
119
120 Ok(Self {
121 base_url: base_url.unwrap_or_else(|| {
122 get_http_base_url(KrakenProductType::Spot, KrakenEnvironment::Mainnet).to_string()
123 }),
124 client: HttpClient::new(
125 Self::default_headers(),
126 vec![],
127 Self::rate_limiter_quotas(),
128 Some(*KRAKEN_REST_QUOTA),
129 timeout_secs,
130 proxy_url,
131 )
132 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))?,
133 credential: None,
134 retry_manager,
135 cancellation_token: CancellationToken::new(),
136 })
137 }
138
139 #[allow(clippy::too_many_arguments)]
140 pub fn with_credentials(
141 api_key: String,
142 api_secret: String,
143 base_url: Option<String>,
144 timeout_secs: Option<u64>,
145 max_retries: Option<u32>,
146 retry_delay_ms: Option<u64>,
147 retry_delay_max_ms: Option<u64>,
148 proxy_url: Option<String>,
149 ) -> anyhow::Result<Self> {
150 let retry_config = RetryConfig {
151 max_retries: max_retries.unwrap_or(3),
152 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
153 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
154 backoff_factor: 2.0,
155 jitter_ms: 1000,
156 operation_timeout_ms: Some(60_000),
157 immediate_first: false,
158 max_elapsed_ms: Some(180_000),
159 };
160
161 let retry_manager = RetryManager::new(retry_config);
162
163 Ok(Self {
164 base_url: base_url.unwrap_or_else(|| {
165 get_http_base_url(KrakenProductType::Spot, KrakenEnvironment::Mainnet).to_string()
166 }),
167 client: HttpClient::new(
168 Self::default_headers(),
169 vec![],
170 Self::rate_limiter_quotas(),
171 Some(*KRAKEN_REST_QUOTA),
172 timeout_secs,
173 proxy_url,
174 )
175 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))?,
176 credential: Some(KrakenCredential::new(api_key, api_secret)),
177 retry_manager,
178 cancellation_token: CancellationToken::new(),
179 })
180 }
181
182 pub fn base_url(&self) -> &str {
183 &self.base_url
184 }
185
186 pub fn credential(&self) -> Option<&KrakenCredential> {
187 self.credential.as_ref()
188 }
189
190 pub fn cancel_all_requests(&self) {
191 self.cancellation_token.cancel();
192 }
193
194 pub fn cancellation_token(&self) -> &CancellationToken {
195 &self.cancellation_token
196 }
197
198 fn default_headers() -> HashMap<String, String> {
199 HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())])
200 }
201
202 fn rate_limiter_quotas() -> Vec<(String, Quota)> {
203 vec![(KRAKEN_GLOBAL_RATE_KEY.to_string(), *KRAKEN_REST_QUOTA)]
204 }
205
206 fn rate_limit_keys(endpoint: &str) -> Vec<String> {
207 let normalized = endpoint.split('?').next().unwrap_or(endpoint);
208 let route = format!("kraken:{normalized}");
209 vec![KRAKEN_GLOBAL_RATE_KEY.to_string(), route]
210 }
211
212 fn sign_request(
213 &self,
214 path: &str,
215 nonce: u64,
216 params: &HashMap<String, String>,
217 ) -> anyhow::Result<(HashMap<String, String>, String)> {
218 let credential = self
219 .credential
220 .as_ref()
221 .ok_or_else(|| anyhow::anyhow!("Missing credentials"))?;
222
223 let (signature, post_data) = credential.sign_request(path, nonce, params)?;
224
225 let mut headers = HashMap::new();
226 headers.insert("API-Key".to_string(), credential.api_key().to_string());
227 headers.insert("API-Sign".to_string(), signature);
228
229 Ok((headers, post_data))
230 }
231
232 async fn send_request<T: DeserializeOwned>(
233 &self,
234 method: Method,
235 endpoint: &str,
236 body: Option<Vec<u8>>,
237 authenticate: bool,
238 ) -> anyhow::Result<KrakenResponse<T>, KrakenHttpError> {
239 let endpoint = endpoint.to_string();
240 let url = format!("{}{endpoint}", self.base_url);
241 let method_clone = method.clone();
242 let body_clone = body.clone();
243
244 let operation = || {
245 let url = url.clone();
246 let method = method_clone.clone();
247 let body = body_clone.clone();
248 let endpoint = endpoint.clone();
249
250 async move {
251 let mut headers = Self::default_headers();
252
253 let final_body = if authenticate {
254 let nonce = std::time::SystemTime::now()
255 .duration_since(std::time::UNIX_EPOCH)
256 .expect("Time went backwards")
257 .as_millis() as u64;
258
259 let params: HashMap<String, String> = if let Some(ref body_bytes) = body {
260 let body_str = std::str::from_utf8(body_bytes).map_err(|e| {
261 KrakenHttpError::ParseError(format!(
262 "Invalid UTF-8 in request body: {e}"
263 ))
264 })?;
265 serde_urlencoded::from_str(body_str).map_err(|e| {
266 KrakenHttpError::ParseError(format!(
267 "Failed to parse request params: {e}"
268 ))
269 })?
270 } else {
271 HashMap::new()
272 };
273
274 let (auth_headers, post_data) = self
275 .sign_request(&endpoint, nonce, ¶ms)
276 .map_err(|e| KrakenHttpError::NetworkError(e.to_string()))?;
277 headers.extend(auth_headers);
278
279 Some(post_data.into_bytes())
281 } else {
282 body
283 };
284
285 if method == Method::POST {
286 headers.insert(
287 "Content-Type".to_string(),
288 "application/x-www-form-urlencoded".to_string(),
289 );
290 }
291
292 let rate_limit_keys = Self::rate_limit_keys(&endpoint);
293
294 let response = self
295 .client
296 .request(
297 method,
298 url,
299 None,
300 Some(headers),
301 final_body,
302 None,
303 Some(rate_limit_keys),
304 )
305 .await
306 .map_err(|e| KrakenHttpError::NetworkError(e.to_string()))?;
307
308 if response.status.as_u16() >= 400 {
309 let body = String::from_utf8_lossy(&response.body).to_string();
310 return Err(KrakenHttpError::NetworkError(format!(
311 "HTTP error {}: {body}",
312 response.status.as_u16()
313 )));
314 }
315
316 let response_text = String::from_utf8(response.body.to_vec()).map_err(|e| {
317 KrakenHttpError::ParseError(format!("Failed to parse response as UTF-8: {e}"))
318 })?;
319
320 let kraken_response: KrakenResponse<T> = serde_json::from_str(&response_text)
321 .map_err(|e| {
322 KrakenHttpError::ParseError(format!("Failed to deserialize response: {e}"))
323 })?;
324
325 if !kraken_response.error.is_empty() {
326 return Err(KrakenHttpError::ApiError(kraken_response.error.clone()));
327 }
328
329 Ok(kraken_response)
330 }
331 };
332
333 let should_retry =
334 |error: &KrakenHttpError| -> bool { matches!(error, KrakenHttpError::NetworkError(_)) };
335
336 let create_error = |msg: String| -> KrakenHttpError { KrakenHttpError::NetworkError(msg) };
337
338 self.retry_manager
339 .execute_with_retry_with_cancel(
340 &endpoint,
341 operation,
342 should_retry,
343 create_error,
344 &self.cancellation_token,
345 )
346 .await
347 }
348
349 async fn send_futures_request<T: DeserializeOwned>(
350 &self,
351 method: Method,
352 endpoint: &str,
353 url: String,
354 ) -> anyhow::Result<T, KrakenHttpError> {
355 let endpoint = endpoint.to_string();
356 let method_clone = method.clone();
357 let url_clone = url.clone();
358
359 let operation = || {
360 let url = url_clone.clone();
361 let method = method_clone.clone();
362 let endpoint = endpoint.clone();
363
364 async move {
365 let headers = Self::default_headers();
366 let rate_limit_keys = Self::rate_limit_keys(&endpoint);
367
368 let response = self
369 .client
370 .request(
371 method,
372 url,
373 None,
374 Some(headers),
375 None,
376 None,
377 Some(rate_limit_keys),
378 )
379 .await
380 .map_err(|e| KrakenHttpError::NetworkError(e.to_string()))?;
381
382 if response.status.as_u16() >= 400 {
383 let body = String::from_utf8_lossy(&response.body).to_string();
384 return Err(KrakenHttpError::NetworkError(format!(
385 "HTTP error {}: {body}",
386 response.status.as_u16()
387 )));
388 }
389
390 let response_text = String::from_utf8(response.body.to_vec()).map_err(|e| {
391 KrakenHttpError::ParseError(format!("Failed to parse response as UTF-8: {e}"))
392 })?;
393
394 serde_json::from_str(&response_text).map_err(|e| {
395 KrakenHttpError::ParseError(format!(
396 "Failed to deserialize futures response: {e}"
397 ))
398 })
399 }
400 };
401
402 let should_retry =
403 |error: &KrakenHttpError| -> bool { matches!(error, KrakenHttpError::NetworkError(_)) };
404
405 let create_error = |msg: String| -> KrakenHttpError { KrakenHttpError::NetworkError(msg) };
406
407 self.retry_manager
408 .execute_with_retry_with_cancel(
409 &endpoint,
410 operation,
411 should_retry,
412 create_error,
413 &self.cancellation_token,
414 )
415 .await
416 }
417
418 fn generate_ts_init(&self) -> UnixNanos {
419 get_atomic_clock_realtime().get_time_ns()
420 }
421
422 pub async fn get_server_time(&self) -> anyhow::Result<ServerTime, KrakenHttpError> {
423 let response: KrakenResponse<ServerTime> = self
424 .send_request(Method::GET, "/0/public/Time", None, false)
425 .await?;
426
427 response.result.ok_or_else(|| {
428 KrakenHttpError::ParseError("Missing result in server time response".to_string())
429 })
430 }
431
432 pub async fn get_system_status(&self) -> anyhow::Result<SystemStatus, KrakenHttpError> {
433 let response: KrakenResponse<SystemStatus> = self
434 .send_request(Method::GET, "/0/public/SystemStatus", None, false)
435 .await?;
436
437 response.result.ok_or_else(|| {
438 KrakenHttpError::ParseError("Missing result in system status response".to_string())
439 })
440 }
441
442 pub async fn get_asset_pairs(
443 &self,
444 pairs: Option<Vec<String>>,
445 ) -> anyhow::Result<AssetPairsResponse, KrakenHttpError> {
446 let endpoint = if let Some(pairs) = pairs {
447 format!("/0/public/AssetPairs?pair={}", pairs.join(","))
448 } else {
449 "/0/public/AssetPairs".to_string()
450 };
451
452 let response: KrakenResponse<AssetPairsResponse> = self
453 .send_request(Method::GET, &endpoint, None, false)
454 .await?;
455
456 response.result.ok_or_else(|| {
457 KrakenHttpError::ParseError("Missing result in asset pairs response".to_string())
458 })
459 }
460
461 pub async fn get_ticker(
462 &self,
463 pairs: Vec<String>,
464 ) -> anyhow::Result<TickerResponse, KrakenHttpError> {
465 let endpoint = format!("/0/public/Ticker?pair={}", pairs.join(","));
466
467 let response: KrakenResponse<TickerResponse> = self
468 .send_request(Method::GET, &endpoint, None, false)
469 .await?;
470
471 response.result.ok_or_else(|| {
472 KrakenHttpError::ParseError("Missing result in ticker response".to_string())
473 })
474 }
475
476 pub async fn get_ohlc(
477 &self,
478 pair: &str,
479 interval: Option<u32>,
480 since: Option<i64>,
481 ) -> anyhow::Result<OhlcResponse, KrakenHttpError> {
482 let mut endpoint = format!("/0/public/OHLC?pair={pair}");
483
484 if let Some(interval) = interval {
485 endpoint.push_str(&format!("&interval={interval}"));
486 }
487 if let Some(since) = since {
488 endpoint.push_str(&format!("&since={since}"));
489 }
490
491 let response: KrakenResponse<OhlcResponse> = self
492 .send_request(Method::GET, &endpoint, None, false)
493 .await?;
494
495 response.result.ok_or_else(|| {
496 KrakenHttpError::ParseError("Missing result in OHLC response".to_string())
497 })
498 }
499
500 pub async fn get_book_depth(
501 &self,
502 pair: &str,
503 count: Option<u32>,
504 ) -> anyhow::Result<OrderBookResponse, KrakenHttpError> {
505 let mut endpoint = format!("/0/public/Depth?pair={pair}");
506
507 if let Some(count) = count {
508 endpoint.push_str(&format!("&count={count}"));
509 }
510
511 let response: KrakenResponse<OrderBookResponse> = self
512 .send_request(Method::GET, &endpoint, None, false)
513 .await?;
514
515 response.result.ok_or_else(|| {
516 KrakenHttpError::ParseError("Missing result in book depth response".to_string())
517 })
518 }
519
520 pub async fn get_trades(
521 &self,
522 pair: &str,
523 since: Option<String>,
524 ) -> anyhow::Result<TradesResponse, KrakenHttpError> {
525 let mut endpoint = format!("/0/public/Trades?pair={pair}");
526
527 if let Some(since) = since {
528 endpoint.push_str(&format!("&since={since}"));
529 }
530
531 let response: KrakenResponse<TradesResponse> = self
532 .send_request(Method::GET, &endpoint, None, false)
533 .await?;
534
535 response.result.ok_or_else(|| {
536 KrakenHttpError::ParseError("Missing result in trades response".to_string())
537 })
538 }
539
540 pub async fn get_websockets_token(&self) -> anyhow::Result<WebSocketToken, KrakenHttpError> {
541 if self.credential.is_none() {
542 return Err(KrakenHttpError::AuthenticationError(
543 "API credentials required for GetWebSocketsToken".to_string(),
544 ));
545 }
546
547 let response: KrakenResponse<WebSocketToken> = self
548 .send_request(Method::POST, "/0/private/GetWebSocketsToken", None, true)
549 .await?;
550
551 response.result.ok_or_else(|| {
552 KrakenHttpError::ParseError("Missing result in websockets token response".to_string())
553 })
554 }
555
556 pub async fn get_instruments_futures(
557 &self,
558 ) -> anyhow::Result<FuturesInstrumentsResponse, KrakenHttpError> {
559 let endpoint = "/derivatives/api/v3/instruments";
560 let url = format!("{}{endpoint}", self.base_url);
561
562 self.send_futures_request(Method::GET, endpoint, url).await
563 }
564
565 pub async fn get_tickers_futures(
566 &self,
567 ) -> anyhow::Result<FuturesTickersResponse, KrakenHttpError> {
568 let endpoint = "/derivatives/api/v3/tickers";
569 let url = format!("{}{endpoint}", self.base_url);
570
571 self.send_futures_request(Method::GET, endpoint, url).await
572 }
573
574 pub async fn get_ohlc_futures(
575 &self,
576 tick_type: &str,
577 symbol: &str,
578 resolution: &str,
579 from: Option<i64>,
580 to: Option<i64>,
581 ) -> anyhow::Result<FuturesCandlesResponse, KrakenHttpError> {
582 let endpoint = format!("/api/charts/v1/{tick_type}/{symbol}/{resolution}");
583
584 let mut url = format!("{}{endpoint}", self.base_url);
585
586 let mut query_params = Vec::new();
587 if let Some(from_ts) = from {
588 query_params.push(format!("from={from_ts}"));
589 }
590 if let Some(to_ts) = to {
591 query_params.push(format!("to={to_ts}"));
592 }
593
594 if !query_params.is_empty() {
595 url.push('?');
596 url.push_str(&query_params.join("&"));
597 }
598
599 self.send_futures_request(Method::GET, &endpoint, url).await
600 }
601}
602
603#[cfg_attr(
604 feature = "python",
605 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
606)]
607pub struct KrakenHttpClient {
608 pub(crate) inner: Arc<KrakenRawHttpClient>,
609 pub(crate) instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
610 cache_initialized: Arc<AtomicBool>,
611}
612
613impl Clone for KrakenHttpClient {
614 fn clone(&self) -> Self {
615 Self {
616 inner: self.inner.clone(),
617 instruments_cache: self.instruments_cache.clone(),
618 cache_initialized: self.cache_initialized.clone(),
619 }
620 }
621}
622
623impl Default for KrakenHttpClient {
624 fn default() -> Self {
625 Self::new(None, Some(60), None, None, None, None)
626 .expect("Failed to create default KrakenHttpClient")
627 }
628}
629
630impl Debug for KrakenHttpClient {
631 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
632 f.debug_struct("KrakenHttpClient")
633 .field("inner", &self.inner)
634 .finish()
635 }
636}
637
638impl KrakenHttpClient {
639 #[allow(clippy::too_many_arguments)]
640 pub fn new(
641 base_url: Option<String>,
642 timeout_secs: Option<u64>,
643 max_retries: Option<u32>,
644 retry_delay_ms: Option<u64>,
645 retry_delay_max_ms: Option<u64>,
646 proxy_url: Option<String>,
647 ) -> anyhow::Result<Self> {
648 Ok(Self {
649 inner: Arc::new(KrakenRawHttpClient::new(
650 base_url,
651 timeout_secs,
652 max_retries,
653 retry_delay_ms,
654 retry_delay_max_ms,
655 proxy_url,
656 )?),
657 instruments_cache: Arc::new(DashMap::new()),
658 cache_initialized: Arc::new(AtomicBool::new(false)),
659 })
660 }
661
662 #[allow(clippy::too_many_arguments)]
663 pub fn with_credentials(
664 api_key: String,
665 api_secret: String,
666 base_url: Option<String>,
667 timeout_secs: Option<u64>,
668 max_retries: Option<u32>,
669 retry_delay_ms: Option<u64>,
670 retry_delay_max_ms: Option<u64>,
671 proxy_url: Option<String>,
672 ) -> anyhow::Result<Self> {
673 Ok(Self {
674 inner: Arc::new(KrakenRawHttpClient::with_credentials(
675 api_key,
676 api_secret,
677 base_url,
678 timeout_secs,
679 max_retries,
680 retry_delay_ms,
681 retry_delay_max_ms,
682 proxy_url,
683 )?),
684 instruments_cache: Arc::new(DashMap::new()),
685 cache_initialized: Arc::new(AtomicBool::new(false)),
686 })
687 }
688
689 fn is_futures(&self) -> bool {
690 self.inner.base_url().contains("futures")
691 }
692
693 pub fn cancel_all_requests(&self) {
694 self.inner.cancel_all_requests();
695 }
696
697 pub fn cancellation_token(&self) -> &CancellationToken {
698 self.inner.cancellation_token()
699 }
700
701 pub fn cache_instrument(&self, instrument: InstrumentAny) {
702 self.instruments_cache
703 .insert(instrument.symbol().inner(), instrument);
704 self.cache_initialized.store(true, Ordering::Release);
705 }
706
707 pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
708 for instrument in instruments {
709 self.instruments_cache
710 .insert(instrument.symbol().inner(), instrument);
711 }
712 self.cache_initialized.store(true, Ordering::Release);
713 }
714
715 pub fn get_cached_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
716 self.instruments_cache
717 .get(symbol)
718 .map(|entry| entry.value().clone())
719 }
720
721 pub async fn get_websockets_token(&self) -> anyhow::Result<WebSocketToken, KrakenHttpError> {
722 self.inner.get_websockets_token().await
723 }
724
725 pub async fn request_instruments(
726 &self,
727 pairs: Option<Vec<String>>,
728 ) -> anyhow::Result<Vec<InstrumentAny>, KrakenHttpError> {
729 let ts_init = self.inner.generate_ts_init();
730
731 if self.is_futures() {
732 let response = self.inner.get_instruments_futures().await?;
733
734 let instruments: Vec<InstrumentAny> = response
735 .instruments
736 .iter()
737 .filter_map(|fut_instrument| {
738 match parse_futures_instrument(fut_instrument, ts_init, ts_init) {
739 Ok(instrument) => Some(instrument),
740 Err(e) => {
741 tracing::warn!(
742 "Failed to parse futures instrument {}: {e}",
743 fut_instrument.symbol
744 );
745 None
746 }
747 }
748 })
749 .collect();
750
751 return Ok(instruments);
752 }
753
754 let asset_pairs = self.inner.get_asset_pairs(pairs).await?;
755
756 let instruments: Vec<InstrumentAny> = asset_pairs
757 .iter()
758 .filter_map(|(pair_name, definition)| {
759 match parse_spot_instrument(pair_name, definition, ts_init, ts_init) {
760 Ok(instrument) => Some(instrument),
761 Err(e) => {
762 tracing::warn!("Failed to parse instrument {pair_name}: {e}");
763 None
764 }
765 }
766 })
767 .collect();
768
769 Ok(instruments)
770 }
771
772 pub async fn request_mark_price(
773 &self,
774 instrument_id: InstrumentId,
775 ) -> anyhow::Result<f64, KrakenHttpError> {
776 if !self.is_futures() {
777 return Err(KrakenHttpError::ParseError(
778 "Mark price is only available for futures instruments. Use a futures client (base URL must contain 'futures')".to_string(),
779 ));
780 }
781
782 let instrument = self
783 .get_cached_instrument(&instrument_id.symbol.inner())
784 .ok_or_else(|| {
785 KrakenHttpError::ParseError(format!(
786 "Instrument not found in cache: {}",
787 instrument_id
788 ))
789 })?;
790
791 let raw_symbol = instrument.raw_symbol().to_string();
792 let tickers = self.inner.get_tickers_futures().await?;
793
794 tickers
795 .tickers
796 .iter()
797 .find(|t| t.symbol == raw_symbol)
798 .map(|t| t.mark_price)
799 .ok_or_else(|| {
800 KrakenHttpError::ParseError(format!("Symbol {} not found in tickers", raw_symbol))
801 })
802 }
803
804 pub async fn request_index_price(
805 &self,
806 instrument_id: InstrumentId,
807 ) -> anyhow::Result<f64, KrakenHttpError> {
808 if !self.is_futures() {
809 return Err(KrakenHttpError::ParseError(
810 "Index price is only available for futures instruments. Use a futures client (base URL must contain 'futures')".to_string(),
811 ));
812 }
813
814 let instrument = self
815 .get_cached_instrument(&instrument_id.symbol.inner())
816 .ok_or_else(|| {
817 KrakenHttpError::ParseError(format!(
818 "Instrument not found in cache: {}",
819 instrument_id
820 ))
821 })?;
822
823 let raw_symbol = instrument.raw_symbol().to_string();
824 let tickers = self.inner.get_tickers_futures().await?;
825
826 tickers
827 .tickers
828 .iter()
829 .find(|t| t.symbol == raw_symbol)
830 .map(|t| t.index_price)
831 .ok_or_else(|| {
832 KrakenHttpError::ParseError(format!("Symbol {} not found in tickers", raw_symbol))
833 })
834 }
835
836 pub async fn request_trades(
837 &self,
838 instrument_id: InstrumentId,
839 start: Option<u64>,
840 end: Option<u64>,
841 limit: Option<u64>,
842 ) -> anyhow::Result<Vec<TradeTick>, KrakenHttpError> {
843 if self.is_futures() {
844 return Err(KrakenHttpError::ParseError(
845 "Trade history is not yet implemented for futures instruments. Use a spot client instead.".to_string(),
846 ));
847 }
848
849 let instrument = self
850 .get_cached_instrument(&instrument_id.symbol.inner())
851 .ok_or_else(|| {
852 KrakenHttpError::ParseError(format!(
853 "Instrument not found in cache: {}",
854 instrument_id
855 ))
856 })?;
857
858 let raw_symbol = instrument.raw_symbol().to_string();
859 let since = start.map(|s| s.to_string());
860
861 let ts_init = self.inner.generate_ts_init();
862 let response = self.inner.get_trades(&raw_symbol, since).await?;
863
864 let mut trades = Vec::new();
865
866 for (_pair_name, trade_arrays) in &response.data {
868 for trade_array in trade_arrays {
869 match parse_trade_tick_from_array(trade_array, &instrument, ts_init) {
870 Ok(trade_tick) => {
871 if let Some(end_ns) = end
873 && trade_tick.ts_event.as_u64() > end_ns
874 {
875 continue;
876 }
877 trades.push(trade_tick);
878
879 if let Some(limit_count) = limit
881 && trades.len() >= limit_count as usize
882 {
883 return Ok(trades);
884 }
885 }
886 Err(e) => {
887 tracing::warn!("Failed to parse trade tick: {e}");
888 }
889 }
890 }
891 }
892
893 Ok(trades)
894 }
895
896 pub async fn request_bars(
897 &self,
898 bar_type: BarType,
899 start: Option<u64>,
900 end: Option<u64>,
901 limit: Option<u64>,
902 ) -> anyhow::Result<Vec<Bar>, KrakenHttpError> {
903 self.request_bars_with_tick_type(bar_type, start, end, limit, None)
904 .await
905 }
906
907 pub async fn request_bars_with_tick_type(
908 &self,
909 bar_type: BarType,
910 start: Option<u64>,
911 end: Option<u64>,
912 limit: Option<u64>,
913 tick_type: Option<&str>,
914 ) -> anyhow::Result<Vec<Bar>, KrakenHttpError> {
915 let instrument_id = bar_type.instrument_id();
916 let instrument = self
917 .get_cached_instrument(&instrument_id.symbol.inner())
918 .ok_or_else(|| {
919 KrakenHttpError::ParseError(format!(
920 "Instrument not found in cache: {}",
921 instrument_id
922 ))
923 })?;
924
925 let raw_symbol = instrument.raw_symbol().to_string();
926 let ts_init = self.inner.generate_ts_init();
927
928 if self.is_futures() {
929 let tick_type = tick_type.unwrap_or("trade");
930 let resolution = bar_type_to_futures_resolution(bar_type)
931 .map_err(|e| KrakenHttpError::ParseError(e.to_string()))?;
932
933 let from = start.map(|s| (s / 1_000_000) as i64);
935 let to = end.map(|e| (e / 1_000_000) as i64);
936
937 let response = self
938 .inner
939 .get_ohlc_futures(tick_type, &raw_symbol, resolution, from, to)
940 .await?;
941
942 let mut bars = Vec::new();
943 for candle in response.candles {
944 let ohlc = OhlcData {
945 time: candle.time / 1000,
946 open: candle.open,
947 high: candle.high,
948 low: candle.low,
949 close: candle.close,
950 vwap: "0".to_string(),
951 volume: candle.volume,
952 count: 0,
953 };
954
955 match parse_bar(&ohlc, &instrument, bar_type, ts_init) {
956 Ok(bar) => {
957 if let Some(end_ns) = end
959 && bar.ts_event.as_u64() > end_ns
960 {
961 continue;
962 }
963 bars.push(bar);
964
965 if let Some(limit_count) = limit
967 && bars.len() >= limit_count as usize
968 {
969 return Ok(bars);
970 }
971 }
972 Err(e) => {
973 tracing::warn!("Failed to parse futures bar: {e}");
974 }
975 }
976 }
977
978 return Ok(bars);
979 }
980
981 let interval = Some(
982 bar_type_to_spot_interval(bar_type)
983 .map_err(|e| KrakenHttpError::ParseError(e.to_string()))?,
984 );
985
986 let since = start.map(|s| (s / 1_000_000_000) as i64);
988
989 let response = self.inner.get_ohlc(&raw_symbol, interval, since).await?;
990
991 let mut bars = Vec::new();
992
993 for (_pair_name, ohlc_arrays) in &response.data {
995 for ohlc_array in ohlc_arrays {
996 if ohlc_array.len() < 8 {
998 tracing::warn!("OHLC array too short: {}", ohlc_array.len());
999 continue;
1000 }
1001
1002 let ohlc = OhlcData {
1003 time: ohlc_array[0].as_i64().unwrap_or(0),
1004 open: ohlc_array[1].as_str().unwrap_or("0").to_string(),
1005 high: ohlc_array[2].as_str().unwrap_or("0").to_string(),
1006 low: ohlc_array[3].as_str().unwrap_or("0").to_string(),
1007 close: ohlc_array[4].as_str().unwrap_or("0").to_string(),
1008 vwap: ohlc_array[5].as_str().unwrap_or("0").to_string(),
1009 volume: ohlc_array[6].as_str().unwrap_or("0").to_string(),
1010 count: ohlc_array[7].as_i64().unwrap_or(0),
1011 };
1012
1013 match parse_bar(&ohlc, &instrument, bar_type, ts_init) {
1014 Ok(bar) => {
1015 if let Some(end_ns) = end
1017 && bar.ts_event.as_u64() > end_ns
1018 {
1019 continue;
1020 }
1021 bars.push(bar);
1022
1023 if let Some(limit_count) = limit
1025 && bars.len() >= limit_count as usize
1026 {
1027 return Ok(bars);
1028 }
1029 }
1030 Err(e) => {
1031 tracing::warn!("Failed to parse bar: {e}");
1032 }
1033 }
1034 }
1035 }
1036
1037 Ok(bars)
1038 }
1039}
1040
1041#[cfg(test)]
1046mod tests {
1047 use rstest::rstest;
1048
1049 use super::*;
1050
1051 #[rstest]
1052 fn test_raw_client_creation() {
1053 let client = KrakenRawHttpClient::default();
1054 assert!(client.credential.is_none());
1055 }
1056
1057 #[rstest]
1058 fn test_raw_client_with_credentials() {
1059 let client = KrakenRawHttpClient::with_credentials(
1060 "test_key".to_string(),
1061 "test_secret".to_string(),
1062 None,
1063 None,
1064 None,
1065 None,
1066 None,
1067 None,
1068 )
1069 .unwrap();
1070 assert!(client.credential.is_some());
1071 }
1072
1073 #[rstest]
1074 fn test_client_creation() {
1075 let client = KrakenHttpClient::default();
1076 assert!(client.instruments_cache.is_empty());
1077 }
1078
1079 #[rstest]
1080 fn test_client_with_credentials() {
1081 let client = KrakenHttpClient::with_credentials(
1082 "test_key".to_string(),
1083 "test_secret".to_string(),
1084 None,
1085 None,
1086 None,
1087 None,
1088 None,
1089 None,
1090 )
1091 .unwrap();
1092 assert!(client.instruments_cache.is_empty());
1093 }
1094}