nautilus_kraken/http/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! HTTP client for the Kraken REST API v2.
17
18use std::{
19    collections::HashMap,
20    fmt::{Debug, Formatter},
21    num::NonZeroU32,
22    sync::{
23        Arc, LazyLock,
24        atomic::{AtomicBool, Ordering},
25    },
26};
27
28use dashmap::DashMap;
29use nautilus_core::{
30    consts::NAUTILUS_USER_AGENT, nanos::UnixNanos, time::get_atomic_clock_realtime,
31};
32use nautilus_model::{
33    data::{Bar, BarType, TradeTick},
34    identifiers::InstrumentId,
35    instruments::{Instrument, InstrumentAny},
36};
37use nautilus_network::{
38    http::HttpClient,
39    ratelimiter::quota::Quota,
40    retry::{RetryConfig, RetryManager},
41};
42use reqwest::{Method, header::USER_AGENT};
43use serde::de::DeserializeOwned;
44use tokio_util::sync::CancellationToken;
45use ustr::Ustr;
46
47use super::{
48    error::KrakenHttpError,
49    models::{FuturesCandlesResponse, FuturesInstrumentsResponse, FuturesTickersResponse, *},
50};
51use crate::common::{
52    credential::KrakenCredential,
53    enums::{KrakenEnvironment, KrakenProductType},
54    parse::{
55        bar_type_to_futures_resolution, bar_type_to_spot_interval, parse_bar,
56        parse_futures_instrument, parse_spot_instrument, parse_trade_tick_from_array,
57    },
58    urls::get_http_base_url,
59};
60
61pub static KRAKEN_REST_QUOTA: LazyLock<Quota> = LazyLock::new(|| {
62    Quota::per_second(NonZeroU32::new(5).expect("Should be a valid non-zero u32"))
63});
64
65const KRAKEN_GLOBAL_RATE_KEY: &str = "kraken:global";
66
67#[derive(Debug, Clone, serde::Deserialize)]
68pub struct KrakenResponse<T> {
69    pub error: Vec<String>,
70    pub result: Option<T>,
71}
72
73pub struct KrakenRawHttpClient {
74    base_url: String,
75    client: HttpClient,
76    credential: Option<KrakenCredential>,
77    retry_manager: RetryManager<KrakenHttpError>,
78    cancellation_token: CancellationToken,
79}
80
81impl Default for KrakenRawHttpClient {
82    fn default() -> Self {
83        Self::new(None, Some(60), None, None, None, None)
84            .expect("Failed to create default KrakenRawHttpClient")
85    }
86}
87
88impl Debug for KrakenRawHttpClient {
89    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
90        f.debug_struct("KrakenRawHttpClient")
91            .field("base_url", &self.base_url)
92            .field("has_credentials", &self.credential.is_some())
93            .finish()
94    }
95}
96
97impl KrakenRawHttpClient {
98    #[allow(clippy::too_many_arguments)]
99    pub fn new(
100        base_url: Option<String>,
101        timeout_secs: Option<u64>,
102        max_retries: Option<u32>,
103        retry_delay_ms: Option<u64>,
104        retry_delay_max_ms: Option<u64>,
105        proxy_url: Option<String>,
106    ) -> anyhow::Result<Self> {
107        let retry_config = RetryConfig {
108            max_retries: max_retries.unwrap_or(3),
109            initial_delay_ms: retry_delay_ms.unwrap_or(1000),
110            max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
111            backoff_factor: 2.0,
112            jitter_ms: 1000,
113            operation_timeout_ms: Some(60_000),
114            immediate_first: false,
115            max_elapsed_ms: Some(180_000),
116        };
117
118        let retry_manager = RetryManager::new(retry_config);
119
120        Ok(Self {
121            base_url: base_url.unwrap_or_else(|| {
122                get_http_base_url(KrakenProductType::Spot, KrakenEnvironment::Mainnet).to_string()
123            }),
124            client: HttpClient::new(
125                Self::default_headers(),
126                vec![],
127                Self::rate_limiter_quotas(),
128                Some(*KRAKEN_REST_QUOTA),
129                timeout_secs,
130                proxy_url,
131            )
132            .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))?,
133            credential: None,
134            retry_manager,
135            cancellation_token: CancellationToken::new(),
136        })
137    }
138
139    #[allow(clippy::too_many_arguments)]
140    pub fn with_credentials(
141        api_key: String,
142        api_secret: String,
143        base_url: Option<String>,
144        timeout_secs: Option<u64>,
145        max_retries: Option<u32>,
146        retry_delay_ms: Option<u64>,
147        retry_delay_max_ms: Option<u64>,
148        proxy_url: Option<String>,
149    ) -> anyhow::Result<Self> {
150        let retry_config = RetryConfig {
151            max_retries: max_retries.unwrap_or(3),
152            initial_delay_ms: retry_delay_ms.unwrap_or(1000),
153            max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
154            backoff_factor: 2.0,
155            jitter_ms: 1000,
156            operation_timeout_ms: Some(60_000),
157            immediate_first: false,
158            max_elapsed_ms: Some(180_000),
159        };
160
161        let retry_manager = RetryManager::new(retry_config);
162
163        Ok(Self {
164            base_url: base_url.unwrap_or_else(|| {
165                get_http_base_url(KrakenProductType::Spot, KrakenEnvironment::Mainnet).to_string()
166            }),
167            client: HttpClient::new(
168                Self::default_headers(),
169                vec![],
170                Self::rate_limiter_quotas(),
171                Some(*KRAKEN_REST_QUOTA),
172                timeout_secs,
173                proxy_url,
174            )
175            .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))?,
176            credential: Some(KrakenCredential::new(api_key, api_secret)),
177            retry_manager,
178            cancellation_token: CancellationToken::new(),
179        })
180    }
181
182    pub fn base_url(&self) -> &str {
183        &self.base_url
184    }
185
186    pub fn credential(&self) -> Option<&KrakenCredential> {
187        self.credential.as_ref()
188    }
189
190    pub fn cancel_all_requests(&self) {
191        self.cancellation_token.cancel();
192    }
193
194    pub fn cancellation_token(&self) -> &CancellationToken {
195        &self.cancellation_token
196    }
197
198    fn default_headers() -> HashMap<String, String> {
199        HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())])
200    }
201
202    fn rate_limiter_quotas() -> Vec<(String, Quota)> {
203        vec![(KRAKEN_GLOBAL_RATE_KEY.to_string(), *KRAKEN_REST_QUOTA)]
204    }
205
206    fn rate_limit_keys(endpoint: &str) -> Vec<String> {
207        let normalized = endpoint.split('?').next().unwrap_or(endpoint);
208        let route = format!("kraken:{normalized}");
209        vec![KRAKEN_GLOBAL_RATE_KEY.to_string(), route]
210    }
211
212    fn sign_request(
213        &self,
214        path: &str,
215        nonce: u64,
216        params: &HashMap<String, String>,
217    ) -> anyhow::Result<(HashMap<String, String>, String)> {
218        let credential = self
219            .credential
220            .as_ref()
221            .ok_or_else(|| anyhow::anyhow!("Missing credentials"))?;
222
223        let (signature, post_data) = credential.sign_request(path, nonce, params)?;
224
225        let mut headers = HashMap::new();
226        headers.insert("API-Key".to_string(), credential.api_key().to_string());
227        headers.insert("API-Sign".to_string(), signature);
228
229        Ok((headers, post_data))
230    }
231
232    async fn send_request<T: DeserializeOwned>(
233        &self,
234        method: Method,
235        endpoint: &str,
236        body: Option<Vec<u8>>,
237        authenticate: bool,
238    ) -> anyhow::Result<KrakenResponse<T>, KrakenHttpError> {
239        let endpoint = endpoint.to_string();
240        let url = format!("{}{endpoint}", self.base_url);
241        let method_clone = method.clone();
242        let body_clone = body.clone();
243
244        let operation = || {
245            let url = url.clone();
246            let method = method_clone.clone();
247            let body = body_clone.clone();
248            let endpoint = endpoint.clone();
249
250            async move {
251                let mut headers = Self::default_headers();
252
253                let final_body = if authenticate {
254                    let nonce = std::time::SystemTime::now()
255                        .duration_since(std::time::UNIX_EPOCH)
256                        .expect("Time went backwards")
257                        .as_millis() as u64;
258
259                    let params: HashMap<String, String> = if let Some(ref body_bytes) = body {
260                        let body_str = std::str::from_utf8(body_bytes).map_err(|e| {
261                            KrakenHttpError::ParseError(format!(
262                                "Invalid UTF-8 in request body: {e}"
263                            ))
264                        })?;
265                        serde_urlencoded::from_str(body_str).map_err(|e| {
266                            KrakenHttpError::ParseError(format!(
267                                "Failed to parse request params: {e}"
268                            ))
269                        })?
270                    } else {
271                        HashMap::new()
272                    };
273
274                    let (auth_headers, post_data) = self
275                        .sign_request(&endpoint, nonce, &params)
276                        .map_err(|e| KrakenHttpError::NetworkError(e.to_string()))?;
277                    headers.extend(auth_headers);
278
279                    // Use the exact post_data that was signed
280                    Some(post_data.into_bytes())
281                } else {
282                    body
283                };
284
285                if method == Method::POST {
286                    headers.insert(
287                        "Content-Type".to_string(),
288                        "application/x-www-form-urlencoded".to_string(),
289                    );
290                }
291
292                let rate_limit_keys = Self::rate_limit_keys(&endpoint);
293
294                let response = self
295                    .client
296                    .request(
297                        method,
298                        url,
299                        None,
300                        Some(headers),
301                        final_body,
302                        None,
303                        Some(rate_limit_keys),
304                    )
305                    .await
306                    .map_err(|e| KrakenHttpError::NetworkError(e.to_string()))?;
307
308                if response.status.as_u16() >= 400 {
309                    let body = String::from_utf8_lossy(&response.body).to_string();
310                    return Err(KrakenHttpError::NetworkError(format!(
311                        "HTTP error {}: {body}",
312                        response.status.as_u16()
313                    )));
314                }
315
316                let response_text = String::from_utf8(response.body.to_vec()).map_err(|e| {
317                    KrakenHttpError::ParseError(format!("Failed to parse response as UTF-8: {e}"))
318                })?;
319
320                let kraken_response: KrakenResponse<T> = serde_json::from_str(&response_text)
321                    .map_err(|e| {
322                        KrakenHttpError::ParseError(format!("Failed to deserialize response: {e}"))
323                    })?;
324
325                if !kraken_response.error.is_empty() {
326                    return Err(KrakenHttpError::ApiError(kraken_response.error.clone()));
327                }
328
329                Ok(kraken_response)
330            }
331        };
332
333        let should_retry =
334            |error: &KrakenHttpError| -> bool { matches!(error, KrakenHttpError::NetworkError(_)) };
335
336        let create_error = |msg: String| -> KrakenHttpError { KrakenHttpError::NetworkError(msg) };
337
338        self.retry_manager
339            .execute_with_retry_with_cancel(
340                &endpoint,
341                operation,
342                should_retry,
343                create_error,
344                &self.cancellation_token,
345            )
346            .await
347    }
348
349    async fn send_futures_request<T: DeserializeOwned>(
350        &self,
351        method: Method,
352        endpoint: &str,
353        url: String,
354    ) -> anyhow::Result<T, KrakenHttpError> {
355        let endpoint = endpoint.to_string();
356        let method_clone = method.clone();
357        let url_clone = url.clone();
358
359        let operation = || {
360            let url = url_clone.clone();
361            let method = method_clone.clone();
362            let endpoint = endpoint.clone();
363
364            async move {
365                let headers = Self::default_headers();
366                let rate_limit_keys = Self::rate_limit_keys(&endpoint);
367
368                let response = self
369                    .client
370                    .request(
371                        method,
372                        url,
373                        None,
374                        Some(headers),
375                        None,
376                        None,
377                        Some(rate_limit_keys),
378                    )
379                    .await
380                    .map_err(|e| KrakenHttpError::NetworkError(e.to_string()))?;
381
382                if response.status.as_u16() >= 400 {
383                    let body = String::from_utf8_lossy(&response.body).to_string();
384                    return Err(KrakenHttpError::NetworkError(format!(
385                        "HTTP error {}: {body}",
386                        response.status.as_u16()
387                    )));
388                }
389
390                let response_text = String::from_utf8(response.body.to_vec()).map_err(|e| {
391                    KrakenHttpError::ParseError(format!("Failed to parse response as UTF-8: {e}"))
392                })?;
393
394                serde_json::from_str(&response_text).map_err(|e| {
395                    KrakenHttpError::ParseError(format!(
396                        "Failed to deserialize futures response: {e}"
397                    ))
398                })
399            }
400        };
401
402        let should_retry =
403            |error: &KrakenHttpError| -> bool { matches!(error, KrakenHttpError::NetworkError(_)) };
404
405        let create_error = |msg: String| -> KrakenHttpError { KrakenHttpError::NetworkError(msg) };
406
407        self.retry_manager
408            .execute_with_retry_with_cancel(
409                &endpoint,
410                operation,
411                should_retry,
412                create_error,
413                &self.cancellation_token,
414            )
415            .await
416    }
417
418    fn generate_ts_init(&self) -> UnixNanos {
419        get_atomic_clock_realtime().get_time_ns()
420    }
421
422    pub async fn get_server_time(&self) -> anyhow::Result<ServerTime, KrakenHttpError> {
423        let response: KrakenResponse<ServerTime> = self
424            .send_request(Method::GET, "/0/public/Time", None, false)
425            .await?;
426
427        response.result.ok_or_else(|| {
428            KrakenHttpError::ParseError("Missing result in server time response".to_string())
429        })
430    }
431
432    pub async fn get_system_status(&self) -> anyhow::Result<SystemStatus, KrakenHttpError> {
433        let response: KrakenResponse<SystemStatus> = self
434            .send_request(Method::GET, "/0/public/SystemStatus", None, false)
435            .await?;
436
437        response.result.ok_or_else(|| {
438            KrakenHttpError::ParseError("Missing result in system status response".to_string())
439        })
440    }
441
442    pub async fn get_asset_pairs(
443        &self,
444        pairs: Option<Vec<String>>,
445    ) -> anyhow::Result<AssetPairsResponse, KrakenHttpError> {
446        let endpoint = if let Some(pairs) = pairs {
447            format!("/0/public/AssetPairs?pair={}", pairs.join(","))
448        } else {
449            "/0/public/AssetPairs".to_string()
450        };
451
452        let response: KrakenResponse<AssetPairsResponse> = self
453            .send_request(Method::GET, &endpoint, None, false)
454            .await?;
455
456        response.result.ok_or_else(|| {
457            KrakenHttpError::ParseError("Missing result in asset pairs response".to_string())
458        })
459    }
460
461    pub async fn get_ticker(
462        &self,
463        pairs: Vec<String>,
464    ) -> anyhow::Result<TickerResponse, KrakenHttpError> {
465        let endpoint = format!("/0/public/Ticker?pair={}", pairs.join(","));
466
467        let response: KrakenResponse<TickerResponse> = self
468            .send_request(Method::GET, &endpoint, None, false)
469            .await?;
470
471        response.result.ok_or_else(|| {
472            KrakenHttpError::ParseError("Missing result in ticker response".to_string())
473        })
474    }
475
476    pub async fn get_ohlc(
477        &self,
478        pair: &str,
479        interval: Option<u32>,
480        since: Option<i64>,
481    ) -> anyhow::Result<OhlcResponse, KrakenHttpError> {
482        let mut endpoint = format!("/0/public/OHLC?pair={pair}");
483
484        if let Some(interval) = interval {
485            endpoint.push_str(&format!("&interval={interval}"));
486        }
487        if let Some(since) = since {
488            endpoint.push_str(&format!("&since={since}"));
489        }
490
491        let response: KrakenResponse<OhlcResponse> = self
492            .send_request(Method::GET, &endpoint, None, false)
493            .await?;
494
495        response.result.ok_or_else(|| {
496            KrakenHttpError::ParseError("Missing result in OHLC response".to_string())
497        })
498    }
499
500    pub async fn get_book_depth(
501        &self,
502        pair: &str,
503        count: Option<u32>,
504    ) -> anyhow::Result<OrderBookResponse, KrakenHttpError> {
505        let mut endpoint = format!("/0/public/Depth?pair={pair}");
506
507        if let Some(count) = count {
508            endpoint.push_str(&format!("&count={count}"));
509        }
510
511        let response: KrakenResponse<OrderBookResponse> = self
512            .send_request(Method::GET, &endpoint, None, false)
513            .await?;
514
515        response.result.ok_or_else(|| {
516            KrakenHttpError::ParseError("Missing result in book depth response".to_string())
517        })
518    }
519
520    pub async fn get_trades(
521        &self,
522        pair: &str,
523        since: Option<String>,
524    ) -> anyhow::Result<TradesResponse, KrakenHttpError> {
525        let mut endpoint = format!("/0/public/Trades?pair={pair}");
526
527        if let Some(since) = since {
528            endpoint.push_str(&format!("&since={since}"));
529        }
530
531        let response: KrakenResponse<TradesResponse> = self
532            .send_request(Method::GET, &endpoint, None, false)
533            .await?;
534
535        response.result.ok_or_else(|| {
536            KrakenHttpError::ParseError("Missing result in trades response".to_string())
537        })
538    }
539
540    pub async fn get_websockets_token(&self) -> anyhow::Result<WebSocketToken, KrakenHttpError> {
541        if self.credential.is_none() {
542            return Err(KrakenHttpError::AuthenticationError(
543                "API credentials required for GetWebSocketsToken".to_string(),
544            ));
545        }
546
547        let response: KrakenResponse<WebSocketToken> = self
548            .send_request(Method::POST, "/0/private/GetWebSocketsToken", None, true)
549            .await?;
550
551        response.result.ok_or_else(|| {
552            KrakenHttpError::ParseError("Missing result in websockets token response".to_string())
553        })
554    }
555
556    pub async fn get_instruments_futures(
557        &self,
558    ) -> anyhow::Result<FuturesInstrumentsResponse, KrakenHttpError> {
559        let endpoint = "/derivatives/api/v3/instruments";
560        let url = format!("{}{endpoint}", self.base_url);
561
562        self.send_futures_request(Method::GET, endpoint, url).await
563    }
564
565    pub async fn get_tickers_futures(
566        &self,
567    ) -> anyhow::Result<FuturesTickersResponse, KrakenHttpError> {
568        let endpoint = "/derivatives/api/v3/tickers";
569        let url = format!("{}{endpoint}", self.base_url);
570
571        self.send_futures_request(Method::GET, endpoint, url).await
572    }
573
574    pub async fn get_ohlc_futures(
575        &self,
576        tick_type: &str,
577        symbol: &str,
578        resolution: &str,
579        from: Option<i64>,
580        to: Option<i64>,
581    ) -> anyhow::Result<FuturesCandlesResponse, KrakenHttpError> {
582        let endpoint = format!("/api/charts/v1/{tick_type}/{symbol}/{resolution}");
583
584        let mut url = format!("{}{endpoint}", self.base_url);
585
586        let mut query_params = Vec::new();
587        if let Some(from_ts) = from {
588            query_params.push(format!("from={from_ts}"));
589        }
590        if let Some(to_ts) = to {
591            query_params.push(format!("to={to_ts}"));
592        }
593
594        if !query_params.is_empty() {
595            url.push('?');
596            url.push_str(&query_params.join("&"));
597        }
598
599        self.send_futures_request(Method::GET, &endpoint, url).await
600    }
601}
602
603#[cfg_attr(
604    feature = "python",
605    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
606)]
607pub struct KrakenHttpClient {
608    pub(crate) inner: Arc<KrakenRawHttpClient>,
609    pub(crate) instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
610    cache_initialized: Arc<AtomicBool>,
611}
612
613impl Clone for KrakenHttpClient {
614    fn clone(&self) -> Self {
615        Self {
616            inner: self.inner.clone(),
617            instruments_cache: self.instruments_cache.clone(),
618            cache_initialized: self.cache_initialized.clone(),
619        }
620    }
621}
622
623impl Default for KrakenHttpClient {
624    fn default() -> Self {
625        Self::new(None, Some(60), None, None, None, None)
626            .expect("Failed to create default KrakenHttpClient")
627    }
628}
629
630impl Debug for KrakenHttpClient {
631    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
632        f.debug_struct("KrakenHttpClient")
633            .field("inner", &self.inner)
634            .finish()
635    }
636}
637
638impl KrakenHttpClient {
639    #[allow(clippy::too_many_arguments)]
640    pub fn new(
641        base_url: Option<String>,
642        timeout_secs: Option<u64>,
643        max_retries: Option<u32>,
644        retry_delay_ms: Option<u64>,
645        retry_delay_max_ms: Option<u64>,
646        proxy_url: Option<String>,
647    ) -> anyhow::Result<Self> {
648        Ok(Self {
649            inner: Arc::new(KrakenRawHttpClient::new(
650                base_url,
651                timeout_secs,
652                max_retries,
653                retry_delay_ms,
654                retry_delay_max_ms,
655                proxy_url,
656            )?),
657            instruments_cache: Arc::new(DashMap::new()),
658            cache_initialized: Arc::new(AtomicBool::new(false)),
659        })
660    }
661
662    #[allow(clippy::too_many_arguments)]
663    pub fn with_credentials(
664        api_key: String,
665        api_secret: String,
666        base_url: Option<String>,
667        timeout_secs: Option<u64>,
668        max_retries: Option<u32>,
669        retry_delay_ms: Option<u64>,
670        retry_delay_max_ms: Option<u64>,
671        proxy_url: Option<String>,
672    ) -> anyhow::Result<Self> {
673        Ok(Self {
674            inner: Arc::new(KrakenRawHttpClient::with_credentials(
675                api_key,
676                api_secret,
677                base_url,
678                timeout_secs,
679                max_retries,
680                retry_delay_ms,
681                retry_delay_max_ms,
682                proxy_url,
683            )?),
684            instruments_cache: Arc::new(DashMap::new()),
685            cache_initialized: Arc::new(AtomicBool::new(false)),
686        })
687    }
688
689    fn is_futures(&self) -> bool {
690        self.inner.base_url().contains("futures")
691    }
692
693    pub fn cancel_all_requests(&self) {
694        self.inner.cancel_all_requests();
695    }
696
697    pub fn cancellation_token(&self) -> &CancellationToken {
698        self.inner.cancellation_token()
699    }
700
701    pub fn cache_instrument(&self, instrument: InstrumentAny) {
702        self.instruments_cache
703            .insert(instrument.symbol().inner(), instrument);
704        self.cache_initialized.store(true, Ordering::Release);
705    }
706
707    pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
708        for instrument in instruments {
709            self.instruments_cache
710                .insert(instrument.symbol().inner(), instrument);
711        }
712        self.cache_initialized.store(true, Ordering::Release);
713    }
714
715    pub fn get_cached_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
716        self.instruments_cache
717            .get(symbol)
718            .map(|entry| entry.value().clone())
719    }
720
721    pub async fn get_websockets_token(&self) -> anyhow::Result<WebSocketToken, KrakenHttpError> {
722        self.inner.get_websockets_token().await
723    }
724
725    pub async fn request_instruments(
726        &self,
727        pairs: Option<Vec<String>>,
728    ) -> anyhow::Result<Vec<InstrumentAny>, KrakenHttpError> {
729        let ts_init = self.inner.generate_ts_init();
730
731        if self.is_futures() {
732            let response = self.inner.get_instruments_futures().await?;
733
734            let instruments: Vec<InstrumentAny> = response
735                .instruments
736                .iter()
737                .filter_map(|fut_instrument| {
738                    match parse_futures_instrument(fut_instrument, ts_init, ts_init) {
739                        Ok(instrument) => Some(instrument),
740                        Err(e) => {
741                            tracing::warn!(
742                                "Failed to parse futures instrument {}: {e}",
743                                fut_instrument.symbol
744                            );
745                            None
746                        }
747                    }
748                })
749                .collect();
750
751            return Ok(instruments);
752        }
753
754        let asset_pairs = self.inner.get_asset_pairs(pairs).await?;
755
756        let instruments: Vec<InstrumentAny> = asset_pairs
757            .iter()
758            .filter_map(|(pair_name, definition)| {
759                match parse_spot_instrument(pair_name, definition, ts_init, ts_init) {
760                    Ok(instrument) => Some(instrument),
761                    Err(e) => {
762                        tracing::warn!("Failed to parse instrument {pair_name}: {e}");
763                        None
764                    }
765                }
766            })
767            .collect();
768
769        Ok(instruments)
770    }
771
772    pub async fn request_mark_price(
773        &self,
774        instrument_id: InstrumentId,
775    ) -> anyhow::Result<f64, KrakenHttpError> {
776        if !self.is_futures() {
777            return Err(KrakenHttpError::ParseError(
778                "Mark price is only available for futures instruments. Use a futures client (base URL must contain 'futures')".to_string(),
779            ));
780        }
781
782        let instrument = self
783            .get_cached_instrument(&instrument_id.symbol.inner())
784            .ok_or_else(|| {
785                KrakenHttpError::ParseError(format!(
786                    "Instrument not found in cache: {}",
787                    instrument_id
788                ))
789            })?;
790
791        let raw_symbol = instrument.raw_symbol().to_string();
792        let tickers = self.inner.get_tickers_futures().await?;
793
794        tickers
795            .tickers
796            .iter()
797            .find(|t| t.symbol == raw_symbol)
798            .map(|t| t.mark_price)
799            .ok_or_else(|| {
800                KrakenHttpError::ParseError(format!("Symbol {} not found in tickers", raw_symbol))
801            })
802    }
803
804    pub async fn request_index_price(
805        &self,
806        instrument_id: InstrumentId,
807    ) -> anyhow::Result<f64, KrakenHttpError> {
808        if !self.is_futures() {
809            return Err(KrakenHttpError::ParseError(
810                "Index price is only available for futures instruments. Use a futures client (base URL must contain 'futures')".to_string(),
811            ));
812        }
813
814        let instrument = self
815            .get_cached_instrument(&instrument_id.symbol.inner())
816            .ok_or_else(|| {
817                KrakenHttpError::ParseError(format!(
818                    "Instrument not found in cache: {}",
819                    instrument_id
820                ))
821            })?;
822
823        let raw_symbol = instrument.raw_symbol().to_string();
824        let tickers = self.inner.get_tickers_futures().await?;
825
826        tickers
827            .tickers
828            .iter()
829            .find(|t| t.symbol == raw_symbol)
830            .map(|t| t.index_price)
831            .ok_or_else(|| {
832                KrakenHttpError::ParseError(format!("Symbol {} not found in tickers", raw_symbol))
833            })
834    }
835
836    pub async fn request_trades(
837        &self,
838        instrument_id: InstrumentId,
839        start: Option<u64>,
840        end: Option<u64>,
841        limit: Option<u64>,
842    ) -> anyhow::Result<Vec<TradeTick>, KrakenHttpError> {
843        if self.is_futures() {
844            return Err(KrakenHttpError::ParseError(
845                "Trade history is not yet implemented for futures instruments. Use a spot client instead.".to_string(),
846            ));
847        }
848
849        let instrument = self
850            .get_cached_instrument(&instrument_id.symbol.inner())
851            .ok_or_else(|| {
852                KrakenHttpError::ParseError(format!(
853                    "Instrument not found in cache: {}",
854                    instrument_id
855                ))
856            })?;
857
858        let raw_symbol = instrument.raw_symbol().to_string();
859        let since = start.map(|s| s.to_string());
860
861        let ts_init = self.inner.generate_ts_init();
862        let response = self.inner.get_trades(&raw_symbol, since).await?;
863
864        let mut trades = Vec::new();
865
866        // Get the first (and typically only) pair's trade data
867        for (_pair_name, trade_arrays) in &response.data {
868            for trade_array in trade_arrays {
869                match parse_trade_tick_from_array(trade_array, &instrument, ts_init) {
870                    Ok(trade_tick) => {
871                        // Filter by end time if specified
872                        if let Some(end_ns) = end
873                            && trade_tick.ts_event.as_u64() > end_ns
874                        {
875                            continue;
876                        }
877                        trades.push(trade_tick);
878
879                        // Check limit
880                        if let Some(limit_count) = limit
881                            && trades.len() >= limit_count as usize
882                        {
883                            return Ok(trades);
884                        }
885                    }
886                    Err(e) => {
887                        tracing::warn!("Failed to parse trade tick: {e}");
888                    }
889                }
890            }
891        }
892
893        Ok(trades)
894    }
895
896    pub async fn request_bars(
897        &self,
898        bar_type: BarType,
899        start: Option<u64>,
900        end: Option<u64>,
901        limit: Option<u64>,
902    ) -> anyhow::Result<Vec<Bar>, KrakenHttpError> {
903        self.request_bars_with_tick_type(bar_type, start, end, limit, None)
904            .await
905    }
906
907    pub async fn request_bars_with_tick_type(
908        &self,
909        bar_type: BarType,
910        start: Option<u64>,
911        end: Option<u64>,
912        limit: Option<u64>,
913        tick_type: Option<&str>,
914    ) -> anyhow::Result<Vec<Bar>, KrakenHttpError> {
915        let instrument_id = bar_type.instrument_id();
916        let instrument = self
917            .get_cached_instrument(&instrument_id.symbol.inner())
918            .ok_or_else(|| {
919                KrakenHttpError::ParseError(format!(
920                    "Instrument not found in cache: {}",
921                    instrument_id
922                ))
923            })?;
924
925        let raw_symbol = instrument.raw_symbol().to_string();
926        let ts_init = self.inner.generate_ts_init();
927
928        if self.is_futures() {
929            let tick_type = tick_type.unwrap_or("trade");
930            let resolution = bar_type_to_futures_resolution(bar_type)
931                .map_err(|e| KrakenHttpError::ParseError(e.to_string()))?;
932
933            // Kraken Futures API expects millisecond timestamps
934            let from = start.map(|s| (s / 1_000_000) as i64);
935            let to = end.map(|e| (e / 1_000_000) as i64);
936
937            let response = self
938                .inner
939                .get_ohlc_futures(tick_type, &raw_symbol, resolution, from, to)
940                .await?;
941
942            let mut bars = Vec::new();
943            for candle in response.candles {
944                let ohlc = OhlcData {
945                    time: candle.time / 1000,
946                    open: candle.open,
947                    high: candle.high,
948                    low: candle.low,
949                    close: candle.close,
950                    vwap: "0".to_string(),
951                    volume: candle.volume,
952                    count: 0,
953                };
954
955                match parse_bar(&ohlc, &instrument, bar_type, ts_init) {
956                    Ok(bar) => {
957                        // Filter by end time if specified
958                        if let Some(end_ns) = end
959                            && bar.ts_event.as_u64() > end_ns
960                        {
961                            continue;
962                        }
963                        bars.push(bar);
964
965                        // Check limit
966                        if let Some(limit_count) = limit
967                            && bars.len() >= limit_count as usize
968                        {
969                            return Ok(bars);
970                        }
971                    }
972                    Err(e) => {
973                        tracing::warn!("Failed to parse futures bar: {e}");
974                    }
975                }
976            }
977
978            return Ok(bars);
979        }
980
981        let interval = Some(
982            bar_type_to_spot_interval(bar_type)
983                .map_err(|e| KrakenHttpError::ParseError(e.to_string()))?,
984        );
985
986        // Convert start time from nanoseconds to seconds
987        let since = start.map(|s| (s / 1_000_000_000) as i64);
988
989        let response = self.inner.get_ohlc(&raw_symbol, interval, since).await?;
990
991        let mut bars = Vec::new();
992
993        // Get the first (and typically only) pair's OHLC data
994        for (_pair_name, ohlc_arrays) in &response.data {
995            for ohlc_array in ohlc_arrays {
996                // Convert array to OhlcData
997                if ohlc_array.len() < 8 {
998                    tracing::warn!("OHLC array too short: {}", ohlc_array.len());
999                    continue;
1000                }
1001
1002                let ohlc = OhlcData {
1003                    time: ohlc_array[0].as_i64().unwrap_or(0),
1004                    open: ohlc_array[1].as_str().unwrap_or("0").to_string(),
1005                    high: ohlc_array[2].as_str().unwrap_or("0").to_string(),
1006                    low: ohlc_array[3].as_str().unwrap_or("0").to_string(),
1007                    close: ohlc_array[4].as_str().unwrap_or("0").to_string(),
1008                    vwap: ohlc_array[5].as_str().unwrap_or("0").to_string(),
1009                    volume: ohlc_array[6].as_str().unwrap_or("0").to_string(),
1010                    count: ohlc_array[7].as_i64().unwrap_or(0),
1011                };
1012
1013                match parse_bar(&ohlc, &instrument, bar_type, ts_init) {
1014                    Ok(bar) => {
1015                        // Filter by end time if specified
1016                        if let Some(end_ns) = end
1017                            && bar.ts_event.as_u64() > end_ns
1018                        {
1019                            continue;
1020                        }
1021                        bars.push(bar);
1022
1023                        // Check limit
1024                        if let Some(limit_count) = limit
1025                            && bars.len() >= limit_count as usize
1026                        {
1027                            return Ok(bars);
1028                        }
1029                    }
1030                    Err(e) => {
1031                        tracing::warn!("Failed to parse bar: {e}");
1032                    }
1033                }
1034            }
1035        }
1036
1037        Ok(bars)
1038    }
1039}
1040
1041////////////////////////////////////////////////////////////////////////////////
1042// Tests
1043////////////////////////////////////////////////////////////////////////////////
1044
1045#[cfg(test)]
1046mod tests {
1047    use rstest::rstest;
1048
1049    use super::*;
1050
1051    #[rstest]
1052    fn test_raw_client_creation() {
1053        let client = KrakenRawHttpClient::default();
1054        assert!(client.credential.is_none());
1055    }
1056
1057    #[rstest]
1058    fn test_raw_client_with_credentials() {
1059        let client = KrakenRawHttpClient::with_credentials(
1060            "test_key".to_string(),
1061            "test_secret".to_string(),
1062            None,
1063            None,
1064            None,
1065            None,
1066            None,
1067            None,
1068        )
1069        .unwrap();
1070        assert!(client.credential.is_some());
1071    }
1072
1073    #[rstest]
1074    fn test_client_creation() {
1075        let client = KrakenHttpClient::default();
1076        assert!(client.instruments_cache.is_empty());
1077    }
1078
1079    #[rstest]
1080    fn test_client_with_credentials() {
1081        let client = KrakenHttpClient::with_credentials(
1082            "test_key".to_string(),
1083            "test_secret".to_string(),
1084            None,
1085            None,
1086            None,
1087            None,
1088            None,
1089            None,
1090        )
1091        .unwrap();
1092        assert!(client.instruments_cache.is_empty());
1093    }
1094}