1use std::{
19 collections::HashMap,
20 fmt::{Debug, Formatter},
21 num::NonZeroU32,
22 sync::{
23 Arc,
24 atomic::{AtomicBool, Ordering},
25 },
26};
27
28use chrono::{DateTime, Utc};
29use dashmap::DashMap;
30use nautilus_core::{
31 AtomicTime, UUID4, consts::NAUTILUS_USER_AGENT, nanos::UnixNanos,
32 time::get_atomic_clock_realtime,
33};
34use nautilus_model::{
35 data::{Bar, BarType, TradeTick},
36 enums::{AccountType, CurrencyType, OrderSide, OrderType, TimeInForce},
37 events::AccountState,
38 identifiers::{AccountId, ClientOrderId, InstrumentId, VenueOrderId},
39 instruments::{Instrument, InstrumentAny},
40 reports::{FillReport, OrderStatusReport, PositionStatusReport},
41 types::{AccountBalance, Currency, Money, Price, Quantity},
42};
43use nautilus_network::{
44 http::{HttpClient, Method, USER_AGENT},
45 ratelimiter::quota::Quota,
46 retry::{RetryConfig, RetryManager},
47};
48use serde::de::DeserializeOwned;
49use tokio_util::sync::CancellationToken;
50use ustr::Ustr;
51
52use super::{models::*, query::*};
53use crate::{
54 common::{
55 consts::NAUTILUS_KRAKEN_BROKER_ID,
56 credential::KrakenCredential,
57 enums::{
58 KrakenApiResult, KrakenEnvironment, KrakenFuturesOrderType, KrakenOrderSide,
59 KrakenProductType, KrakenSendStatus,
60 },
61 parse::{
62 bar_type_to_futures_resolution, parse_bar, parse_futures_fill_report,
63 parse_futures_instrument, parse_futures_order_event_status_report,
64 parse_futures_order_status_report, parse_futures_position_status_report,
65 parse_futures_public_execution,
66 },
67 urls::get_kraken_http_base_url,
68 },
69 http::{error::KrakenHttpError, models::OhlcData},
70};
71
72pub const KRAKEN_FUTURES_DEFAULT_RATE_LIMIT_PER_SECOND: u32 = 5;
74
75const KRAKEN_GLOBAL_RATE_KEY: &str = "kraken:futures:global";
76
77const BATCH_CANCEL_LIMIT: usize = 50;
79
80pub struct KrakenFuturesRawHttpClient {
85 base_url: String,
86 client: HttpClient,
87 credential: Option<KrakenCredential>,
88 retry_manager: RetryManager<KrakenHttpError>,
89 cancellation_token: CancellationToken,
90 clock: &'static AtomicTime,
91 auth_mutex: tokio::sync::Mutex<()>,
93}
94
95impl Default for KrakenFuturesRawHttpClient {
96 fn default() -> Self {
97 Self::new(
98 KrakenEnvironment::Mainnet,
99 None,
100 Some(60),
101 None,
102 None,
103 None,
104 None,
105 None,
106 )
107 .expect("Failed to create default KrakenFuturesRawHttpClient")
108 }
109}
110
111impl Debug for KrakenFuturesRawHttpClient {
112 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
113 f.debug_struct(stringify!(KrakenFuturesRawHttpClient))
114 .field("base_url", &self.base_url)
115 .field("has_credentials", &self.credential.is_some())
116 .finish()
117 }
118}
119
120impl KrakenFuturesRawHttpClient {
121 #[allow(clippy::too_many_arguments)]
123 pub fn new(
124 environment: KrakenEnvironment,
125 base_url_override: Option<String>,
126 timeout_secs: Option<u64>,
127 max_retries: Option<u32>,
128 retry_delay_ms: Option<u64>,
129 retry_delay_max_ms: Option<u64>,
130 proxy_url: Option<String>,
131 max_requests_per_second: Option<u32>,
132 ) -> anyhow::Result<Self> {
133 let retry_config = RetryConfig {
134 max_retries: max_retries.unwrap_or(3),
135 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
136 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
137 backoff_factor: 2.0,
138 jitter_ms: 1000,
139 operation_timeout_ms: Some(60_000),
140 immediate_first: false,
141 max_elapsed_ms: Some(180_000),
142 };
143
144 let retry_manager = RetryManager::new(retry_config);
145 let base_url = base_url_override.unwrap_or_else(|| {
146 get_kraken_http_base_url(KrakenProductType::Futures, environment).to_string()
147 });
148
149 let rate_limit =
150 max_requests_per_second.unwrap_or(KRAKEN_FUTURES_DEFAULT_RATE_LIMIT_PER_SECOND);
151
152 Ok(Self {
153 base_url,
154 client: HttpClient::new(
155 Self::default_headers(),
156 vec![],
157 Self::rate_limiter_quotas(rate_limit),
158 Some(Self::default_quota(rate_limit)),
159 timeout_secs,
160 proxy_url,
161 )
162 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))?,
163 credential: None,
164 retry_manager,
165 cancellation_token: CancellationToken::new(),
166 clock: get_atomic_clock_realtime(),
167 auth_mutex: tokio::sync::Mutex::new(()),
168 })
169 }
170
171 #[allow(clippy::too_many_arguments)]
173 pub fn with_credentials(
174 api_key: String,
175 api_secret: String,
176 environment: KrakenEnvironment,
177 base_url_override: Option<String>,
178 timeout_secs: Option<u64>,
179 max_retries: Option<u32>,
180 retry_delay_ms: Option<u64>,
181 retry_delay_max_ms: Option<u64>,
182 proxy_url: Option<String>,
183 max_requests_per_second: Option<u32>,
184 ) -> anyhow::Result<Self> {
185 let retry_config = RetryConfig {
186 max_retries: max_retries.unwrap_or(3),
187 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
188 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
189 backoff_factor: 2.0,
190 jitter_ms: 1000,
191 operation_timeout_ms: Some(60_000),
192 immediate_first: false,
193 max_elapsed_ms: Some(180_000),
194 };
195
196 let retry_manager = RetryManager::new(retry_config);
197 let base_url = base_url_override.unwrap_or_else(|| {
198 get_kraken_http_base_url(KrakenProductType::Futures, environment).to_string()
199 });
200
201 let rate_limit =
202 max_requests_per_second.unwrap_or(KRAKEN_FUTURES_DEFAULT_RATE_LIMIT_PER_SECOND);
203
204 Ok(Self {
205 base_url,
206 client: HttpClient::new(
207 Self::default_headers(),
208 vec![],
209 Self::rate_limiter_quotas(rate_limit),
210 Some(Self::default_quota(rate_limit)),
211 timeout_secs,
212 proxy_url,
213 )
214 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))?,
215 credential: Some(KrakenCredential::new(api_key, api_secret)),
216 retry_manager,
217 cancellation_token: CancellationToken::new(),
218 clock: get_atomic_clock_realtime(),
219 auth_mutex: tokio::sync::Mutex::new(()),
220 })
221 }
222
223 fn generate_nonce(&self) -> u64 {
228 self.clock.get_time_ns().as_u64()
229 }
230
231 pub fn base_url(&self) -> &str {
233 &self.base_url
234 }
235
236 pub fn credential(&self) -> Option<&KrakenCredential> {
238 self.credential.as_ref()
239 }
240
241 pub fn cancel_all_requests(&self) {
243 self.cancellation_token.cancel();
244 }
245
246 pub fn cancellation_token(&self) -> &CancellationToken {
248 &self.cancellation_token
249 }
250
251 fn default_headers() -> HashMap<String, String> {
252 HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())])
253 }
254
255 fn default_quota(max_requests_per_second: u32) -> Quota {
256 Quota::per_second(NonZeroU32::new(max_requests_per_second).unwrap_or_else(|| {
257 NonZeroU32::new(KRAKEN_FUTURES_DEFAULT_RATE_LIMIT_PER_SECOND).unwrap()
258 }))
259 }
260
261 fn rate_limiter_quotas(max_requests_per_second: u32) -> Vec<(String, Quota)> {
262 vec![(
263 KRAKEN_GLOBAL_RATE_KEY.to_string(),
264 Self::default_quota(max_requests_per_second),
265 )]
266 }
267
268 fn rate_limit_keys(endpoint: &str) -> Vec<String> {
269 let normalized = endpoint.split('?').next().unwrap_or(endpoint);
270 let route = format!("kraken:futures:{normalized}");
271 vec![KRAKEN_GLOBAL_RATE_KEY.to_string(), route]
272 }
273
274 async fn send_request<T: DeserializeOwned>(
275 &self,
276 method: Method,
277 endpoint: &str,
278 url: String,
279 authenticate: bool,
280 ) -> anyhow::Result<T, KrakenHttpError> {
281 let _guard = if authenticate {
285 Some(self.auth_mutex.lock().await)
286 } else {
287 None
288 };
289
290 let endpoint = endpoint.to_string();
291 let method_clone = method.clone();
292 let url_clone = url.clone();
293 let credential = self.credential.clone();
294
295 let operation = || {
296 let url = url_clone.clone();
297 let method = method_clone.clone();
298 let endpoint = endpoint.clone();
299 let credential = credential.clone();
300
301 async move {
302 let mut headers = Self::default_headers();
303
304 if authenticate {
305 let cred = credential.as_ref().ok_or_else(|| {
306 KrakenHttpError::AuthenticationError(
307 "Missing credentials for authenticated request".to_string(),
308 )
309 })?;
310
311 let nonce = self.generate_nonce();
312
313 let signature = cred.sign_futures(&endpoint, "", nonce).map_err(|e| {
314 KrakenHttpError::AuthenticationError(format!("Failed to sign request: {e}"))
315 })?;
316
317 let base_url = &self.base_url;
318 tracing::debug!(
319 "Kraken Futures auth: endpoint={endpoint}, nonce={nonce}, base_url={base_url}"
320 );
321
322 headers.insert("APIKey".to_string(), cred.api_key().to_string());
323 headers.insert("Authent".to_string(), signature);
324 headers.insert("Nonce".to_string(), nonce.to_string());
325 }
326
327 let rate_limit_keys = Self::rate_limit_keys(&endpoint);
328
329 let response = self
330 .client
331 .request(
332 method,
333 url,
334 None,
335 Some(headers),
336 None,
337 None,
338 Some(rate_limit_keys),
339 )
340 .await
341 .map_err(|e| KrakenHttpError::NetworkError(e.to_string()))?;
342
343 let status = response.status.as_u16();
344 if status >= 400 {
345 let body = String::from_utf8_lossy(&response.body).to_string();
346 if status == 401 || status == 403 {
348 return Err(KrakenHttpError::AuthenticationError(format!(
349 "HTTP error {status}: {body}"
350 )));
351 }
352 return Err(KrakenHttpError::NetworkError(format!(
353 "HTTP error {status}: {body}"
354 )));
355 }
356
357 let response_text = String::from_utf8(response.body.to_vec()).map_err(|e| {
358 KrakenHttpError::ParseError(format!("Failed to parse response as UTF-8: {e}"))
359 })?;
360
361 serde_json::from_str(&response_text).map_err(|e| {
362 KrakenHttpError::ParseError(format!(
363 "Failed to deserialize futures response: {e}"
364 ))
365 })
366 }
367 };
368
369 let should_retry =
370 |error: &KrakenHttpError| -> bool { matches!(error, KrakenHttpError::NetworkError(_)) };
371 let create_error = |msg: String| -> KrakenHttpError { KrakenHttpError::NetworkError(msg) };
372
373 self.retry_manager
374 .execute_with_retry_with_cancel(
375 &endpoint,
376 operation,
377 should_retry,
378 create_error,
379 &self.cancellation_token,
380 )
381 .await
382 }
383
384 async fn send_get_with_query<T: DeserializeOwned>(
389 &self,
390 endpoint: &str,
391 url: String,
392 query_string: &str,
393 ) -> anyhow::Result<T, KrakenHttpError> {
394 let _guard = self.auth_mutex.lock().await;
395
396 if self.cancellation_token.is_cancelled() {
397 return Err(KrakenHttpError::NetworkError(
398 "Request cancelled".to_string(),
399 ));
400 }
401
402 let credential = self.credential.as_ref().ok_or_else(|| {
403 KrakenHttpError::AuthenticationError("Missing credentials".to_string())
404 })?;
405
406 let nonce = self.generate_nonce();
407
408 let signature = credential
410 .sign_futures(endpoint, query_string, nonce)
411 .map_err(|e| {
412 KrakenHttpError::AuthenticationError(format!("Failed to sign request: {e}"))
413 })?;
414
415 tracing::debug!(
416 "Kraken Futures GET with query: endpoint={endpoint}, query={query_string}, nonce={nonce}"
417 );
418
419 let mut headers = Self::default_headers();
420 headers.insert("APIKey".to_string(), credential.api_key().to_string());
421 headers.insert("Authent".to_string(), signature);
422 headers.insert("Nonce".to_string(), nonce.to_string());
423
424 let rate_limit_keys = Self::rate_limit_keys(endpoint);
425
426 let response = self
427 .client
428 .request(
429 Method::GET,
430 url,
431 None,
432 Some(headers),
433 None,
434 None,
435 Some(rate_limit_keys),
436 )
437 .await
438 .map_err(|e| KrakenHttpError::NetworkError(e.to_string()))?;
439
440 let status = response.status.as_u16();
441 if status >= 400 {
442 let body = String::from_utf8_lossy(&response.body).to_string();
443 if status == 401 || status == 403 {
444 return Err(KrakenHttpError::AuthenticationError(format!(
445 "HTTP error {status}: {body}"
446 )));
447 }
448 return Err(KrakenHttpError::NetworkError(format!(
449 "HTTP error {status}: {body}"
450 )));
451 }
452
453 let response_text = String::from_utf8(response.body.to_vec()).map_err(|e| {
454 KrakenHttpError::ParseError(format!("Failed to parse response as UTF-8: {e}"))
455 })?;
456
457 serde_json::from_str(&response_text).map_err(|e| {
458 KrakenHttpError::ParseError(format!("Failed to deserialize futures response: {e}"))
459 })
460 }
461
462 async fn send_request_with_body<T: DeserializeOwned>(
463 &self,
464 endpoint: &str,
465 params: HashMap<String, String>,
466 ) -> anyhow::Result<T, KrakenHttpError> {
467 let post_data = serde_urlencoded::to_string(¶ms)
468 .map_err(|e| KrakenHttpError::ParseError(format!("Failed to encode params: {e}")))?;
469 self.send_authenticated_post(endpoint, post_data).await
470 }
471
472 async fn send_request_with_params<P: serde::Serialize, T: DeserializeOwned>(
474 &self,
475 endpoint: &str,
476 params: &P,
477 ) -> anyhow::Result<T, KrakenHttpError> {
478 let post_data = serde_urlencoded::to_string(params)
479 .map_err(|e| KrakenHttpError::ParseError(format!("Failed to encode params: {e}")))?;
480 self.send_authenticated_post(endpoint, post_data).await
481 }
482
483 async fn send_authenticated_post<T: DeserializeOwned>(
485 &self,
486 endpoint: &str,
487 post_data: String,
488 ) -> anyhow::Result<T, KrakenHttpError> {
489 if self.cancellation_token.is_cancelled() {
490 return Err(KrakenHttpError::NetworkError(
491 "Request cancelled".to_string(),
492 ));
493 }
494
495 let _guard = self.auth_mutex.lock().await;
497
498 if self.cancellation_token.is_cancelled() {
499 return Err(KrakenHttpError::NetworkError(
500 "Request cancelled".to_string(),
501 ));
502 }
503
504 let credential = self.credential.as_ref().ok_or_else(|| {
505 KrakenHttpError::AuthenticationError("Missing credentials".to_string())
506 })?;
507
508 let nonce = self.generate_nonce();
509 tracing::debug!("Generated nonce {nonce} for {endpoint}");
510
511 let signature = credential
512 .sign_futures(endpoint, &post_data, nonce)
513 .map_err(|e| {
514 KrakenHttpError::AuthenticationError(format!("Failed to sign request: {e}"))
515 })?;
516
517 let url = format!("{}{endpoint}", self.base_url);
518 let mut headers = Self::default_headers();
519 headers.insert(
520 "Content-Type".to_string(),
521 "application/x-www-form-urlencoded".to_string(),
522 );
523 headers.insert("APIKey".to_string(), credential.api_key().to_string());
524 headers.insert("Authent".to_string(), signature);
525 headers.insert("Nonce".to_string(), nonce.to_string());
526
527 let rate_limit_keys = Self::rate_limit_keys(endpoint);
528
529 let response = self
530 .client
531 .request(
532 Method::POST,
533 url,
534 None,
535 Some(headers),
536 Some(post_data.into_bytes()),
537 None,
538 Some(rate_limit_keys),
539 )
540 .await
541 .map_err(|e| KrakenHttpError::NetworkError(e.to_string()))?;
542
543 if response.status.as_u16() >= 400 {
544 let status = response.status.as_u16();
545 let body = String::from_utf8_lossy(&response.body).to_string();
546 return Err(KrakenHttpError::NetworkError(format!(
547 "HTTP error {status}: {body}"
548 )));
549 }
550
551 let response_text = String::from_utf8(response.body.to_vec()).map_err(|e| {
552 KrakenHttpError::ParseError(format!("Failed to parse response as UTF-8: {e}"))
553 })?;
554
555 serde_json::from_str(&response_text).map_err(|e| {
556 tracing::error!("Failed to parse response from {endpoint}: {response_text}");
557 KrakenHttpError::ParseError(format!("Failed to deserialize response: {e}"))
558 })
559 }
560
561 pub async fn get_instruments(
563 &self,
564 ) -> anyhow::Result<FuturesInstrumentsResponse, KrakenHttpError> {
565 let endpoint = "/derivatives/api/v3/instruments";
566 let url = format!("{}{endpoint}", self.base_url);
567
568 self.send_request(Method::GET, endpoint, url, false).await
569 }
570
571 pub async fn get_tickers(&self) -> anyhow::Result<FuturesTickersResponse, KrakenHttpError> {
573 let endpoint = "/derivatives/api/v3/tickers";
574 let url = format!("{}{endpoint}", self.base_url);
575
576 self.send_request(Method::GET, endpoint, url, false).await
577 }
578
579 pub async fn get_ohlc(
581 &self,
582 tick_type: &str,
583 symbol: &str,
584 resolution: &str,
585 from: Option<i64>,
586 to: Option<i64>,
587 ) -> anyhow::Result<FuturesCandlesResponse, KrakenHttpError> {
588 let endpoint = format!("/api/charts/v1/{tick_type}/{symbol}/{resolution}");
589
590 let mut url = format!("{}{endpoint}", self.base_url);
591
592 let mut query_params = Vec::new();
593 if let Some(from_ts) = from {
594 query_params.push(format!("from={from_ts}"));
595 }
596 if let Some(to_ts) = to {
597 query_params.push(format!("to={to_ts}"));
598 }
599
600 if !query_params.is_empty() {
601 url.push('?');
602 url.push_str(&query_params.join("&"));
603 }
604
605 self.send_request(Method::GET, &endpoint, url, false).await
606 }
607
608 pub async fn get_public_executions(
610 &self,
611 symbol: &str,
612 since: Option<i64>,
613 before: Option<i64>,
614 sort: Option<&str>,
615 continuation_token: Option<&str>,
616 ) -> anyhow::Result<FuturesPublicExecutionsResponse, KrakenHttpError> {
617 let endpoint = format!("/api/history/v3/market/{symbol}/executions");
618
619 let mut url = format!("{}{endpoint}", self.base_url);
620
621 let mut query_params = Vec::new();
622 if let Some(since_ts) = since {
623 query_params.push(format!("since={since_ts}"));
624 }
625 if let Some(before_ts) = before {
626 query_params.push(format!("before={before_ts}"));
627 }
628 if let Some(sort_order) = sort {
629 query_params.push(format!("sort={sort_order}"));
630 }
631 if let Some(token) = continuation_token {
632 query_params.push(format!("continuationToken={token}"));
633 }
634
635 if !query_params.is_empty() {
636 url.push('?');
637 url.push_str(&query_params.join("&"));
638 }
639
640 self.send_request(Method::GET, &endpoint, url, false).await
641 }
642
643 pub async fn get_open_orders(
645 &self,
646 ) -> anyhow::Result<FuturesOpenOrdersResponse, KrakenHttpError> {
647 if self.credential.is_none() {
648 return Err(KrakenHttpError::AuthenticationError(
649 "API credentials required for futures open orders".to_string(),
650 ));
651 }
652
653 let endpoint = "/derivatives/api/v3/openorders";
654 let url = format!("{}{endpoint}", self.base_url);
655
656 self.send_request(Method::GET, endpoint, url, true).await
657 }
658
659 pub async fn get_order_events(
661 &self,
662 before: Option<i64>,
663 since: Option<i64>,
664 continuation_token: Option<&str>,
665 ) -> anyhow::Result<FuturesOrderEventsResponse, KrakenHttpError> {
666 if self.credential.is_none() {
667 return Err(KrakenHttpError::AuthenticationError(
668 "API credentials required for futures order events".to_string(),
669 ));
670 }
671
672 let endpoint = "/api/history/v2/orders";
673 let mut query_params = Vec::new();
674
675 if let Some(before_ts) = before {
676 query_params.push(format!("before={before_ts}"));
677 }
678 if let Some(since_ts) = since {
679 query_params.push(format!("since={since_ts}"));
680 }
681 if let Some(token) = continuation_token {
682 query_params.push(format!("continuation_token={token}"));
683 }
684
685 let query_string = query_params.join("&");
687 let url = if query_string.is_empty() {
688 format!("{}{endpoint}", self.base_url)
689 } else {
690 format!("{}{endpoint}?{query_string}", self.base_url)
691 };
692
693 self.send_get_with_query(endpoint, url, &query_string).await
696 }
697
698 pub async fn get_fills(
700 &self,
701 last_fill_time: Option<&str>,
702 ) -> anyhow::Result<FuturesFillsResponse, KrakenHttpError> {
703 if self.credential.is_none() {
704 return Err(KrakenHttpError::AuthenticationError(
705 "API credentials required for futures fills".to_string(),
706 ));
707 }
708
709 let endpoint = "/derivatives/api/v3/fills";
710 let query_string = last_fill_time
711 .map(|t| format!("lastFillTime={t}"))
712 .unwrap_or_default();
713
714 let url = if query_string.is_empty() {
715 format!("{}{endpoint}", self.base_url)
716 } else {
717 format!("{}{endpoint}?{query_string}", self.base_url)
718 };
719
720 self.send_get_with_query(endpoint, url, &query_string).await
722 }
723
724 pub async fn get_open_positions(
726 &self,
727 ) -> anyhow::Result<FuturesOpenPositionsResponse, KrakenHttpError> {
728 if self.credential.is_none() {
729 return Err(KrakenHttpError::AuthenticationError(
730 "API credentials required for futures open positions".to_string(),
731 ));
732 }
733
734 let endpoint = "/derivatives/api/v3/openpositions";
735 let url = format!("{}{endpoint}", self.base_url);
736
737 self.send_request(Method::GET, endpoint, url, true).await
738 }
739
740 pub async fn get_accounts(&self) -> anyhow::Result<FuturesAccountsResponse, KrakenHttpError> {
742 if self.credential.is_none() {
743 return Err(KrakenHttpError::AuthenticationError(
744 "API credentials required for futures accounts".to_string(),
745 ));
746 }
747
748 let endpoint = "/derivatives/api/v3/accounts";
749 let url = format!("{}{endpoint}", self.base_url);
750
751 self.send_request(Method::GET, endpoint, url, true).await
752 }
753
754 pub async fn send_order(
756 &self,
757 params: HashMap<String, String>,
758 ) -> anyhow::Result<FuturesSendOrderResponse, KrakenHttpError> {
759 if self.credential.is_none() {
760 return Err(KrakenHttpError::AuthenticationError(
761 "API credentials required for sending orders".to_string(),
762 ));
763 }
764
765 let endpoint = "/derivatives/api/v3/sendorder";
766 self.send_request_with_body(endpoint, params).await
767 }
768
769 pub async fn send_order_params(
771 &self,
772 params: &KrakenFuturesSendOrderParams,
773 ) -> anyhow::Result<FuturesSendOrderResponse, KrakenHttpError> {
774 if self.credential.is_none() {
775 return Err(KrakenHttpError::AuthenticationError(
776 "API credentials required for sending orders".to_string(),
777 ));
778 }
779
780 let endpoint = "/derivatives/api/v3/sendorder";
781 self.send_request_with_params(endpoint, params).await
782 }
783
784 pub async fn cancel_order(
786 &self,
787 order_id: Option<String>,
788 cli_ord_id: Option<String>,
789 ) -> anyhow::Result<FuturesCancelOrderResponse, KrakenHttpError> {
790 if self.credential.is_none() {
791 return Err(KrakenHttpError::AuthenticationError(
792 "API credentials required for canceling orders".to_string(),
793 ));
794 }
795
796 let mut params = HashMap::new();
797 if let Some(id) = order_id {
798 params.insert("order_id".to_string(), id);
799 }
800 if let Some(id) = cli_ord_id {
801 params.insert("cliOrdId".to_string(), id);
802 }
803
804 let endpoint = "/derivatives/api/v3/cancelorder";
805 self.send_request_with_body(endpoint, params).await
806 }
807
808 pub async fn edit_order(
810 &self,
811 params: &KrakenFuturesEditOrderParams,
812 ) -> anyhow::Result<FuturesEditOrderResponse, KrakenHttpError> {
813 if self.credential.is_none() {
814 return Err(KrakenHttpError::AuthenticationError(
815 "API credentials required for editing orders".to_string(),
816 ));
817 }
818
819 let endpoint = "/derivatives/api/v3/editorder";
820 self.send_request_with_params(endpoint, params).await
821 }
822
823 pub async fn batch_order(
825 &self,
826 params: HashMap<String, String>,
827 ) -> anyhow::Result<FuturesBatchOrderResponse, KrakenHttpError> {
828 if self.credential.is_none() {
829 return Err(KrakenHttpError::AuthenticationError(
830 "API credentials required for batch orders".to_string(),
831 ));
832 }
833
834 let endpoint = "/derivatives/api/v3/batchorder";
835 self.send_request_with_body(endpoint, params).await
836 }
837
838 pub async fn cancel_orders_batch(
840 &self,
841 order_ids: Vec<String>,
842 ) -> anyhow::Result<FuturesBatchCancelResponse, KrakenHttpError> {
843 if self.credential.is_none() {
844 return Err(KrakenHttpError::AuthenticationError(
845 "API credentials required for batch orders".to_string(),
846 ));
847 }
848
849 let batch_items: Vec<KrakenFuturesBatchCancelItem> = order_ids
850 .into_iter()
851 .map(KrakenFuturesBatchCancelItem::from_order_id)
852 .collect();
853
854 let params = KrakenFuturesBatchOrderParams::new(batch_items);
855 let post_data = params
856 .to_body()
857 .map_err(|e| KrakenHttpError::ParseError(format!("Failed to serialize batch: {e}")))?;
858
859 let endpoint = "/derivatives/api/v3/batchorder";
860 self.send_authenticated_post(endpoint, post_data).await
861 }
862
863 pub async fn cancel_all_orders(
865 &self,
866 symbol: Option<String>,
867 ) -> anyhow::Result<FuturesCancelAllOrdersResponse, KrakenHttpError> {
868 if self.credential.is_none() {
869 return Err(KrakenHttpError::AuthenticationError(
870 "API credentials required for canceling orders".to_string(),
871 ));
872 }
873
874 let mut params = HashMap::new();
875 if let Some(sym) = symbol {
876 params.insert("symbol".to_string(), sym);
877 }
878
879 let endpoint = "/derivatives/api/v3/cancelallorders";
880 self.send_request_with_body(endpoint, params).await
881 }
882}
883
884#[cfg_attr(
894 feature = "python",
895 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
896)]
897pub struct KrakenFuturesHttpClient {
898 pub(crate) inner: Arc<KrakenFuturesRawHttpClient>,
899 pub(crate) instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
900 cache_initialized: Arc<AtomicBool>,
901}
902
903impl Clone for KrakenFuturesHttpClient {
904 fn clone(&self) -> Self {
905 Self {
906 inner: self.inner.clone(),
907 instruments_cache: self.instruments_cache.clone(),
908 cache_initialized: self.cache_initialized.clone(),
909 }
910 }
911}
912
913impl Default for KrakenFuturesHttpClient {
914 fn default() -> Self {
915 Self::new(
916 KrakenEnvironment::Mainnet,
917 None,
918 Some(60),
919 None,
920 None,
921 None,
922 None,
923 None,
924 )
925 .expect("Failed to create default KrakenFuturesHttpClient")
926 }
927}
928
929impl Debug for KrakenFuturesHttpClient {
930 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
931 f.debug_struct(stringify!(KrakenFuturesHttpClient))
932 .field("inner", &self.inner)
933 .finish()
934 }
935}
936
937impl KrakenFuturesHttpClient {
938 #[allow(clippy::too_many_arguments)]
940 pub fn new(
941 environment: KrakenEnvironment,
942 base_url_override: Option<String>,
943 timeout_secs: Option<u64>,
944 max_retries: Option<u32>,
945 retry_delay_ms: Option<u64>,
946 retry_delay_max_ms: Option<u64>,
947 proxy_url: Option<String>,
948 max_requests_per_second: Option<u32>,
949 ) -> anyhow::Result<Self> {
950 Ok(Self {
951 inner: Arc::new(KrakenFuturesRawHttpClient::new(
952 environment,
953 base_url_override,
954 timeout_secs,
955 max_retries,
956 retry_delay_ms,
957 retry_delay_max_ms,
958 proxy_url,
959 max_requests_per_second,
960 )?),
961 instruments_cache: Arc::new(DashMap::new()),
962 cache_initialized: Arc::new(AtomicBool::new(false)),
963 })
964 }
965
966 #[allow(clippy::too_many_arguments)]
968 pub fn with_credentials(
969 api_key: String,
970 api_secret: String,
971 environment: KrakenEnvironment,
972 base_url_override: Option<String>,
973 timeout_secs: Option<u64>,
974 max_retries: Option<u32>,
975 retry_delay_ms: Option<u64>,
976 retry_delay_max_ms: Option<u64>,
977 proxy_url: Option<String>,
978 max_requests_per_second: Option<u32>,
979 ) -> anyhow::Result<Self> {
980 Ok(Self {
981 inner: Arc::new(KrakenFuturesRawHttpClient::with_credentials(
982 api_key,
983 api_secret,
984 environment,
985 base_url_override,
986 timeout_secs,
987 max_retries,
988 retry_delay_ms,
989 retry_delay_max_ms,
990 proxy_url,
991 max_requests_per_second,
992 )?),
993 instruments_cache: Arc::new(DashMap::new()),
994 cache_initialized: Arc::new(AtomicBool::new(false)),
995 })
996 }
997
998 #[allow(clippy::too_many_arguments)]
1005 pub fn from_env(
1006 environment: KrakenEnvironment,
1007 base_url_override: Option<String>,
1008 timeout_secs: Option<u64>,
1009 max_retries: Option<u32>,
1010 retry_delay_ms: Option<u64>,
1011 retry_delay_max_ms: Option<u64>,
1012 proxy_url: Option<String>,
1013 max_requests_per_second: Option<u32>,
1014 ) -> anyhow::Result<Self> {
1015 let demo = environment == KrakenEnvironment::Demo;
1016
1017 if let Some(credential) = KrakenCredential::from_env_futures(demo) {
1018 let (api_key, api_secret) = credential.into_parts();
1019 Self::with_credentials(
1020 api_key,
1021 api_secret,
1022 environment,
1023 base_url_override,
1024 timeout_secs,
1025 max_retries,
1026 retry_delay_ms,
1027 retry_delay_max_ms,
1028 proxy_url,
1029 max_requests_per_second,
1030 )
1031 } else {
1032 Self::new(
1033 environment,
1034 base_url_override,
1035 timeout_secs,
1036 max_retries,
1037 retry_delay_ms,
1038 retry_delay_max_ms,
1039 proxy_url,
1040 max_requests_per_second,
1041 )
1042 }
1043 }
1044
1045 pub fn cancel_all_requests(&self) {
1047 self.inner.cancel_all_requests();
1048 }
1049
1050 pub fn cancellation_token(&self) -> &CancellationToken {
1052 self.inner.cancellation_token()
1053 }
1054
1055 pub fn cache_instrument(&self, instrument: InstrumentAny) {
1057 self.instruments_cache
1058 .insert(instrument.symbol().inner(), instrument);
1059 self.cache_initialized.store(true, Ordering::Release);
1060 }
1061
1062 pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
1064 for instrument in instruments {
1065 self.instruments_cache
1066 .insert(instrument.symbol().inner(), instrument);
1067 }
1068 self.cache_initialized.store(true, Ordering::Release);
1069 }
1070
1071 pub fn get_cached_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
1073 self.instruments_cache
1074 .get(symbol)
1075 .map(|entry| entry.value().clone())
1076 }
1077
1078 fn get_instrument_by_raw_symbol(&self, raw_symbol: &str) -> Option<InstrumentAny> {
1079 self.instruments_cache
1080 .iter()
1081 .find(|entry| entry.value().raw_symbol().as_str() == raw_symbol)
1082 .map(|entry| entry.value().clone())
1083 }
1084
1085 fn generate_ts_init(&self) -> UnixNanos {
1086 get_atomic_clock_realtime().get_time_ns()
1087 }
1088
1089 pub async fn request_instruments(&self) -> anyhow::Result<Vec<InstrumentAny>, KrakenHttpError> {
1091 let ts_init = self.generate_ts_init();
1092 let response = self.inner.get_instruments().await?;
1093
1094 let instruments: Vec<InstrumentAny> = response
1095 .instruments
1096 .iter()
1097 .filter_map(|fut_instrument| {
1098 match parse_futures_instrument(fut_instrument, ts_init, ts_init) {
1099 Ok(instrument) => Some(instrument),
1100 Err(e) => {
1101 let symbol = &fut_instrument.symbol;
1102 tracing::warn!("Failed to parse futures instrument {symbol}: {e}");
1103 None
1104 }
1105 }
1106 })
1107 .collect();
1108
1109 Ok(instruments)
1110 }
1111
1112 pub async fn request_mark_price(
1114 &self,
1115 instrument_id: InstrumentId,
1116 ) -> anyhow::Result<f64, KrakenHttpError> {
1117 let instrument = self
1118 .get_cached_instrument(&instrument_id.symbol.inner())
1119 .ok_or_else(|| {
1120 KrakenHttpError::ParseError(format!(
1121 "Instrument not found in cache: {instrument_id}"
1122 ))
1123 })?;
1124
1125 let raw_symbol = instrument.raw_symbol().to_string();
1126 let tickers = self.inner.get_tickers().await?;
1127
1128 tickers
1129 .tickers
1130 .iter()
1131 .find(|t| t.symbol == raw_symbol)
1132 .ok_or_else(|| {
1133 KrakenHttpError::ParseError(format!("Symbol {raw_symbol} not found in tickers"))
1134 })
1135 .and_then(|t| {
1136 t.mark_price.ok_or_else(|| {
1137 KrakenHttpError::ParseError(format!(
1138 "Mark price not available for {raw_symbol} (may not be available in testnet)"
1139 ))
1140 })
1141 })
1142 }
1143
1144 pub async fn request_index_price(
1145 &self,
1146 instrument_id: InstrumentId,
1147 ) -> anyhow::Result<f64, KrakenHttpError> {
1148 let instrument = self
1149 .get_cached_instrument(&instrument_id.symbol.inner())
1150 .ok_or_else(|| {
1151 KrakenHttpError::ParseError(format!(
1152 "Instrument not found in cache: {instrument_id}"
1153 ))
1154 })?;
1155
1156 let raw_symbol = instrument.raw_symbol().to_string();
1157 let tickers = self.inner.get_tickers().await?;
1158
1159 tickers
1160 .tickers
1161 .iter()
1162 .find(|t| t.symbol == raw_symbol)
1163 .ok_or_else(|| {
1164 KrakenHttpError::ParseError(format!("Symbol {raw_symbol} not found in tickers"))
1165 })
1166 .and_then(|t| {
1167 t.index_price.ok_or_else(|| {
1168 KrakenHttpError::ParseError(format!(
1169 "Index price not available for {raw_symbol} (may not be available in testnet)"
1170 ))
1171 })
1172 })
1173 }
1174
1175 pub async fn request_trades(
1176 &self,
1177 instrument_id: InstrumentId,
1178 start: Option<DateTime<Utc>>,
1179 end: Option<DateTime<Utc>>,
1180 limit: Option<u64>,
1181 ) -> anyhow::Result<Vec<TradeTick>, KrakenHttpError> {
1182 let instrument = self
1183 .get_cached_instrument(&instrument_id.symbol.inner())
1184 .ok_or_else(|| {
1185 KrakenHttpError::ParseError(format!(
1186 "Instrument not found in cache: {instrument_id}"
1187 ))
1188 })?;
1189
1190 let raw_symbol = instrument.raw_symbol().to_string();
1191 let ts_init = self.generate_ts_init();
1192
1193 let since = start.map(|dt| dt.timestamp_millis());
1194 let before = end.map(|dt| dt.timestamp_millis());
1195
1196 let response = self
1197 .inner
1198 .get_public_executions(&raw_symbol, since, before, Some("asc"), None)
1199 .await?;
1200
1201 let mut trades = Vec::new();
1202
1203 for element in &response.elements {
1204 let execution = &element.event.execution.execution;
1205 match parse_futures_public_execution(execution, &instrument, ts_init) {
1206 Ok(trade_tick) => {
1207 trades.push(trade_tick);
1208
1209 if let Some(limit_count) = limit
1210 && trades.len() >= limit_count as usize
1211 {
1212 return Ok(trades);
1213 }
1214 }
1215 Err(e) => {
1216 tracing::warn!("Failed to parse futures trade tick: {e}");
1217 }
1218 }
1219 }
1220
1221 Ok(trades)
1222 }
1223
1224 pub async fn request_bars(
1225 &self,
1226 bar_type: BarType,
1227 start: Option<DateTime<Utc>>,
1228 end: Option<DateTime<Utc>>,
1229 limit: Option<u64>,
1230 ) -> anyhow::Result<Vec<Bar>, KrakenHttpError> {
1231 let instrument_id = bar_type.instrument_id();
1232 let instrument = self
1233 .get_cached_instrument(&instrument_id.symbol.inner())
1234 .ok_or_else(|| {
1235 KrakenHttpError::ParseError(format!(
1236 "Instrument not found in cache: {instrument_id}"
1237 ))
1238 })?;
1239
1240 let raw_symbol = instrument.raw_symbol().to_string();
1241 let ts_init = self.generate_ts_init();
1242 let tick_type = "trade";
1243 let resolution = bar_type_to_futures_resolution(bar_type)
1244 .map_err(|e| KrakenHttpError::ParseError(e.to_string()))?;
1245
1246 let from = start.map(|dt| dt.timestamp());
1248 let to = end.map(|dt| dt.timestamp());
1249 let end_ns = end.map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64);
1250
1251 let response = self
1252 .inner
1253 .get_ohlc(tick_type, &raw_symbol, resolution, from, to)
1254 .await?;
1255
1256 let mut bars = Vec::new();
1257 for candle in response.candles {
1258 let ohlc = OhlcData {
1259 time: candle.time / 1000,
1260 open: candle.open,
1261 high: candle.high,
1262 low: candle.low,
1263 close: candle.close,
1264 vwap: "0".to_string(),
1265 volume: candle.volume,
1266 count: 0,
1267 };
1268
1269 match parse_bar(&ohlc, &instrument, bar_type, ts_init) {
1270 Ok(bar) => {
1271 if let Some(end_nanos) = end_ns
1272 && bar.ts_event.as_u64() > end_nanos
1273 {
1274 continue;
1275 }
1276 bars.push(bar);
1277
1278 if let Some(limit_count) = limit
1279 && bars.len() >= limit_count as usize
1280 {
1281 return Ok(bars);
1282 }
1283 }
1284 Err(e) => {
1285 tracing::warn!("Failed to parse futures bar: {e}");
1286 }
1287 }
1288 }
1289
1290 Ok(bars)
1291 }
1292
1293 pub async fn request_account_state(
1305 &self,
1306 account_id: AccountId,
1307 ) -> anyhow::Result<AccountState> {
1308 let accounts_response = self.inner.get_accounts().await?;
1309
1310 if accounts_response.result != KrakenApiResult::Success {
1311 let error_msg = accounts_response
1312 .error
1313 .unwrap_or_else(|| "Unknown error".to_string());
1314 anyhow::bail!("Failed to get futures accounts: {error_msg}");
1315 }
1316
1317 let ts_init = self.generate_ts_init();
1318
1319 let mut balances: Vec<AccountBalance> = Vec::new();
1320
1321 for account in accounts_response.accounts.values() {
1322 match account.account_type.as_str() {
1323 "multiCollateralMarginAccount" => {
1324 for (currency_code, currency_info) in &account.currencies {
1325 if currency_info.quantity == 0.0 {
1326 continue;
1327 }
1328
1329 let currency = Currency::new(
1330 currency_code.as_str(),
1331 8,
1332 0,
1333 currency_code.as_str(),
1334 CurrencyType::Crypto,
1335 );
1336
1337 let total_amount = currency_info.quantity;
1338 let total = Money::new(total_amount, currency);
1339
1340 let available_amount = currency_info
1342 .available
1343 .unwrap_or(total_amount)
1344 .min(total_amount);
1345 let locked_amount = (total_amount - available_amount).max(0.0);
1346 let locked = Money::new(locked_amount, currency);
1347 let free = total - locked;
1349
1350 balances.push(AccountBalance::new(total, locked, free));
1351 }
1352
1353 if let Some(portfolio_value) = account.portfolio_value
1357 && portfolio_value > 0.0
1358 {
1359 let usd_currency = Currency::USD();
1360 let total_usd = Money::new(portfolio_value, usd_currency);
1361 let available_usd = account
1362 .available_margin
1363 .unwrap_or(portfolio_value)
1364 .min(portfolio_value);
1365 let locked_usd =
1367 Money::new((portfolio_value - available_usd).max(0.0), usd_currency);
1368 let free_usd = total_usd - locked_usd;
1369
1370 balances.push(AccountBalance::new(total_usd, locked_usd, free_usd));
1371 }
1372 }
1373 "marginAccount" => {
1374 for (currency_code, &amount) in &account.balances {
1375 if amount == 0.0 {
1376 continue;
1377 }
1378
1379 let currency = Currency::new(
1380 currency_code.as_str(),
1381 8,
1382 0,
1383 currency_code.as_str(),
1384 CurrencyType::Crypto,
1385 );
1386
1387 let total = Money::new(amount, currency);
1388
1389 let available = account
1391 .auxiliary
1392 .as_ref()
1393 .and_then(|aux| aux.af)
1394 .unwrap_or(amount)
1395 .min(amount);
1396 let locked = amount - available;
1397
1398 balances.push(AccountBalance::new(
1399 total,
1400 Money::new(locked, currency),
1401 Money::new(available, currency),
1402 ));
1403 }
1404 }
1405 "cashAccount" => {
1406 for (currency_code, &amount) in &account.balances {
1407 if amount == 0.0 {
1408 continue;
1409 }
1410
1411 let currency = Currency::new(
1412 currency_code.as_str(),
1413 8,
1414 0,
1415 currency_code.as_str(),
1416 CurrencyType::Crypto,
1417 );
1418
1419 let total = Money::new(amount, currency);
1420 let locked = Money::new(0.0, currency);
1421
1422 balances.push(AccountBalance::new(total, locked, total));
1423 }
1424 }
1425 _ => {
1426 let account_type = &account.account_type;
1427 tracing::debug!("Unknown account type: {account_type}");
1428 }
1429 }
1430 }
1431
1432 Ok(AccountState::new(
1433 account_id,
1434 AccountType::Margin,
1435 balances,
1436 vec![],
1437 true,
1438 UUID4::new(),
1439 ts_init,
1440 ts_init,
1441 None,
1442 ))
1443 }
1444
1445 pub async fn request_order_status_reports(
1446 &self,
1447 account_id: AccountId,
1448 instrument_id: Option<InstrumentId>,
1449 start: Option<DateTime<Utc>>,
1450 end: Option<DateTime<Utc>>,
1451 open_only: bool,
1452 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1453 let ts_init = self.generate_ts_init();
1454 let mut all_reports = Vec::new();
1455
1456 let response = self
1457 .inner
1458 .get_open_orders()
1459 .await
1460 .map_err(|e| anyhow::anyhow!("get_open_orders failed: {e}"))?;
1461 if response.result != KrakenApiResult::Success {
1462 let error_msg = response
1463 .error
1464 .unwrap_or_else(|| "Unknown error".to_string());
1465 anyhow::bail!("Failed to get open orders: {error_msg}");
1466 }
1467
1468 for order in &response.open_orders {
1469 if let Some(ref target_id) = instrument_id {
1470 let instrument = self.get_cached_instrument(&target_id.symbol.inner());
1471 if let Some(inst) = instrument
1472 && inst.raw_symbol().as_str() != order.symbol
1473 {
1474 continue;
1475 }
1476 }
1477
1478 if let Some(instrument) = self.get_instrument_by_raw_symbol(&order.symbol) {
1479 match parse_futures_order_status_report(order, &instrument, account_id, ts_init) {
1480 Ok(report) => all_reports.push(report),
1481 Err(e) => {
1482 let order_id = &order.order_id;
1483 tracing::warn!("Failed to parse futures order {order_id}: {e}");
1484 }
1485 }
1486 }
1487 }
1488
1489 if !open_only {
1490 let start_ms = start.map(|dt| dt.timestamp_millis());
1492 let end_ms = end.map(|dt| dt.timestamp_millis());
1493 let response = self
1494 .inner
1495 .get_order_events(end_ms, start_ms, None)
1496 .await
1497 .map_err(|e| anyhow::anyhow!("get_order_events failed: {e}"))?;
1498
1499 for event_wrapper in response.order_events {
1500 let event = &event_wrapper.order;
1501 if let Some(ref target_id) = instrument_id {
1502 let instrument = self.get_cached_instrument(&target_id.symbol.inner());
1503 if let Some(inst) = instrument
1504 && inst.raw_symbol().as_str() != event.symbol
1505 {
1506 continue;
1507 }
1508 }
1509
1510 if let Some(instrument) = self.get_instrument_by_raw_symbol(&event.symbol) {
1511 match parse_futures_order_event_status_report(
1512 event,
1513 &instrument,
1514 account_id,
1515 ts_init,
1516 ) {
1517 Ok(report) => all_reports.push(report),
1518 Err(e) => {
1519 let order_id = &event.order_id;
1520 tracing::warn!("Failed to parse futures order event {order_id}: {e}");
1521 }
1522 }
1523 }
1524 }
1525 }
1526
1527 Ok(all_reports)
1528 }
1529
1530 pub async fn request_fill_reports(
1531 &self,
1532 account_id: AccountId,
1533 instrument_id: Option<InstrumentId>,
1534 start: Option<DateTime<Utc>>,
1535 end: Option<DateTime<Utc>>,
1536 ) -> anyhow::Result<Vec<FillReport>> {
1537 let ts_init = self.generate_ts_init();
1538 let mut all_reports = Vec::new();
1539
1540 let response = self.inner.get_fills(None).await?;
1541 if response.result != KrakenApiResult::Success {
1542 let error_msg = response
1543 .error
1544 .unwrap_or_else(|| "Unknown error".to_string());
1545 anyhow::bail!("Failed to get fills: {error_msg}");
1546 }
1547
1548 let start_ms = start.map(|dt| dt.timestamp_millis());
1549 let end_ms = end.map(|dt| dt.timestamp_millis());
1550
1551 for fill in response.fills {
1552 if let Some(start_threshold) = start_ms
1553 && let Ok(fill_ts) = DateTime::parse_from_rfc3339(&fill.fill_time)
1554 {
1555 let fill_ms = fill_ts.timestamp_millis();
1556 if fill_ms < start_threshold {
1557 continue;
1558 }
1559 }
1560 if let Some(end_threshold) = end_ms
1561 && let Ok(fill_ts) = DateTime::parse_from_rfc3339(&fill.fill_time)
1562 {
1563 let fill_ms = fill_ts.timestamp_millis();
1564 if fill_ms > end_threshold {
1565 continue;
1566 }
1567 }
1568
1569 if let Some(ref target_id) = instrument_id {
1570 let instrument = self.get_cached_instrument(&target_id.symbol.inner());
1571 if let Some(inst) = instrument
1572 && inst.raw_symbol().as_str() != fill.symbol
1573 {
1574 continue;
1575 }
1576 }
1577
1578 if let Some(instrument) = self.get_instrument_by_raw_symbol(&fill.symbol) {
1579 match parse_futures_fill_report(&fill, &instrument, account_id, ts_init) {
1580 Ok(report) => all_reports.push(report),
1581 Err(e) => {
1582 let fill_id = &fill.fill_id;
1583 tracing::warn!("Failed to parse futures fill {fill_id}: {e}");
1584 }
1585 }
1586 }
1587 }
1588
1589 Ok(all_reports)
1590 }
1591
1592 pub async fn request_position_status_reports(
1593 &self,
1594 account_id: AccountId,
1595 instrument_id: Option<InstrumentId>,
1596 ) -> anyhow::Result<Vec<PositionStatusReport>> {
1597 let ts_init = self.generate_ts_init();
1598 let mut all_reports = Vec::new();
1599
1600 let response = self.inner.get_open_positions().await?;
1601 if response.result != KrakenApiResult::Success {
1602 let error_msg = response
1603 .error
1604 .unwrap_or_else(|| "Unknown error".to_string());
1605 anyhow::bail!("Failed to get open positions: {error_msg}");
1606 }
1607
1608 for position in response.open_positions {
1609 if let Some(ref target_id) = instrument_id {
1610 let instrument = self.get_cached_instrument(&target_id.symbol.inner());
1611 if let Some(inst) = instrument
1612 && inst.raw_symbol().as_str() != position.symbol
1613 {
1614 continue;
1615 }
1616 }
1617
1618 if let Some(instrument) = self.get_instrument_by_raw_symbol(&position.symbol) {
1619 match parse_futures_position_status_report(
1620 &position,
1621 &instrument,
1622 account_id,
1623 ts_init,
1624 ) {
1625 Ok(report) => all_reports.push(report),
1626 Err(e) => {
1627 let symbol = &position.symbol;
1628 tracing::warn!("Failed to parse futures position {symbol}: {e}");
1629 }
1630 }
1631 }
1632 }
1633
1634 Ok(all_reports)
1635 }
1636
1637 #[allow(clippy::too_many_arguments)]
1648 pub async fn submit_order(
1649 &self,
1650 account_id: AccountId,
1651 instrument_id: InstrumentId,
1652 client_order_id: ClientOrderId,
1653 order_side: OrderSide,
1654 order_type: OrderType,
1655 quantity: Quantity,
1656 time_in_force: TimeInForce,
1657 price: Option<Price>,
1658 trigger_price: Option<Price>,
1659 reduce_only: bool,
1660 post_only: bool,
1661 ) -> anyhow::Result<OrderStatusReport> {
1662 let instrument = self
1663 .get_cached_instrument(&instrument_id.symbol.inner())
1664 .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {instrument_id}"))?;
1665
1666 let raw_symbol = instrument.raw_symbol().inner();
1667
1668 let kraken_order_type = match order_type {
1675 OrderType::Market => KrakenFuturesOrderType::Market,
1676 OrderType::Limit => {
1677 if post_only {
1678 KrakenFuturesOrderType::Post
1679 } else {
1680 match time_in_force {
1681 TimeInForce::Ioc => KrakenFuturesOrderType::Ioc,
1682 TimeInForce::Fok => {
1683 anyhow::bail!("FOK not supported by Kraken Futures, use IOC instead")
1684 }
1685 TimeInForce::Gtd => {
1686 anyhow::bail!("GTD not supported by Kraken Futures, use GTC instead")
1687 }
1688 _ => KrakenFuturesOrderType::Limit, }
1690 }
1691 }
1692 OrderType::StopMarket | OrderType::StopLimit => KrakenFuturesOrderType::Stop,
1693 OrderType::MarketIfTouched => KrakenFuturesOrderType::TakeProfit,
1694 _ => anyhow::bail!("Unsupported order type: {order_type:?}"),
1695 };
1696
1697 let mut builder = KrakenFuturesSendOrderParamsBuilder::default();
1698 builder
1699 .cli_ord_id(client_order_id.to_string())
1700 .broker(NAUTILUS_KRAKEN_BROKER_ID)
1701 .symbol(raw_symbol)
1702 .side(KrakenOrderSide::from(order_side))
1703 .size(quantity.to_string())
1704 .order_type(kraken_order_type);
1705
1706 match order_type {
1708 OrderType::StopMarket => {
1709 if let Some(trigger) = trigger_price {
1711 builder.stop_price(trigger.to_string());
1712 }
1713 }
1714 OrderType::StopLimit => {
1715 if let Some(trigger) = trigger_price {
1717 builder.stop_price(trigger.to_string());
1718 }
1719 if let Some(limit) = price {
1720 builder.limit_price(limit.to_string());
1721 }
1722 }
1723 OrderType::MarketIfTouched => {
1724 if let Some(trigger) = trigger_price {
1726 builder.stop_price(trigger.to_string());
1727 }
1728 if let Some(limit) = price {
1729 builder.limit_price(limit.to_string());
1730 }
1731 }
1732 _ => {
1733 if let Some(limit) = price {
1735 builder.limit_price(limit.to_string());
1736 }
1737 }
1738 }
1739
1740 if reduce_only {
1741 builder.reduce_only(true);
1742 }
1743
1744 let params = builder
1745 .build()
1746 .map_err(|e| anyhow::anyhow!("Failed to build order params: {e}"))?;
1747
1748 let response = self.inner.send_order_params(¶ms).await?;
1749
1750 if response.result != KrakenApiResult::Success {
1751 let error_msg = response
1752 .error
1753 .unwrap_or_else(|| "Unknown error".to_string());
1754 anyhow::bail!("Order submission failed: {error_msg}");
1755 }
1756
1757 let send_status = response
1758 .send_status
1759 .ok_or_else(|| anyhow::anyhow!("No send_status in successful response"))?;
1760
1761 let status = &send_status.status;
1762
1763 if status == "postWouldExecute" {
1765 let reason = send_status
1766 .order_events
1767 .as_ref()
1768 .and_then(|events| events.first())
1769 .and_then(|e| e.reason.clone())
1770 .unwrap_or_else(|| "Post-only order would have crossed".to_string());
1771 anyhow::bail!("POST_ONLY_REJECTED: {reason}");
1772 }
1773
1774 let venue_order_id = send_status
1775 .order_id
1776 .ok_or_else(|| anyhow::anyhow!("No order_id in send_status: {status}"))?;
1777
1778 let ts_init = self.generate_ts_init();
1779
1780 let open_orders_response = self.inner.get_open_orders().await?;
1781 if let Some(order) = open_orders_response
1782 .open_orders
1783 .iter()
1784 .find(|o| o.order_id == venue_order_id)
1785 {
1786 return parse_futures_order_status_report(order, &instrument, account_id, ts_init);
1787 }
1788
1789 if let Some(order_events) = &send_status.order_events
1792 && let Some(send_event) = order_events.first()
1793 {
1794 let event = if let Some(order_data) = &send_event.order {
1796 FuturesOrderEvent {
1797 order_id: order_data.order_id.clone(),
1798 cli_ord_id: order_data.cli_ord_id.clone(),
1799 order_type: order_data.order_type,
1800 symbol: order_data.symbol.clone(),
1801 side: order_data.side,
1802 quantity: order_data.quantity,
1803 filled: order_data.filled,
1804 limit_price: order_data.limit_price,
1805 stop_price: order_data.stop_price,
1806 timestamp: order_data.timestamp.clone(),
1807 last_update_timestamp: order_data.last_update_timestamp.clone(),
1808 reduce_only: order_data.reduce_only,
1809 }
1810 } else if let Some(trigger_data) = &send_event.order_trigger {
1811 FuturesOrderEvent {
1812 order_id: trigger_data.uid.clone(),
1813 cli_ord_id: trigger_data.client_id.clone(),
1814 order_type: trigger_data.order_type,
1815 symbol: trigger_data.symbol.clone(),
1816 side: trigger_data.side,
1817 quantity: trigger_data.quantity,
1818 filled: 0.0,
1819 limit_price: trigger_data.limit_price,
1820 stop_price: Some(trigger_data.trigger_price),
1821 timestamp: trigger_data.timestamp.clone(),
1822 last_update_timestamp: trigger_data.last_update_timestamp.clone(),
1823 reduce_only: trigger_data.reduce_only,
1824 }
1825 } else if let Some(prior_exec) = &send_event.order_prior_execution {
1826 FuturesOrderEvent {
1828 order_id: prior_exec.order_id.clone(),
1829 cli_ord_id: prior_exec.cli_ord_id.clone(),
1830 order_type: prior_exec.order_type,
1831 symbol: prior_exec.symbol.clone(),
1832 side: prior_exec.side,
1833 quantity: prior_exec.quantity,
1834 filled: send_event.amount.unwrap_or(prior_exec.quantity), limit_price: prior_exec.limit_price,
1836 stop_price: prior_exec.stop_price,
1837 timestamp: prior_exec.timestamp.clone(),
1838 last_update_timestamp: prior_exec.last_update_timestamp.clone(),
1839 reduce_only: prior_exec.reduce_only,
1840 }
1841 } else {
1842 anyhow::bail!("No order, orderTrigger, or orderPriorExecution data in event");
1843 };
1844 return parse_futures_order_event_status_report(
1845 &event,
1846 &instrument,
1847 account_id,
1848 ts_init,
1849 );
1850 }
1851
1852 let events_response = self.inner.get_order_events(None, None, None).await?;
1854 let event_wrapper = events_response
1855 .order_events
1856 .iter()
1857 .find(|e| e.order.order_id == venue_order_id)
1858 .ok_or_else(|| {
1859 anyhow::anyhow!("Order not found in open orders or events: {venue_order_id}")
1860 })?;
1861
1862 parse_futures_order_event_status_report(
1863 &event_wrapper.order,
1864 &instrument,
1865 account_id,
1866 ts_init,
1867 )
1868 }
1869
1870 pub async fn modify_order(
1882 &self,
1883 instrument_id: InstrumentId,
1884 client_order_id: Option<ClientOrderId>,
1885 venue_order_id: Option<VenueOrderId>,
1886 quantity: Option<Quantity>,
1887 price: Option<Price>,
1888 trigger_price: Option<Price>,
1889 ) -> anyhow::Result<VenueOrderId> {
1890 let _ = self
1891 .get_cached_instrument(&instrument_id.symbol.inner())
1892 .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {instrument_id}"))?;
1893
1894 let order_id = venue_order_id.as_ref().map(|id| id.to_string());
1895 let cli_ord_id = client_order_id.as_ref().map(|id| id.to_string());
1896
1897 if order_id.is_none() && cli_ord_id.is_none() {
1898 anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1899 }
1900
1901 let mut builder = KrakenFuturesEditOrderParamsBuilder::default();
1902
1903 if let Some(ref id) = order_id {
1904 builder.order_id(id.clone());
1905 }
1906 if let Some(ref id) = cli_ord_id {
1907 builder.cli_ord_id(id.clone());
1908 }
1909 if let Some(qty) = quantity {
1910 builder.size(qty.to_string());
1911 }
1912 if let Some(p) = price {
1913 builder.limit_price(p.to_string());
1914 }
1915 if let Some(tp) = trigger_price {
1916 builder.stop_price(tp.to_string());
1917 }
1918
1919 let params = builder
1920 .build()
1921 .map_err(|e| anyhow::anyhow!("Failed to build edit order params: {e}"))?;
1922
1923 let response = self.inner.edit_order(¶ms).await?;
1924
1925 if response.result != KrakenApiResult::Success {
1926 let status = &response.edit_status.status;
1927 anyhow::bail!("Order modification failed: {status}");
1928 }
1929
1930 let new_venue_order_id = response
1932 .edit_status
1933 .order_id
1934 .or(order_id)
1935 .ok_or_else(|| anyhow::anyhow!("No order ID in edit order response"))?;
1936
1937 Ok(VenueOrderId::new(&new_venue_order_id))
1938 }
1939
1940 pub async fn cancel_order(
1950 &self,
1951 _account_id: AccountId,
1952 instrument_id: InstrumentId,
1953 client_order_id: Option<ClientOrderId>,
1954 venue_order_id: Option<VenueOrderId>,
1955 ) -> anyhow::Result<()> {
1956 let _ = self
1957 .get_cached_instrument(&instrument_id.symbol.inner())
1958 .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {instrument_id}"))?;
1959
1960 let order_id = venue_order_id.as_ref().map(|id| id.to_string());
1961 let cli_ord_id = client_order_id.as_ref().map(|id| id.to_string());
1962
1963 if order_id.is_none() && cli_ord_id.is_none() {
1964 anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1965 }
1966
1967 let response = self.inner.cancel_order(order_id, cli_ord_id).await?;
1968
1969 if response.result != KrakenApiResult::Success {
1970 let status = &response.cancel_status.status;
1971 anyhow::bail!("Order cancellation failed: {status}");
1972 }
1973
1974 Ok(())
1975 }
1976
1977 pub async fn cancel_orders_batch(
1987 &self,
1988 venue_order_ids: Vec<VenueOrderId>,
1989 ) -> anyhow::Result<usize> {
1990 if venue_order_ids.is_empty() {
1991 return Ok(0);
1992 }
1993
1994 let mut total_cancelled = 0;
1995
1996 for chunk in venue_order_ids.chunks(BATCH_CANCEL_LIMIT) {
1997 let order_ids: Vec<String> = chunk.iter().map(|id| id.to_string()).collect();
1998 let response = self.inner.cancel_orders_batch(order_ids).await?;
1999
2000 if response.result != KrakenApiResult::Success {
2001 let error_msg = response.error.as_deref().unwrap_or("Unknown error");
2002 anyhow::bail!("Batch cancel failed: {error_msg}");
2003 }
2004
2005 let success_count = response
2006 .batch_status
2007 .iter()
2008 .filter(|s| {
2009 s.status == Some(KrakenSendStatus::Cancelled)
2010 || s.cancel_status
2011 .as_ref()
2012 .is_some_and(|cs| cs.status == KrakenSendStatus::Cancelled)
2013 })
2014 .count();
2015
2016 total_cancelled += success_count;
2017 }
2018
2019 Ok(total_cancelled)
2020 }
2021}
2022
2023#[cfg(test)]
2024mod tests {
2025 use rstest::rstest;
2026
2027 use super::*;
2028
2029 #[rstest]
2030 fn test_raw_client_creation() {
2031 let client = KrakenFuturesRawHttpClient::default();
2032 assert!(client.credential.is_none());
2033 assert!(client.base_url().contains("futures"));
2034 }
2035
2036 #[rstest]
2037 fn test_raw_client_with_credentials() {
2038 let client = KrakenFuturesRawHttpClient::with_credentials(
2039 "test_key".to_string(),
2040 "test_secret".to_string(),
2041 KrakenEnvironment::Mainnet,
2042 None,
2043 None,
2044 None,
2045 None,
2046 None,
2047 None,
2048 None,
2049 )
2050 .unwrap();
2051 assert!(client.credential.is_some());
2052 }
2053
2054 #[rstest]
2055 fn test_client_creation() {
2056 let client = KrakenFuturesHttpClient::default();
2057 assert!(client.instruments_cache.is_empty());
2058 }
2059
2060 #[rstest]
2061 fn test_client_with_credentials() {
2062 let client = KrakenFuturesHttpClient::with_credentials(
2063 "test_key".to_string(),
2064 "test_secret".to_string(),
2065 KrakenEnvironment::Mainnet,
2066 None,
2067 None,
2068 None,
2069 None,
2070 None,
2071 None,
2072 None,
2073 )
2074 .unwrap();
2075 assert!(client.instruments_cache.is_empty());
2076 }
2077}