nautilus_kraken/http/futures/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! HTTP client for the Kraken Futures REST API.
17
18use std::{
19    collections::HashMap,
20    fmt::{Debug, Formatter},
21    num::NonZeroU32,
22    sync::{
23        Arc,
24        atomic::{AtomicBool, Ordering},
25    },
26};
27
28use chrono::{DateTime, Utc};
29use dashmap::DashMap;
30use nautilus_core::{
31    AtomicTime, UUID4, consts::NAUTILUS_USER_AGENT, nanos::UnixNanos,
32    time::get_atomic_clock_realtime,
33};
34use nautilus_model::{
35    data::{Bar, BarType, TradeTick},
36    enums::{AccountType, CurrencyType, OrderSide, OrderType, TimeInForce},
37    events::AccountState,
38    identifiers::{AccountId, ClientOrderId, InstrumentId, VenueOrderId},
39    instruments::{Instrument, InstrumentAny},
40    reports::{FillReport, OrderStatusReport, PositionStatusReport},
41    types::{AccountBalance, Currency, Money, Price, Quantity},
42};
43use nautilus_network::{
44    http::{HttpClient, Method, USER_AGENT},
45    ratelimiter::quota::Quota,
46    retry::{RetryConfig, RetryManager},
47};
48use serde::de::DeserializeOwned;
49use tokio_util::sync::CancellationToken;
50use ustr::Ustr;
51
52use super::{models::*, query::*};
53use crate::{
54    common::{
55        consts::NAUTILUS_KRAKEN_BROKER_ID,
56        credential::KrakenCredential,
57        enums::{
58            KrakenApiResult, KrakenEnvironment, KrakenFuturesOrderType, KrakenOrderSide,
59            KrakenProductType, KrakenSendStatus,
60        },
61        parse::{
62            bar_type_to_futures_resolution, parse_bar, parse_futures_fill_report,
63            parse_futures_instrument, parse_futures_order_event_status_report,
64            parse_futures_order_status_report, parse_futures_position_status_report,
65            parse_futures_public_execution,
66        },
67        urls::get_kraken_http_base_url,
68    },
69    http::{error::KrakenHttpError, models::OhlcData},
70};
71
72/// Default Kraken Futures REST API rate limit (requests per second).
73pub const KRAKEN_FUTURES_DEFAULT_RATE_LIMIT_PER_SECOND: u32 = 5;
74
75const KRAKEN_GLOBAL_RATE_KEY: &str = "kraken:futures:global";
76
77/// Maximum orders per batch cancel request for Kraken Futures API.
78const BATCH_CANCEL_LIMIT: usize = 50;
79
80/// Raw HTTP client for low-level Kraken Futures API operations.
81///
82/// This client handles request/response operations with the Kraken Futures API,
83/// returning venue-specific response types. It does not parse to Nautilus domain types.
84pub struct KrakenFuturesRawHttpClient {
85    base_url: String,
86    client: HttpClient,
87    credential: Option<KrakenCredential>,
88    retry_manager: RetryManager<KrakenHttpError>,
89    cancellation_token: CancellationToken,
90    clock: &'static AtomicTime,
91    /// Mutex to serialize authenticated requests, ensuring nonces arrive at Kraken in order
92    auth_mutex: tokio::sync::Mutex<()>,
93}
94
95impl Default for KrakenFuturesRawHttpClient {
96    fn default() -> Self {
97        Self::new(
98            KrakenEnvironment::Mainnet,
99            None,
100            Some(60),
101            None,
102            None,
103            None,
104            None,
105            None,
106        )
107        .expect("Failed to create default KrakenFuturesRawHttpClient")
108    }
109}
110
111impl Debug for KrakenFuturesRawHttpClient {
112    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
113        f.debug_struct(stringify!(KrakenFuturesRawHttpClient))
114            .field("base_url", &self.base_url)
115            .field("has_credentials", &self.credential.is_some())
116            .finish()
117    }
118}
119
120impl KrakenFuturesRawHttpClient {
121    /// Creates a new [`KrakenFuturesRawHttpClient`].
122    #[allow(clippy::too_many_arguments)]
123    pub fn new(
124        environment: KrakenEnvironment,
125        base_url_override: Option<String>,
126        timeout_secs: Option<u64>,
127        max_retries: Option<u32>,
128        retry_delay_ms: Option<u64>,
129        retry_delay_max_ms: Option<u64>,
130        proxy_url: Option<String>,
131        max_requests_per_second: Option<u32>,
132    ) -> anyhow::Result<Self> {
133        let retry_config = RetryConfig {
134            max_retries: max_retries.unwrap_or(3),
135            initial_delay_ms: retry_delay_ms.unwrap_or(1000),
136            max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
137            backoff_factor: 2.0,
138            jitter_ms: 1000,
139            operation_timeout_ms: Some(60_000),
140            immediate_first: false,
141            max_elapsed_ms: Some(180_000),
142        };
143
144        let retry_manager = RetryManager::new(retry_config);
145        let base_url = base_url_override.unwrap_or_else(|| {
146            get_kraken_http_base_url(KrakenProductType::Futures, environment).to_string()
147        });
148
149        let rate_limit =
150            max_requests_per_second.unwrap_or(KRAKEN_FUTURES_DEFAULT_RATE_LIMIT_PER_SECOND);
151
152        Ok(Self {
153            base_url,
154            client: HttpClient::new(
155                Self::default_headers(),
156                vec![],
157                Self::rate_limiter_quotas(rate_limit),
158                Some(Self::default_quota(rate_limit)),
159                timeout_secs,
160                proxy_url,
161            )
162            .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))?,
163            credential: None,
164            retry_manager,
165            cancellation_token: CancellationToken::new(),
166            clock: get_atomic_clock_realtime(),
167            auth_mutex: tokio::sync::Mutex::new(()),
168        })
169    }
170
171    /// Creates a new [`KrakenFuturesRawHttpClient`] with credentials.
172    #[allow(clippy::too_many_arguments)]
173    pub fn with_credentials(
174        api_key: String,
175        api_secret: String,
176        environment: KrakenEnvironment,
177        base_url_override: Option<String>,
178        timeout_secs: Option<u64>,
179        max_retries: Option<u32>,
180        retry_delay_ms: Option<u64>,
181        retry_delay_max_ms: Option<u64>,
182        proxy_url: Option<String>,
183        max_requests_per_second: Option<u32>,
184    ) -> anyhow::Result<Self> {
185        let retry_config = RetryConfig {
186            max_retries: max_retries.unwrap_or(3),
187            initial_delay_ms: retry_delay_ms.unwrap_or(1000),
188            max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
189            backoff_factor: 2.0,
190            jitter_ms: 1000,
191            operation_timeout_ms: Some(60_000),
192            immediate_first: false,
193            max_elapsed_ms: Some(180_000),
194        };
195
196        let retry_manager = RetryManager::new(retry_config);
197        let base_url = base_url_override.unwrap_or_else(|| {
198            get_kraken_http_base_url(KrakenProductType::Futures, environment).to_string()
199        });
200
201        let rate_limit =
202            max_requests_per_second.unwrap_or(KRAKEN_FUTURES_DEFAULT_RATE_LIMIT_PER_SECOND);
203
204        Ok(Self {
205            base_url,
206            client: HttpClient::new(
207                Self::default_headers(),
208                vec![],
209                Self::rate_limiter_quotas(rate_limit),
210                Some(Self::default_quota(rate_limit)),
211                timeout_secs,
212                proxy_url,
213            )
214            .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))?,
215            credential: Some(KrakenCredential::new(api_key, api_secret)),
216            retry_manager,
217            cancellation_token: CancellationToken::new(),
218            clock: get_atomic_clock_realtime(),
219            auth_mutex: tokio::sync::Mutex::new(()),
220        })
221    }
222
223    /// Generates a unique nonce for Kraken Futures API requests.
224    ///
225    /// Uses `AtomicTime` for strict monotonicity. The nanosecond timestamp
226    /// guarantees uniqueness even for rapid consecutive calls.
227    fn generate_nonce(&self) -> u64 {
228        self.clock.get_time_ns().as_u64()
229    }
230
231    /// Returns the base URL for this client.
232    pub fn base_url(&self) -> &str {
233        &self.base_url
234    }
235
236    /// Returns the credential for this client, if set.
237    pub fn credential(&self) -> Option<&KrakenCredential> {
238        self.credential.as_ref()
239    }
240
241    /// Cancels all pending HTTP requests.
242    pub fn cancel_all_requests(&self) {
243        self.cancellation_token.cancel();
244    }
245
246    /// Returns the cancellation token for this client.
247    pub fn cancellation_token(&self) -> &CancellationToken {
248        &self.cancellation_token
249    }
250
251    fn default_headers() -> HashMap<String, String> {
252        HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())])
253    }
254
255    fn default_quota(max_requests_per_second: u32) -> Quota {
256        Quota::per_second(NonZeroU32::new(max_requests_per_second).unwrap_or_else(|| {
257            NonZeroU32::new(KRAKEN_FUTURES_DEFAULT_RATE_LIMIT_PER_SECOND).unwrap()
258        }))
259    }
260
261    fn rate_limiter_quotas(max_requests_per_second: u32) -> Vec<(String, Quota)> {
262        vec![(
263            KRAKEN_GLOBAL_RATE_KEY.to_string(),
264            Self::default_quota(max_requests_per_second),
265        )]
266    }
267
268    fn rate_limit_keys(endpoint: &str) -> Vec<String> {
269        let normalized = endpoint.split('?').next().unwrap_or(endpoint);
270        let route = format!("kraken:futures:{normalized}");
271        vec![KRAKEN_GLOBAL_RATE_KEY.to_string(), route]
272    }
273
274    async fn send_request<T: DeserializeOwned>(
275        &self,
276        method: Method,
277        endpoint: &str,
278        url: String,
279        authenticate: bool,
280    ) -> anyhow::Result<T, KrakenHttpError> {
281        // Serialize authenticated requests to ensure nonces arrive at Kraken in order.
282        // Without this, concurrent requests can race through the network and arrive
283        // out-of-order, causing "Invalid nonce" errors.
284        let _guard = if authenticate {
285            Some(self.auth_mutex.lock().await)
286        } else {
287            None
288        };
289
290        let endpoint = endpoint.to_string();
291        let method_clone = method.clone();
292        let url_clone = url.clone();
293        let credential = self.credential.clone();
294
295        let operation = || {
296            let url = url_clone.clone();
297            let method = method_clone.clone();
298            let endpoint = endpoint.clone();
299            let credential = credential.clone();
300
301            async move {
302                let mut headers = Self::default_headers();
303
304                if authenticate {
305                    let cred = credential.as_ref().ok_or_else(|| {
306                        KrakenHttpError::AuthenticationError(
307                            "Missing credentials for authenticated request".to_string(),
308                        )
309                    })?;
310
311                    let nonce = self.generate_nonce();
312
313                    let signature = cred.sign_futures(&endpoint, "", nonce).map_err(|e| {
314                        KrakenHttpError::AuthenticationError(format!("Failed to sign request: {e}"))
315                    })?;
316
317                    let base_url = &self.base_url;
318                    tracing::debug!(
319                        "Kraken Futures auth: endpoint={endpoint}, nonce={nonce}, base_url={base_url}"
320                    );
321
322                    headers.insert("APIKey".to_string(), cred.api_key().to_string());
323                    headers.insert("Authent".to_string(), signature);
324                    headers.insert("Nonce".to_string(), nonce.to_string());
325                }
326
327                let rate_limit_keys = Self::rate_limit_keys(&endpoint);
328
329                let response = self
330                    .client
331                    .request(
332                        method,
333                        url,
334                        None,
335                        Some(headers),
336                        None,
337                        None,
338                        Some(rate_limit_keys),
339                    )
340                    .await
341                    .map_err(|e| KrakenHttpError::NetworkError(e.to_string()))?;
342
343                let status = response.status.as_u16();
344                if status >= 400 {
345                    let body = String::from_utf8_lossy(&response.body).to_string();
346                    // Don't retry authentication errors
347                    if status == 401 || status == 403 {
348                        return Err(KrakenHttpError::AuthenticationError(format!(
349                            "HTTP error {status}: {body}"
350                        )));
351                    }
352                    return Err(KrakenHttpError::NetworkError(format!(
353                        "HTTP error {status}: {body}"
354                    )));
355                }
356
357                let response_text = String::from_utf8(response.body.to_vec()).map_err(|e| {
358                    KrakenHttpError::ParseError(format!("Failed to parse response as UTF-8: {e}"))
359                })?;
360
361                serde_json::from_str(&response_text).map_err(|e| {
362                    KrakenHttpError::ParseError(format!(
363                        "Failed to deserialize futures response: {e}"
364                    ))
365                })
366            }
367        };
368
369        let should_retry =
370            |error: &KrakenHttpError| -> bool { matches!(error, KrakenHttpError::NetworkError(_)) };
371        let create_error = |msg: String| -> KrakenHttpError { KrakenHttpError::NetworkError(msg) };
372
373        self.retry_manager
374            .execute_with_retry_with_cancel(
375                &endpoint,
376                operation,
377                should_retry,
378                create_error,
379                &self.cancellation_token,
380            )
381            .await
382    }
383
384    /// Sends authenticated GET request with query parameters included in signature.
385    ///
386    /// For Kraken Futures, GET requests with query params must include them in postData
387    /// for signing: message = postData + nonce + endpoint
388    async fn send_get_with_query<T: DeserializeOwned>(
389        &self,
390        endpoint: &str,
391        url: String,
392        query_string: &str,
393    ) -> anyhow::Result<T, KrakenHttpError> {
394        let _guard = self.auth_mutex.lock().await;
395
396        if self.cancellation_token.is_cancelled() {
397            return Err(KrakenHttpError::NetworkError(
398                "Request cancelled".to_string(),
399            ));
400        }
401
402        let credential = self.credential.as_ref().ok_or_else(|| {
403            KrakenHttpError::AuthenticationError("Missing credentials".to_string())
404        })?;
405
406        let nonce = self.generate_nonce();
407
408        // Query params go in postData for signing (not in endpoint)
409        let signature = credential
410            .sign_futures(endpoint, query_string, nonce)
411            .map_err(|e| {
412                KrakenHttpError::AuthenticationError(format!("Failed to sign request: {e}"))
413            })?;
414
415        tracing::debug!(
416            "Kraken Futures GET with query: endpoint={endpoint}, query={query_string}, nonce={nonce}"
417        );
418
419        let mut headers = Self::default_headers();
420        headers.insert("APIKey".to_string(), credential.api_key().to_string());
421        headers.insert("Authent".to_string(), signature);
422        headers.insert("Nonce".to_string(), nonce.to_string());
423
424        let rate_limit_keys = Self::rate_limit_keys(endpoint);
425
426        let response = self
427            .client
428            .request(
429                Method::GET,
430                url,
431                None,
432                Some(headers),
433                None,
434                None,
435                Some(rate_limit_keys),
436            )
437            .await
438            .map_err(|e| KrakenHttpError::NetworkError(e.to_string()))?;
439
440        let status = response.status.as_u16();
441        if status >= 400 {
442            let body = String::from_utf8_lossy(&response.body).to_string();
443            if status == 401 || status == 403 {
444                return Err(KrakenHttpError::AuthenticationError(format!(
445                    "HTTP error {status}: {body}"
446                )));
447            }
448            return Err(KrakenHttpError::NetworkError(format!(
449                "HTTP error {status}: {body}"
450            )));
451        }
452
453        let response_text = String::from_utf8(response.body.to_vec()).map_err(|e| {
454            KrakenHttpError::ParseError(format!("Failed to parse response as UTF-8: {e}"))
455        })?;
456
457        serde_json::from_str(&response_text).map_err(|e| {
458            KrakenHttpError::ParseError(format!("Failed to deserialize futures response: {e}"))
459        })
460    }
461
462    async fn send_request_with_body<T: DeserializeOwned>(
463        &self,
464        endpoint: &str,
465        params: HashMap<String, String>,
466    ) -> anyhow::Result<T, KrakenHttpError> {
467        let post_data = serde_urlencoded::to_string(&params)
468            .map_err(|e| KrakenHttpError::ParseError(format!("Failed to encode params: {e}")))?;
469        self.send_authenticated_post(endpoint, post_data).await
470    }
471
472    /// Sends a request with typed parameters (serializable struct).
473    async fn send_request_with_params<P: serde::Serialize, T: DeserializeOwned>(
474        &self,
475        endpoint: &str,
476        params: &P,
477    ) -> anyhow::Result<T, KrakenHttpError> {
478        let post_data = serde_urlencoded::to_string(params)
479            .map_err(|e| KrakenHttpError::ParseError(format!("Failed to encode params: {e}")))?;
480        self.send_authenticated_post(endpoint, post_data).await
481    }
482
483    /// Core authenticated POST request - takes raw post_data string.
484    async fn send_authenticated_post<T: DeserializeOwned>(
485        &self,
486        endpoint: &str,
487        post_data: String,
488    ) -> anyhow::Result<T, KrakenHttpError> {
489        if self.cancellation_token.is_cancelled() {
490            return Err(KrakenHttpError::NetworkError(
491                "Request cancelled".to_string(),
492            ));
493        }
494
495        // Serialize authenticated requests to ensure nonces arrive at Kraken in order
496        let _guard = self.auth_mutex.lock().await;
497
498        if self.cancellation_token.is_cancelled() {
499            return Err(KrakenHttpError::NetworkError(
500                "Request cancelled".to_string(),
501            ));
502        }
503
504        let credential = self.credential.as_ref().ok_or_else(|| {
505            KrakenHttpError::AuthenticationError("Missing credentials".to_string())
506        })?;
507
508        let nonce = self.generate_nonce();
509        tracing::debug!("Generated nonce {nonce} for {endpoint}");
510
511        let signature = credential
512            .sign_futures(endpoint, &post_data, nonce)
513            .map_err(|e| {
514                KrakenHttpError::AuthenticationError(format!("Failed to sign request: {e}"))
515            })?;
516
517        let url = format!("{}{endpoint}", self.base_url);
518        let mut headers = Self::default_headers();
519        headers.insert(
520            "Content-Type".to_string(),
521            "application/x-www-form-urlencoded".to_string(),
522        );
523        headers.insert("APIKey".to_string(), credential.api_key().to_string());
524        headers.insert("Authent".to_string(), signature);
525        headers.insert("Nonce".to_string(), nonce.to_string());
526
527        let rate_limit_keys = Self::rate_limit_keys(endpoint);
528
529        let response = self
530            .client
531            .request(
532                Method::POST,
533                url,
534                None,
535                Some(headers),
536                Some(post_data.into_bytes()),
537                None,
538                Some(rate_limit_keys),
539            )
540            .await
541            .map_err(|e| KrakenHttpError::NetworkError(e.to_string()))?;
542
543        if response.status.as_u16() >= 400 {
544            let status = response.status.as_u16();
545            let body = String::from_utf8_lossy(&response.body).to_string();
546            return Err(KrakenHttpError::NetworkError(format!(
547                "HTTP error {status}: {body}"
548            )));
549        }
550
551        let response_text = String::from_utf8(response.body.to_vec()).map_err(|e| {
552            KrakenHttpError::ParseError(format!("Failed to parse response as UTF-8: {e}"))
553        })?;
554
555        serde_json::from_str(&response_text).map_err(|e| {
556            tracing::error!("Failed to parse response from {endpoint}: {response_text}");
557            KrakenHttpError::ParseError(format!("Failed to deserialize response: {e}"))
558        })
559    }
560
561    /// Requests tradable instruments from Kraken Futures.
562    pub async fn get_instruments(
563        &self,
564    ) -> anyhow::Result<FuturesInstrumentsResponse, KrakenHttpError> {
565        let endpoint = "/derivatives/api/v3/instruments";
566        let url = format!("{}{endpoint}", self.base_url);
567
568        self.send_request(Method::GET, endpoint, url, false).await
569    }
570
571    /// Requests ticker information for all futures instruments.
572    pub async fn get_tickers(&self) -> anyhow::Result<FuturesTickersResponse, KrakenHttpError> {
573        let endpoint = "/derivatives/api/v3/tickers";
574        let url = format!("{}{endpoint}", self.base_url);
575
576        self.send_request(Method::GET, endpoint, url, false).await
577    }
578
579    /// Requests OHLC candlestick data for a futures symbol.
580    pub async fn get_ohlc(
581        &self,
582        tick_type: &str,
583        symbol: &str,
584        resolution: &str,
585        from: Option<i64>,
586        to: Option<i64>,
587    ) -> anyhow::Result<FuturesCandlesResponse, KrakenHttpError> {
588        let endpoint = format!("/api/charts/v1/{tick_type}/{symbol}/{resolution}");
589
590        let mut url = format!("{}{endpoint}", self.base_url);
591
592        let mut query_params = Vec::new();
593        if let Some(from_ts) = from {
594            query_params.push(format!("from={from_ts}"));
595        }
596        if let Some(to_ts) = to {
597            query_params.push(format!("to={to_ts}"));
598        }
599
600        if !query_params.is_empty() {
601            url.push('?');
602            url.push_str(&query_params.join("&"));
603        }
604
605        self.send_request(Method::GET, &endpoint, url, false).await
606    }
607
608    /// Gets public execution events (trades) for a futures symbol.
609    pub async fn get_public_executions(
610        &self,
611        symbol: &str,
612        since: Option<i64>,
613        before: Option<i64>,
614        sort: Option<&str>,
615        continuation_token: Option<&str>,
616    ) -> anyhow::Result<FuturesPublicExecutionsResponse, KrakenHttpError> {
617        let endpoint = format!("/api/history/v3/market/{symbol}/executions");
618
619        let mut url = format!("{}{endpoint}", self.base_url);
620
621        let mut query_params = Vec::new();
622        if let Some(since_ts) = since {
623            query_params.push(format!("since={since_ts}"));
624        }
625        if let Some(before_ts) = before {
626            query_params.push(format!("before={before_ts}"));
627        }
628        if let Some(sort_order) = sort {
629            query_params.push(format!("sort={sort_order}"));
630        }
631        if let Some(token) = continuation_token {
632            query_params.push(format!("continuationToken={token}"));
633        }
634
635        if !query_params.is_empty() {
636            url.push('?');
637            url.push_str(&query_params.join("&"));
638        }
639
640        self.send_request(Method::GET, &endpoint, url, false).await
641    }
642
643    /// Requests all open orders (requires authentication).
644    pub async fn get_open_orders(
645        &self,
646    ) -> anyhow::Result<FuturesOpenOrdersResponse, KrakenHttpError> {
647        if self.credential.is_none() {
648            return Err(KrakenHttpError::AuthenticationError(
649                "API credentials required for futures open orders".to_string(),
650            ));
651        }
652
653        let endpoint = "/derivatives/api/v3/openorders";
654        let url = format!("{}{endpoint}", self.base_url);
655
656        self.send_request(Method::GET, endpoint, url, true).await
657    }
658
659    /// Requests historical order events (requires authentication).
660    pub async fn get_order_events(
661        &self,
662        before: Option<i64>,
663        since: Option<i64>,
664        continuation_token: Option<&str>,
665    ) -> anyhow::Result<FuturesOrderEventsResponse, KrakenHttpError> {
666        if self.credential.is_none() {
667            return Err(KrakenHttpError::AuthenticationError(
668                "API credentials required for futures order events".to_string(),
669            ));
670        }
671
672        let endpoint = "/api/history/v2/orders";
673        let mut query_params = Vec::new();
674
675        if let Some(before_ts) = before {
676            query_params.push(format!("before={before_ts}"));
677        }
678        if let Some(since_ts) = since {
679            query_params.push(format!("since={since_ts}"));
680        }
681        if let Some(token) = continuation_token {
682            query_params.push(format!("continuation_token={token}"));
683        }
684
685        // Build URL with query params
686        let query_string = query_params.join("&");
687        let url = if query_string.is_empty() {
688            format!("{}{endpoint}", self.base_url)
689        } else {
690            format!("{}{endpoint}?{query_string}", self.base_url)
691        };
692
693        // For signing: query params go in postData, not endpoint
694        // Kraken: message = postData + nonce + endpoint
695        self.send_get_with_query(endpoint, url, &query_string).await
696    }
697
698    /// Requests fill/trade history (requires authentication).
699    pub async fn get_fills(
700        &self,
701        last_fill_time: Option<&str>,
702    ) -> anyhow::Result<FuturesFillsResponse, KrakenHttpError> {
703        if self.credential.is_none() {
704            return Err(KrakenHttpError::AuthenticationError(
705                "API credentials required for futures fills".to_string(),
706            ));
707        }
708
709        let endpoint = "/derivatives/api/v3/fills";
710        let query_string = last_fill_time
711            .map(|t| format!("lastFillTime={t}"))
712            .unwrap_or_default();
713
714        let url = if query_string.is_empty() {
715            format!("{}{endpoint}", self.base_url)
716        } else {
717            format!("{}{endpoint}?{query_string}", self.base_url)
718        };
719
720        // Query params go in postData for signing
721        self.send_get_with_query(endpoint, url, &query_string).await
722    }
723
724    /// Requests open positions (requires authentication).
725    pub async fn get_open_positions(
726        &self,
727    ) -> anyhow::Result<FuturesOpenPositionsResponse, KrakenHttpError> {
728        if self.credential.is_none() {
729            return Err(KrakenHttpError::AuthenticationError(
730                "API credentials required for futures open positions".to_string(),
731            ));
732        }
733
734        let endpoint = "/derivatives/api/v3/openpositions";
735        let url = format!("{}{endpoint}", self.base_url);
736
737        self.send_request(Method::GET, endpoint, url, true).await
738    }
739
740    /// Requests all accounts (cash and margin) with balances and margin info.
741    pub async fn get_accounts(&self) -> anyhow::Result<FuturesAccountsResponse, KrakenHttpError> {
742        if self.credential.is_none() {
743            return Err(KrakenHttpError::AuthenticationError(
744                "API credentials required for futures accounts".to_string(),
745            ));
746        }
747
748        let endpoint = "/derivatives/api/v3/accounts";
749        let url = format!("{}{endpoint}", self.base_url);
750
751        self.send_request(Method::GET, endpoint, url, true).await
752    }
753
754    /// Submits a new order (requires authentication).
755    pub async fn send_order(
756        &self,
757        params: HashMap<String, String>,
758    ) -> anyhow::Result<FuturesSendOrderResponse, KrakenHttpError> {
759        if self.credential.is_none() {
760            return Err(KrakenHttpError::AuthenticationError(
761                "API credentials required for sending orders".to_string(),
762            ));
763        }
764
765        let endpoint = "/derivatives/api/v3/sendorder";
766        self.send_request_with_body(endpoint, params).await
767    }
768
769    /// Submits a new order using typed parameters (requires authentication).
770    pub async fn send_order_params(
771        &self,
772        params: &KrakenFuturesSendOrderParams,
773    ) -> anyhow::Result<FuturesSendOrderResponse, KrakenHttpError> {
774        if self.credential.is_none() {
775            return Err(KrakenHttpError::AuthenticationError(
776                "API credentials required for sending orders".to_string(),
777            ));
778        }
779
780        let endpoint = "/derivatives/api/v3/sendorder";
781        self.send_request_with_params(endpoint, params).await
782    }
783
784    /// Cancels an open order (requires authentication).
785    pub async fn cancel_order(
786        &self,
787        order_id: Option<String>,
788        cli_ord_id: Option<String>,
789    ) -> anyhow::Result<FuturesCancelOrderResponse, KrakenHttpError> {
790        if self.credential.is_none() {
791            return Err(KrakenHttpError::AuthenticationError(
792                "API credentials required for canceling orders".to_string(),
793            ));
794        }
795
796        let mut params = HashMap::new();
797        if let Some(id) = order_id {
798            params.insert("order_id".to_string(), id);
799        }
800        if let Some(id) = cli_ord_id {
801            params.insert("cliOrdId".to_string(), id);
802        }
803
804        let endpoint = "/derivatives/api/v3/cancelorder";
805        self.send_request_with_body(endpoint, params).await
806    }
807
808    /// Edits an existing order (requires authentication).
809    pub async fn edit_order(
810        &self,
811        params: &KrakenFuturesEditOrderParams,
812    ) -> anyhow::Result<FuturesEditOrderResponse, KrakenHttpError> {
813        if self.credential.is_none() {
814            return Err(KrakenHttpError::AuthenticationError(
815                "API credentials required for editing orders".to_string(),
816            ));
817        }
818
819        let endpoint = "/derivatives/api/v3/editorder";
820        self.send_request_with_params(endpoint, params).await
821    }
822
823    /// Submits multiple orders in a single batch request (requires authentication).
824    pub async fn batch_order(
825        &self,
826        params: HashMap<String, String>,
827    ) -> anyhow::Result<FuturesBatchOrderResponse, KrakenHttpError> {
828        if self.credential.is_none() {
829            return Err(KrakenHttpError::AuthenticationError(
830                "API credentials required for batch orders".to_string(),
831            ));
832        }
833
834        let endpoint = "/derivatives/api/v3/batchorder";
835        self.send_request_with_body(endpoint, params).await
836    }
837
838    /// Cancels multiple orders in a single batch request (requires authentication).
839    pub async fn cancel_orders_batch(
840        &self,
841        order_ids: Vec<String>,
842    ) -> anyhow::Result<FuturesBatchCancelResponse, KrakenHttpError> {
843        if self.credential.is_none() {
844            return Err(KrakenHttpError::AuthenticationError(
845                "API credentials required for batch orders".to_string(),
846            ));
847        }
848
849        let batch_items: Vec<KrakenFuturesBatchCancelItem> = order_ids
850            .into_iter()
851            .map(KrakenFuturesBatchCancelItem::from_order_id)
852            .collect();
853
854        let params = KrakenFuturesBatchOrderParams::new(batch_items);
855        let post_data = params
856            .to_body()
857            .map_err(|e| KrakenHttpError::ParseError(format!("Failed to serialize batch: {e}")))?;
858
859        let endpoint = "/derivatives/api/v3/batchorder";
860        self.send_authenticated_post(endpoint, post_data).await
861    }
862
863    /// Cancels all open orders, optionally filtered by symbol (requires authentication).
864    pub async fn cancel_all_orders(
865        &self,
866        symbol: Option<String>,
867    ) -> anyhow::Result<FuturesCancelAllOrdersResponse, KrakenHttpError> {
868        if self.credential.is_none() {
869            return Err(KrakenHttpError::AuthenticationError(
870                "API credentials required for canceling orders".to_string(),
871            ));
872        }
873
874        let mut params = HashMap::new();
875        if let Some(sym) = symbol {
876            params.insert("symbol".to_string(), sym);
877        }
878
879        let endpoint = "/derivatives/api/v3/cancelallorders";
880        self.send_request_with_body(endpoint, params).await
881    }
882}
883
884// =============================================================================
885// Domain Client
886// =============================================================================
887
888/// High-level HTTP client for the Kraken Futures REST API.
889///
890/// This client wraps the raw client and provides Nautilus domain types.
891/// It maintains an instrument cache and uses it to parse venue responses
892/// into Nautilus domain objects.
893#[cfg_attr(
894    feature = "python",
895    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
896)]
897pub struct KrakenFuturesHttpClient {
898    pub(crate) inner: Arc<KrakenFuturesRawHttpClient>,
899    pub(crate) instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
900    cache_initialized: Arc<AtomicBool>,
901}
902
903impl Clone for KrakenFuturesHttpClient {
904    fn clone(&self) -> Self {
905        Self {
906            inner: self.inner.clone(),
907            instruments_cache: self.instruments_cache.clone(),
908            cache_initialized: self.cache_initialized.clone(),
909        }
910    }
911}
912
913impl Default for KrakenFuturesHttpClient {
914    fn default() -> Self {
915        Self::new(
916            KrakenEnvironment::Mainnet,
917            None,
918            Some(60),
919            None,
920            None,
921            None,
922            None,
923            None,
924        )
925        .expect("Failed to create default KrakenFuturesHttpClient")
926    }
927}
928
929impl Debug for KrakenFuturesHttpClient {
930    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
931        f.debug_struct(stringify!(KrakenFuturesHttpClient))
932            .field("inner", &self.inner)
933            .finish()
934    }
935}
936
937impl KrakenFuturesHttpClient {
938    /// Creates a new [`KrakenFuturesHttpClient`].
939    #[allow(clippy::too_many_arguments)]
940    pub fn new(
941        environment: KrakenEnvironment,
942        base_url_override: Option<String>,
943        timeout_secs: Option<u64>,
944        max_retries: Option<u32>,
945        retry_delay_ms: Option<u64>,
946        retry_delay_max_ms: Option<u64>,
947        proxy_url: Option<String>,
948        max_requests_per_second: Option<u32>,
949    ) -> anyhow::Result<Self> {
950        Ok(Self {
951            inner: Arc::new(KrakenFuturesRawHttpClient::new(
952                environment,
953                base_url_override,
954                timeout_secs,
955                max_retries,
956                retry_delay_ms,
957                retry_delay_max_ms,
958                proxy_url,
959                max_requests_per_second,
960            )?),
961            instruments_cache: Arc::new(DashMap::new()),
962            cache_initialized: Arc::new(AtomicBool::new(false)),
963        })
964    }
965
966    /// Creates a new [`KrakenFuturesHttpClient`] with credentials.
967    #[allow(clippy::too_many_arguments)]
968    pub fn with_credentials(
969        api_key: String,
970        api_secret: String,
971        environment: KrakenEnvironment,
972        base_url_override: Option<String>,
973        timeout_secs: Option<u64>,
974        max_retries: Option<u32>,
975        retry_delay_ms: Option<u64>,
976        retry_delay_max_ms: Option<u64>,
977        proxy_url: Option<String>,
978        max_requests_per_second: Option<u32>,
979    ) -> anyhow::Result<Self> {
980        Ok(Self {
981            inner: Arc::new(KrakenFuturesRawHttpClient::with_credentials(
982                api_key,
983                api_secret,
984                environment,
985                base_url_override,
986                timeout_secs,
987                max_retries,
988                retry_delay_ms,
989                retry_delay_max_ms,
990                proxy_url,
991                max_requests_per_second,
992            )?),
993            instruments_cache: Arc::new(DashMap::new()),
994            cache_initialized: Arc::new(AtomicBool::new(false)),
995        })
996    }
997
998    /// Creates a new [`KrakenFuturesHttpClient`] loading credentials from environment variables.
999    ///
1000    /// Looks for `KRAKEN_FUTURES_API_KEY` and `KRAKEN_FUTURES_API_SECRET` (mainnet)
1001    /// or `KRAKEN_FUTURES_DEMO_API_KEY` and `KRAKEN_FUTURES_DEMO_API_SECRET` (demo).
1002    ///
1003    /// Falls back to unauthenticated client if credentials are not set.
1004    #[allow(clippy::too_many_arguments)]
1005    pub fn from_env(
1006        environment: KrakenEnvironment,
1007        base_url_override: Option<String>,
1008        timeout_secs: Option<u64>,
1009        max_retries: Option<u32>,
1010        retry_delay_ms: Option<u64>,
1011        retry_delay_max_ms: Option<u64>,
1012        proxy_url: Option<String>,
1013        max_requests_per_second: Option<u32>,
1014    ) -> anyhow::Result<Self> {
1015        let demo = environment == KrakenEnvironment::Demo;
1016
1017        if let Some(credential) = KrakenCredential::from_env_futures(demo) {
1018            let (api_key, api_secret) = credential.into_parts();
1019            Self::with_credentials(
1020                api_key,
1021                api_secret,
1022                environment,
1023                base_url_override,
1024                timeout_secs,
1025                max_retries,
1026                retry_delay_ms,
1027                retry_delay_max_ms,
1028                proxy_url,
1029                max_requests_per_second,
1030            )
1031        } else {
1032            Self::new(
1033                environment,
1034                base_url_override,
1035                timeout_secs,
1036                max_retries,
1037                retry_delay_ms,
1038                retry_delay_max_ms,
1039                proxy_url,
1040                max_requests_per_second,
1041            )
1042        }
1043    }
1044
1045    /// Cancels all pending HTTP requests.
1046    pub fn cancel_all_requests(&self) {
1047        self.inner.cancel_all_requests();
1048    }
1049
1050    /// Returns the cancellation token for this client.
1051    pub fn cancellation_token(&self) -> &CancellationToken {
1052        self.inner.cancellation_token()
1053    }
1054
1055    /// Caches an instrument for symbol lookup.
1056    pub fn cache_instrument(&self, instrument: InstrumentAny) {
1057        self.instruments_cache
1058            .insert(instrument.symbol().inner(), instrument);
1059        self.cache_initialized.store(true, Ordering::Release);
1060    }
1061
1062    /// Caches multiple instruments for symbol lookup.
1063    pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
1064        for instrument in instruments {
1065            self.instruments_cache
1066                .insert(instrument.symbol().inner(), instrument);
1067        }
1068        self.cache_initialized.store(true, Ordering::Release);
1069    }
1070
1071    /// Gets an instrument from the cache by symbol.
1072    pub fn get_cached_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
1073        self.instruments_cache
1074            .get(symbol)
1075            .map(|entry| entry.value().clone())
1076    }
1077
1078    fn get_instrument_by_raw_symbol(&self, raw_symbol: &str) -> Option<InstrumentAny> {
1079        self.instruments_cache
1080            .iter()
1081            .find(|entry| entry.value().raw_symbol().as_str() == raw_symbol)
1082            .map(|entry| entry.value().clone())
1083    }
1084
1085    fn generate_ts_init(&self) -> UnixNanos {
1086        get_atomic_clock_realtime().get_time_ns()
1087    }
1088
1089    /// Requests tradable instruments from Kraken Futures.
1090    pub async fn request_instruments(&self) -> anyhow::Result<Vec<InstrumentAny>, KrakenHttpError> {
1091        let ts_init = self.generate_ts_init();
1092        let response = self.inner.get_instruments().await?;
1093
1094        let instruments: Vec<InstrumentAny> = response
1095            .instruments
1096            .iter()
1097            .filter_map(|fut_instrument| {
1098                match parse_futures_instrument(fut_instrument, ts_init, ts_init) {
1099                    Ok(instrument) => Some(instrument),
1100                    Err(e) => {
1101                        let symbol = &fut_instrument.symbol;
1102                        tracing::warn!("Failed to parse futures instrument {symbol}: {e}");
1103                        None
1104                    }
1105                }
1106            })
1107            .collect();
1108
1109        Ok(instruments)
1110    }
1111
1112    /// Requests the mark price for an instrument.
1113    pub async fn request_mark_price(
1114        &self,
1115        instrument_id: InstrumentId,
1116    ) -> anyhow::Result<f64, KrakenHttpError> {
1117        let instrument = self
1118            .get_cached_instrument(&instrument_id.symbol.inner())
1119            .ok_or_else(|| {
1120                KrakenHttpError::ParseError(format!(
1121                    "Instrument not found in cache: {instrument_id}"
1122                ))
1123            })?;
1124
1125        let raw_symbol = instrument.raw_symbol().to_string();
1126        let tickers = self.inner.get_tickers().await?;
1127
1128        tickers
1129            .tickers
1130            .iter()
1131            .find(|t| t.symbol == raw_symbol)
1132            .ok_or_else(|| {
1133                KrakenHttpError::ParseError(format!("Symbol {raw_symbol} not found in tickers"))
1134            })
1135            .and_then(|t| {
1136                t.mark_price.ok_or_else(|| {
1137                    KrakenHttpError::ParseError(format!(
1138                        "Mark price not available for {raw_symbol} (may not be available in testnet)"
1139                    ))
1140                })
1141            })
1142    }
1143
1144    pub async fn request_index_price(
1145        &self,
1146        instrument_id: InstrumentId,
1147    ) -> anyhow::Result<f64, KrakenHttpError> {
1148        let instrument = self
1149            .get_cached_instrument(&instrument_id.symbol.inner())
1150            .ok_or_else(|| {
1151                KrakenHttpError::ParseError(format!(
1152                    "Instrument not found in cache: {instrument_id}"
1153                ))
1154            })?;
1155
1156        let raw_symbol = instrument.raw_symbol().to_string();
1157        let tickers = self.inner.get_tickers().await?;
1158
1159        tickers
1160            .tickers
1161            .iter()
1162            .find(|t| t.symbol == raw_symbol)
1163            .ok_or_else(|| {
1164                KrakenHttpError::ParseError(format!("Symbol {raw_symbol} not found in tickers"))
1165            })
1166            .and_then(|t| {
1167                t.index_price.ok_or_else(|| {
1168                    KrakenHttpError::ParseError(format!(
1169                        "Index price not available for {raw_symbol} (may not be available in testnet)"
1170                    ))
1171                })
1172            })
1173    }
1174
1175    pub async fn request_trades(
1176        &self,
1177        instrument_id: InstrumentId,
1178        start: Option<DateTime<Utc>>,
1179        end: Option<DateTime<Utc>>,
1180        limit: Option<u64>,
1181    ) -> anyhow::Result<Vec<TradeTick>, KrakenHttpError> {
1182        let instrument = self
1183            .get_cached_instrument(&instrument_id.symbol.inner())
1184            .ok_or_else(|| {
1185                KrakenHttpError::ParseError(format!(
1186                    "Instrument not found in cache: {instrument_id}"
1187                ))
1188            })?;
1189
1190        let raw_symbol = instrument.raw_symbol().to_string();
1191        let ts_init = self.generate_ts_init();
1192
1193        let since = start.map(|dt| dt.timestamp_millis());
1194        let before = end.map(|dt| dt.timestamp_millis());
1195
1196        let response = self
1197            .inner
1198            .get_public_executions(&raw_symbol, since, before, Some("asc"), None)
1199            .await?;
1200
1201        let mut trades = Vec::new();
1202
1203        for element in &response.elements {
1204            let execution = &element.event.execution.execution;
1205            match parse_futures_public_execution(execution, &instrument, ts_init) {
1206                Ok(trade_tick) => {
1207                    trades.push(trade_tick);
1208
1209                    if let Some(limit_count) = limit
1210                        && trades.len() >= limit_count as usize
1211                    {
1212                        return Ok(trades);
1213                    }
1214                }
1215                Err(e) => {
1216                    tracing::warn!("Failed to parse futures trade tick: {e}");
1217                }
1218            }
1219        }
1220
1221        Ok(trades)
1222    }
1223
1224    pub async fn request_bars(
1225        &self,
1226        bar_type: BarType,
1227        start: Option<DateTime<Utc>>,
1228        end: Option<DateTime<Utc>>,
1229        limit: Option<u64>,
1230    ) -> anyhow::Result<Vec<Bar>, KrakenHttpError> {
1231        let instrument_id = bar_type.instrument_id();
1232        let instrument = self
1233            .get_cached_instrument(&instrument_id.symbol.inner())
1234            .ok_or_else(|| {
1235                KrakenHttpError::ParseError(format!(
1236                    "Instrument not found in cache: {instrument_id}"
1237                ))
1238            })?;
1239
1240        let raw_symbol = instrument.raw_symbol().to_string();
1241        let ts_init = self.generate_ts_init();
1242        let tick_type = "trade";
1243        let resolution = bar_type_to_futures_resolution(bar_type)
1244            .map_err(|e| KrakenHttpError::ParseError(e.to_string()))?;
1245
1246        // Kraken Futures OHLC API expects Unix timestamp in seconds
1247        let from = start.map(|dt| dt.timestamp());
1248        let to = end.map(|dt| dt.timestamp());
1249        let end_ns = end.map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64);
1250
1251        let response = self
1252            .inner
1253            .get_ohlc(tick_type, &raw_symbol, resolution, from, to)
1254            .await?;
1255
1256        let mut bars = Vec::new();
1257        for candle in response.candles {
1258            let ohlc = OhlcData {
1259                time: candle.time / 1000,
1260                open: candle.open,
1261                high: candle.high,
1262                low: candle.low,
1263                close: candle.close,
1264                vwap: "0".to_string(),
1265                volume: candle.volume,
1266                count: 0,
1267            };
1268
1269            match parse_bar(&ohlc, &instrument, bar_type, ts_init) {
1270                Ok(bar) => {
1271                    if let Some(end_nanos) = end_ns
1272                        && bar.ts_event.as_u64() > end_nanos
1273                    {
1274                        continue;
1275                    }
1276                    bars.push(bar);
1277
1278                    if let Some(limit_count) = limit
1279                        && bars.len() >= limit_count as usize
1280                    {
1281                        return Ok(bars);
1282                    }
1283                }
1284                Err(e) => {
1285                    tracing::warn!("Failed to parse futures bar: {e}");
1286                }
1287            }
1288        }
1289
1290        Ok(bars)
1291    }
1292
1293    /// Requests account state from the Kraken Futures exchange.
1294    ///
1295    /// This queries the accounts endpoint and converts the response into a
1296    /// Nautilus `AccountState` event containing balances and margin info.
1297    ///
1298    /// # Errors
1299    ///
1300    /// Returns an error if:
1301    /// - Credentials are missing.
1302    /// - The request fails.
1303    /// - Response parsing fails.
1304    pub async fn request_account_state(
1305        &self,
1306        account_id: AccountId,
1307    ) -> anyhow::Result<AccountState> {
1308        let accounts_response = self.inner.get_accounts().await?;
1309
1310        if accounts_response.result != KrakenApiResult::Success {
1311            let error_msg = accounts_response
1312                .error
1313                .unwrap_or_else(|| "Unknown error".to_string());
1314            anyhow::bail!("Failed to get futures accounts: {error_msg}");
1315        }
1316
1317        let ts_init = self.generate_ts_init();
1318
1319        let mut balances: Vec<AccountBalance> = Vec::new();
1320
1321        for account in accounts_response.accounts.values() {
1322            match account.account_type.as_str() {
1323                "multiCollateralMarginAccount" => {
1324                    for (currency_code, currency_info) in &account.currencies {
1325                        if currency_info.quantity == 0.0 {
1326                            continue;
1327                        }
1328
1329                        let currency = Currency::new(
1330                            currency_code.as_str(),
1331                            8,
1332                            0,
1333                            currency_code.as_str(),
1334                            CurrencyType::Crypto,
1335                        );
1336
1337                        let total_amount = currency_info.quantity;
1338                        let total = Money::new(total_amount, currency);
1339
1340                        // Available can exceed quantity with positive PnL, cap to satisfy invariant
1341                        let available_amount = currency_info
1342                            .available
1343                            .unwrap_or(total_amount)
1344                            .min(total_amount);
1345                        let locked_amount = (total_amount - available_amount).max(0.0);
1346                        let locked = Money::new(locked_amount, currency);
1347                        // Compute free from total - locked to guarantee the invariant holds
1348                        let free = total - locked;
1349
1350                        balances.push(AccountBalance::new(total, locked, free));
1351                    }
1352
1353                    // Add USD balance from portfolio value for margin calculations.
1354                    // Multi-collateral accounts track margin in USD even though the
1355                    // actual collateral is held in various crypto currencies.
1356                    if let Some(portfolio_value) = account.portfolio_value
1357                        && portfolio_value > 0.0
1358                    {
1359                        let usd_currency = Currency::USD();
1360                        let total_usd = Money::new(portfolio_value, usd_currency);
1361                        let available_usd = account
1362                            .available_margin
1363                            .unwrap_or(portfolio_value)
1364                            .min(portfolio_value);
1365                        // Compute locked = total - available to guarantee the invariant holds
1366                        let locked_usd =
1367                            Money::new((portfolio_value - available_usd).max(0.0), usd_currency);
1368                        let free_usd = total_usd - locked_usd;
1369
1370                        balances.push(AccountBalance::new(total_usd, locked_usd, free_usd));
1371                    }
1372                }
1373                "marginAccount" => {
1374                    for (currency_code, &amount) in &account.balances {
1375                        if amount == 0.0 {
1376                            continue;
1377                        }
1378
1379                        let currency = Currency::new(
1380                            currency_code.as_str(),
1381                            8,
1382                            0,
1383                            currency_code.as_str(),
1384                            CurrencyType::Crypto,
1385                        );
1386
1387                        let total = Money::new(amount, currency);
1388
1389                        // Available can exceed balance with positive PnL, cap to satisfy invariant
1390                        let available = account
1391                            .auxiliary
1392                            .as_ref()
1393                            .and_then(|aux| aux.af)
1394                            .unwrap_or(amount)
1395                            .min(amount);
1396                        let locked = amount - available;
1397
1398                        balances.push(AccountBalance::new(
1399                            total,
1400                            Money::new(locked, currency),
1401                            Money::new(available, currency),
1402                        ));
1403                    }
1404                }
1405                "cashAccount" => {
1406                    for (currency_code, &amount) in &account.balances {
1407                        if amount == 0.0 {
1408                            continue;
1409                        }
1410
1411                        let currency = Currency::new(
1412                            currency_code.as_str(),
1413                            8,
1414                            0,
1415                            currency_code.as_str(),
1416                            CurrencyType::Crypto,
1417                        );
1418
1419                        let total = Money::new(amount, currency);
1420                        let locked = Money::new(0.0, currency);
1421
1422                        balances.push(AccountBalance::new(total, locked, total));
1423                    }
1424                }
1425                _ => {
1426                    let account_type = &account.account_type;
1427                    tracing::debug!("Unknown account type: {account_type}");
1428                }
1429            }
1430        }
1431
1432        Ok(AccountState::new(
1433            account_id,
1434            AccountType::Margin,
1435            balances,
1436            vec![],
1437            true,
1438            UUID4::new(),
1439            ts_init,
1440            ts_init,
1441            None,
1442        ))
1443    }
1444
1445    pub async fn request_order_status_reports(
1446        &self,
1447        account_id: AccountId,
1448        instrument_id: Option<InstrumentId>,
1449        start: Option<DateTime<Utc>>,
1450        end: Option<DateTime<Utc>>,
1451        open_only: bool,
1452    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1453        let ts_init = self.generate_ts_init();
1454        let mut all_reports = Vec::new();
1455
1456        let response = self
1457            .inner
1458            .get_open_orders()
1459            .await
1460            .map_err(|e| anyhow::anyhow!("get_open_orders failed: {e}"))?;
1461        if response.result != KrakenApiResult::Success {
1462            let error_msg = response
1463                .error
1464                .unwrap_or_else(|| "Unknown error".to_string());
1465            anyhow::bail!("Failed to get open orders: {error_msg}");
1466        }
1467
1468        for order in &response.open_orders {
1469            if let Some(ref target_id) = instrument_id {
1470                let instrument = self.get_cached_instrument(&target_id.symbol.inner());
1471                if let Some(inst) = instrument
1472                    && inst.raw_symbol().as_str() != order.symbol
1473                {
1474                    continue;
1475                }
1476            }
1477
1478            if let Some(instrument) = self.get_instrument_by_raw_symbol(&order.symbol) {
1479                match parse_futures_order_status_report(order, &instrument, account_id, ts_init) {
1480                    Ok(report) => all_reports.push(report),
1481                    Err(e) => {
1482                        let order_id = &order.order_id;
1483                        tracing::warn!("Failed to parse futures order {order_id}: {e}");
1484                    }
1485                }
1486            }
1487        }
1488
1489        if !open_only {
1490            // Kraken Futures order events API expects Unix timestamp in milliseconds
1491            let start_ms = start.map(|dt| dt.timestamp_millis());
1492            let end_ms = end.map(|dt| dt.timestamp_millis());
1493            let response = self
1494                .inner
1495                .get_order_events(end_ms, start_ms, None)
1496                .await
1497                .map_err(|e| anyhow::anyhow!("get_order_events failed: {e}"))?;
1498
1499            for event_wrapper in response.order_events {
1500                let event = &event_wrapper.order;
1501                if let Some(ref target_id) = instrument_id {
1502                    let instrument = self.get_cached_instrument(&target_id.symbol.inner());
1503                    if let Some(inst) = instrument
1504                        && inst.raw_symbol().as_str() != event.symbol
1505                    {
1506                        continue;
1507                    }
1508                }
1509
1510                if let Some(instrument) = self.get_instrument_by_raw_symbol(&event.symbol) {
1511                    match parse_futures_order_event_status_report(
1512                        event,
1513                        &instrument,
1514                        account_id,
1515                        ts_init,
1516                    ) {
1517                        Ok(report) => all_reports.push(report),
1518                        Err(e) => {
1519                            let order_id = &event.order_id;
1520                            tracing::warn!("Failed to parse futures order event {order_id}: {e}");
1521                        }
1522                    }
1523                }
1524            }
1525        }
1526
1527        Ok(all_reports)
1528    }
1529
1530    pub async fn request_fill_reports(
1531        &self,
1532        account_id: AccountId,
1533        instrument_id: Option<InstrumentId>,
1534        start: Option<DateTime<Utc>>,
1535        end: Option<DateTime<Utc>>,
1536    ) -> anyhow::Result<Vec<FillReport>> {
1537        let ts_init = self.generate_ts_init();
1538        let mut all_reports = Vec::new();
1539
1540        let response = self.inner.get_fills(None).await?;
1541        if response.result != KrakenApiResult::Success {
1542            let error_msg = response
1543                .error
1544                .unwrap_or_else(|| "Unknown error".to_string());
1545            anyhow::bail!("Failed to get fills: {error_msg}");
1546        }
1547
1548        let start_ms = start.map(|dt| dt.timestamp_millis());
1549        let end_ms = end.map(|dt| dt.timestamp_millis());
1550
1551        for fill in response.fills {
1552            if let Some(start_threshold) = start_ms
1553                && let Ok(fill_ts) = DateTime::parse_from_rfc3339(&fill.fill_time)
1554            {
1555                let fill_ms = fill_ts.timestamp_millis();
1556                if fill_ms < start_threshold {
1557                    continue;
1558                }
1559            }
1560            if let Some(end_threshold) = end_ms
1561                && let Ok(fill_ts) = DateTime::parse_from_rfc3339(&fill.fill_time)
1562            {
1563                let fill_ms = fill_ts.timestamp_millis();
1564                if fill_ms > end_threshold {
1565                    continue;
1566                }
1567            }
1568
1569            if let Some(ref target_id) = instrument_id {
1570                let instrument = self.get_cached_instrument(&target_id.symbol.inner());
1571                if let Some(inst) = instrument
1572                    && inst.raw_symbol().as_str() != fill.symbol
1573                {
1574                    continue;
1575                }
1576            }
1577
1578            if let Some(instrument) = self.get_instrument_by_raw_symbol(&fill.symbol) {
1579                match parse_futures_fill_report(&fill, &instrument, account_id, ts_init) {
1580                    Ok(report) => all_reports.push(report),
1581                    Err(e) => {
1582                        let fill_id = &fill.fill_id;
1583                        tracing::warn!("Failed to parse futures fill {fill_id}: {e}");
1584                    }
1585                }
1586            }
1587        }
1588
1589        Ok(all_reports)
1590    }
1591
1592    pub async fn request_position_status_reports(
1593        &self,
1594        account_id: AccountId,
1595        instrument_id: Option<InstrumentId>,
1596    ) -> anyhow::Result<Vec<PositionStatusReport>> {
1597        let ts_init = self.generate_ts_init();
1598        let mut all_reports = Vec::new();
1599
1600        let response = self.inner.get_open_positions().await?;
1601        if response.result != KrakenApiResult::Success {
1602            let error_msg = response
1603                .error
1604                .unwrap_or_else(|| "Unknown error".to_string());
1605            anyhow::bail!("Failed to get open positions: {error_msg}");
1606        }
1607
1608        for position in response.open_positions {
1609            if let Some(ref target_id) = instrument_id {
1610                let instrument = self.get_cached_instrument(&target_id.symbol.inner());
1611                if let Some(inst) = instrument
1612                    && inst.raw_symbol().as_str() != position.symbol
1613                {
1614                    continue;
1615                }
1616            }
1617
1618            if let Some(instrument) = self.get_instrument_by_raw_symbol(&position.symbol) {
1619                match parse_futures_position_status_report(
1620                    &position,
1621                    &instrument,
1622                    account_id,
1623                    ts_init,
1624                ) {
1625                    Ok(report) => all_reports.push(report),
1626                    Err(e) => {
1627                        let symbol = &position.symbol;
1628                        tracing::warn!("Failed to parse futures position {symbol}: {e}");
1629                    }
1630                }
1631            }
1632        }
1633
1634        Ok(all_reports)
1635    }
1636
1637    /// Submits a new order to the Kraken Futures exchange.
1638    ///
1639    /// # Errors
1640    ///
1641    /// Returns an error if:
1642    /// - Credentials are missing.
1643    /// - The instrument is not found in cache.
1644    /// - The order type or time in force is not supported.
1645    /// - The request fails.
1646    /// - The order is rejected.
1647    #[allow(clippy::too_many_arguments)]
1648    pub async fn submit_order(
1649        &self,
1650        account_id: AccountId,
1651        instrument_id: InstrumentId,
1652        client_order_id: ClientOrderId,
1653        order_side: OrderSide,
1654        order_type: OrderType,
1655        quantity: Quantity,
1656        time_in_force: TimeInForce,
1657        price: Option<Price>,
1658        trigger_price: Option<Price>,
1659        reduce_only: bool,
1660        post_only: bool,
1661    ) -> anyhow::Result<OrderStatusReport> {
1662        let instrument = self
1663            .get_cached_instrument(&instrument_id.symbol.inner())
1664            .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {instrument_id}"))?;
1665
1666        let raw_symbol = instrument.raw_symbol().inner();
1667
1668        // Map order type and time-in-force to Kraken order type
1669        // Kraken Futures encodes TIF in the orderType field:
1670        // - lmt = limit (GTC)
1671        // - ioc = immediate-or-cancel
1672        // - post = post-only (maker only)
1673        // - mkt = market
1674        let kraken_order_type = match order_type {
1675            OrderType::Market => KrakenFuturesOrderType::Market,
1676            OrderType::Limit => {
1677                if post_only {
1678                    KrakenFuturesOrderType::Post
1679                } else {
1680                    match time_in_force {
1681                        TimeInForce::Ioc => KrakenFuturesOrderType::Ioc,
1682                        TimeInForce::Fok => {
1683                            anyhow::bail!("FOK not supported by Kraken Futures, use IOC instead")
1684                        }
1685                        TimeInForce::Gtd => {
1686                            anyhow::bail!("GTD not supported by Kraken Futures, use GTC instead")
1687                        }
1688                        _ => KrakenFuturesOrderType::Limit, // GTC is default
1689                    }
1690                }
1691            }
1692            OrderType::StopMarket | OrderType::StopLimit => KrakenFuturesOrderType::Stop,
1693            OrderType::MarketIfTouched => KrakenFuturesOrderType::TakeProfit,
1694            _ => anyhow::bail!("Unsupported order type: {order_type:?}"),
1695        };
1696
1697        let mut builder = KrakenFuturesSendOrderParamsBuilder::default();
1698        builder
1699            .cli_ord_id(client_order_id.to_string())
1700            .broker(NAUTILUS_KRAKEN_BROKER_ID)
1701            .symbol(raw_symbol)
1702            .side(KrakenOrderSide::from(order_side))
1703            .size(quantity.to_string())
1704            .order_type(kraken_order_type);
1705
1706        // Handle prices based on order type
1707        match order_type {
1708            OrderType::StopMarket => {
1709                // Stop market orders need stop_price (trigger price)
1710                if let Some(trigger) = trigger_price {
1711                    builder.stop_price(trigger.to_string());
1712                }
1713            }
1714            OrderType::StopLimit => {
1715                // Stop limit orders need both stop_price and limit_price
1716                if let Some(trigger) = trigger_price {
1717                    builder.stop_price(trigger.to_string());
1718                }
1719                if let Some(limit) = price {
1720                    builder.limit_price(limit.to_string());
1721                }
1722            }
1723            OrderType::MarketIfTouched => {
1724                // Take-profit orders need stop_price (trigger price) and optionally limit_price
1725                if let Some(trigger) = trigger_price {
1726                    builder.stop_price(trigger.to_string());
1727                }
1728                if let Some(limit) = price {
1729                    builder.limit_price(limit.to_string());
1730                }
1731            }
1732            _ => {
1733                // Regular orders just use limit_price
1734                if let Some(limit) = price {
1735                    builder.limit_price(limit.to_string());
1736                }
1737            }
1738        }
1739
1740        if reduce_only {
1741            builder.reduce_only(true);
1742        }
1743
1744        let params = builder
1745            .build()
1746            .map_err(|e| anyhow::anyhow!("Failed to build order params: {e}"))?;
1747
1748        let response = self.inner.send_order_params(&params).await?;
1749
1750        if response.result != KrakenApiResult::Success {
1751            let error_msg = response
1752                .error
1753                .unwrap_or_else(|| "Unknown error".to_string());
1754            anyhow::bail!("Order submission failed: {error_msg}");
1755        }
1756
1757        let send_status = response
1758            .send_status
1759            .ok_or_else(|| anyhow::anyhow!("No send_status in successful response"))?;
1760
1761        let status = &send_status.status;
1762
1763        // Check for post-only rejection (Kraken returns status="postWouldExecute")
1764        if status == "postWouldExecute" {
1765            let reason = send_status
1766                .order_events
1767                .as_ref()
1768                .and_then(|events| events.first())
1769                .and_then(|e| e.reason.clone())
1770                .unwrap_or_else(|| "Post-only order would have crossed".to_string());
1771            anyhow::bail!("POST_ONLY_REJECTED: {reason}");
1772        }
1773
1774        let venue_order_id = send_status
1775            .order_id
1776            .ok_or_else(|| anyhow::anyhow!("No order_id in send_status: {status}"))?;
1777
1778        let ts_init = self.generate_ts_init();
1779
1780        let open_orders_response = self.inner.get_open_orders().await?;
1781        if let Some(order) = open_orders_response
1782            .open_orders
1783            .iter()
1784            .find(|o| o.order_id == venue_order_id)
1785        {
1786            return parse_futures_order_status_report(order, &instrument, account_id, ts_init);
1787        }
1788
1789        // Order not in open orders - may have filled immediately (market order or aggressive limit)
1790        // Try to use order_events from send_status first
1791        if let Some(order_events) = &send_status.order_events
1792            && let Some(send_event) = order_events.first()
1793        {
1794            // Handle regular orders, trigger orders, and execution events
1795            let event = if let Some(order_data) = &send_event.order {
1796                FuturesOrderEvent {
1797                    order_id: order_data.order_id.clone(),
1798                    cli_ord_id: order_data.cli_ord_id.clone(),
1799                    order_type: order_data.order_type,
1800                    symbol: order_data.symbol.clone(),
1801                    side: order_data.side,
1802                    quantity: order_data.quantity,
1803                    filled: order_data.filled,
1804                    limit_price: order_data.limit_price,
1805                    stop_price: order_data.stop_price,
1806                    timestamp: order_data.timestamp.clone(),
1807                    last_update_timestamp: order_data.last_update_timestamp.clone(),
1808                    reduce_only: order_data.reduce_only,
1809                }
1810            } else if let Some(trigger_data) = &send_event.order_trigger {
1811                FuturesOrderEvent {
1812                    order_id: trigger_data.uid.clone(),
1813                    cli_ord_id: trigger_data.client_id.clone(),
1814                    order_type: trigger_data.order_type,
1815                    symbol: trigger_data.symbol.clone(),
1816                    side: trigger_data.side,
1817                    quantity: trigger_data.quantity,
1818                    filled: 0.0,
1819                    limit_price: trigger_data.limit_price,
1820                    stop_price: Some(trigger_data.trigger_price),
1821                    timestamp: trigger_data.timestamp.clone(),
1822                    last_update_timestamp: trigger_data.last_update_timestamp.clone(),
1823                    reduce_only: trigger_data.reduce_only,
1824                }
1825            } else if let Some(prior_exec) = &send_event.order_prior_execution {
1826                // EXECUTION event - use orderPriorExecution data
1827                FuturesOrderEvent {
1828                    order_id: prior_exec.order_id.clone(),
1829                    cli_ord_id: prior_exec.cli_ord_id.clone(),
1830                    order_type: prior_exec.order_type,
1831                    symbol: prior_exec.symbol.clone(),
1832                    side: prior_exec.side,
1833                    quantity: prior_exec.quantity,
1834                    filled: send_event.amount.unwrap_or(prior_exec.quantity), // Use execution amount
1835                    limit_price: prior_exec.limit_price,
1836                    stop_price: prior_exec.stop_price,
1837                    timestamp: prior_exec.timestamp.clone(),
1838                    last_update_timestamp: prior_exec.last_update_timestamp.clone(),
1839                    reduce_only: prior_exec.reduce_only,
1840                }
1841            } else {
1842                anyhow::bail!("No order, orderTrigger, or orderPriorExecution data in event");
1843            };
1844            return parse_futures_order_event_status_report(
1845                &event,
1846                &instrument,
1847                account_id,
1848                ts_init,
1849            );
1850        }
1851
1852        // Fall back to querying order events
1853        let events_response = self.inner.get_order_events(None, None, None).await?;
1854        let event_wrapper = events_response
1855            .order_events
1856            .iter()
1857            .find(|e| e.order.order_id == venue_order_id)
1858            .ok_or_else(|| {
1859                anyhow::anyhow!("Order not found in open orders or events: {venue_order_id}")
1860            })?;
1861
1862        parse_futures_order_event_status_report(
1863            &event_wrapper.order,
1864            &instrument,
1865            account_id,
1866            ts_init,
1867        )
1868    }
1869
1870    /// Modifies an existing order on the Kraken Futures exchange.
1871    ///
1872    /// Returns the new venue order ID assigned to the modified order.
1873    ///
1874    /// # Errors
1875    ///
1876    /// Returns an error if:
1877    /// - Neither `client_order_id` nor `venue_order_id` is provided.
1878    /// - The instrument is not found in cache.
1879    /// - The request fails.
1880    /// - The edit fails on the exchange.
1881    pub async fn modify_order(
1882        &self,
1883        instrument_id: InstrumentId,
1884        client_order_id: Option<ClientOrderId>,
1885        venue_order_id: Option<VenueOrderId>,
1886        quantity: Option<Quantity>,
1887        price: Option<Price>,
1888        trigger_price: Option<Price>,
1889    ) -> anyhow::Result<VenueOrderId> {
1890        let _ = self
1891            .get_cached_instrument(&instrument_id.symbol.inner())
1892            .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {instrument_id}"))?;
1893
1894        let order_id = venue_order_id.as_ref().map(|id| id.to_string());
1895        let cli_ord_id = client_order_id.as_ref().map(|id| id.to_string());
1896
1897        if order_id.is_none() && cli_ord_id.is_none() {
1898            anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1899        }
1900
1901        let mut builder = KrakenFuturesEditOrderParamsBuilder::default();
1902
1903        if let Some(ref id) = order_id {
1904            builder.order_id(id.clone());
1905        }
1906        if let Some(ref id) = cli_ord_id {
1907            builder.cli_ord_id(id.clone());
1908        }
1909        if let Some(qty) = quantity {
1910            builder.size(qty.to_string());
1911        }
1912        if let Some(p) = price {
1913            builder.limit_price(p.to_string());
1914        }
1915        if let Some(tp) = trigger_price {
1916            builder.stop_price(tp.to_string());
1917        }
1918
1919        let params = builder
1920            .build()
1921            .map_err(|e| anyhow::anyhow!("Failed to build edit order params: {e}"))?;
1922
1923        let response = self.inner.edit_order(&params).await?;
1924
1925        if response.result != KrakenApiResult::Success {
1926            let status = &response.edit_status.status;
1927            anyhow::bail!("Order modification failed: {status}");
1928        }
1929
1930        // Return the new order_id from the response, or fall back to the original
1931        let new_venue_order_id = response
1932            .edit_status
1933            .order_id
1934            .or(order_id)
1935            .ok_or_else(|| anyhow::anyhow!("No order ID in edit order response"))?;
1936
1937        Ok(VenueOrderId::new(&new_venue_order_id))
1938    }
1939
1940    /// Cancels an order on the Kraken Futures exchange.
1941    ///
1942    /// # Errors
1943    ///
1944    /// Returns an error if:
1945    /// - Credentials are missing.
1946    /// - Neither client_order_id nor venue_order_id is provided.
1947    /// - The request fails.
1948    /// - The order cancellation is rejected.
1949    pub async fn cancel_order(
1950        &self,
1951        _account_id: AccountId,
1952        instrument_id: InstrumentId,
1953        client_order_id: Option<ClientOrderId>,
1954        venue_order_id: Option<VenueOrderId>,
1955    ) -> anyhow::Result<()> {
1956        let _ = self
1957            .get_cached_instrument(&instrument_id.symbol.inner())
1958            .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {instrument_id}"))?;
1959
1960        let order_id = venue_order_id.as_ref().map(|id| id.to_string());
1961        let cli_ord_id = client_order_id.as_ref().map(|id| id.to_string());
1962
1963        if order_id.is_none() && cli_ord_id.is_none() {
1964            anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1965        }
1966
1967        let response = self.inner.cancel_order(order_id, cli_ord_id).await?;
1968
1969        if response.result != KrakenApiResult::Success {
1970            let status = &response.cancel_status.status;
1971            anyhow::bail!("Order cancellation failed: {status}");
1972        }
1973
1974        Ok(())
1975    }
1976
1977    /// Cancels multiple orders on the Kraken Futures exchange.
1978    ///
1979    /// Automatically chunks requests into batches of 50 orders.
1980    ///
1981    /// # Parameters
1982    /// - `venue_order_ids` - List of venue order IDs to cancel.
1983    ///
1984    /// # Returns
1985    /// The total number of successfully cancelled orders.
1986    pub async fn cancel_orders_batch(
1987        &self,
1988        venue_order_ids: Vec<VenueOrderId>,
1989    ) -> anyhow::Result<usize> {
1990        if venue_order_ids.is_empty() {
1991            return Ok(0);
1992        }
1993
1994        let mut total_cancelled = 0;
1995
1996        for chunk in venue_order_ids.chunks(BATCH_CANCEL_LIMIT) {
1997            let order_ids: Vec<String> = chunk.iter().map(|id| id.to_string()).collect();
1998            let response = self.inner.cancel_orders_batch(order_ids).await?;
1999
2000            if response.result != KrakenApiResult::Success {
2001                let error_msg = response.error.as_deref().unwrap_or("Unknown error");
2002                anyhow::bail!("Batch cancel failed: {error_msg}");
2003            }
2004
2005            let success_count = response
2006                .batch_status
2007                .iter()
2008                .filter(|s| {
2009                    s.status == Some(KrakenSendStatus::Cancelled)
2010                        || s.cancel_status
2011                            .as_ref()
2012                            .is_some_and(|cs| cs.status == KrakenSendStatus::Cancelled)
2013                })
2014                .count();
2015
2016            total_cancelled += success_count;
2017        }
2018
2019        Ok(total_cancelled)
2020    }
2021}
2022
2023#[cfg(test)]
2024mod tests {
2025    use rstest::rstest;
2026
2027    use super::*;
2028
2029    #[rstest]
2030    fn test_raw_client_creation() {
2031        let client = KrakenFuturesRawHttpClient::default();
2032        assert!(client.credential.is_none());
2033        assert!(client.base_url().contains("futures"));
2034    }
2035
2036    #[rstest]
2037    fn test_raw_client_with_credentials() {
2038        let client = KrakenFuturesRawHttpClient::with_credentials(
2039            "test_key".to_string(),
2040            "test_secret".to_string(),
2041            KrakenEnvironment::Mainnet,
2042            None,
2043            None,
2044            None,
2045            None,
2046            None,
2047            None,
2048            None,
2049        )
2050        .unwrap();
2051        assert!(client.credential.is_some());
2052    }
2053
2054    #[rstest]
2055    fn test_client_creation() {
2056        let client = KrakenFuturesHttpClient::default();
2057        assert!(client.instruments_cache.is_empty());
2058    }
2059
2060    #[rstest]
2061    fn test_client_with_credentials() {
2062        let client = KrakenFuturesHttpClient::with_credentials(
2063            "test_key".to_string(),
2064            "test_secret".to_string(),
2065            KrakenEnvironment::Mainnet,
2066            None,
2067            None,
2068            None,
2069            None,
2070            None,
2071            None,
2072            None,
2073        )
2074        .unwrap();
2075        assert!(client.instruments_cache.is_empty());
2076    }
2077}