nautilus_kraken/http/futures/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! HTTP client for the Kraken Futures REST API.
17
18use std::{
19    collections::HashMap,
20    fmt::Debug,
21    num::NonZeroU32,
22    sync::{
23        Arc,
24        atomic::{AtomicBool, Ordering},
25    },
26};
27
28use chrono::{DateTime, Utc};
29use dashmap::DashMap;
30use nautilus_core::{
31    AtomicTime, UUID4, consts::NAUTILUS_USER_AGENT, nanos::UnixNanos,
32    time::get_atomic_clock_realtime,
33};
34use nautilus_model::{
35    data::{Bar, BarType, TradeTick},
36    enums::{AccountType, CurrencyType, OrderSide, OrderType, TimeInForce},
37    events::AccountState,
38    identifiers::{AccountId, ClientOrderId, InstrumentId, VenueOrderId},
39    instruments::{Instrument, InstrumentAny},
40    reports::{FillReport, OrderStatusReport, PositionStatusReport},
41    types::{AccountBalance, Currency, Money, Price, Quantity},
42};
43use nautilus_network::{
44    http::{HttpClient, Method, USER_AGENT},
45    ratelimiter::quota::Quota,
46    retry::{RetryConfig, RetryManager},
47};
48use serde::de::DeserializeOwned;
49use tokio_util::sync::CancellationToken;
50use ustr::Ustr;
51
52use super::{models::*, query::*};
53use crate::{
54    common::{
55        consts::NAUTILUS_KRAKEN_BROKER_ID,
56        credential::KrakenCredential,
57        enums::{
58            KrakenApiResult, KrakenEnvironment, KrakenFuturesOrderType, KrakenOrderSide,
59            KrakenProductType, KrakenSendStatus,
60        },
61        parse::{
62            bar_type_to_futures_resolution, parse_bar, parse_futures_fill_report,
63            parse_futures_instrument, parse_futures_order_event_status_report,
64            parse_futures_order_status_report, parse_futures_position_status_report,
65            parse_futures_public_execution,
66        },
67        urls::get_kraken_http_base_url,
68    },
69    http::{error::KrakenHttpError, models::OhlcData},
70};
71
72/// Default Kraken Futures REST API rate limit (requests per second).
73pub const KRAKEN_FUTURES_DEFAULT_RATE_LIMIT_PER_SECOND: u32 = 5;
74
75const KRAKEN_GLOBAL_RATE_KEY: &str = "kraken:futures:global";
76
77/// Maximum orders per batch cancel request for Kraken Futures API.
78const BATCH_CANCEL_LIMIT: usize = 50;
79
80/// Raw HTTP client for low-level Kraken Futures API operations.
81///
82/// This client handles request/response operations with the Kraken Futures API,
83/// returning venue-specific response types. It does not parse to Nautilus domain types.
84pub struct KrakenFuturesRawHttpClient {
85    base_url: String,
86    client: HttpClient,
87    credential: Option<KrakenCredential>,
88    retry_manager: RetryManager<KrakenHttpError>,
89    cancellation_token: CancellationToken,
90    clock: &'static AtomicTime,
91    /// Mutex to serialize authenticated requests, ensuring nonces arrive at Kraken in order
92    auth_mutex: tokio::sync::Mutex<()>,
93}
94
95impl Default for KrakenFuturesRawHttpClient {
96    fn default() -> Self {
97        Self::new(
98            KrakenEnvironment::Mainnet,
99            None,
100            Some(60),
101            None,
102            None,
103            None,
104            None,
105            None,
106        )
107        .expect("Failed to create default KrakenFuturesRawHttpClient")
108    }
109}
110
111impl Debug for KrakenFuturesRawHttpClient {
112    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113        f.debug_struct(stringify!(KrakenFuturesRawHttpClient))
114            .field("base_url", &self.base_url)
115            .field("has_credentials", &self.credential.is_some())
116            .finish()
117    }
118}
119
120impl KrakenFuturesRawHttpClient {
121    /// Creates a new [`KrakenFuturesRawHttpClient`].
122    #[allow(clippy::too_many_arguments)]
123    pub fn new(
124        environment: KrakenEnvironment,
125        base_url_override: Option<String>,
126        timeout_secs: Option<u64>,
127        max_retries: Option<u32>,
128        retry_delay_ms: Option<u64>,
129        retry_delay_max_ms: Option<u64>,
130        proxy_url: Option<String>,
131        max_requests_per_second: Option<u32>,
132    ) -> anyhow::Result<Self> {
133        let retry_config = RetryConfig {
134            max_retries: max_retries.unwrap_or(3),
135            initial_delay_ms: retry_delay_ms.unwrap_or(1000),
136            max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
137            backoff_factor: 2.0,
138            jitter_ms: 1000,
139            operation_timeout_ms: Some(60_000),
140            immediate_first: false,
141            max_elapsed_ms: Some(180_000),
142        };
143
144        let retry_manager = RetryManager::new(retry_config);
145        let base_url = base_url_override.unwrap_or_else(|| {
146            get_kraken_http_base_url(KrakenProductType::Futures, environment).to_string()
147        });
148
149        let rate_limit =
150            max_requests_per_second.unwrap_or(KRAKEN_FUTURES_DEFAULT_RATE_LIMIT_PER_SECOND);
151
152        Ok(Self {
153            base_url,
154            client: HttpClient::new(
155                Self::default_headers(),
156                vec![],
157                Self::rate_limiter_quotas(rate_limit),
158                Some(Self::default_quota(rate_limit)),
159                timeout_secs,
160                proxy_url,
161            )
162            .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))?,
163            credential: None,
164            retry_manager,
165            cancellation_token: CancellationToken::new(),
166            clock: get_atomic_clock_realtime(),
167            auth_mutex: tokio::sync::Mutex::new(()),
168        })
169    }
170
171    /// Creates a new [`KrakenFuturesRawHttpClient`] with credentials.
172    #[allow(clippy::too_many_arguments)]
173    pub fn with_credentials(
174        api_key: String,
175        api_secret: String,
176        environment: KrakenEnvironment,
177        base_url_override: Option<String>,
178        timeout_secs: Option<u64>,
179        max_retries: Option<u32>,
180        retry_delay_ms: Option<u64>,
181        retry_delay_max_ms: Option<u64>,
182        proxy_url: Option<String>,
183        max_requests_per_second: Option<u32>,
184    ) -> anyhow::Result<Self> {
185        let retry_config = RetryConfig {
186            max_retries: max_retries.unwrap_or(3),
187            initial_delay_ms: retry_delay_ms.unwrap_or(1000),
188            max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
189            backoff_factor: 2.0,
190            jitter_ms: 1000,
191            operation_timeout_ms: Some(60_000),
192            immediate_first: false,
193            max_elapsed_ms: Some(180_000),
194        };
195
196        let retry_manager = RetryManager::new(retry_config);
197        let base_url = base_url_override.unwrap_or_else(|| {
198            get_kraken_http_base_url(KrakenProductType::Futures, environment).to_string()
199        });
200
201        let rate_limit =
202            max_requests_per_second.unwrap_or(KRAKEN_FUTURES_DEFAULT_RATE_LIMIT_PER_SECOND);
203
204        Ok(Self {
205            base_url,
206            client: HttpClient::new(
207                Self::default_headers(),
208                vec![],
209                Self::rate_limiter_quotas(rate_limit),
210                Some(Self::default_quota(rate_limit)),
211                timeout_secs,
212                proxy_url,
213            )
214            .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))?,
215            credential: Some(KrakenCredential::new(api_key, api_secret)),
216            retry_manager,
217            cancellation_token: CancellationToken::new(),
218            clock: get_atomic_clock_realtime(),
219            auth_mutex: tokio::sync::Mutex::new(()),
220        })
221    }
222
223    /// Generates a unique nonce for Kraken Futures API requests.
224    ///
225    /// Uses `AtomicTime` for strict monotonicity. The nanosecond timestamp
226    /// guarantees uniqueness even for rapid consecutive calls.
227    fn generate_nonce(&self) -> u64 {
228        self.clock.get_time_ns().as_u64()
229    }
230
231    /// Returns the base URL for this client.
232    pub fn base_url(&self) -> &str {
233        &self.base_url
234    }
235
236    /// Returns the credential for this client, if set.
237    pub fn credential(&self) -> Option<&KrakenCredential> {
238        self.credential.as_ref()
239    }
240
241    /// Cancels all pending HTTP requests.
242    pub fn cancel_all_requests(&self) {
243        self.cancellation_token.cancel();
244    }
245
246    /// Returns the cancellation token for this client.
247    pub fn cancellation_token(&self) -> &CancellationToken {
248        &self.cancellation_token
249    }
250
251    fn default_headers() -> HashMap<String, String> {
252        HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())])
253    }
254
255    fn default_quota(max_requests_per_second: u32) -> Quota {
256        Quota::per_second(NonZeroU32::new(max_requests_per_second).unwrap_or_else(|| {
257            NonZeroU32::new(KRAKEN_FUTURES_DEFAULT_RATE_LIMIT_PER_SECOND).unwrap()
258        }))
259    }
260
261    fn rate_limiter_quotas(max_requests_per_second: u32) -> Vec<(String, Quota)> {
262        vec![(
263            KRAKEN_GLOBAL_RATE_KEY.to_string(),
264            Self::default_quota(max_requests_per_second),
265        )]
266    }
267
268    fn rate_limit_keys(endpoint: &str) -> Vec<String> {
269        let normalized = endpoint.split('?').next().unwrap_or(endpoint);
270        let route = format!("kraken:futures:{normalized}");
271        vec![KRAKEN_GLOBAL_RATE_KEY.to_string(), route]
272    }
273
274    async fn send_request<T: DeserializeOwned>(
275        &self,
276        method: Method,
277        endpoint: &str,
278        url: String,
279        authenticate: bool,
280    ) -> anyhow::Result<T, KrakenHttpError> {
281        // Serialize authenticated requests to ensure nonces arrive at Kraken in order.
282        // Without this, concurrent requests can race through the network and arrive
283        // out-of-order, causing "Invalid nonce" errors.
284        let _guard = if authenticate {
285            Some(self.auth_mutex.lock().await)
286        } else {
287            None
288        };
289
290        let endpoint = endpoint.to_string();
291        let method_clone = method.clone();
292        let url_clone = url.clone();
293        let credential = self.credential.clone();
294
295        let operation = || {
296            let url = url_clone.clone();
297            let method = method_clone.clone();
298            let endpoint = endpoint.clone();
299            let credential = credential.clone();
300
301            async move {
302                let mut headers = Self::default_headers();
303
304                if authenticate {
305                    let cred = credential.as_ref().ok_or_else(|| {
306                        KrakenHttpError::AuthenticationError(
307                            "Missing credentials for authenticated request".to_string(),
308                        )
309                    })?;
310
311                    let nonce = self.generate_nonce();
312
313                    let signature = cred.sign_futures(&endpoint, "", nonce).map_err(|e| {
314                        KrakenHttpError::AuthenticationError(format!("Failed to sign request: {e}"))
315                    })?;
316
317                    let base_url = &self.base_url;
318                    tracing::debug!(
319                        "Kraken Futures auth: endpoint={endpoint}, nonce={nonce}, base_url={base_url}"
320                    );
321
322                    headers.insert("APIKey".to_string(), cred.api_key().to_string());
323                    headers.insert("Authent".to_string(), signature);
324                    headers.insert("Nonce".to_string(), nonce.to_string());
325                }
326
327                let rate_limit_keys = Self::rate_limit_keys(&endpoint);
328
329                let response = self
330                    .client
331                    .request(
332                        method,
333                        url,
334                        None,
335                        Some(headers),
336                        None,
337                        None,
338                        Some(rate_limit_keys),
339                    )
340                    .await
341                    .map_err(|e| KrakenHttpError::NetworkError(e.to_string()))?;
342
343                let status = response.status.as_u16();
344                if status >= 400 {
345                    let body = String::from_utf8_lossy(&response.body).to_string();
346                    // Don't retry authentication errors
347                    if status == 401 || status == 403 {
348                        return Err(KrakenHttpError::AuthenticationError(format!(
349                            "HTTP error {status}: {body}"
350                        )));
351                    }
352                    return Err(KrakenHttpError::NetworkError(format!(
353                        "HTTP error {status}: {body}"
354                    )));
355                }
356
357                let response_text = String::from_utf8(response.body.to_vec()).map_err(|e| {
358                    KrakenHttpError::ParseError(format!("Failed to parse response as UTF-8: {e}"))
359                })?;
360
361                serde_json::from_str(&response_text).map_err(|e| {
362                    KrakenHttpError::ParseError(format!(
363                        "Failed to deserialize futures response: {e}"
364                    ))
365                })
366            }
367        };
368
369        let should_retry =
370            |error: &KrakenHttpError| -> bool { matches!(error, KrakenHttpError::NetworkError(_)) };
371        let create_error = |msg: String| -> KrakenHttpError { KrakenHttpError::NetworkError(msg) };
372
373        self.retry_manager
374            .execute_with_retry_with_cancel(
375                &endpoint,
376                operation,
377                should_retry,
378                create_error,
379                &self.cancellation_token,
380            )
381            .await
382    }
383
384    /// Sends authenticated GET request with query parameters included in signature.
385    ///
386    /// For Kraken Futures, GET requests with query params must include them in postData
387    /// for signing: message = postData + nonce + endpoint
388    async fn send_get_with_query<T: DeserializeOwned>(
389        &self,
390        endpoint: &str,
391        url: String,
392        query_string: &str,
393    ) -> anyhow::Result<T, KrakenHttpError> {
394        let _guard = self.auth_mutex.lock().await;
395
396        if self.cancellation_token.is_cancelled() {
397            return Err(KrakenHttpError::NetworkError(
398                "Request cancelled".to_string(),
399            ));
400        }
401
402        let credential = self.credential.as_ref().ok_or_else(|| {
403            KrakenHttpError::AuthenticationError("Missing credentials".to_string())
404        })?;
405
406        let nonce = self.generate_nonce();
407
408        // Query params go in postData for signing (not in endpoint)
409        let signature = credential
410            .sign_futures(endpoint, query_string, nonce)
411            .map_err(|e| {
412                KrakenHttpError::AuthenticationError(format!("Failed to sign request: {e}"))
413            })?;
414
415        tracing::debug!(
416            "Kraken Futures GET with query: endpoint={endpoint}, query={query_string}, nonce={nonce}"
417        );
418
419        let mut headers = Self::default_headers();
420        headers.insert("APIKey".to_string(), credential.api_key().to_string());
421        headers.insert("Authent".to_string(), signature);
422        headers.insert("Nonce".to_string(), nonce.to_string());
423
424        let rate_limit_keys = Self::rate_limit_keys(endpoint);
425
426        let response = self
427            .client
428            .request(
429                Method::GET,
430                url,
431                None,
432                Some(headers),
433                None,
434                None,
435                Some(rate_limit_keys),
436            )
437            .await
438            .map_err(|e| KrakenHttpError::NetworkError(e.to_string()))?;
439
440        let status = response.status.as_u16();
441        if status >= 400 {
442            let body = String::from_utf8_lossy(&response.body).to_string();
443            if status == 401 || status == 403 {
444                return Err(KrakenHttpError::AuthenticationError(format!(
445                    "HTTP error {status}: {body}"
446                )));
447            }
448            return Err(KrakenHttpError::NetworkError(format!(
449                "HTTP error {status}: {body}"
450            )));
451        }
452
453        let response_text = String::from_utf8(response.body.to_vec()).map_err(|e| {
454            KrakenHttpError::ParseError(format!("Failed to parse response as UTF-8: {e}"))
455        })?;
456
457        serde_json::from_str(&response_text).map_err(|e| {
458            KrakenHttpError::ParseError(format!("Failed to deserialize futures response: {e}"))
459        })
460    }
461
462    async fn send_request_with_body<T: DeserializeOwned>(
463        &self,
464        endpoint: &str,
465        params: HashMap<String, String>,
466    ) -> anyhow::Result<T, KrakenHttpError> {
467        let post_data = serde_urlencoded::to_string(&params)
468            .map_err(|e| KrakenHttpError::ParseError(format!("Failed to encode params: {e}")))?;
469        self.send_authenticated_post(endpoint, post_data).await
470    }
471
472    /// Sends a request with typed parameters (serializable struct).
473    async fn send_request_with_params<P: serde::Serialize, T: DeserializeOwned>(
474        &self,
475        endpoint: &str,
476        params: &P,
477    ) -> anyhow::Result<T, KrakenHttpError> {
478        let post_data = serde_urlencoded::to_string(params)
479            .map_err(|e| KrakenHttpError::ParseError(format!("Failed to encode params: {e}")))?;
480        self.send_authenticated_post(endpoint, post_data).await
481    }
482
483    /// Core authenticated POST request - takes raw post_data string.
484    async fn send_authenticated_post<T: DeserializeOwned>(
485        &self,
486        endpoint: &str,
487        post_data: String,
488    ) -> anyhow::Result<T, KrakenHttpError> {
489        if self.cancellation_token.is_cancelled() {
490            return Err(KrakenHttpError::NetworkError(
491                "Request cancelled".to_string(),
492            ));
493        }
494
495        // Serialize authenticated requests to ensure nonces arrive at Kraken in order
496        let _guard = self.auth_mutex.lock().await;
497
498        if self.cancellation_token.is_cancelled() {
499            return Err(KrakenHttpError::NetworkError(
500                "Request cancelled".to_string(),
501            ));
502        }
503
504        let credential = self.credential.as_ref().ok_or_else(|| {
505            KrakenHttpError::AuthenticationError("Missing credentials".to_string())
506        })?;
507
508        let nonce = self.generate_nonce();
509        tracing::debug!("Generated nonce {nonce} for {endpoint}");
510
511        let signature = credential
512            .sign_futures(endpoint, &post_data, nonce)
513            .map_err(|e| {
514                KrakenHttpError::AuthenticationError(format!("Failed to sign request: {e}"))
515            })?;
516
517        let url = format!("{}{endpoint}", self.base_url);
518        let mut headers = Self::default_headers();
519        headers.insert(
520            "Content-Type".to_string(),
521            "application/x-www-form-urlencoded".to_string(),
522        );
523        headers.insert("APIKey".to_string(), credential.api_key().to_string());
524        headers.insert("Authent".to_string(), signature);
525        headers.insert("Nonce".to_string(), nonce.to_string());
526
527        let rate_limit_keys = Self::rate_limit_keys(endpoint);
528
529        let response = self
530            .client
531            .request(
532                Method::POST,
533                url,
534                None,
535                Some(headers),
536                Some(post_data.into_bytes()),
537                None,
538                Some(rate_limit_keys),
539            )
540            .await
541            .map_err(|e| KrakenHttpError::NetworkError(e.to_string()))?;
542
543        if response.status.as_u16() >= 400 {
544            let status = response.status.as_u16();
545            let body = String::from_utf8_lossy(&response.body).to_string();
546            return Err(KrakenHttpError::NetworkError(format!(
547                "HTTP error {status}: {body}"
548            )));
549        }
550
551        let response_text = String::from_utf8(response.body.to_vec()).map_err(|e| {
552            KrakenHttpError::ParseError(format!("Failed to parse response as UTF-8: {e}"))
553        })?;
554
555        serde_json::from_str(&response_text).map_err(|e| {
556            tracing::error!("Failed to parse response from {endpoint}: {response_text}");
557            KrakenHttpError::ParseError(format!("Failed to deserialize response: {e}"))
558        })
559    }
560
561    /// Requests tradable instruments from Kraken Futures.
562    pub async fn get_instruments(
563        &self,
564    ) -> anyhow::Result<FuturesInstrumentsResponse, KrakenHttpError> {
565        let endpoint = "/derivatives/api/v3/instruments";
566        let url = format!("{}{endpoint}", self.base_url);
567
568        self.send_request(Method::GET, endpoint, url, false).await
569    }
570
571    /// Requests ticker information for all futures instruments.
572    pub async fn get_tickers(&self) -> anyhow::Result<FuturesTickersResponse, KrakenHttpError> {
573        let endpoint = "/derivatives/api/v3/tickers";
574        let url = format!("{}{endpoint}", self.base_url);
575
576        self.send_request(Method::GET, endpoint, url, false).await
577    }
578
579    /// Requests OHLC candlestick data for a futures symbol.
580    pub async fn get_ohlc(
581        &self,
582        tick_type: &str,
583        symbol: &str,
584        resolution: &str,
585        from: Option<i64>,
586        to: Option<i64>,
587    ) -> anyhow::Result<FuturesCandlesResponse, KrakenHttpError> {
588        let endpoint = format!("/api/charts/v1/{tick_type}/{symbol}/{resolution}");
589
590        let mut url = format!("{}{endpoint}", self.base_url);
591
592        let mut query_params = Vec::new();
593        if let Some(from_ts) = from {
594            query_params.push(format!("from={from_ts}"));
595        }
596        if let Some(to_ts) = to {
597            query_params.push(format!("to={to_ts}"));
598        }
599
600        if !query_params.is_empty() {
601            url.push('?');
602            url.push_str(&query_params.join("&"));
603        }
604
605        self.send_request(Method::GET, &endpoint, url, false).await
606    }
607
608    /// Gets public execution events (trades) for a futures symbol.
609    pub async fn get_public_executions(
610        &self,
611        symbol: &str,
612        since: Option<i64>,
613        before: Option<i64>,
614        sort: Option<&str>,
615        continuation_token: Option<&str>,
616    ) -> anyhow::Result<FuturesPublicExecutionsResponse, KrakenHttpError> {
617        let endpoint = format!("/api/history/v3/market/{symbol}/executions");
618
619        let mut url = format!("{}{endpoint}", self.base_url);
620
621        let mut query_params = Vec::new();
622        if let Some(since_ts) = since {
623            query_params.push(format!("since={since_ts}"));
624        }
625        if let Some(before_ts) = before {
626            query_params.push(format!("before={before_ts}"));
627        }
628        if let Some(sort_order) = sort {
629            query_params.push(format!("sort={sort_order}"));
630        }
631        if let Some(token) = continuation_token {
632            query_params.push(format!("continuationToken={token}"));
633        }
634
635        if !query_params.is_empty() {
636            url.push('?');
637            url.push_str(&query_params.join("&"));
638        }
639
640        self.send_request(Method::GET, &endpoint, url, false).await
641    }
642
643    /// Requests all open orders (requires authentication).
644    pub async fn get_open_orders(
645        &self,
646    ) -> anyhow::Result<FuturesOpenOrdersResponse, KrakenHttpError> {
647        if self.credential.is_none() {
648            return Err(KrakenHttpError::AuthenticationError(
649                "API credentials required for futures open orders".to_string(),
650            ));
651        }
652
653        let endpoint = "/derivatives/api/v3/openorders";
654        let url = format!("{}{endpoint}", self.base_url);
655
656        self.send_request(Method::GET, endpoint, url, true).await
657    }
658
659    /// Requests historical order events (requires authentication).
660    pub async fn get_order_events(
661        &self,
662        before: Option<i64>,
663        since: Option<i64>,
664        continuation_token: Option<&str>,
665    ) -> anyhow::Result<FuturesOrderEventsResponse, KrakenHttpError> {
666        if self.credential.is_none() {
667            return Err(KrakenHttpError::AuthenticationError(
668                "API credentials required for futures order events".to_string(),
669            ));
670        }
671
672        let endpoint = "/api/history/v2/orders";
673        let mut query_params = Vec::new();
674
675        if let Some(before_ts) = before {
676            query_params.push(format!("before={before_ts}"));
677        }
678        if let Some(since_ts) = since {
679            query_params.push(format!("since={since_ts}"));
680        }
681        if let Some(token) = continuation_token {
682            query_params.push(format!("continuation_token={token}"));
683        }
684
685        // Build URL with query params
686        let query_string = query_params.join("&");
687        let url = if query_string.is_empty() {
688            format!("{}{endpoint}", self.base_url)
689        } else {
690            format!("{}{endpoint}?{query_string}", self.base_url)
691        };
692
693        // For signing: query params go in postData, not endpoint
694        // Kraken: message = postData + nonce + endpoint
695        self.send_get_with_query(endpoint, url, &query_string).await
696    }
697
698    /// Requests fill/trade history (requires authentication).
699    pub async fn get_fills(
700        &self,
701        last_fill_time: Option<&str>,
702    ) -> anyhow::Result<FuturesFillsResponse, KrakenHttpError> {
703        if self.credential.is_none() {
704            return Err(KrakenHttpError::AuthenticationError(
705                "API credentials required for futures fills".to_string(),
706            ));
707        }
708
709        let endpoint = "/derivatives/api/v3/fills";
710        let query_string = last_fill_time
711            .map(|t| format!("lastFillTime={t}"))
712            .unwrap_or_default();
713
714        let url = if query_string.is_empty() {
715            format!("{}{endpoint}", self.base_url)
716        } else {
717            format!("{}{endpoint}?{query_string}", self.base_url)
718        };
719
720        // Query params go in postData for signing
721        self.send_get_with_query(endpoint, url, &query_string).await
722    }
723
724    /// Requests open positions (requires authentication).
725    pub async fn get_open_positions(
726        &self,
727    ) -> anyhow::Result<FuturesOpenPositionsResponse, KrakenHttpError> {
728        if self.credential.is_none() {
729            return Err(KrakenHttpError::AuthenticationError(
730                "API credentials required for futures open positions".to_string(),
731            ));
732        }
733
734        let endpoint = "/derivatives/api/v3/openpositions";
735        let url = format!("{}{endpoint}", self.base_url);
736
737        self.send_request(Method::GET, endpoint, url, true).await
738    }
739
740    /// Requests all accounts (cash and margin) with balances and margin info.
741    pub async fn get_accounts(&self) -> anyhow::Result<FuturesAccountsResponse, KrakenHttpError> {
742        if self.credential.is_none() {
743            return Err(KrakenHttpError::AuthenticationError(
744                "API credentials required for futures accounts".to_string(),
745            ));
746        }
747
748        let endpoint = "/derivatives/api/v3/accounts";
749        let url = format!("{}{endpoint}", self.base_url);
750
751        self.send_request(Method::GET, endpoint, url, true).await
752    }
753
754    /// Submits a new order (requires authentication).
755    pub async fn send_order(
756        &self,
757        params: HashMap<String, String>,
758    ) -> anyhow::Result<FuturesSendOrderResponse, KrakenHttpError> {
759        if self.credential.is_none() {
760            return Err(KrakenHttpError::AuthenticationError(
761                "API credentials required for sending orders".to_string(),
762            ));
763        }
764
765        let endpoint = "/derivatives/api/v3/sendorder";
766        self.send_request_with_body(endpoint, params).await
767    }
768
769    /// Submits a new order using typed parameters (requires authentication).
770    pub async fn send_order_params(
771        &self,
772        params: &KrakenFuturesSendOrderParams,
773    ) -> anyhow::Result<FuturesSendOrderResponse, KrakenHttpError> {
774        if self.credential.is_none() {
775            return Err(KrakenHttpError::AuthenticationError(
776                "API credentials required for sending orders".to_string(),
777            ));
778        }
779
780        let endpoint = "/derivatives/api/v3/sendorder";
781        self.send_request_with_params(endpoint, params).await
782    }
783
784    /// Cancels an open order (requires authentication).
785    pub async fn cancel_order(
786        &self,
787        order_id: Option<String>,
788        cli_ord_id: Option<String>,
789    ) -> anyhow::Result<FuturesCancelOrderResponse, KrakenHttpError> {
790        if self.credential.is_none() {
791            return Err(KrakenHttpError::AuthenticationError(
792                "API credentials required for canceling orders".to_string(),
793            ));
794        }
795
796        let mut params = HashMap::new();
797        if let Some(id) = order_id {
798            params.insert("order_id".to_string(), id);
799        }
800        if let Some(id) = cli_ord_id {
801            params.insert("cliOrdId".to_string(), id);
802        }
803
804        let endpoint = "/derivatives/api/v3/cancelorder";
805        self.send_request_with_body(endpoint, params).await
806    }
807
808    /// Edits an existing order (requires authentication).
809    pub async fn edit_order(
810        &self,
811        params: &KrakenFuturesEditOrderParams,
812    ) -> anyhow::Result<FuturesEditOrderResponse, KrakenHttpError> {
813        if self.credential.is_none() {
814            return Err(KrakenHttpError::AuthenticationError(
815                "API credentials required for editing orders".to_string(),
816            ));
817        }
818
819        let endpoint = "/derivatives/api/v3/editorder";
820        self.send_request_with_params(endpoint, params).await
821    }
822
823    /// Submits multiple orders in a single batch request (requires authentication).
824    pub async fn batch_order(
825        &self,
826        params: HashMap<String, String>,
827    ) -> anyhow::Result<FuturesBatchOrderResponse, KrakenHttpError> {
828        if self.credential.is_none() {
829            return Err(KrakenHttpError::AuthenticationError(
830                "API credentials required for batch orders".to_string(),
831            ));
832        }
833
834        let endpoint = "/derivatives/api/v3/batchorder";
835        self.send_request_with_body(endpoint, params).await
836    }
837
838    /// Cancels multiple orders in a single batch request (requires authentication).
839    pub async fn cancel_orders_batch(
840        &self,
841        order_ids: Vec<String>,
842    ) -> anyhow::Result<FuturesBatchCancelResponse, KrakenHttpError> {
843        if self.credential.is_none() {
844            return Err(KrakenHttpError::AuthenticationError(
845                "API credentials required for batch orders".to_string(),
846            ));
847        }
848
849        let batch_items: Vec<KrakenFuturesBatchCancelItem> = order_ids
850            .into_iter()
851            .map(KrakenFuturesBatchCancelItem::from_order_id)
852            .collect();
853
854        let params = KrakenFuturesBatchOrderParams::new(batch_items);
855        let post_data = params
856            .to_body()
857            .map_err(|e| KrakenHttpError::ParseError(format!("Failed to serialize batch: {e}")))?;
858
859        let endpoint = "/derivatives/api/v3/batchorder";
860        self.send_authenticated_post(endpoint, post_data).await
861    }
862
863    /// Cancels all open orders, optionally filtered by symbol (requires authentication).
864    pub async fn cancel_all_orders(
865        &self,
866        symbol: Option<String>,
867    ) -> anyhow::Result<FuturesCancelAllOrdersResponse, KrakenHttpError> {
868        if self.credential.is_none() {
869            return Err(KrakenHttpError::AuthenticationError(
870                "API credentials required for canceling orders".to_string(),
871            ));
872        }
873
874        let mut params = HashMap::new();
875        if let Some(sym) = symbol {
876            params.insert("symbol".to_string(), sym);
877        }
878
879        let endpoint = "/derivatives/api/v3/cancelallorders";
880        self.send_request_with_body(endpoint, params).await
881    }
882}
883
884/// High-level HTTP client for the Kraken Futures REST API.
885///
886/// This client wraps the raw client and provides Nautilus domain types.
887/// It maintains an instrument cache and uses it to parse venue responses
888/// into Nautilus domain objects.
889#[cfg_attr(
890    feature = "python",
891    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.kraken")
892)]
893pub struct KrakenFuturesHttpClient {
894    pub(crate) inner: Arc<KrakenFuturesRawHttpClient>,
895    pub(crate) instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
896    cache_initialized: Arc<AtomicBool>,
897}
898
899impl Clone for KrakenFuturesHttpClient {
900    fn clone(&self) -> Self {
901        Self {
902            inner: self.inner.clone(),
903            instruments_cache: self.instruments_cache.clone(),
904            cache_initialized: self.cache_initialized.clone(),
905        }
906    }
907}
908
909impl Default for KrakenFuturesHttpClient {
910    fn default() -> Self {
911        Self::new(
912            KrakenEnvironment::Mainnet,
913            None,
914            Some(60),
915            None,
916            None,
917            None,
918            None,
919            None,
920        )
921        .expect("Failed to create default KrakenFuturesHttpClient")
922    }
923}
924
925impl Debug for KrakenFuturesHttpClient {
926    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
927        f.debug_struct(stringify!(KrakenFuturesHttpClient))
928            .field("inner", &self.inner)
929            .finish()
930    }
931}
932
933impl KrakenFuturesHttpClient {
934    /// Creates a new [`KrakenFuturesHttpClient`].
935    #[allow(clippy::too_many_arguments)]
936    pub fn new(
937        environment: KrakenEnvironment,
938        base_url_override: Option<String>,
939        timeout_secs: Option<u64>,
940        max_retries: Option<u32>,
941        retry_delay_ms: Option<u64>,
942        retry_delay_max_ms: Option<u64>,
943        proxy_url: Option<String>,
944        max_requests_per_second: Option<u32>,
945    ) -> anyhow::Result<Self> {
946        Ok(Self {
947            inner: Arc::new(KrakenFuturesRawHttpClient::new(
948                environment,
949                base_url_override,
950                timeout_secs,
951                max_retries,
952                retry_delay_ms,
953                retry_delay_max_ms,
954                proxy_url,
955                max_requests_per_second,
956            )?),
957            instruments_cache: Arc::new(DashMap::new()),
958            cache_initialized: Arc::new(AtomicBool::new(false)),
959        })
960    }
961
962    /// Creates a new [`KrakenFuturesHttpClient`] with credentials.
963    #[allow(clippy::too_many_arguments)]
964    pub fn with_credentials(
965        api_key: String,
966        api_secret: String,
967        environment: KrakenEnvironment,
968        base_url_override: Option<String>,
969        timeout_secs: Option<u64>,
970        max_retries: Option<u32>,
971        retry_delay_ms: Option<u64>,
972        retry_delay_max_ms: Option<u64>,
973        proxy_url: Option<String>,
974        max_requests_per_second: Option<u32>,
975    ) -> anyhow::Result<Self> {
976        Ok(Self {
977            inner: Arc::new(KrakenFuturesRawHttpClient::with_credentials(
978                api_key,
979                api_secret,
980                environment,
981                base_url_override,
982                timeout_secs,
983                max_retries,
984                retry_delay_ms,
985                retry_delay_max_ms,
986                proxy_url,
987                max_requests_per_second,
988            )?),
989            instruments_cache: Arc::new(DashMap::new()),
990            cache_initialized: Arc::new(AtomicBool::new(false)),
991        })
992    }
993
994    /// Creates a new [`KrakenFuturesHttpClient`] loading credentials from environment variables.
995    ///
996    /// Looks for `KRAKEN_FUTURES_API_KEY` and `KRAKEN_FUTURES_API_SECRET` (mainnet)
997    /// or `KRAKEN_FUTURES_DEMO_API_KEY` and `KRAKEN_FUTURES_DEMO_API_SECRET` (demo).
998    ///
999    /// Falls back to unauthenticated client if credentials are not set.
1000    #[allow(clippy::too_many_arguments)]
1001    pub fn from_env(
1002        environment: KrakenEnvironment,
1003        base_url_override: Option<String>,
1004        timeout_secs: Option<u64>,
1005        max_retries: Option<u32>,
1006        retry_delay_ms: Option<u64>,
1007        retry_delay_max_ms: Option<u64>,
1008        proxy_url: Option<String>,
1009        max_requests_per_second: Option<u32>,
1010    ) -> anyhow::Result<Self> {
1011        let demo = environment == KrakenEnvironment::Demo;
1012
1013        if let Some(credential) = KrakenCredential::from_env_futures(demo) {
1014            let (api_key, api_secret) = credential.into_parts();
1015            Self::with_credentials(
1016                api_key,
1017                api_secret,
1018                environment,
1019                base_url_override,
1020                timeout_secs,
1021                max_retries,
1022                retry_delay_ms,
1023                retry_delay_max_ms,
1024                proxy_url,
1025                max_requests_per_second,
1026            )
1027        } else {
1028            Self::new(
1029                environment,
1030                base_url_override,
1031                timeout_secs,
1032                max_retries,
1033                retry_delay_ms,
1034                retry_delay_max_ms,
1035                proxy_url,
1036                max_requests_per_second,
1037            )
1038        }
1039    }
1040
1041    /// Cancels all pending HTTP requests.
1042    pub fn cancel_all_requests(&self) {
1043        self.inner.cancel_all_requests();
1044    }
1045
1046    /// Returns the cancellation token for this client.
1047    pub fn cancellation_token(&self) -> &CancellationToken {
1048        self.inner.cancellation_token()
1049    }
1050
1051    /// Caches an instrument for symbol lookup.
1052    pub fn cache_instrument(&self, instrument: InstrumentAny) {
1053        self.instruments_cache
1054            .insert(instrument.symbol().inner(), instrument);
1055        self.cache_initialized.store(true, Ordering::Release);
1056    }
1057
1058    /// Caches multiple instruments for symbol lookup.
1059    pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
1060        for instrument in instruments {
1061            self.instruments_cache
1062                .insert(instrument.symbol().inner(), instrument);
1063        }
1064        self.cache_initialized.store(true, Ordering::Release);
1065    }
1066
1067    /// Gets an instrument from the cache by symbol.
1068    pub fn get_cached_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
1069        self.instruments_cache
1070            .get(symbol)
1071            .map(|entry| entry.value().clone())
1072    }
1073
1074    fn get_instrument_by_raw_symbol(&self, raw_symbol: &str) -> Option<InstrumentAny> {
1075        self.instruments_cache
1076            .iter()
1077            .find(|entry| entry.value().raw_symbol().as_str() == raw_symbol)
1078            .map(|entry| entry.value().clone())
1079    }
1080
1081    fn generate_ts_init(&self) -> UnixNanos {
1082        get_atomic_clock_realtime().get_time_ns()
1083    }
1084
1085    /// Requests tradable instruments from Kraken Futures.
1086    pub async fn request_instruments(&self) -> anyhow::Result<Vec<InstrumentAny>, KrakenHttpError> {
1087        let ts_init = self.generate_ts_init();
1088        let response = self.inner.get_instruments().await?;
1089
1090        let instruments: Vec<InstrumentAny> = response
1091            .instruments
1092            .iter()
1093            .filter_map(|fut_instrument| {
1094                match parse_futures_instrument(fut_instrument, ts_init, ts_init) {
1095                    Ok(instrument) => Some(instrument),
1096                    Err(e) => {
1097                        let symbol = &fut_instrument.symbol;
1098                        tracing::warn!("Failed to parse futures instrument {symbol}: {e}");
1099                        None
1100                    }
1101                }
1102            })
1103            .collect();
1104
1105        Ok(instruments)
1106    }
1107
1108    /// Requests the mark price for an instrument.
1109    pub async fn request_mark_price(
1110        &self,
1111        instrument_id: InstrumentId,
1112    ) -> anyhow::Result<f64, KrakenHttpError> {
1113        let instrument = self
1114            .get_cached_instrument(&instrument_id.symbol.inner())
1115            .ok_or_else(|| {
1116                KrakenHttpError::ParseError(format!(
1117                    "Instrument not found in cache: {instrument_id}"
1118                ))
1119            })?;
1120
1121        let raw_symbol = instrument.raw_symbol().to_string();
1122        let tickers = self.inner.get_tickers().await?;
1123
1124        tickers
1125            .tickers
1126            .iter()
1127            .find(|t| t.symbol == raw_symbol)
1128            .ok_or_else(|| {
1129                KrakenHttpError::ParseError(format!("Symbol {raw_symbol} not found in tickers"))
1130            })
1131            .and_then(|t| {
1132                t.mark_price.ok_or_else(|| {
1133                    KrakenHttpError::ParseError(format!(
1134                        "Mark price not available for {raw_symbol} (may not be available in testnet)"
1135                    ))
1136                })
1137            })
1138    }
1139
1140    pub async fn request_index_price(
1141        &self,
1142        instrument_id: InstrumentId,
1143    ) -> anyhow::Result<f64, KrakenHttpError> {
1144        let instrument = self
1145            .get_cached_instrument(&instrument_id.symbol.inner())
1146            .ok_or_else(|| {
1147                KrakenHttpError::ParseError(format!(
1148                    "Instrument not found in cache: {instrument_id}"
1149                ))
1150            })?;
1151
1152        let raw_symbol = instrument.raw_symbol().to_string();
1153        let tickers = self.inner.get_tickers().await?;
1154
1155        tickers
1156            .tickers
1157            .iter()
1158            .find(|t| t.symbol == raw_symbol)
1159            .ok_or_else(|| {
1160                KrakenHttpError::ParseError(format!("Symbol {raw_symbol} not found in tickers"))
1161            })
1162            .and_then(|t| {
1163                t.index_price.ok_or_else(|| {
1164                    KrakenHttpError::ParseError(format!(
1165                        "Index price not available for {raw_symbol} (may not be available in testnet)"
1166                    ))
1167                })
1168            })
1169    }
1170
1171    pub async fn request_trades(
1172        &self,
1173        instrument_id: InstrumentId,
1174        start: Option<DateTime<Utc>>,
1175        end: Option<DateTime<Utc>>,
1176        limit: Option<u64>,
1177    ) -> anyhow::Result<Vec<TradeTick>, KrakenHttpError> {
1178        let instrument = self
1179            .get_cached_instrument(&instrument_id.symbol.inner())
1180            .ok_or_else(|| {
1181                KrakenHttpError::ParseError(format!(
1182                    "Instrument not found in cache: {instrument_id}"
1183                ))
1184            })?;
1185
1186        let raw_symbol = instrument.raw_symbol().to_string();
1187        let ts_init = self.generate_ts_init();
1188
1189        let since = start.map(|dt| dt.timestamp_millis());
1190        let before = end.map(|dt| dt.timestamp_millis());
1191
1192        let response = self
1193            .inner
1194            .get_public_executions(&raw_symbol, since, before, Some("asc"), None)
1195            .await?;
1196
1197        let mut trades = Vec::new();
1198
1199        for element in &response.elements {
1200            let execution = &element.event.execution.execution;
1201            match parse_futures_public_execution(execution, &instrument, ts_init) {
1202                Ok(trade_tick) => {
1203                    trades.push(trade_tick);
1204
1205                    if let Some(limit_count) = limit
1206                        && trades.len() >= limit_count as usize
1207                    {
1208                        return Ok(trades);
1209                    }
1210                }
1211                Err(e) => {
1212                    tracing::warn!("Failed to parse futures trade tick: {e}");
1213                }
1214            }
1215        }
1216
1217        Ok(trades)
1218    }
1219
1220    pub async fn request_bars(
1221        &self,
1222        bar_type: BarType,
1223        start: Option<DateTime<Utc>>,
1224        end: Option<DateTime<Utc>>,
1225        limit: Option<u64>,
1226    ) -> anyhow::Result<Vec<Bar>, KrakenHttpError> {
1227        let instrument_id = bar_type.instrument_id();
1228        let instrument = self
1229            .get_cached_instrument(&instrument_id.symbol.inner())
1230            .ok_or_else(|| {
1231                KrakenHttpError::ParseError(format!(
1232                    "Instrument not found in cache: {instrument_id}"
1233                ))
1234            })?;
1235
1236        let raw_symbol = instrument.raw_symbol().to_string();
1237        let ts_init = self.generate_ts_init();
1238        let tick_type = "trade";
1239        let resolution = bar_type_to_futures_resolution(bar_type)
1240            .map_err(|e| KrakenHttpError::ParseError(e.to_string()))?;
1241
1242        // Kraken Futures OHLC API expects Unix timestamp in seconds
1243        let from = start.map(|dt| dt.timestamp());
1244        let to = end.map(|dt| dt.timestamp());
1245        let end_ns = end.map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64);
1246
1247        let response = self
1248            .inner
1249            .get_ohlc(tick_type, &raw_symbol, resolution, from, to)
1250            .await?;
1251
1252        let mut bars = Vec::new();
1253        for candle in response.candles {
1254            let ohlc = OhlcData {
1255                time: candle.time / 1000,
1256                open: candle.open,
1257                high: candle.high,
1258                low: candle.low,
1259                close: candle.close,
1260                vwap: "0".to_string(),
1261                volume: candle.volume,
1262                count: 0,
1263            };
1264
1265            match parse_bar(&ohlc, &instrument, bar_type, ts_init) {
1266                Ok(bar) => {
1267                    if let Some(end_nanos) = end_ns
1268                        && bar.ts_event.as_u64() > end_nanos
1269                    {
1270                        continue;
1271                    }
1272                    bars.push(bar);
1273
1274                    if let Some(limit_count) = limit
1275                        && bars.len() >= limit_count as usize
1276                    {
1277                        return Ok(bars);
1278                    }
1279                }
1280                Err(e) => {
1281                    tracing::warn!("Failed to parse futures bar: {e}");
1282                }
1283            }
1284        }
1285
1286        Ok(bars)
1287    }
1288
1289    /// Requests account state from the Kraken Futures exchange.
1290    ///
1291    /// This queries the accounts endpoint and converts the response into a
1292    /// Nautilus `AccountState` event containing balances and margin info.
1293    ///
1294    /// # Errors
1295    ///
1296    /// Returns an error if:
1297    /// - Credentials are missing.
1298    /// - The request fails.
1299    /// - Response parsing fails.
1300    pub async fn request_account_state(
1301        &self,
1302        account_id: AccountId,
1303    ) -> anyhow::Result<AccountState> {
1304        let accounts_response = self.inner.get_accounts().await?;
1305
1306        if accounts_response.result != KrakenApiResult::Success {
1307            let error_msg = accounts_response
1308                .error
1309                .unwrap_or_else(|| "Unknown error".to_string());
1310            anyhow::bail!("Failed to get futures accounts: {error_msg}");
1311        }
1312
1313        let ts_init = self.generate_ts_init();
1314
1315        let mut balances: Vec<AccountBalance> = Vec::new();
1316
1317        for account in accounts_response.accounts.values() {
1318            match account.account_type.as_str() {
1319                "multiCollateralMarginAccount" => {
1320                    for (currency_code, currency_info) in &account.currencies {
1321                        if currency_info.quantity == 0.0 {
1322                            continue;
1323                        }
1324
1325                        let currency = Currency::new(
1326                            currency_code.as_str(),
1327                            8,
1328                            0,
1329                            currency_code.as_str(),
1330                            CurrencyType::Crypto,
1331                        );
1332
1333                        let total_amount = currency_info.quantity;
1334                        let total = Money::new(total_amount, currency);
1335
1336                        // Available can exceed quantity with positive PnL, cap to satisfy invariant
1337                        let available_amount = currency_info
1338                            .available
1339                            .unwrap_or(total_amount)
1340                            .min(total_amount);
1341                        let locked_amount = (total_amount - available_amount).max(0.0);
1342                        let locked = Money::new(locked_amount, currency);
1343                        // Compute free from total - locked to guarantee the invariant holds
1344                        let free = total - locked;
1345
1346                        balances.push(AccountBalance::new(total, locked, free));
1347                    }
1348
1349                    // Add USD balance from portfolio value for margin calculations.
1350                    // Multi-collateral accounts track margin in USD even though the
1351                    // actual collateral is held in various crypto currencies.
1352                    if let Some(portfolio_value) = account.portfolio_value
1353                        && portfolio_value > 0.0
1354                    {
1355                        let usd_currency = Currency::USD();
1356                        let total_usd = Money::new(portfolio_value, usd_currency);
1357                        let available_usd = account
1358                            .available_margin
1359                            .unwrap_or(portfolio_value)
1360                            .min(portfolio_value);
1361                        // Compute locked = total - available to guarantee the invariant holds
1362                        let locked_usd =
1363                            Money::new((portfolio_value - available_usd).max(0.0), usd_currency);
1364                        let free_usd = total_usd - locked_usd;
1365
1366                        balances.push(AccountBalance::new(total_usd, locked_usd, free_usd));
1367                    }
1368                }
1369                "marginAccount" => {
1370                    for (currency_code, &amount) in &account.balances {
1371                        if amount == 0.0 {
1372                            continue;
1373                        }
1374
1375                        let currency = Currency::new(
1376                            currency_code.as_str(),
1377                            8,
1378                            0,
1379                            currency_code.as_str(),
1380                            CurrencyType::Crypto,
1381                        );
1382
1383                        let total = Money::new(amount, currency);
1384
1385                        // Available can exceed balance with positive PnL, cap to satisfy invariant
1386                        let available = account
1387                            .auxiliary
1388                            .as_ref()
1389                            .and_then(|aux| aux.af)
1390                            .unwrap_or(amount)
1391                            .min(amount);
1392                        let locked = amount - available;
1393
1394                        balances.push(AccountBalance::new(
1395                            total,
1396                            Money::new(locked, currency),
1397                            Money::new(available, currency),
1398                        ));
1399                    }
1400                }
1401                "cashAccount" => {
1402                    for (currency_code, &amount) in &account.balances {
1403                        if amount == 0.0 {
1404                            continue;
1405                        }
1406
1407                        let currency = Currency::new(
1408                            currency_code.as_str(),
1409                            8,
1410                            0,
1411                            currency_code.as_str(),
1412                            CurrencyType::Crypto,
1413                        );
1414
1415                        let total = Money::new(amount, currency);
1416                        let locked = Money::new(0.0, currency);
1417
1418                        balances.push(AccountBalance::new(total, locked, total));
1419                    }
1420                }
1421                _ => {
1422                    let account_type = &account.account_type;
1423                    tracing::debug!("Unknown account type: {account_type}");
1424                }
1425            }
1426        }
1427
1428        Ok(AccountState::new(
1429            account_id,
1430            AccountType::Margin,
1431            balances,
1432            vec![],
1433            true,
1434            UUID4::new(),
1435            ts_init,
1436            ts_init,
1437            None,
1438        ))
1439    }
1440
1441    pub async fn request_order_status_reports(
1442        &self,
1443        account_id: AccountId,
1444        instrument_id: Option<InstrumentId>,
1445        start: Option<DateTime<Utc>>,
1446        end: Option<DateTime<Utc>>,
1447        open_only: bool,
1448    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1449        let ts_init = self.generate_ts_init();
1450        let mut all_reports = Vec::new();
1451
1452        let response = self
1453            .inner
1454            .get_open_orders()
1455            .await
1456            .map_err(|e| anyhow::anyhow!("get_open_orders failed: {e}"))?;
1457        if response.result != KrakenApiResult::Success {
1458            let error_msg = response
1459                .error
1460                .unwrap_or_else(|| "Unknown error".to_string());
1461            anyhow::bail!("Failed to get open orders: {error_msg}");
1462        }
1463
1464        for order in &response.open_orders {
1465            if let Some(ref target_id) = instrument_id {
1466                let instrument = self.get_cached_instrument(&target_id.symbol.inner());
1467                if let Some(inst) = instrument
1468                    && inst.raw_symbol().as_str() != order.symbol
1469                {
1470                    continue;
1471                }
1472            }
1473
1474            if let Some(instrument) = self.get_instrument_by_raw_symbol(&order.symbol) {
1475                match parse_futures_order_status_report(order, &instrument, account_id, ts_init) {
1476                    Ok(report) => all_reports.push(report),
1477                    Err(e) => {
1478                        let order_id = &order.order_id;
1479                        tracing::warn!("Failed to parse futures order {order_id}: {e}");
1480                    }
1481                }
1482            }
1483        }
1484
1485        if !open_only {
1486            // Kraken Futures order events API expects Unix timestamp in milliseconds
1487            let start_ms = start.map(|dt| dt.timestamp_millis());
1488            let end_ms = end.map(|dt| dt.timestamp_millis());
1489            let response = self
1490                .inner
1491                .get_order_events(end_ms, start_ms, None)
1492                .await
1493                .map_err(|e| anyhow::anyhow!("get_order_events failed: {e}"))?;
1494
1495            for event_wrapper in response.order_events {
1496                let event = &event_wrapper.order;
1497                if let Some(ref target_id) = instrument_id {
1498                    let instrument = self.get_cached_instrument(&target_id.symbol.inner());
1499                    if let Some(inst) = instrument
1500                        && inst.raw_symbol().as_str() != event.symbol
1501                    {
1502                        continue;
1503                    }
1504                }
1505
1506                if let Some(instrument) = self.get_instrument_by_raw_symbol(&event.symbol) {
1507                    match parse_futures_order_event_status_report(
1508                        event,
1509                        &instrument,
1510                        account_id,
1511                        ts_init,
1512                    ) {
1513                        Ok(report) => all_reports.push(report),
1514                        Err(e) => {
1515                            let order_id = &event.order_id;
1516                            tracing::warn!("Failed to parse futures order event {order_id}: {e}");
1517                        }
1518                    }
1519                }
1520            }
1521        }
1522
1523        Ok(all_reports)
1524    }
1525
1526    pub async fn request_fill_reports(
1527        &self,
1528        account_id: AccountId,
1529        instrument_id: Option<InstrumentId>,
1530        start: Option<DateTime<Utc>>,
1531        end: Option<DateTime<Utc>>,
1532    ) -> anyhow::Result<Vec<FillReport>> {
1533        let ts_init = self.generate_ts_init();
1534        let mut all_reports = Vec::new();
1535
1536        let response = self.inner.get_fills(None).await?;
1537        if response.result != KrakenApiResult::Success {
1538            let error_msg = response
1539                .error
1540                .unwrap_or_else(|| "Unknown error".to_string());
1541            anyhow::bail!("Failed to get fills: {error_msg}");
1542        }
1543
1544        let start_ms = start.map(|dt| dt.timestamp_millis());
1545        let end_ms = end.map(|dt| dt.timestamp_millis());
1546
1547        for fill in response.fills {
1548            if let Some(start_threshold) = start_ms
1549                && let Ok(fill_ts) = DateTime::parse_from_rfc3339(&fill.fill_time)
1550            {
1551                let fill_ms = fill_ts.timestamp_millis();
1552                if fill_ms < start_threshold {
1553                    continue;
1554                }
1555            }
1556            if let Some(end_threshold) = end_ms
1557                && let Ok(fill_ts) = DateTime::parse_from_rfc3339(&fill.fill_time)
1558            {
1559                let fill_ms = fill_ts.timestamp_millis();
1560                if fill_ms > end_threshold {
1561                    continue;
1562                }
1563            }
1564
1565            if let Some(ref target_id) = instrument_id {
1566                let instrument = self.get_cached_instrument(&target_id.symbol.inner());
1567                if let Some(inst) = instrument
1568                    && inst.raw_symbol().as_str() != fill.symbol
1569                {
1570                    continue;
1571                }
1572            }
1573
1574            if let Some(instrument) = self.get_instrument_by_raw_symbol(&fill.symbol) {
1575                match parse_futures_fill_report(&fill, &instrument, account_id, ts_init) {
1576                    Ok(report) => all_reports.push(report),
1577                    Err(e) => {
1578                        let fill_id = &fill.fill_id;
1579                        tracing::warn!("Failed to parse futures fill {fill_id}: {e}");
1580                    }
1581                }
1582            }
1583        }
1584
1585        Ok(all_reports)
1586    }
1587
1588    pub async fn request_position_status_reports(
1589        &self,
1590        account_id: AccountId,
1591        instrument_id: Option<InstrumentId>,
1592    ) -> anyhow::Result<Vec<PositionStatusReport>> {
1593        let ts_init = self.generate_ts_init();
1594        let mut all_reports = Vec::new();
1595
1596        let response = self.inner.get_open_positions().await?;
1597        if response.result != KrakenApiResult::Success {
1598            let error_msg = response
1599                .error
1600                .unwrap_or_else(|| "Unknown error".to_string());
1601            anyhow::bail!("Failed to get open positions: {error_msg}");
1602        }
1603
1604        for position in response.open_positions {
1605            if let Some(ref target_id) = instrument_id {
1606                let instrument = self.get_cached_instrument(&target_id.symbol.inner());
1607                if let Some(inst) = instrument
1608                    && inst.raw_symbol().as_str() != position.symbol
1609                {
1610                    continue;
1611                }
1612            }
1613
1614            if let Some(instrument) = self.get_instrument_by_raw_symbol(&position.symbol) {
1615                match parse_futures_position_status_report(
1616                    &position,
1617                    &instrument,
1618                    account_id,
1619                    ts_init,
1620                ) {
1621                    Ok(report) => all_reports.push(report),
1622                    Err(e) => {
1623                        let symbol = &position.symbol;
1624                        tracing::warn!("Failed to parse futures position {symbol}: {e}");
1625                    }
1626                }
1627            }
1628        }
1629
1630        Ok(all_reports)
1631    }
1632
1633    /// Submits a new order to the Kraken Futures exchange.
1634    ///
1635    /// # Errors
1636    ///
1637    /// Returns an error if:
1638    /// - Credentials are missing.
1639    /// - The instrument is not found in cache.
1640    /// - The order type or time in force is not supported.
1641    /// - The request fails.
1642    /// - The order is rejected.
1643    #[allow(clippy::too_many_arguments)]
1644    pub async fn submit_order(
1645        &self,
1646        account_id: AccountId,
1647        instrument_id: InstrumentId,
1648        client_order_id: ClientOrderId,
1649        order_side: OrderSide,
1650        order_type: OrderType,
1651        quantity: Quantity,
1652        time_in_force: TimeInForce,
1653        price: Option<Price>,
1654        trigger_price: Option<Price>,
1655        reduce_only: bool,
1656        post_only: bool,
1657    ) -> anyhow::Result<OrderStatusReport> {
1658        let instrument = self
1659            .get_cached_instrument(&instrument_id.symbol.inner())
1660            .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {instrument_id}"))?;
1661
1662        let raw_symbol = instrument.raw_symbol().inner();
1663
1664        // Map order type and time-in-force to Kraken order type
1665        // Kraken Futures encodes TIF in the orderType field:
1666        // - lmt = limit (GTC)
1667        // - ioc = immediate-or-cancel
1668        // - post = post-only (maker only)
1669        // - mkt = market
1670        let kraken_order_type = match order_type {
1671            OrderType::Market => KrakenFuturesOrderType::Market,
1672            OrderType::Limit => {
1673                if post_only {
1674                    KrakenFuturesOrderType::Post
1675                } else {
1676                    match time_in_force {
1677                        TimeInForce::Ioc => KrakenFuturesOrderType::Ioc,
1678                        TimeInForce::Fok => {
1679                            anyhow::bail!("FOK not supported by Kraken Futures, use IOC instead")
1680                        }
1681                        TimeInForce::Gtd => {
1682                            anyhow::bail!("GTD not supported by Kraken Futures, use GTC instead")
1683                        }
1684                        _ => KrakenFuturesOrderType::Limit, // GTC is default
1685                    }
1686                }
1687            }
1688            OrderType::StopMarket | OrderType::StopLimit => KrakenFuturesOrderType::Stop,
1689            OrderType::MarketIfTouched => KrakenFuturesOrderType::TakeProfit,
1690            _ => anyhow::bail!("Unsupported order type: {order_type:?}"),
1691        };
1692
1693        let mut builder = KrakenFuturesSendOrderParamsBuilder::default();
1694        builder
1695            .cli_ord_id(client_order_id.to_string())
1696            .broker(NAUTILUS_KRAKEN_BROKER_ID)
1697            .symbol(raw_symbol)
1698            .side(KrakenOrderSide::from(order_side))
1699            .size(quantity.to_string())
1700            .order_type(kraken_order_type);
1701
1702        // Handle prices based on order type
1703        match order_type {
1704            OrderType::StopMarket => {
1705                // Stop market orders need stop_price (trigger price)
1706                if let Some(trigger) = trigger_price {
1707                    builder.stop_price(trigger.to_string());
1708                }
1709            }
1710            OrderType::StopLimit => {
1711                // Stop limit orders need both stop_price and limit_price
1712                if let Some(trigger) = trigger_price {
1713                    builder.stop_price(trigger.to_string());
1714                }
1715                if let Some(limit) = price {
1716                    builder.limit_price(limit.to_string());
1717                }
1718            }
1719            OrderType::MarketIfTouched => {
1720                // Take-profit orders need stop_price (trigger price) and optionally limit_price
1721                if let Some(trigger) = trigger_price {
1722                    builder.stop_price(trigger.to_string());
1723                }
1724                if let Some(limit) = price {
1725                    builder.limit_price(limit.to_string());
1726                }
1727            }
1728            _ => {
1729                // Regular orders just use limit_price
1730                if let Some(limit) = price {
1731                    builder.limit_price(limit.to_string());
1732                }
1733            }
1734        }
1735
1736        if reduce_only {
1737            builder.reduce_only(true);
1738        }
1739
1740        let params = builder
1741            .build()
1742            .map_err(|e| anyhow::anyhow!("Failed to build order params: {e}"))?;
1743
1744        let response = self.inner.send_order_params(&params).await?;
1745
1746        if response.result != KrakenApiResult::Success {
1747            let error_msg = response
1748                .error
1749                .unwrap_or_else(|| "Unknown error".to_string());
1750            anyhow::bail!("Order submission failed: {error_msg}");
1751        }
1752
1753        let send_status = response
1754            .send_status
1755            .ok_or_else(|| anyhow::anyhow!("No send_status in successful response"))?;
1756
1757        let status = &send_status.status;
1758
1759        // Check for post-only rejection (Kraken returns status="postWouldExecute")
1760        if status == "postWouldExecute" {
1761            let reason = send_status
1762                .order_events
1763                .as_ref()
1764                .and_then(|events| events.first())
1765                .and_then(|e| e.reason.clone())
1766                .unwrap_or_else(|| "Post-only order would have crossed".to_string());
1767            anyhow::bail!("POST_ONLY_REJECTED: {reason}");
1768        }
1769
1770        let venue_order_id = send_status
1771            .order_id
1772            .ok_or_else(|| anyhow::anyhow!("No order_id in send_status: {status}"))?;
1773
1774        let ts_init = self.generate_ts_init();
1775
1776        let open_orders_response = self.inner.get_open_orders().await?;
1777        if let Some(order) = open_orders_response
1778            .open_orders
1779            .iter()
1780            .find(|o| o.order_id == venue_order_id)
1781        {
1782            return parse_futures_order_status_report(order, &instrument, account_id, ts_init);
1783        }
1784
1785        // Order not in open orders - may have filled immediately (market order or aggressive limit)
1786        // Try to use order_events from send_status first
1787        if let Some(order_events) = &send_status.order_events
1788            && let Some(send_event) = order_events.first()
1789        {
1790            // Handle regular orders, trigger orders, and execution events
1791            let event = if let Some(order_data) = &send_event.order {
1792                FuturesOrderEvent {
1793                    order_id: order_data.order_id.clone(),
1794                    cli_ord_id: order_data.cli_ord_id.clone(),
1795                    order_type: order_data.order_type,
1796                    symbol: order_data.symbol.clone(),
1797                    side: order_data.side,
1798                    quantity: order_data.quantity,
1799                    filled: order_data.filled,
1800                    limit_price: order_data.limit_price,
1801                    stop_price: order_data.stop_price,
1802                    timestamp: order_data.timestamp.clone(),
1803                    last_update_timestamp: order_data.last_update_timestamp.clone(),
1804                    reduce_only: order_data.reduce_only,
1805                }
1806            } else if let Some(trigger_data) = &send_event.order_trigger {
1807                FuturesOrderEvent {
1808                    order_id: trigger_data.uid.clone(),
1809                    cli_ord_id: trigger_data.client_id.clone(),
1810                    order_type: trigger_data.order_type,
1811                    symbol: trigger_data.symbol.clone(),
1812                    side: trigger_data.side,
1813                    quantity: trigger_data.quantity,
1814                    filled: 0.0,
1815                    limit_price: trigger_data.limit_price,
1816                    stop_price: Some(trigger_data.trigger_price),
1817                    timestamp: trigger_data.timestamp.clone(),
1818                    last_update_timestamp: trigger_data.last_update_timestamp.clone(),
1819                    reduce_only: trigger_data.reduce_only,
1820                }
1821            } else if let Some(prior_exec) = &send_event.order_prior_execution {
1822                // EXECUTION event - use orderPriorExecution data
1823                FuturesOrderEvent {
1824                    order_id: prior_exec.order_id.clone(),
1825                    cli_ord_id: prior_exec.cli_ord_id.clone(),
1826                    order_type: prior_exec.order_type,
1827                    symbol: prior_exec.symbol.clone(),
1828                    side: prior_exec.side,
1829                    quantity: prior_exec.quantity,
1830                    filled: send_event.amount.unwrap_or(prior_exec.quantity), // Use execution amount
1831                    limit_price: prior_exec.limit_price,
1832                    stop_price: prior_exec.stop_price,
1833                    timestamp: prior_exec.timestamp.clone(),
1834                    last_update_timestamp: prior_exec.last_update_timestamp.clone(),
1835                    reduce_only: prior_exec.reduce_only,
1836                }
1837            } else {
1838                anyhow::bail!("No order, orderTrigger, or orderPriorExecution data in event");
1839            };
1840            return parse_futures_order_event_status_report(
1841                &event,
1842                &instrument,
1843                account_id,
1844                ts_init,
1845            );
1846        }
1847
1848        // Fall back to querying order events
1849        let events_response = self.inner.get_order_events(None, None, None).await?;
1850        let event_wrapper = events_response
1851            .order_events
1852            .iter()
1853            .find(|e| e.order.order_id == venue_order_id)
1854            .ok_or_else(|| {
1855                anyhow::anyhow!("Order not found in open orders or events: {venue_order_id}")
1856            })?;
1857
1858        parse_futures_order_event_status_report(
1859            &event_wrapper.order,
1860            &instrument,
1861            account_id,
1862            ts_init,
1863        )
1864    }
1865
1866    /// Modifies an existing order on the Kraken Futures exchange.
1867    ///
1868    /// Returns the new venue order ID assigned to the modified order.
1869    ///
1870    /// # Errors
1871    ///
1872    /// Returns an error if:
1873    /// - Neither `client_order_id` nor `venue_order_id` is provided.
1874    /// - The instrument is not found in cache.
1875    /// - The request fails.
1876    /// - The edit fails on the exchange.
1877    pub async fn modify_order(
1878        &self,
1879        instrument_id: InstrumentId,
1880        client_order_id: Option<ClientOrderId>,
1881        venue_order_id: Option<VenueOrderId>,
1882        quantity: Option<Quantity>,
1883        price: Option<Price>,
1884        trigger_price: Option<Price>,
1885    ) -> anyhow::Result<VenueOrderId> {
1886        let _ = self
1887            .get_cached_instrument(&instrument_id.symbol.inner())
1888            .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {instrument_id}"))?;
1889
1890        let order_id = venue_order_id.as_ref().map(|id| id.to_string());
1891        let cli_ord_id = client_order_id.as_ref().map(|id| id.to_string());
1892
1893        if order_id.is_none() && cli_ord_id.is_none() {
1894            anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1895        }
1896
1897        let mut builder = KrakenFuturesEditOrderParamsBuilder::default();
1898
1899        if let Some(ref id) = order_id {
1900            builder.order_id(id.clone());
1901        }
1902        if let Some(ref id) = cli_ord_id {
1903            builder.cli_ord_id(id.clone());
1904        }
1905        if let Some(qty) = quantity {
1906            builder.size(qty.to_string());
1907        }
1908        if let Some(p) = price {
1909            builder.limit_price(p.to_string());
1910        }
1911        if let Some(tp) = trigger_price {
1912            builder.stop_price(tp.to_string());
1913        }
1914
1915        let params = builder
1916            .build()
1917            .map_err(|e| anyhow::anyhow!("Failed to build edit order params: {e}"))?;
1918
1919        let response = self.inner.edit_order(&params).await?;
1920
1921        if response.result != KrakenApiResult::Success {
1922            let status = &response.edit_status.status;
1923            anyhow::bail!("Order modification failed: {status}");
1924        }
1925
1926        // Return the new order_id from the response, or fall back to the original
1927        let new_venue_order_id = response
1928            .edit_status
1929            .order_id
1930            .or(order_id)
1931            .ok_or_else(|| anyhow::anyhow!("No order ID in edit order response"))?;
1932
1933        Ok(VenueOrderId::new(&new_venue_order_id))
1934    }
1935
1936    /// Cancels an order on the Kraken Futures exchange.
1937    ///
1938    /// # Errors
1939    ///
1940    /// Returns an error if:
1941    /// - Credentials are missing.
1942    /// - Neither client_order_id nor venue_order_id is provided.
1943    /// - The request fails.
1944    /// - The order cancellation is rejected.
1945    pub async fn cancel_order(
1946        &self,
1947        _account_id: AccountId,
1948        instrument_id: InstrumentId,
1949        client_order_id: Option<ClientOrderId>,
1950        venue_order_id: Option<VenueOrderId>,
1951    ) -> anyhow::Result<()> {
1952        let _ = self
1953            .get_cached_instrument(&instrument_id.symbol.inner())
1954            .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {instrument_id}"))?;
1955
1956        let order_id = venue_order_id.as_ref().map(|id| id.to_string());
1957        let cli_ord_id = client_order_id.as_ref().map(|id| id.to_string());
1958
1959        if order_id.is_none() && cli_ord_id.is_none() {
1960            anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1961        }
1962
1963        let response = self.inner.cancel_order(order_id, cli_ord_id).await?;
1964
1965        if response.result != KrakenApiResult::Success {
1966            let status = &response.cancel_status.status;
1967            anyhow::bail!("Order cancellation failed: {status}");
1968        }
1969
1970        Ok(())
1971    }
1972
1973    /// Cancels multiple orders on the Kraken Futures exchange.
1974    ///
1975    /// Automatically chunks requests into batches of 50 orders.
1976    ///
1977    /// # Parameters
1978    /// - `venue_order_ids` - List of venue order IDs to cancel.
1979    ///
1980    /// # Returns
1981    /// The total number of successfully cancelled orders.
1982    pub async fn cancel_orders_batch(
1983        &self,
1984        venue_order_ids: Vec<VenueOrderId>,
1985    ) -> anyhow::Result<usize> {
1986        if venue_order_ids.is_empty() {
1987            return Ok(0);
1988        }
1989
1990        let mut total_cancelled = 0;
1991
1992        for chunk in venue_order_ids.chunks(BATCH_CANCEL_LIMIT) {
1993            let order_ids: Vec<String> = chunk.iter().map(|id| id.to_string()).collect();
1994            let response = self.inner.cancel_orders_batch(order_ids).await?;
1995
1996            if response.result != KrakenApiResult::Success {
1997                let error_msg = response.error.as_deref().unwrap_or("Unknown error");
1998                anyhow::bail!("Batch cancel failed: {error_msg}");
1999            }
2000
2001            let success_count = response
2002                .batch_status
2003                .iter()
2004                .filter(|s| {
2005                    s.status == Some(KrakenSendStatus::Cancelled)
2006                        || s.cancel_status
2007                            .as_ref()
2008                            .is_some_and(|cs| cs.status == KrakenSendStatus::Cancelled)
2009                })
2010                .count();
2011
2012            total_cancelled += success_count;
2013        }
2014
2015        Ok(total_cancelled)
2016    }
2017}
2018
2019#[cfg(test)]
2020mod tests {
2021    use rstest::rstest;
2022
2023    use super::*;
2024
2025    #[rstest]
2026    fn test_raw_client_creation() {
2027        let client = KrakenFuturesRawHttpClient::default();
2028        assert!(client.credential.is_none());
2029        assert!(client.base_url().contains("futures"));
2030    }
2031
2032    #[rstest]
2033    fn test_raw_client_with_credentials() {
2034        let client = KrakenFuturesRawHttpClient::with_credentials(
2035            "test_key".to_string(),
2036            "test_secret".to_string(),
2037            KrakenEnvironment::Mainnet,
2038            None,
2039            None,
2040            None,
2041            None,
2042            None,
2043            None,
2044            None,
2045        )
2046        .unwrap();
2047        assert!(client.credential.is_some());
2048    }
2049
2050    #[rstest]
2051    fn test_client_creation() {
2052        let client = KrakenFuturesHttpClient::default();
2053        assert!(client.instruments_cache.is_empty());
2054    }
2055
2056    #[rstest]
2057    fn test_client_with_credentials() {
2058        let client = KrakenFuturesHttpClient::with_credentials(
2059            "test_key".to_string(),
2060            "test_secret".to_string(),
2061            KrakenEnvironment::Mainnet,
2062            None,
2063            None,
2064            None,
2065            None,
2066            None,
2067            None,
2068            None,
2069        )
2070        .unwrap();
2071        assert!(client.instruments_cache.is_empty());
2072    }
2073}