Skip to main content

nautilus_kraken/http/spot/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! HTTP client for the Kraken Spot REST API.
17
18use std::{
19    collections::HashMap,
20    fmt::Debug,
21    num::NonZeroU32,
22    sync::{
23        Arc, RwLock,
24        atomic::{AtomicBool, Ordering},
25    },
26};
27
28use chrono::{DateTime, Utc};
29use dashmap::DashMap;
30use indexmap::IndexMap;
31use nautilus_core::{
32    AtomicTime, UUID4, consts::NAUTILUS_USER_AGENT, datetime::NANOSECONDS_IN_SECOND,
33    nanos::UnixNanos, time::get_atomic_clock_realtime,
34};
35use nautilus_model::{
36    data::{Bar, BarType, TradeTick},
37    enums::{AccountType, CurrencyType, OrderSide, OrderType, PositionSideSpecified, TimeInForce},
38    events::AccountState,
39    identifiers::{AccountId, ClientOrderId, InstrumentId, VenueOrderId},
40    instruments::{Instrument, InstrumentAny},
41    reports::{FillReport, OrderStatusReport, PositionStatusReport},
42    types::{AccountBalance, Currency, Money, Price, Quantity},
43};
44use nautilus_network::{
45    http::{HttpClient, Method, USER_AGENT},
46    ratelimiter::quota::Quota,
47    retry::{RetryConfig, RetryManager},
48};
49use serde::de::DeserializeOwned;
50use tokio_util::sync::CancellationToken;
51use ustr::Ustr;
52
53use super::{models::*, query::*};
54use crate::{
55    common::{
56        consts::NAUTILUS_KRAKEN_BROKER_ID,
57        credential::KrakenCredential,
58        enums::{KrakenEnvironment, KrakenOrderSide, KrakenOrderType, KrakenProductType},
59        parse::{
60            bar_type_to_spot_interval, normalize_currency_code, parse_bar, parse_fill_report,
61            parse_order_status_report, parse_spot_instrument, parse_trade_tick_from_array,
62        },
63        urls::get_kraken_http_base_url,
64    },
65    http::error::KrakenHttpError,
66};
67
68/// Default Kraken Spot REST API rate limit (requests per second).
69pub const KRAKEN_SPOT_DEFAULT_RATE_LIMIT_PER_SECOND: u32 = 5;
70
71const KRAKEN_GLOBAL_RATE_KEY: &str = "kraken:spot:global";
72
73/// Maximum orders per batch cancel request for Kraken Spot API.
74const BATCH_CANCEL_LIMIT: usize = 50;
75
76/// Computes the time-in-force and expiration time parameters for Kraken Spot orders.
77///
78/// Returns a tuple of (timeinforce, expiretm) for use in order requests.
79/// For limit orders, handles GTC, IOC, and GTD. Market orders return (None, None).
80fn compute_time_in_force(
81    is_limit_order: bool,
82    time_in_force: TimeInForce,
83    expire_time: Option<UnixNanos>,
84) -> anyhow::Result<(Option<String>, Option<String>)> {
85    if is_limit_order {
86        match time_in_force {
87            TimeInForce::Gtc => Ok((None, None)), // Default, no parameter needed
88            TimeInForce::Ioc => Ok((Some("IOC".to_string()), None)),
89            TimeInForce::Fok => {
90                anyhow::bail!("FOK time in force not supported by Kraken Spot API")
91            }
92            TimeInForce::Gtd => {
93                let expire = expire_time.ok_or_else(|| {
94                    anyhow::anyhow!("GTD time in force requires expire_time parameter")
95                })?;
96                // Convert nanoseconds to seconds for Kraken API
97                let expire_secs = expire.as_u64() / NANOSECONDS_IN_SECOND;
98                Ok((Some("GTD".to_string()), Some(expire_secs.to_string())))
99            }
100            _ => anyhow::bail!("Unsupported time in force: {time_in_force:?}"),
101        }
102    } else {
103        // Market orders are inherently immediate, timeinforce not applicable
104        Ok((None, None))
105    }
106}
107
108/// Raw HTTP client for low-level Kraken Spot API operations.
109///
110/// This client handles request/response operations with the Kraken Spot API,
111/// returning venue-specific response types. It does not parse to Nautilus domain types.
112pub struct KrakenSpotRawHttpClient {
113    base_url: String,
114    client: HttpClient,
115    credential: Option<KrakenCredential>,
116    retry_manager: RetryManager<KrakenHttpError>,
117    cancellation_token: CancellationToken,
118    clock: &'static AtomicTime,
119    /// Mutex to serialize authenticated requests, ensuring nonces arrive at Kraken in order
120    auth_mutex: tokio::sync::Mutex<()>,
121}
122
123impl Default for KrakenSpotRawHttpClient {
124    fn default() -> Self {
125        Self::new(
126            KrakenEnvironment::Mainnet,
127            None,
128            Some(60),
129            None,
130            None,
131            None,
132            None,
133            None,
134        )
135        .expect("Failed to create default KrakenSpotRawHttpClient")
136    }
137}
138
139impl Debug for KrakenSpotRawHttpClient {
140    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141        f.debug_struct(stringify!(KrakenSpotRawHttpClient))
142            .field("base_url", &self.base_url)
143            .field("has_credentials", &self.credential.is_some())
144            .finish()
145    }
146}
147
148impl KrakenSpotRawHttpClient {
149    /// Creates a new [`KrakenSpotRawHttpClient`].
150    #[allow(clippy::too_many_arguments)]
151    pub fn new(
152        environment: KrakenEnvironment,
153        base_url_override: Option<String>,
154        timeout_secs: Option<u64>,
155        max_retries: Option<u32>,
156        retry_delay_ms: Option<u64>,
157        retry_delay_max_ms: Option<u64>,
158        proxy_url: Option<String>,
159        max_requests_per_second: Option<u32>,
160    ) -> anyhow::Result<Self> {
161        let retry_config = RetryConfig {
162            max_retries: max_retries.unwrap_or(3),
163            initial_delay_ms: retry_delay_ms.unwrap_or(1000),
164            max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
165            backoff_factor: 2.0,
166            jitter_ms: 1000,
167            operation_timeout_ms: Some(60_000),
168            immediate_first: false,
169            max_elapsed_ms: Some(180_000),
170        };
171
172        let retry_manager = RetryManager::new(retry_config);
173        let base_url = base_url_override.unwrap_or_else(|| {
174            get_kraken_http_base_url(KrakenProductType::Spot, environment).to_string()
175        });
176
177        let rate_limit =
178            max_requests_per_second.unwrap_or(KRAKEN_SPOT_DEFAULT_RATE_LIMIT_PER_SECOND);
179
180        Ok(Self {
181            base_url,
182            client: HttpClient::new(
183                Self::default_headers(),
184                vec![],
185                Self::rate_limiter_quotas(rate_limit),
186                Some(Self::default_quota(rate_limit)),
187                timeout_secs,
188                proxy_url,
189            )
190            .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))?,
191            credential: None,
192            retry_manager,
193            cancellation_token: CancellationToken::new(),
194            clock: get_atomic_clock_realtime(),
195            auth_mutex: tokio::sync::Mutex::new(()),
196        })
197    }
198
199    /// Creates a new [`KrakenSpotRawHttpClient`] with credentials.
200    #[allow(clippy::too_many_arguments)]
201    pub fn with_credentials(
202        api_key: String,
203        api_secret: String,
204        environment: KrakenEnvironment,
205        base_url_override: Option<String>,
206        timeout_secs: Option<u64>,
207        max_retries: Option<u32>,
208        retry_delay_ms: Option<u64>,
209        retry_delay_max_ms: Option<u64>,
210        proxy_url: Option<String>,
211        max_requests_per_second: Option<u32>,
212    ) -> anyhow::Result<Self> {
213        let retry_config = RetryConfig {
214            max_retries: max_retries.unwrap_or(3),
215            initial_delay_ms: retry_delay_ms.unwrap_or(1000),
216            max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
217            backoff_factor: 2.0,
218            jitter_ms: 1000,
219            operation_timeout_ms: Some(60_000),
220            immediate_first: false,
221            max_elapsed_ms: Some(180_000),
222        };
223
224        let retry_manager = RetryManager::new(retry_config);
225        let base_url = base_url_override.unwrap_or_else(|| {
226            get_kraken_http_base_url(KrakenProductType::Spot, environment).to_string()
227        });
228
229        let rate_limit =
230            max_requests_per_second.unwrap_or(KRAKEN_SPOT_DEFAULT_RATE_LIMIT_PER_SECOND);
231
232        Ok(Self {
233            base_url,
234            client: HttpClient::new(
235                Self::default_headers(),
236                vec![],
237                Self::rate_limiter_quotas(rate_limit),
238                Some(Self::default_quota(rate_limit)),
239                timeout_secs,
240                proxy_url,
241            )
242            .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))?,
243            credential: Some(KrakenCredential::new(api_key, api_secret)),
244            retry_manager,
245            cancellation_token: CancellationToken::new(),
246            clock: get_atomic_clock_realtime(),
247            auth_mutex: tokio::sync::Mutex::new(()),
248        })
249    }
250
251    /// Generates a unique nonce for Kraken Spot API requests.
252    ///
253    /// Uses `AtomicTime` for strict monotonicity. The nanosecond timestamp
254    /// guarantees uniqueness even for rapid consecutive calls.
255    fn generate_nonce(&self) -> u64 {
256        self.clock.get_time_ns().as_u64()
257    }
258
259    /// Returns the base URL for this client.
260    pub fn base_url(&self) -> &str {
261        &self.base_url
262    }
263
264    /// Returns the credential for this client, if set.
265    pub fn credential(&self) -> Option<&KrakenCredential> {
266        self.credential.as_ref()
267    }
268
269    /// Cancels all pending HTTP requests.
270    pub fn cancel_all_requests(&self) {
271        self.cancellation_token.cancel();
272    }
273
274    /// Returns the cancellation token for this client.
275    pub fn cancellation_token(&self) -> &CancellationToken {
276        &self.cancellation_token
277    }
278
279    fn default_headers() -> HashMap<String, String> {
280        HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())])
281    }
282
283    fn default_quota(max_requests_per_second: u32) -> Quota {
284        Quota::per_second(
285            NonZeroU32::new(max_requests_per_second).unwrap_or_else(|| {
286                NonZeroU32::new(KRAKEN_SPOT_DEFAULT_RATE_LIMIT_PER_SECOND).unwrap()
287            }),
288        )
289    }
290
291    fn rate_limiter_quotas(max_requests_per_second: u32) -> Vec<(String, Quota)> {
292        vec![(
293            KRAKEN_GLOBAL_RATE_KEY.to_string(),
294            Self::default_quota(max_requests_per_second),
295        )]
296    }
297
298    fn rate_limit_keys(endpoint: &str) -> Vec<String> {
299        let normalized = endpoint.split('?').next().unwrap_or(endpoint);
300        let route = format!("kraken:spot:{normalized}");
301        vec![KRAKEN_GLOBAL_RATE_KEY.to_string(), route]
302    }
303
304    fn sign_spot(
305        &self,
306        path: &str,
307        nonce: u64,
308        params: &HashMap<String, String>,
309    ) -> anyhow::Result<(HashMap<String, String>, String)> {
310        let credential = self
311            .credential
312            .as_ref()
313            .ok_or_else(|| anyhow::anyhow!("Missing credentials"))?;
314
315        let (signature, post_data) = credential.sign_spot(path, nonce, params)?;
316
317        let mut headers = HashMap::new();
318        headers.insert("API-Key".to_string(), credential.api_key().to_string());
319        headers.insert("API-Sign".to_string(), signature);
320
321        Ok((headers, post_data))
322    }
323
324    async fn send_request<T: DeserializeOwned>(
325        &self,
326        method: Method,
327        endpoint: &str,
328        body: Option<Vec<u8>>,
329        authenticate: bool,
330    ) -> anyhow::Result<KrakenResponse<T>, KrakenHttpError> {
331        // Serialize authenticated requests to ensure nonces arrive at Kraken in order.
332        // Without this, concurrent requests can race through the network and arrive
333        // out-of-order, causing "Invalid nonce" errors.
334        let _guard = if authenticate {
335            Some(self.auth_mutex.lock().await)
336        } else {
337            None
338        };
339
340        let endpoint = endpoint.to_string();
341        let url = format!("{}{endpoint}", self.base_url);
342        let method_clone = method.clone();
343        let body_clone = body.clone();
344
345        let operation = || {
346            let url = url.clone();
347            let method = method_clone.clone();
348            let body = body_clone.clone();
349            let endpoint = endpoint.clone();
350
351            async move {
352                let mut headers = Self::default_headers();
353
354                let final_body = if authenticate {
355                    let nonce = self.generate_nonce();
356                    log::debug!("Generated nonce {nonce} for {endpoint}");
357
358                    let params: HashMap<String, String> = if let Some(ref body_bytes) = body {
359                        let body_str = std::str::from_utf8(body_bytes).map_err(|e| {
360                            KrakenHttpError::ParseError(format!(
361                                "Invalid UTF-8 in request body: {e}"
362                            ))
363                        })?;
364                        serde_urlencoded::from_str(body_str).map_err(|e| {
365                            KrakenHttpError::ParseError(format!(
366                                "Failed to parse request params: {e}"
367                            ))
368                        })?
369                    } else {
370                        HashMap::new()
371                    };
372
373                    let (auth_headers, post_data) = self
374                        .sign_spot(&endpoint, nonce, &params)
375                        .map_err(|e| KrakenHttpError::NetworkError(e.to_string()))?;
376                    headers.extend(auth_headers);
377                    Some(post_data.into_bytes())
378                } else {
379                    body
380                };
381
382                if method == Method::POST {
383                    headers.insert(
384                        "Content-Type".to_string(),
385                        "application/x-www-form-urlencoded".to_string(),
386                    );
387                }
388
389                let rate_limit_keys = Self::rate_limit_keys(&endpoint);
390
391                let response = self
392                    .client
393                    .request(
394                        method,
395                        url,
396                        None,
397                        Some(headers),
398                        final_body,
399                        None,
400                        Some(rate_limit_keys),
401                    )
402                    .await
403                    .map_err(|e| KrakenHttpError::NetworkError(e.to_string()))?;
404
405                let status = response.status.as_u16();
406                if status >= 400 {
407                    let body = String::from_utf8_lossy(&response.body).to_string();
408                    // Don't retry authentication errors
409                    if status == 401 || status == 403 {
410                        return Err(KrakenHttpError::AuthenticationError(format!(
411                            "HTTP error {status}: {body}"
412                        )));
413                    }
414                    return Err(KrakenHttpError::NetworkError(format!(
415                        "HTTP error {status}: {body}"
416                    )));
417                }
418
419                let response_text = String::from_utf8(response.body.to_vec()).map_err(|e| {
420                    KrakenHttpError::ParseError(format!("Failed to parse response as UTF-8: {e}"))
421                })?;
422
423                let kraken_response: KrakenResponse<T> = serde_json::from_str(&response_text)
424                    .map_err(|e| {
425                        KrakenHttpError::ParseError(format!("Failed to deserialize response: {e}"))
426                    })?;
427
428                if !kraken_response.error.is_empty() {
429                    return Err(KrakenHttpError::ApiError(kraken_response.error));
430                }
431
432                Ok(kraken_response)
433            }
434        };
435
436        let should_retry =
437            |error: &KrakenHttpError| -> bool { matches!(error, KrakenHttpError::NetworkError(_)) };
438        let create_error = |msg: String| -> KrakenHttpError { KrakenHttpError::NetworkError(msg) };
439
440        self.retry_manager
441            .execute_with_retry_with_cancel(
442                &endpoint,
443                operation,
444                should_retry,
445                create_error,
446                &self.cancellation_token,
447            )
448            .await
449    }
450
451    /// Requests the server time from Kraken.
452    pub async fn get_server_time(&self) -> anyhow::Result<ServerTime, KrakenHttpError> {
453        let response: KrakenResponse<ServerTime> = self
454            .send_request(Method::GET, "/0/public/Time", None, false)
455            .await?;
456
457        response.result.ok_or_else(|| {
458            KrakenHttpError::ParseError("Missing result in server time response".to_string())
459        })
460    }
461
462    /// Requests the system status from Kraken.
463    pub async fn get_system_status(&self) -> anyhow::Result<SystemStatus, KrakenHttpError> {
464        let response: KrakenResponse<SystemStatus> = self
465            .send_request(Method::GET, "/0/public/SystemStatus", None, false)
466            .await?;
467
468        response.result.ok_or_else(|| {
469            KrakenHttpError::ParseError("Missing result in system status response".to_string())
470        })
471    }
472
473    /// Requests tradable asset pairs from Kraken.
474    pub async fn get_asset_pairs(
475        &self,
476        pairs: Option<Vec<String>>,
477    ) -> anyhow::Result<AssetPairsResponse, KrakenHttpError> {
478        let endpoint = if let Some(pairs) = pairs {
479            format!("/0/public/AssetPairs?pair={}", pairs.join(","))
480        } else {
481            "/0/public/AssetPairs".to_string()
482        };
483
484        let response: KrakenResponse<AssetPairsResponse> = self
485            .send_request(Method::GET, &endpoint, None, false)
486            .await?;
487
488        response.result.ok_or_else(|| {
489            KrakenHttpError::ParseError("Missing result in asset pairs response".to_string())
490        })
491    }
492
493    /// Requests ticker information for asset pairs.
494    pub async fn get_ticker(
495        &self,
496        pairs: Vec<String>,
497    ) -> anyhow::Result<TickerResponse, KrakenHttpError> {
498        let endpoint = format!("/0/public/Ticker?pair={}", pairs.join(","));
499
500        let response: KrakenResponse<TickerResponse> = self
501            .send_request(Method::GET, &endpoint, None, false)
502            .await?;
503
504        response.result.ok_or_else(|| {
505            KrakenHttpError::ParseError("Missing result in ticker response".to_string())
506        })
507    }
508
509    /// Requests OHLC candlestick data for an asset pair.
510    pub async fn get_ohlc(
511        &self,
512        pair: &str,
513        interval: Option<u32>,
514        since: Option<i64>,
515    ) -> anyhow::Result<OhlcResponse, KrakenHttpError> {
516        let mut endpoint = format!("/0/public/OHLC?pair={pair}");
517
518        if let Some(interval) = interval {
519            endpoint.push_str(&format!("&interval={interval}"));
520        }
521        if let Some(since) = since {
522            endpoint.push_str(&format!("&since={since}"));
523        }
524
525        let response: KrakenResponse<OhlcResponse> = self
526            .send_request(Method::GET, &endpoint, None, false)
527            .await?;
528
529        response.result.ok_or_else(|| {
530            KrakenHttpError::ParseError("Missing result in OHLC response".to_string())
531        })
532    }
533
534    /// Requests order book depth for an asset pair.
535    pub async fn get_book_depth(
536        &self,
537        pair: &str,
538        count: Option<u32>,
539    ) -> anyhow::Result<OrderBookResponse, KrakenHttpError> {
540        let mut endpoint = format!("/0/public/Depth?pair={pair}");
541
542        if let Some(count) = count {
543            endpoint.push_str(&format!("&count={count}"));
544        }
545
546        let response: KrakenResponse<OrderBookResponse> = self
547            .send_request(Method::GET, &endpoint, None, false)
548            .await?;
549
550        response.result.ok_or_else(|| {
551            KrakenHttpError::ParseError("Missing result in book depth response".to_string())
552        })
553    }
554
555    /// Requests recent trades for an asset pair.
556    pub async fn get_trades(
557        &self,
558        pair: &str,
559        since: Option<String>,
560    ) -> anyhow::Result<TradesResponse, KrakenHttpError> {
561        let mut endpoint = format!("/0/public/Trades?pair={pair}");
562
563        if let Some(since) = since {
564            endpoint.push_str(&format!("&since={since}"));
565        }
566
567        let response: KrakenResponse<TradesResponse> = self
568            .send_request(Method::GET, &endpoint, None, false)
569            .await?;
570
571        response.result.ok_or_else(|| {
572            KrakenHttpError::ParseError("Missing result in trades response".to_string())
573        })
574    }
575
576    /// Requests an authentication token for WebSocket connections.
577    pub async fn get_websockets_token(&self) -> anyhow::Result<WebSocketToken, KrakenHttpError> {
578        if self.credential.is_none() {
579            return Err(KrakenHttpError::AuthenticationError(
580                "API credentials required for GetWebSocketsToken".to_string(),
581            ));
582        }
583
584        let response: KrakenResponse<WebSocketToken> = self
585            .send_request(Method::POST, "/0/private/GetWebSocketsToken", None, true)
586            .await?;
587
588        response.result.ok_or_else(|| {
589            KrakenHttpError::ParseError("Missing result in websockets token response".to_string())
590        })
591    }
592
593    /// Requests all open orders (requires authentication).
594    pub async fn get_open_orders(
595        &self,
596        trades: Option<bool>,
597        userref: Option<i64>,
598    ) -> anyhow::Result<IndexMap<String, SpotOrder>, KrakenHttpError> {
599        if self.credential.is_none() {
600            return Err(KrakenHttpError::AuthenticationError(
601                "API credentials required for OpenOrders".to_string(),
602            ));
603        }
604
605        let mut params = vec![];
606        if let Some(trades_flag) = trades {
607            params.push(format!("trades={trades_flag}"));
608        }
609        if let Some(userref_val) = userref {
610            params.push(format!("userref={userref_val}"));
611        }
612
613        let body = if params.is_empty() {
614            None
615        } else {
616            Some(params.join("&").into_bytes())
617        };
618
619        let response: KrakenResponse<SpotOpenOrdersResult> = self
620            .send_request(Method::POST, "/0/private/OpenOrders", body, true)
621            .await?;
622
623        let result = response.result.ok_or_else(|| {
624            KrakenHttpError::ParseError("Missing result in open orders response".to_string())
625        })?;
626
627        Ok(result.open)
628    }
629
630    /// Requests closed orders history (requires authentication).
631    pub async fn get_closed_orders(
632        &self,
633        trades: Option<bool>,
634        userref: Option<i64>,
635        start: Option<i64>,
636        end: Option<i64>,
637        ofs: Option<i32>,
638        closetime: Option<String>,
639    ) -> anyhow::Result<IndexMap<String, SpotOrder>, KrakenHttpError> {
640        if self.credential.is_none() {
641            return Err(KrakenHttpError::AuthenticationError(
642                "API credentials required for ClosedOrders".to_string(),
643            ));
644        }
645
646        let mut params = vec![];
647        if let Some(trades_flag) = trades {
648            params.push(format!("trades={trades_flag}"));
649        }
650        if let Some(userref_val) = userref {
651            params.push(format!("userref={userref_val}"));
652        }
653        if let Some(start_val) = start {
654            params.push(format!("start={start_val}"));
655        }
656        if let Some(end_val) = end {
657            params.push(format!("end={end_val}"));
658        }
659        if let Some(ofs_val) = ofs {
660            params.push(format!("ofs={ofs_val}"));
661        }
662        if let Some(closetime_val) = closetime {
663            params.push(format!("closetime={closetime_val}"));
664        }
665
666        let body = if params.is_empty() {
667            None
668        } else {
669            Some(params.join("&").into_bytes())
670        };
671
672        let response: KrakenResponse<SpotClosedOrdersResult> = self
673            .send_request(Method::POST, "/0/private/ClosedOrders", body, true)
674            .await?;
675
676        let result = response.result.ok_or_else(|| {
677            KrakenHttpError::ParseError("Missing result in closed orders response".to_string())
678        })?;
679
680        Ok(result.closed)
681    }
682
683    /// Requests trades history (requires authentication).
684    pub async fn get_trades_history(
685        &self,
686        trade_type: Option<String>,
687        trades: Option<bool>,
688        start: Option<i64>,
689        end: Option<i64>,
690        ofs: Option<i32>,
691    ) -> anyhow::Result<IndexMap<String, SpotTrade>, KrakenHttpError> {
692        if self.credential.is_none() {
693            return Err(KrakenHttpError::AuthenticationError(
694                "API credentials required for TradesHistory".to_string(),
695            ));
696        }
697
698        let mut params = vec![];
699        if let Some(type_val) = trade_type {
700            params.push(format!("type={type_val}"));
701        }
702        if let Some(trades_flag) = trades {
703            params.push(format!("trades={trades_flag}"));
704        }
705        if let Some(start_val) = start {
706            params.push(format!("start={start_val}"));
707        }
708        if let Some(end_val) = end {
709            params.push(format!("end={end_val}"));
710        }
711        if let Some(ofs_val) = ofs {
712            params.push(format!("ofs={ofs_val}"));
713        }
714
715        let body = if params.is_empty() {
716            None
717        } else {
718            Some(params.join("&").into_bytes())
719        };
720
721        let response: KrakenResponse<SpotTradesHistoryResult> = self
722            .send_request(Method::POST, "/0/private/TradesHistory", body, true)
723            .await?;
724
725        let result = response.result.ok_or_else(|| {
726            KrakenHttpError::ParseError("Missing result in trades history response".to_string())
727        })?;
728
729        Ok(result.trades)
730    }
731
732    /// Submits a new order (requires authentication).
733    pub async fn add_order(
734        &self,
735        params: &KrakenSpotAddOrderParams,
736    ) -> anyhow::Result<SpotAddOrderResponse, KrakenHttpError> {
737        if self.credential.is_none() {
738            return Err(KrakenHttpError::AuthenticationError(
739                "API credentials required for adding orders".to_string(),
740            ));
741        }
742
743        let param_string = serde_urlencoded::to_string(params)
744            .map_err(|e| KrakenHttpError::ParseError(format!("Failed to encode params: {e}")))?;
745        let body = Some(param_string.into_bytes());
746
747        let response: KrakenResponse<SpotAddOrderResponse> = self
748            .send_request(Method::POST, "/0/private/AddOrder", body, true)
749            .await?;
750
751        response
752            .result
753            .ok_or_else(|| KrakenHttpError::ParseError("Missing result in response".to_string()))
754    }
755
756    /// Cancels an open order (requires authentication).
757    pub async fn cancel_order(
758        &self,
759        params: &KrakenSpotCancelOrderParams,
760    ) -> anyhow::Result<SpotCancelOrderResponse, KrakenHttpError> {
761        if self.credential.is_none() {
762            return Err(KrakenHttpError::AuthenticationError(
763                "API credentials required for canceling orders".to_string(),
764            ));
765        }
766
767        let param_string = serde_urlencoded::to_string(params)
768            .map_err(|e| KrakenHttpError::ParseError(format!("Failed to encode params: {e}")))?;
769
770        let body = Some(param_string.into_bytes());
771
772        let response: KrakenResponse<SpotCancelOrderResponse> = self
773            .send_request(Method::POST, "/0/private/CancelOrder", body, true)
774            .await?;
775
776        response
777            .result
778            .ok_or_else(|| KrakenHttpError::ParseError("Missing result in response".to_string()))
779    }
780
781    /// Cancels multiple orders in a single batch request (max 50 orders).
782    pub async fn cancel_order_batch(
783        &self,
784        params: &KrakenSpotCancelOrderBatchParams,
785    ) -> anyhow::Result<SpotCancelOrderBatchResponse, KrakenHttpError> {
786        let credential = self.credential.as_ref().ok_or_else(|| {
787            KrakenHttpError::AuthenticationError(
788                "API credentials required for canceling orders".to_string(),
789            )
790        })?;
791
792        // Serialize authenticated requests to ensure nonces arrive at Kraken in order
793        let _guard = self.auth_mutex.lock().await;
794
795        let endpoint = "/0/private/CancelOrderBatch";
796        let nonce = self.generate_nonce();
797
798        // CancelOrderBatch uses JSON body with nonce included
799        let json_body = serde_json::json!({
800            "nonce": nonce.to_string(),
801            "orders": params.orders
802        });
803        let json_str = serde_json::to_string(&json_body)
804            .map_err(|e| KrakenHttpError::ParseError(format!("Failed to serialize: {e}")))?;
805
806        let signature = credential
807            .sign_spot_json(endpoint, nonce, &json_str)
808            .map_err(|e| KrakenHttpError::AuthenticationError(format!("Failed to sign: {e}")))?;
809
810        let mut headers = Self::default_headers();
811        headers.insert("API-Key".to_string(), credential.api_key().to_string());
812        headers.insert("API-Sign".to_string(), signature);
813        headers.insert("Content-Type".to_string(), "application/json".to_string());
814
815        let url = format!("{}{endpoint}", self.base_url);
816        let rate_limit_keys = Self::rate_limit_keys(endpoint);
817
818        let response = self
819            .client
820            .request(
821                Method::POST,
822                url,
823                None,
824                Some(headers),
825                Some(json_str.into_bytes()),
826                None,
827                Some(rate_limit_keys),
828            )
829            .await
830            .map_err(|e| KrakenHttpError::NetworkError(e.to_string()))?;
831
832        if response.status.as_u16() >= 400 {
833            let status = response.status.as_u16();
834            let body = String::from_utf8_lossy(&response.body).to_string();
835            if status == 401 || status == 403 {
836                return Err(KrakenHttpError::AuthenticationError(format!(
837                    "HTTP error {status}: {body}"
838                )));
839            }
840            return Err(KrakenHttpError::NetworkError(format!(
841                "HTTP error {status}: {body}"
842            )));
843        }
844
845        let response_text = String::from_utf8(response.body.to_vec())
846            .map_err(|e| KrakenHttpError::ParseError(format!("Invalid UTF-8: {e}")))?;
847
848        let kraken_response: KrakenResponse<SpotCancelOrderBatchResponse> =
849            serde_json::from_str(&response_text).map_err(|e| {
850                KrakenHttpError::ParseError(format!("Failed to parse response: {e}"))
851            })?;
852
853        if !kraken_response.error.is_empty() {
854            return Err(KrakenHttpError::ApiError(kraken_response.error));
855        }
856
857        kraken_response
858            .result
859            .ok_or_else(|| KrakenHttpError::ParseError("Missing result in response".to_string()))
860    }
861
862    /// Cancels all open orders (requires authentication).
863    pub async fn cancel_all_orders(
864        &self,
865    ) -> anyhow::Result<SpotCancelOrderResponse, KrakenHttpError> {
866        if self.credential.is_none() {
867            return Err(KrakenHttpError::AuthenticationError(
868                "API credentials required for canceling orders".to_string(),
869            ));
870        }
871
872        let response: KrakenResponse<SpotCancelOrderResponse> = self
873            .send_request(Method::POST, "/0/private/CancelAll", None, true)
874            .await?;
875
876        response
877            .result
878            .ok_or_else(|| KrakenHttpError::ParseError("Missing result in response".to_string()))
879    }
880
881    /// Edits an existing order (cancel and replace).
882    pub async fn edit_order(
883        &self,
884        params: &KrakenSpotEditOrderParams,
885    ) -> anyhow::Result<SpotEditOrderResponse, KrakenHttpError> {
886        if self.credential.is_none() {
887            return Err(KrakenHttpError::AuthenticationError(
888                "API credentials required for editing orders".to_string(),
889            ));
890        }
891
892        let param_string = serde_urlencoded::to_string(params)
893            .map_err(|e| KrakenHttpError::ParseError(format!("Failed to encode params: {e}")))?;
894
895        let body = Some(param_string.into_bytes());
896
897        let response: KrakenResponse<SpotEditOrderResponse> = self
898            .send_request(Method::POST, "/0/private/EditOrder", body, true)
899            .await?;
900
901        response
902            .result
903            .ok_or_else(|| KrakenHttpError::ParseError("Missing result in response".to_string()))
904    }
905
906    /// Amends an existing order in-place (no cancel/replace).
907    pub async fn amend_order(
908        &self,
909        params: &KrakenSpotAmendOrderParams,
910    ) -> anyhow::Result<SpotAmendOrderResponse, KrakenHttpError> {
911        if self.credential.is_none() {
912            return Err(KrakenHttpError::AuthenticationError(
913                "API credentials required for amending orders".to_string(),
914            ));
915        }
916
917        let param_string = serde_urlencoded::to_string(params)
918            .map_err(|e| KrakenHttpError::ParseError(format!("Failed to encode params: {e}")))?;
919
920        let body = Some(param_string.into_bytes());
921
922        let response: KrakenResponse<SpotAmendOrderResponse> = self
923            .send_request(Method::POST, "/0/private/AmendOrder", body, true)
924            .await?;
925
926        response
927            .result
928            .ok_or_else(|| KrakenHttpError::ParseError("Missing result in response".to_string()))
929    }
930
931    /// Requests account balances (requires authentication).
932    pub async fn get_balance(&self) -> anyhow::Result<BalanceResponse, KrakenHttpError> {
933        if self.credential.is_none() {
934            return Err(KrakenHttpError::AuthenticationError(
935                "API credentials required for Balance".to_string(),
936            ));
937        }
938
939        let response: KrakenResponse<BalanceResponse> = self
940            .send_request(Method::POST, "/0/private/Balance", None, true)
941            .await?;
942
943        response.result.ok_or_else(|| {
944            KrakenHttpError::ParseError("Missing result in balance response".to_string())
945        })
946    }
947}
948
949/// High-level HTTP client for the Kraken Spot REST API.
950///
951/// This client wraps the raw client and provides Nautilus domain types.
952/// It maintains an instrument cache and uses it to parse venue responses
953/// into Nautilus domain objects.
954#[cfg_attr(
955    feature = "python",
956    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.kraken")
957)]
958pub struct KrakenSpotHttpClient {
959    pub(crate) inner: Arc<KrakenSpotRawHttpClient>,
960    pub(crate) instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
961    cache_initialized: Arc<AtomicBool>,
962    use_spot_position_reports: Arc<AtomicBool>,
963    spot_positions_quote_currency: Arc<RwLock<Ustr>>,
964}
965
966impl Clone for KrakenSpotHttpClient {
967    fn clone(&self) -> Self {
968        Self {
969            inner: self.inner.clone(),
970            instruments_cache: self.instruments_cache.clone(),
971            cache_initialized: self.cache_initialized.clone(),
972            use_spot_position_reports: self.use_spot_position_reports.clone(),
973            spot_positions_quote_currency: self.spot_positions_quote_currency.clone(),
974        }
975    }
976}
977
978impl Default for KrakenSpotHttpClient {
979    fn default() -> Self {
980        Self::new(
981            KrakenEnvironment::Mainnet,
982            None,
983            Some(60),
984            None,
985            None,
986            None,
987            None,
988            None,
989        )
990        .expect("Failed to create default KrakenSpotHttpClient")
991    }
992}
993
994impl Debug for KrakenSpotHttpClient {
995    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
996        f.debug_struct(stringify!(KrakenSpotHttpClient))
997            .field("inner", &self.inner)
998            .finish()
999    }
1000}
1001
1002impl KrakenSpotHttpClient {
1003    /// Creates a new [`KrakenSpotHttpClient`].
1004    #[allow(clippy::too_many_arguments)]
1005    pub fn new(
1006        environment: KrakenEnvironment,
1007        base_url_override: Option<String>,
1008        timeout_secs: Option<u64>,
1009        max_retries: Option<u32>,
1010        retry_delay_ms: Option<u64>,
1011        retry_delay_max_ms: Option<u64>,
1012        proxy_url: Option<String>,
1013        max_requests_per_second: Option<u32>,
1014    ) -> anyhow::Result<Self> {
1015        Ok(Self {
1016            inner: Arc::new(KrakenSpotRawHttpClient::new(
1017                environment,
1018                base_url_override,
1019                timeout_secs,
1020                max_retries,
1021                retry_delay_ms,
1022                retry_delay_max_ms,
1023                proxy_url,
1024                max_requests_per_second,
1025            )?),
1026            instruments_cache: Arc::new(DashMap::new()),
1027            cache_initialized: Arc::new(AtomicBool::new(false)),
1028            use_spot_position_reports: Arc::new(AtomicBool::new(false)),
1029            spot_positions_quote_currency: Arc::new(RwLock::new(Ustr::from("USDT"))),
1030        })
1031    }
1032
1033    /// Creates a new [`KrakenSpotHttpClient`] with credentials.
1034    #[allow(clippy::too_many_arguments)]
1035    pub fn with_credentials(
1036        api_key: String,
1037        api_secret: String,
1038        environment: KrakenEnvironment,
1039        base_url_override: Option<String>,
1040        timeout_secs: Option<u64>,
1041        max_retries: Option<u32>,
1042        retry_delay_ms: Option<u64>,
1043        retry_delay_max_ms: Option<u64>,
1044        proxy_url: Option<String>,
1045        max_requests_per_second: Option<u32>,
1046    ) -> anyhow::Result<Self> {
1047        Ok(Self {
1048            inner: Arc::new(KrakenSpotRawHttpClient::with_credentials(
1049                api_key,
1050                api_secret,
1051                environment,
1052                base_url_override,
1053                timeout_secs,
1054                max_retries,
1055                retry_delay_ms,
1056                retry_delay_max_ms,
1057                proxy_url,
1058                max_requests_per_second,
1059            )?),
1060            instruments_cache: Arc::new(DashMap::new()),
1061            cache_initialized: Arc::new(AtomicBool::new(false)),
1062            use_spot_position_reports: Arc::new(AtomicBool::new(false)),
1063            spot_positions_quote_currency: Arc::new(RwLock::new(Ustr::from("USDT"))),
1064        })
1065    }
1066
1067    /// Creates a new [`KrakenSpotHttpClient`] loading credentials from environment variables.
1068    ///
1069    /// Looks for `KRAKEN_SPOT_API_KEY` and `KRAKEN_SPOT_API_SECRET`.
1070    ///
1071    /// Note: Kraken Spot does not have a testnet/demo environment.
1072    ///
1073    /// Falls back to unauthenticated client if credentials are not set.
1074    #[allow(clippy::too_many_arguments)]
1075    pub fn from_env(
1076        environment: KrakenEnvironment,
1077        base_url_override: Option<String>,
1078        timeout_secs: Option<u64>,
1079        max_retries: Option<u32>,
1080        retry_delay_ms: Option<u64>,
1081        retry_delay_max_ms: Option<u64>,
1082        proxy_url: Option<String>,
1083        max_requests_per_second: Option<u32>,
1084    ) -> anyhow::Result<Self> {
1085        if let Some(credential) = KrakenCredential::from_env_spot() {
1086            let (api_key, api_secret) = credential.into_parts();
1087            Self::with_credentials(
1088                api_key,
1089                api_secret,
1090                environment,
1091                base_url_override,
1092                timeout_secs,
1093                max_retries,
1094                retry_delay_ms,
1095                retry_delay_max_ms,
1096                proxy_url,
1097                max_requests_per_second,
1098            )
1099        } else {
1100            Self::new(
1101                environment,
1102                base_url_override,
1103                timeout_secs,
1104                max_retries,
1105                retry_delay_ms,
1106                retry_delay_max_ms,
1107                proxy_url,
1108                max_requests_per_second,
1109            )
1110        }
1111    }
1112
1113    /// Cancels all pending HTTP requests.
1114    pub fn cancel_all_requests(&self) {
1115        self.inner.cancel_all_requests();
1116    }
1117
1118    /// Returns the cancellation token for this client.
1119    pub fn cancellation_token(&self) -> &CancellationToken {
1120        self.inner.cancellation_token()
1121    }
1122
1123    /// Caches an instrument for symbol lookup.
1124    pub fn cache_instrument(&self, instrument: InstrumentAny) {
1125        self.instruments_cache
1126            .insert(instrument.symbol().inner(), instrument);
1127        self.cache_initialized.store(true, Ordering::Release);
1128    }
1129
1130    /// Caches multiple instruments for symbol lookup.
1131    pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
1132        for instrument in instruments {
1133            self.instruments_cache
1134                .insert(instrument.symbol().inner(), instrument);
1135        }
1136        self.cache_initialized.store(true, Ordering::Release);
1137    }
1138
1139    /// Gets an instrument from the cache by symbol.
1140    pub fn get_cached_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
1141        self.instruments_cache
1142            .get(symbol)
1143            .map(|entry| entry.value().clone())
1144    }
1145
1146    fn get_instrument_by_raw_symbol(&self, raw_symbol: &str) -> Option<InstrumentAny> {
1147        self.instruments_cache
1148            .iter()
1149            .find(|entry| entry.value().raw_symbol().as_str() == raw_symbol)
1150            .map(|entry| entry.value().clone())
1151    }
1152
1153    fn generate_ts_init(&self) -> UnixNanos {
1154        get_atomic_clock_realtime().get_time_ns()
1155    }
1156
1157    /// Sets whether to generate position reports from wallet balances for SPOT instruments.
1158    pub fn set_use_spot_position_reports(&self, value: bool) {
1159        self.use_spot_position_reports
1160            .store(value, Ordering::Relaxed);
1161    }
1162
1163    /// Sets the quote currency filter for spot position reports.
1164    pub fn set_spot_positions_quote_currency(&self, currency: &str) {
1165        let mut guard = self.spot_positions_quote_currency.write().expect("lock");
1166        *guard = Ustr::from(currency);
1167    }
1168
1169    /// Requests an authentication token for WebSocket connections.
1170    pub async fn get_websockets_token(&self) -> anyhow::Result<WebSocketToken, KrakenHttpError> {
1171        self.inner.get_websockets_token().await
1172    }
1173
1174    /// Requests tradable instruments from Kraken.
1175    pub async fn request_instruments(
1176        &self,
1177        pairs: Option<Vec<String>>,
1178    ) -> anyhow::Result<Vec<InstrumentAny>, KrakenHttpError> {
1179        let ts_init = self.generate_ts_init();
1180        let asset_pairs = self.inner.get_asset_pairs(pairs).await?;
1181
1182        let instruments: Vec<InstrumentAny> = asset_pairs
1183            .iter()
1184            .filter_map(|(pair_name, definition)| {
1185                match parse_spot_instrument(pair_name, definition, ts_init, ts_init) {
1186                    Ok(instrument) => Some(instrument),
1187                    Err(e) => {
1188                        log::warn!("Failed to parse instrument {pair_name}: {e}");
1189                        None
1190                    }
1191                }
1192            })
1193            .collect();
1194
1195        Ok(instruments)
1196    }
1197
1198    /// Requests historical trades for an instrument.
1199    pub async fn request_trades(
1200        &self,
1201        instrument_id: InstrumentId,
1202        start: Option<DateTime<Utc>>,
1203        end: Option<DateTime<Utc>>,
1204        limit: Option<u64>,
1205    ) -> anyhow::Result<Vec<TradeTick>, KrakenHttpError> {
1206        let instrument = self
1207            .get_cached_instrument(&instrument_id.symbol.inner())
1208            .ok_or_else(|| {
1209                KrakenHttpError::ParseError(format!(
1210                    "Instrument not found in cache: {instrument_id}",
1211                ))
1212            })?;
1213
1214        let raw_symbol = instrument.raw_symbol().to_string();
1215        let ts_init = self.generate_ts_init();
1216
1217        // Kraken trades API expects nanoseconds since epoch as string
1218        let since = start.map(|dt| (dt.timestamp_nanos_opt().unwrap_or(0) as u64).to_string());
1219        let response = self.inner.get_trades(&raw_symbol, since).await?;
1220
1221        let end_ns = end.map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64);
1222        let mut trades = Vec::new();
1223
1224        for (_pair_name, trade_arrays) in &response.data {
1225            for trade_array in trade_arrays {
1226                match parse_trade_tick_from_array(trade_array, &instrument, ts_init) {
1227                    Ok(trade_tick) => {
1228                        if let Some(end_nanos) = end_ns
1229                            && trade_tick.ts_event.as_u64() > end_nanos
1230                        {
1231                            continue;
1232                        }
1233                        trades.push(trade_tick);
1234
1235                        if let Some(limit_count) = limit
1236                            && trades.len() >= limit_count as usize
1237                        {
1238                            return Ok(trades);
1239                        }
1240                    }
1241                    Err(e) => {
1242                        log::warn!("Failed to parse trade tick: {e}");
1243                    }
1244                }
1245            }
1246        }
1247
1248        Ok(trades)
1249    }
1250
1251    /// Requests historical bars/OHLC data for an instrument.
1252    pub async fn request_bars(
1253        &self,
1254        bar_type: BarType,
1255        start: Option<DateTime<Utc>>,
1256        end: Option<DateTime<Utc>>,
1257        limit: Option<u64>,
1258    ) -> anyhow::Result<Vec<Bar>, KrakenHttpError> {
1259        let instrument_id = bar_type.instrument_id();
1260        let instrument = self
1261            .get_cached_instrument(&instrument_id.symbol.inner())
1262            .ok_or_else(|| {
1263                KrakenHttpError::ParseError(format!(
1264                    "Instrument not found in cache: {instrument_id}"
1265                ))
1266            })?;
1267
1268        let raw_symbol = instrument.raw_symbol().to_string();
1269        let ts_init = self.generate_ts_init();
1270
1271        let interval = Some(
1272            bar_type_to_spot_interval(bar_type)
1273                .map_err(|e| KrakenHttpError::ParseError(e.to_string()))?,
1274        );
1275
1276        // Kraken OHLC API expects Unix timestamp in seconds
1277        let since = start.map(|dt| dt.timestamp());
1278        let end_ns = end.map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64);
1279        let response = self.inner.get_ohlc(&raw_symbol, interval, since).await?;
1280
1281        let mut bars = Vec::new();
1282
1283        for (_pair_name, ohlc_arrays) in &response.data {
1284            for ohlc_array in ohlc_arrays {
1285                if ohlc_array.len() < 8 {
1286                    let len = ohlc_array.len();
1287                    log::warn!("OHLC array too short: {len}");
1288                    continue;
1289                }
1290
1291                let ohlc = OhlcData {
1292                    time: ohlc_array[0].as_i64().unwrap_or(0),
1293                    open: ohlc_array[1].as_str().unwrap_or("0").to_string(),
1294                    high: ohlc_array[2].as_str().unwrap_or("0").to_string(),
1295                    low: ohlc_array[3].as_str().unwrap_or("0").to_string(),
1296                    close: ohlc_array[4].as_str().unwrap_or("0").to_string(),
1297                    vwap: ohlc_array[5].as_str().unwrap_or("0").to_string(),
1298                    volume: ohlc_array[6].as_str().unwrap_or("0").to_string(),
1299                    count: ohlc_array[7].as_i64().unwrap_or(0),
1300                };
1301
1302                match parse_bar(&ohlc, &instrument, bar_type, ts_init) {
1303                    Ok(bar) => {
1304                        if let Some(end_nanos) = end_ns
1305                            && bar.ts_event.as_u64() > end_nanos
1306                        {
1307                            continue;
1308                        }
1309                        bars.push(bar);
1310
1311                        if let Some(limit_count) = limit
1312                            && bars.len() >= limit_count as usize
1313                        {
1314                            return Ok(bars);
1315                        }
1316                    }
1317                    Err(e) => {
1318                        log::warn!("Failed to parse bar: {e}");
1319                    }
1320                }
1321            }
1322        }
1323
1324        Ok(bars)
1325    }
1326
1327    /// Requests account state (balances) from Kraken.
1328    ///
1329    /// Returns an `AccountState` containing all currency balances.
1330    pub async fn request_account_state(
1331        &self,
1332        account_id: AccountId,
1333    ) -> anyhow::Result<AccountState> {
1334        let balances_raw = self.inner.get_balance().await?;
1335        let ts_init = self.generate_ts_init();
1336
1337        let balances: Vec<AccountBalance> = balances_raw
1338            .iter()
1339            .filter_map(|(currency_code, amount_str)| {
1340                let amount = amount_str.parse::<f64>().ok()?;
1341                if amount == 0.0 {
1342                    return None;
1343                }
1344
1345                // Kraken uses X-prefixed names for some currencies (e.g., XXBT for BTC)
1346                let normalized_code = currency_code
1347                    .strip_prefix("X")
1348                    .or_else(|| currency_code.strip_prefix("Z"))
1349                    .unwrap_or(currency_code);
1350
1351                let currency = Currency::new(
1352                    normalized_code,
1353                    8, // Default precision
1354                    0,
1355                    "0",
1356                    CurrencyType::Crypto,
1357                );
1358
1359                let total = Money::new(amount, currency);
1360                let locked = Money::new(0.0, currency);
1361
1362                // Balance endpoint returns total only, so free = total (no locked info)
1363                Some(AccountBalance::new(total, locked, total))
1364            })
1365            .collect();
1366
1367        Ok(AccountState::new(
1368            account_id,
1369            AccountType::Cash,
1370            balances,
1371            vec![], // No margins for spot
1372            true,   // reported
1373            UUID4::new(),
1374            ts_init,
1375            ts_init,
1376            None,
1377        ))
1378    }
1379
1380    /// Requests order status reports from Kraken.
1381    pub async fn request_order_status_reports(
1382        &self,
1383        account_id: AccountId,
1384        instrument_id: Option<InstrumentId>,
1385        start: Option<DateTime<Utc>>,
1386        end: Option<DateTime<Utc>>,
1387        open_only: bool,
1388    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1389        const PAGE_SIZE: i32 = 50;
1390
1391        let ts_init = self.generate_ts_init();
1392        let mut all_reports = Vec::new();
1393
1394        let open_orders = self.inner.get_open_orders(Some(true), None).await?;
1395
1396        for (order_id, order) in &open_orders {
1397            if let Some(ref target_id) = instrument_id {
1398                let instrument = self.get_cached_instrument(&target_id.symbol.inner());
1399                if let Some(inst) = instrument
1400                    && inst.raw_symbol().as_str() != order.descr.pair
1401                {
1402                    continue;
1403                }
1404            }
1405
1406            if let Some(instrument) = self.get_instrument_by_raw_symbol(order.descr.pair.as_str()) {
1407                match parse_order_status_report(order_id, order, &instrument, account_id, ts_init) {
1408                    Ok(report) => all_reports.push(report),
1409                    Err(e) => {
1410                        log::warn!("Failed to parse order {order_id}: {e}");
1411                    }
1412                }
1413            }
1414        }
1415
1416        if open_only {
1417            return Ok(all_reports);
1418        }
1419
1420        // Kraken API expects Unix timestamps in seconds
1421        let start_ts = start.map(|dt| dt.timestamp());
1422        let end_ts = end.map(|dt| dt.timestamp());
1423
1424        let mut offset = 0;
1425
1426        loop {
1427            let closed_orders = self
1428                .inner
1429                .get_closed_orders(Some(true), None, start_ts, end_ts, Some(offset), None)
1430                .await?;
1431
1432            if closed_orders.is_empty() {
1433                break;
1434            }
1435
1436            for (order_id, order) in &closed_orders {
1437                if let Some(ref target_id) = instrument_id {
1438                    let instrument = self.get_cached_instrument(&target_id.symbol.inner());
1439                    if let Some(inst) = instrument
1440                        && inst.raw_symbol().as_str() != order.descr.pair
1441                    {
1442                        continue;
1443                    }
1444                }
1445
1446                if let Some(instrument) =
1447                    self.get_instrument_by_raw_symbol(order.descr.pair.as_str())
1448                {
1449                    match parse_order_status_report(
1450                        order_id,
1451                        order,
1452                        &instrument,
1453                        account_id,
1454                        ts_init,
1455                    ) {
1456                        Ok(report) => all_reports.push(report),
1457                        Err(e) => {
1458                            log::warn!("Failed to parse order {order_id}: {e}");
1459                        }
1460                    }
1461                }
1462            }
1463
1464            offset += PAGE_SIZE;
1465        }
1466
1467        Ok(all_reports)
1468    }
1469
1470    /// Requests fill/trade reports from Kraken.
1471    pub async fn request_fill_reports(
1472        &self,
1473        account_id: AccountId,
1474        instrument_id: Option<InstrumentId>,
1475        start: Option<DateTime<Utc>>,
1476        end: Option<DateTime<Utc>>,
1477    ) -> anyhow::Result<Vec<FillReport>> {
1478        const PAGE_SIZE: i32 = 50;
1479
1480        let ts_init = self.generate_ts_init();
1481        let mut all_reports = Vec::new();
1482
1483        // Kraken API expects Unix timestamps in seconds
1484        let start_ts = start.map(|dt| dt.timestamp());
1485        let end_ts = end.map(|dt| dt.timestamp());
1486
1487        let mut offset = 0;
1488
1489        loop {
1490            let trades = self
1491                .inner
1492                .get_trades_history(None, Some(true), start_ts, end_ts, Some(offset))
1493                .await?;
1494
1495            if trades.is_empty() {
1496                break;
1497            }
1498
1499            for (trade_id, trade) in &trades {
1500                if let Some(ref target_id) = instrument_id {
1501                    let instrument = self.get_cached_instrument(&target_id.symbol.inner());
1502                    if let Some(inst) = instrument
1503                        && inst.raw_symbol().as_str() != trade.pair
1504                    {
1505                        continue;
1506                    }
1507                }
1508
1509                if let Some(instrument) = self.get_instrument_by_raw_symbol(trade.pair.as_str()) {
1510                    match parse_fill_report(trade_id, trade, &instrument, account_id, ts_init) {
1511                        Ok(report) => all_reports.push(report),
1512                        Err(e) => {
1513                            log::warn!("Failed to parse trade {trade_id}: {e}");
1514                        }
1515                    }
1516                }
1517            }
1518
1519            offset += PAGE_SIZE;
1520        }
1521
1522        Ok(all_reports)
1523    }
1524
1525    /// Requests position status reports for SPOT instruments.
1526    ///
1527    /// Returns wallet balances as position reports if `use_spot_position_reports` is enabled.
1528    /// Otherwise returns an empty vector (spot traditionally has no "positions").
1529    pub async fn request_position_status_reports(
1530        &self,
1531        account_id: AccountId,
1532        instrument_id: Option<InstrumentId>,
1533    ) -> anyhow::Result<Vec<PositionStatusReport>> {
1534        if self.use_spot_position_reports.load(Ordering::Relaxed) {
1535            self.generate_spot_position_reports_from_wallet(account_id, instrument_id)
1536                .await
1537        } else {
1538            Ok(Vec::new())
1539        }
1540    }
1541
1542    /// Generates SPOT position reports from wallet balances.
1543    ///
1544    /// Kraken spot balances are simple totals (no borrowing concept).
1545    /// Positive balances are reported as LONG positions.
1546    /// Zero balances are reported as FLAT.
1547    async fn generate_spot_position_reports_from_wallet(
1548        &self,
1549        account_id: AccountId,
1550        instrument_id: Option<InstrumentId>,
1551    ) -> anyhow::Result<Vec<PositionStatusReport>> {
1552        let balances_raw = self.inner.get_balance().await?;
1553        let ts_init = self.generate_ts_init();
1554        let mut wallet_by_coin: HashMap<Ustr, f64> = HashMap::new();
1555
1556        for (currency_code, amount_str) in &balances_raw {
1557            let balance = match amount_str.parse::<f64>() {
1558                Ok(b) => b,
1559                Err(_) => continue,
1560            };
1561
1562            if balance == 0.0 {
1563                continue;
1564            }
1565
1566            wallet_by_coin.insert(Ustr::from(normalize_currency_code(currency_code)), balance);
1567        }
1568
1569        let mut reports = Vec::new();
1570
1571        if let Some(instrument_id) = instrument_id {
1572            if let Some(instrument) = self.get_cached_instrument(&instrument_id.symbol.inner()) {
1573                let base_currency = match instrument.base_currency() {
1574                    Some(currency) => currency,
1575                    None => return Ok(reports),
1576                };
1577
1578                let coin = Ustr::from(normalize_currency_code(base_currency.code.as_str()));
1579                let wallet_balance = wallet_by_coin.get(&coin).copied().unwrap_or(0.0);
1580
1581                let side = if wallet_balance > 0.0 {
1582                    PositionSideSpecified::Long
1583                } else {
1584                    PositionSideSpecified::Flat
1585                };
1586
1587                let abs_balance = wallet_balance.abs();
1588                let quantity = Quantity::new(abs_balance, instrument.size_precision());
1589
1590                let report = PositionStatusReport::new(
1591                    account_id,
1592                    instrument_id,
1593                    side,
1594                    quantity,
1595                    ts_init,
1596                    ts_init,
1597                    None,
1598                    None,
1599                    None,
1600                );
1601
1602                reports.push(report);
1603            }
1604        } else {
1605            let quote_filter = *self.spot_positions_quote_currency.read().expect("lock");
1606
1607            for entry in self.instruments_cache.iter() {
1608                let instrument = entry.value();
1609
1610                let quote_currency = match instrument.quote_currency() {
1611                    currency if currency.code == quote_filter => currency,
1612                    _ => continue,
1613                };
1614
1615                let base_currency = match instrument.base_currency() {
1616                    Some(currency) => currency,
1617                    None => continue,
1618                };
1619
1620                let coin = Ustr::from(normalize_currency_code(base_currency.code.as_str()));
1621                let wallet_balance = wallet_by_coin.get(&coin).copied().unwrap_or(0.0);
1622
1623                if wallet_balance == 0.0 {
1624                    continue;
1625                }
1626
1627                let side = PositionSideSpecified::Long;
1628                let quantity = Quantity::new(wallet_balance, instrument.size_precision());
1629
1630                if quantity.is_zero() {
1631                    continue;
1632                }
1633
1634                log::debug!(
1635                    "Spot position: {} {} (quote: {})",
1636                    quantity,
1637                    base_currency.code,
1638                    quote_currency.code
1639                );
1640
1641                let report = PositionStatusReport::new(
1642                    account_id,
1643                    instrument.id(),
1644                    side,
1645                    quantity,
1646                    ts_init,
1647                    ts_init,
1648                    None,
1649                    None,
1650                    None,
1651                );
1652
1653                reports.push(report);
1654            }
1655        }
1656
1657        Ok(reports)
1658    }
1659
1660    /// Submits a new order to the Kraken Spot exchange.
1661    ///
1662    /// Returns the venue order ID on success. WebSocket handles all execution events.
1663    ///
1664    /// # Errors
1665    ///
1666    /// Returns an error if:
1667    /// - Credentials are missing.
1668    /// - The instrument is not found in cache.
1669    /// - The order type or time in force is not supported.
1670    /// - The request fails.
1671    /// - The order is rejected.
1672    #[allow(clippy::too_many_arguments)]
1673    pub async fn submit_order(
1674        &self,
1675        _account_id: AccountId,
1676        instrument_id: InstrumentId,
1677        client_order_id: ClientOrderId,
1678        order_side: OrderSide,
1679        order_type: OrderType,
1680        quantity: Quantity,
1681        time_in_force: TimeInForce,
1682        expire_time: Option<UnixNanos>,
1683        price: Option<Price>,
1684        trigger_price: Option<Price>,
1685        reduce_only: bool,
1686        post_only: bool,
1687    ) -> anyhow::Result<VenueOrderId> {
1688        let instrument = self
1689            .get_cached_instrument(&instrument_id.symbol.inner())
1690            .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {instrument_id}"))?;
1691
1692        let raw_symbol = instrument.raw_symbol().inner();
1693
1694        let kraken_side = match order_side {
1695            OrderSide::Buy => KrakenOrderSide::Buy,
1696            OrderSide::Sell => KrakenOrderSide::Sell,
1697            _ => anyhow::bail!("Invalid order side: {order_side:?}"),
1698        };
1699
1700        let kraken_order_type = match order_type {
1701            OrderType::Market => KrakenOrderType::Market,
1702            OrderType::Limit => KrakenOrderType::Limit,
1703            OrderType::StopMarket => KrakenOrderType::StopLoss,
1704            OrderType::StopLimit => KrakenOrderType::StopLossLimit,
1705            OrderType::MarketIfTouched => KrakenOrderType::TakeProfit,
1706            OrderType::LimitIfTouched => KrakenOrderType::TakeProfitLimit,
1707            _ => anyhow::bail!("Unsupported order type: {order_type:?}"),
1708        };
1709
1710        // Note: timeinforce is only valid for limit-type orders, not market orders
1711        let mut oflags = Vec::new();
1712        let is_limit_order = matches!(
1713            order_type,
1714            OrderType::Limit | OrderType::StopLimit | OrderType::LimitIfTouched
1715        );
1716
1717        let (timeinforce, expiretm) =
1718            compute_time_in_force(is_limit_order, time_in_force, expire_time)?;
1719
1720        if post_only {
1721            oflags.push("post");
1722        }
1723
1724        if reduce_only {
1725            log::warn!("reduce_only is not supported by Kraken Spot API, ignoring");
1726        }
1727
1728        let mut builder = KrakenSpotAddOrderParamsBuilder::default();
1729        builder
1730            .cl_ord_id(client_order_id.to_string())
1731            .broker(NAUTILUS_KRAKEN_BROKER_ID)
1732            .pair(raw_symbol)
1733            .side(kraken_side)
1734            .volume(quantity.to_string())
1735            .order_type(kraken_order_type);
1736
1737        // For stop/conditional orders:
1738        // - price = trigger price (when the order activates)
1739        // - price2 = limit price (for stop-limit and take-profit-limit)
1740        // For regular limit orders:
1741        // - price = limit price
1742        let is_conditional = matches!(
1743            order_type,
1744            OrderType::StopMarket
1745                | OrderType::StopLimit
1746                | OrderType::MarketIfTouched
1747                | OrderType::LimitIfTouched
1748        );
1749
1750        if is_conditional {
1751            if let Some(trigger) = trigger_price {
1752                builder.price(trigger.to_string());
1753            }
1754            if let Some(limit) = price {
1755                builder.price2(limit.to_string());
1756            }
1757        } else if let Some(limit) = price {
1758            builder.price(limit.to_string());
1759        }
1760
1761        if !oflags.is_empty() {
1762            builder.oflags(oflags.join(","));
1763        }
1764
1765        if let Some(tif) = timeinforce {
1766            builder.timeinforce(tif);
1767        }
1768
1769        if let Some(expire) = expiretm {
1770            builder.expiretm(expire);
1771        }
1772
1773        let params = builder
1774            .build()
1775            .map_err(|e| anyhow::anyhow!("Failed to build order params: {e}"))?;
1776
1777        let response = self.inner.add_order(&params).await?;
1778
1779        let venue_order_id = response
1780            .txid
1781            .first()
1782            .ok_or_else(|| anyhow::anyhow!("No transaction ID in order response"))?;
1783
1784        Ok(VenueOrderId::new(venue_order_id))
1785    }
1786
1787    /// Modifies an existing order on the Kraken Spot exchange using atomic amend.
1788    ///
1789    /// Uses the AmendOrder endpoint which modifies the order in-place,
1790    /// keeping the same order ID and queue position.
1791    ///
1792    /// # Errors
1793    ///
1794    /// Returns an error if:
1795    /// - Neither `client_order_id` nor `venue_order_id` is provided.
1796    /// - The instrument is not found in cache.
1797    /// - The request fails.
1798    pub async fn modify_order(
1799        &self,
1800        instrument_id: InstrumentId,
1801        client_order_id: Option<ClientOrderId>,
1802        venue_order_id: Option<VenueOrderId>,
1803        quantity: Option<Quantity>,
1804        price: Option<Price>,
1805        trigger_price: Option<Price>,
1806    ) -> anyhow::Result<VenueOrderId> {
1807        let _ = self
1808            .get_cached_instrument(&instrument_id.symbol.inner())
1809            .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {instrument_id}"))?;
1810
1811        let txid = venue_order_id.as_ref().map(|id| id.to_string());
1812        let cl_ord_id = client_order_id.as_ref().map(|id| id.to_string());
1813
1814        if txid.is_none() && cl_ord_id.is_none() {
1815            anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1816        }
1817
1818        let mut builder = KrakenSpotAmendOrderParamsBuilder::default();
1819
1820        // Prefer txid (venue_order_id) over cl_ord_id
1821        if let Some(ref id) = txid {
1822            builder.txid(id.clone());
1823        } else if let Some(ref id) = cl_ord_id {
1824            builder.cl_ord_id(id.clone());
1825        }
1826
1827        if let Some(qty) = quantity {
1828            builder.order_qty(qty.to_string());
1829        }
1830        if let Some(p) = price {
1831            builder.limit_price(p.to_string());
1832        }
1833        if let Some(tp) = trigger_price {
1834            builder.trigger_price(tp.to_string());
1835        }
1836
1837        let params = builder
1838            .build()
1839            .map_err(|e| anyhow::anyhow!("Failed to build amend order params: {e}"))?;
1840
1841        let _response = self.inner.amend_order(&params).await?;
1842
1843        // AmendOrder modifies in-place, so the order keeps its original ID
1844        let order_id = venue_order_id
1845            .ok_or_else(|| anyhow::anyhow!("venue_order_id required for amend response"))?;
1846
1847        Ok(order_id)
1848    }
1849
1850    /// Cancels an order on the Kraken Spot exchange.
1851    ///
1852    /// # Errors
1853    ///
1854    /// Returns an error if:
1855    /// - Credentials are missing.
1856    /// - Neither client_order_id nor venue_order_id is provided.
1857    /// - The request fails.
1858    /// - The order cancellation is rejected.
1859    pub async fn cancel_order(
1860        &self,
1861        _account_id: AccountId,
1862        instrument_id: InstrumentId,
1863        client_order_id: Option<ClientOrderId>,
1864        venue_order_id: Option<VenueOrderId>,
1865    ) -> anyhow::Result<()> {
1866        let _ = self
1867            .get_cached_instrument(&instrument_id.symbol.inner())
1868            .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {instrument_id}"))?;
1869
1870        let txid = venue_order_id.as_ref().map(|id| id.to_string());
1871        let cl_ord_id = client_order_id.as_ref().map(|id| id.to_string());
1872
1873        if txid.is_none() && cl_ord_id.is_none() {
1874            anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1875        }
1876
1877        // Prefer txid (venue identifier) since Kraken always knows it.
1878        // cl_ord_id may not be known to Kraken for reconciled orders.
1879        let mut builder = KrakenSpotCancelOrderParamsBuilder::default();
1880        if let Some(ref id) = txid {
1881            builder.txid(id.clone());
1882        } else if let Some(ref id) = cl_ord_id {
1883            builder.cl_ord_id(id.clone());
1884        }
1885        let params = builder
1886            .build()
1887            .map_err(|e| anyhow::anyhow!("Failed to build cancel params: {e}"))?;
1888
1889        self.inner.cancel_order(&params).await?;
1890
1891        Ok(())
1892    }
1893
1894    /// Cancels multiple orders on the Kraken Spot exchange (batched, max 50 per request).
1895    pub async fn cancel_orders_batch(
1896        &self,
1897        venue_order_ids: Vec<VenueOrderId>,
1898    ) -> anyhow::Result<i32> {
1899        if venue_order_ids.is_empty() {
1900            return Ok(0);
1901        }
1902
1903        let mut total_cancelled = 0;
1904
1905        for chunk in venue_order_ids.chunks(BATCH_CANCEL_LIMIT) {
1906            let orders: Vec<String> = chunk.iter().map(|id| id.to_string()).collect();
1907            let params = KrakenSpotCancelOrderBatchParams { orders };
1908
1909            let response = self.inner.cancel_order_batch(&params).await?;
1910            total_cancelled += response.count;
1911        }
1912
1913        Ok(total_cancelled)
1914    }
1915}
1916
1917#[cfg(test)]
1918mod tests {
1919    use rstest::rstest;
1920
1921    use super::*;
1922
1923    #[rstest]
1924    fn test_raw_client_creation() {
1925        let client = KrakenSpotRawHttpClient::default();
1926        assert!(client.credential.is_none());
1927    }
1928
1929    #[rstest]
1930    fn test_raw_client_with_credentials() {
1931        let client = KrakenSpotRawHttpClient::with_credentials(
1932            "test_key".to_string(),
1933            "test_secret".to_string(),
1934            KrakenEnvironment::Mainnet,
1935            None,
1936            None,
1937            None,
1938            None,
1939            None,
1940            None,
1941            None,
1942        )
1943        .unwrap();
1944        assert!(client.credential.is_some());
1945    }
1946
1947    #[rstest]
1948    fn test_client_creation() {
1949        let client = KrakenSpotHttpClient::default();
1950        assert!(client.instruments_cache.is_empty());
1951    }
1952
1953    #[rstest]
1954    fn test_client_with_credentials() {
1955        let client = KrakenSpotHttpClient::with_credentials(
1956            "test_key".to_string(),
1957            "test_secret".to_string(),
1958            KrakenEnvironment::Mainnet,
1959            None,
1960            None,
1961            None,
1962            None,
1963            None,
1964            None,
1965            None,
1966        )
1967        .unwrap();
1968        assert!(client.instruments_cache.is_empty());
1969    }
1970
1971    #[rstest]
1972    fn test_nonce_generation_strictly_increasing() {
1973        let client = KrakenSpotRawHttpClient::default();
1974
1975        let nonce1 = client.generate_nonce();
1976        let nonce2 = client.generate_nonce();
1977        let nonce3 = client.generate_nonce();
1978
1979        assert!(
1980            nonce2 > nonce1,
1981            "nonce2 ({nonce2}) should be > nonce1 ({nonce1})"
1982        );
1983        assert!(
1984            nonce3 > nonce2,
1985            "nonce3 ({nonce3}) should be > nonce2 ({nonce2})"
1986        );
1987    }
1988
1989    #[rstest]
1990    fn test_nonce_is_nanosecond_timestamp() {
1991        let client = KrakenSpotRawHttpClient::default();
1992
1993        let nonce = client.generate_nonce();
1994
1995        // Nonce should be a nanosecond timestamp (roughly 1.7e18 for Dec 2025)
1996        // Verify it's in a reasonable range (> 1.5e18, which is ~2017)
1997        assert!(
1998            nonce > 1_500_000_000_000_000_000,
1999            "Nonce should be nanosecond timestamp"
2000        );
2001    }
2002
2003    #[rstest]
2004    #[case::gtc_limit(true, TimeInForce::Gtc, None, None, None)]
2005    #[case::ioc_limit(true, TimeInForce::Ioc, None, Some("IOC"), None)]
2006    #[case::gtd_limit_with_expire(
2007        true,
2008        TimeInForce::Gtd,
2009        Some(1_704_067_200_000_000_000u64),
2010        Some("GTD"),
2011        Some("1704067200")
2012    )]
2013    #[case::gtc_market(false, TimeInForce::Gtc, None, None, None)]
2014    #[case::ioc_market(false, TimeInForce::Ioc, None, None, None)]
2015    fn test_compute_time_in_force_success(
2016        #[case] is_limit: bool,
2017        #[case] tif: TimeInForce,
2018        #[case] expire_nanos: Option<u64>,
2019        #[case] expected_tif: Option<&str>,
2020        #[case] expected_expire: Option<&str>,
2021    ) {
2022        let expire_time = expire_nanos.map(UnixNanos::from);
2023        let result = compute_time_in_force(is_limit, tif, expire_time).unwrap();
2024        assert_eq!(result.0, expected_tif.map(String::from));
2025        assert_eq!(result.1, expected_expire.map(String::from));
2026    }
2027
2028    #[rstest]
2029    #[case::fok_not_supported(TimeInForce::Fok, None, "FOK")]
2030    #[case::gtd_missing_expire(TimeInForce::Gtd, None, "expire_time")]
2031    fn test_compute_time_in_force_errors(
2032        #[case] tif: TimeInForce,
2033        #[case] expire_nanos: Option<u64>,
2034        #[case] expected_error: &str,
2035    ) {
2036        let expire_time = expire_nanos.map(UnixNanos::from);
2037        let result = compute_time_in_force(true, tif, expire_time);
2038        assert!(result.is_err());
2039        assert!(result.unwrap_err().to_string().contains(expected_error));
2040    }
2041}