1use std::{
19 collections::HashMap,
20 fmt::Debug,
21 num::NonZeroU32,
22 sync::{
23 Arc,
24 atomic::{AtomicBool, Ordering},
25 },
26};
27
28use chrono::{DateTime, Utc};
29use dashmap::DashMap;
30use nautilus_core::{
31 AtomicTime, UUID4, consts::NAUTILUS_USER_AGENT, nanos::UnixNanos,
32 time::get_atomic_clock_realtime,
33};
34use nautilus_model::{
35 data::{Bar, BarType, TradeTick},
36 enums::{AccountType, CurrencyType, OrderSide, OrderType, TimeInForce},
37 events::AccountState,
38 identifiers::{AccountId, ClientOrderId, InstrumentId, VenueOrderId},
39 instruments::{Instrument, InstrumentAny},
40 reports::{FillReport, OrderStatusReport, PositionStatusReport},
41 types::{AccountBalance, Currency, Money, Price, Quantity},
42};
43use nautilus_network::{
44 http::{HttpClient, Method, USER_AGENT},
45 ratelimiter::quota::Quota,
46 retry::{RetryConfig, RetryManager},
47};
48use serde::de::DeserializeOwned;
49use tokio_util::sync::CancellationToken;
50use ustr::Ustr;
51
52use super::{models::*, query::*};
53use crate::{
54 common::{
55 consts::NAUTILUS_KRAKEN_BROKER_ID,
56 credential::KrakenCredential,
57 enums::{
58 KrakenApiResult, KrakenEnvironment, KrakenFuturesOrderType, KrakenOrderSide,
59 KrakenProductType, KrakenSendStatus,
60 },
61 parse::{
62 bar_type_to_futures_resolution, parse_bar, parse_futures_fill_report,
63 parse_futures_instrument, parse_futures_order_event_status_report,
64 parse_futures_order_status_report, parse_futures_position_status_report,
65 parse_futures_public_execution,
66 },
67 urls::get_kraken_http_base_url,
68 },
69 http::{error::KrakenHttpError, models::OhlcData},
70};
71
72pub const KRAKEN_FUTURES_DEFAULT_RATE_LIMIT_PER_SECOND: u32 = 5;
74
75const KRAKEN_GLOBAL_RATE_KEY: &str = "kraken:futures:global";
76
77const BATCH_CANCEL_LIMIT: usize = 50;
79
80pub struct KrakenFuturesRawHttpClient {
85 base_url: String,
86 client: HttpClient,
87 credential: Option<KrakenCredential>,
88 retry_manager: RetryManager<KrakenHttpError>,
89 cancellation_token: CancellationToken,
90 clock: &'static AtomicTime,
91 auth_mutex: tokio::sync::Mutex<()>,
93}
94
95impl Default for KrakenFuturesRawHttpClient {
96 fn default() -> Self {
97 Self::new(
98 KrakenEnvironment::Mainnet,
99 None,
100 Some(60),
101 None,
102 None,
103 None,
104 None,
105 None,
106 )
107 .expect("Failed to create default KrakenFuturesRawHttpClient")
108 }
109}
110
111impl Debug for KrakenFuturesRawHttpClient {
112 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113 f.debug_struct(stringify!(KrakenFuturesRawHttpClient))
114 .field("base_url", &self.base_url)
115 .field("has_credentials", &self.credential.is_some())
116 .finish()
117 }
118}
119
120impl KrakenFuturesRawHttpClient {
121 #[allow(clippy::too_many_arguments)]
123 pub fn new(
124 environment: KrakenEnvironment,
125 base_url_override: Option<String>,
126 timeout_secs: Option<u64>,
127 max_retries: Option<u32>,
128 retry_delay_ms: Option<u64>,
129 retry_delay_max_ms: Option<u64>,
130 proxy_url: Option<String>,
131 max_requests_per_second: Option<u32>,
132 ) -> anyhow::Result<Self> {
133 let retry_config = RetryConfig {
134 max_retries: max_retries.unwrap_or(3),
135 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
136 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
137 backoff_factor: 2.0,
138 jitter_ms: 1000,
139 operation_timeout_ms: Some(60_000),
140 immediate_first: false,
141 max_elapsed_ms: Some(180_000),
142 };
143
144 let retry_manager = RetryManager::new(retry_config);
145 let base_url = base_url_override.unwrap_or_else(|| {
146 get_kraken_http_base_url(KrakenProductType::Futures, environment).to_string()
147 });
148
149 let rate_limit =
150 max_requests_per_second.unwrap_or(KRAKEN_FUTURES_DEFAULT_RATE_LIMIT_PER_SECOND);
151
152 Ok(Self {
153 base_url,
154 client: HttpClient::new(
155 Self::default_headers(),
156 vec![],
157 Self::rate_limiter_quotas(rate_limit),
158 Some(Self::default_quota(rate_limit)),
159 timeout_secs,
160 proxy_url,
161 )
162 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))?,
163 credential: None,
164 retry_manager,
165 cancellation_token: CancellationToken::new(),
166 clock: get_atomic_clock_realtime(),
167 auth_mutex: tokio::sync::Mutex::new(()),
168 })
169 }
170
171 #[allow(clippy::too_many_arguments)]
173 pub fn with_credentials(
174 api_key: String,
175 api_secret: String,
176 environment: KrakenEnvironment,
177 base_url_override: Option<String>,
178 timeout_secs: Option<u64>,
179 max_retries: Option<u32>,
180 retry_delay_ms: Option<u64>,
181 retry_delay_max_ms: Option<u64>,
182 proxy_url: Option<String>,
183 max_requests_per_second: Option<u32>,
184 ) -> anyhow::Result<Self> {
185 let retry_config = RetryConfig {
186 max_retries: max_retries.unwrap_or(3),
187 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
188 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
189 backoff_factor: 2.0,
190 jitter_ms: 1000,
191 operation_timeout_ms: Some(60_000),
192 immediate_first: false,
193 max_elapsed_ms: Some(180_000),
194 };
195
196 let retry_manager = RetryManager::new(retry_config);
197 let base_url = base_url_override.unwrap_or_else(|| {
198 get_kraken_http_base_url(KrakenProductType::Futures, environment).to_string()
199 });
200
201 let rate_limit =
202 max_requests_per_second.unwrap_or(KRAKEN_FUTURES_DEFAULT_RATE_LIMIT_PER_SECOND);
203
204 Ok(Self {
205 base_url,
206 client: HttpClient::new(
207 Self::default_headers(),
208 vec![],
209 Self::rate_limiter_quotas(rate_limit),
210 Some(Self::default_quota(rate_limit)),
211 timeout_secs,
212 proxy_url,
213 )
214 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))?,
215 credential: Some(KrakenCredential::new(api_key, api_secret)),
216 retry_manager,
217 cancellation_token: CancellationToken::new(),
218 clock: get_atomic_clock_realtime(),
219 auth_mutex: tokio::sync::Mutex::new(()),
220 })
221 }
222
223 fn generate_nonce(&self) -> u64 {
228 self.clock.get_time_ns().as_u64()
229 }
230
231 pub fn base_url(&self) -> &str {
233 &self.base_url
234 }
235
236 pub fn credential(&self) -> Option<&KrakenCredential> {
238 self.credential.as_ref()
239 }
240
241 pub fn cancel_all_requests(&self) {
243 self.cancellation_token.cancel();
244 }
245
246 pub fn cancellation_token(&self) -> &CancellationToken {
248 &self.cancellation_token
249 }
250
251 fn default_headers() -> HashMap<String, String> {
252 HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())])
253 }
254
255 fn default_quota(max_requests_per_second: u32) -> Quota {
256 Quota::per_second(NonZeroU32::new(max_requests_per_second).unwrap_or_else(|| {
257 NonZeroU32::new(KRAKEN_FUTURES_DEFAULT_RATE_LIMIT_PER_SECOND).unwrap()
258 }))
259 }
260
261 fn rate_limiter_quotas(max_requests_per_second: u32) -> Vec<(String, Quota)> {
262 vec![(
263 KRAKEN_GLOBAL_RATE_KEY.to_string(),
264 Self::default_quota(max_requests_per_second),
265 )]
266 }
267
268 fn rate_limit_keys(endpoint: &str) -> Vec<String> {
269 let normalized = endpoint.split('?').next().unwrap_or(endpoint);
270 let route = format!("kraken:futures:{normalized}");
271 vec![KRAKEN_GLOBAL_RATE_KEY.to_string(), route]
272 }
273
274 async fn send_request<T: DeserializeOwned>(
275 &self,
276 method: Method,
277 endpoint: &str,
278 url: String,
279 authenticate: bool,
280 ) -> anyhow::Result<T, KrakenHttpError> {
281 let _guard = if authenticate {
285 Some(self.auth_mutex.lock().await)
286 } else {
287 None
288 };
289
290 let endpoint = endpoint.to_string();
291 let method_clone = method.clone();
292 let url_clone = url.clone();
293 let credential = self.credential.clone();
294
295 let operation = || {
296 let url = url_clone.clone();
297 let method = method_clone.clone();
298 let endpoint = endpoint.clone();
299 let credential = credential.clone();
300
301 async move {
302 let mut headers = Self::default_headers();
303
304 if authenticate {
305 let cred = credential.as_ref().ok_or_else(|| {
306 KrakenHttpError::AuthenticationError(
307 "Missing credentials for authenticated request".to_string(),
308 )
309 })?;
310
311 let nonce = self.generate_nonce();
312
313 let signature = cred.sign_futures(&endpoint, "", nonce).map_err(|e| {
314 KrakenHttpError::AuthenticationError(format!("Failed to sign request: {e}"))
315 })?;
316
317 let base_url = &self.base_url;
318 tracing::debug!(
319 "Kraken Futures auth: endpoint={endpoint}, nonce={nonce}, base_url={base_url}"
320 );
321
322 headers.insert("APIKey".to_string(), cred.api_key().to_string());
323 headers.insert("Authent".to_string(), signature);
324 headers.insert("Nonce".to_string(), nonce.to_string());
325 }
326
327 let rate_limit_keys = Self::rate_limit_keys(&endpoint);
328
329 let response = self
330 .client
331 .request(
332 method,
333 url,
334 None,
335 Some(headers),
336 None,
337 None,
338 Some(rate_limit_keys),
339 )
340 .await
341 .map_err(|e| KrakenHttpError::NetworkError(e.to_string()))?;
342
343 let status = response.status.as_u16();
344 if status >= 400 {
345 let body = String::from_utf8_lossy(&response.body).to_string();
346 if status == 401 || status == 403 {
348 return Err(KrakenHttpError::AuthenticationError(format!(
349 "HTTP error {status}: {body}"
350 )));
351 }
352 return Err(KrakenHttpError::NetworkError(format!(
353 "HTTP error {status}: {body}"
354 )));
355 }
356
357 let response_text = String::from_utf8(response.body.to_vec()).map_err(|e| {
358 KrakenHttpError::ParseError(format!("Failed to parse response as UTF-8: {e}"))
359 })?;
360
361 serde_json::from_str(&response_text).map_err(|e| {
362 KrakenHttpError::ParseError(format!(
363 "Failed to deserialize futures response: {e}"
364 ))
365 })
366 }
367 };
368
369 let should_retry =
370 |error: &KrakenHttpError| -> bool { matches!(error, KrakenHttpError::NetworkError(_)) };
371 let create_error = |msg: String| -> KrakenHttpError { KrakenHttpError::NetworkError(msg) };
372
373 self.retry_manager
374 .execute_with_retry_with_cancel(
375 &endpoint,
376 operation,
377 should_retry,
378 create_error,
379 &self.cancellation_token,
380 )
381 .await
382 }
383
384 async fn send_get_with_query<T: DeserializeOwned>(
389 &self,
390 endpoint: &str,
391 url: String,
392 query_string: &str,
393 ) -> anyhow::Result<T, KrakenHttpError> {
394 let _guard = self.auth_mutex.lock().await;
395
396 if self.cancellation_token.is_cancelled() {
397 return Err(KrakenHttpError::NetworkError(
398 "Request cancelled".to_string(),
399 ));
400 }
401
402 let credential = self.credential.as_ref().ok_or_else(|| {
403 KrakenHttpError::AuthenticationError("Missing credentials".to_string())
404 })?;
405
406 let nonce = self.generate_nonce();
407
408 let signature = credential
410 .sign_futures(endpoint, query_string, nonce)
411 .map_err(|e| {
412 KrakenHttpError::AuthenticationError(format!("Failed to sign request: {e}"))
413 })?;
414
415 tracing::debug!(
416 "Kraken Futures GET with query: endpoint={endpoint}, query={query_string}, nonce={nonce}"
417 );
418
419 let mut headers = Self::default_headers();
420 headers.insert("APIKey".to_string(), credential.api_key().to_string());
421 headers.insert("Authent".to_string(), signature);
422 headers.insert("Nonce".to_string(), nonce.to_string());
423
424 let rate_limit_keys = Self::rate_limit_keys(endpoint);
425
426 let response = self
427 .client
428 .request(
429 Method::GET,
430 url,
431 None,
432 Some(headers),
433 None,
434 None,
435 Some(rate_limit_keys),
436 )
437 .await
438 .map_err(|e| KrakenHttpError::NetworkError(e.to_string()))?;
439
440 let status = response.status.as_u16();
441 if status >= 400 {
442 let body = String::from_utf8_lossy(&response.body).to_string();
443 if status == 401 || status == 403 {
444 return Err(KrakenHttpError::AuthenticationError(format!(
445 "HTTP error {status}: {body}"
446 )));
447 }
448 return Err(KrakenHttpError::NetworkError(format!(
449 "HTTP error {status}: {body}"
450 )));
451 }
452
453 let response_text = String::from_utf8(response.body.to_vec()).map_err(|e| {
454 KrakenHttpError::ParseError(format!("Failed to parse response as UTF-8: {e}"))
455 })?;
456
457 serde_json::from_str(&response_text).map_err(|e| {
458 KrakenHttpError::ParseError(format!("Failed to deserialize futures response: {e}"))
459 })
460 }
461
462 async fn send_request_with_body<T: DeserializeOwned>(
463 &self,
464 endpoint: &str,
465 params: HashMap<String, String>,
466 ) -> anyhow::Result<T, KrakenHttpError> {
467 let post_data = serde_urlencoded::to_string(¶ms)
468 .map_err(|e| KrakenHttpError::ParseError(format!("Failed to encode params: {e}")))?;
469 self.send_authenticated_post(endpoint, post_data).await
470 }
471
472 async fn send_request_with_params<P: serde::Serialize, T: DeserializeOwned>(
474 &self,
475 endpoint: &str,
476 params: &P,
477 ) -> anyhow::Result<T, KrakenHttpError> {
478 let post_data = serde_urlencoded::to_string(params)
479 .map_err(|e| KrakenHttpError::ParseError(format!("Failed to encode params: {e}")))?;
480 self.send_authenticated_post(endpoint, post_data).await
481 }
482
483 async fn send_authenticated_post<T: DeserializeOwned>(
485 &self,
486 endpoint: &str,
487 post_data: String,
488 ) -> anyhow::Result<T, KrakenHttpError> {
489 if self.cancellation_token.is_cancelled() {
490 return Err(KrakenHttpError::NetworkError(
491 "Request cancelled".to_string(),
492 ));
493 }
494
495 let _guard = self.auth_mutex.lock().await;
497
498 if self.cancellation_token.is_cancelled() {
499 return Err(KrakenHttpError::NetworkError(
500 "Request cancelled".to_string(),
501 ));
502 }
503
504 let credential = self.credential.as_ref().ok_or_else(|| {
505 KrakenHttpError::AuthenticationError("Missing credentials".to_string())
506 })?;
507
508 let nonce = self.generate_nonce();
509 tracing::debug!("Generated nonce {nonce} for {endpoint}");
510
511 let signature = credential
512 .sign_futures(endpoint, &post_data, nonce)
513 .map_err(|e| {
514 KrakenHttpError::AuthenticationError(format!("Failed to sign request: {e}"))
515 })?;
516
517 let url = format!("{}{endpoint}", self.base_url);
518 let mut headers = Self::default_headers();
519 headers.insert(
520 "Content-Type".to_string(),
521 "application/x-www-form-urlencoded".to_string(),
522 );
523 headers.insert("APIKey".to_string(), credential.api_key().to_string());
524 headers.insert("Authent".to_string(), signature);
525 headers.insert("Nonce".to_string(), nonce.to_string());
526
527 let rate_limit_keys = Self::rate_limit_keys(endpoint);
528
529 let response = self
530 .client
531 .request(
532 Method::POST,
533 url,
534 None,
535 Some(headers),
536 Some(post_data.into_bytes()),
537 None,
538 Some(rate_limit_keys),
539 )
540 .await
541 .map_err(|e| KrakenHttpError::NetworkError(e.to_string()))?;
542
543 if response.status.as_u16() >= 400 {
544 let status = response.status.as_u16();
545 let body = String::from_utf8_lossy(&response.body).to_string();
546 return Err(KrakenHttpError::NetworkError(format!(
547 "HTTP error {status}: {body}"
548 )));
549 }
550
551 let response_text = String::from_utf8(response.body.to_vec()).map_err(|e| {
552 KrakenHttpError::ParseError(format!("Failed to parse response as UTF-8: {e}"))
553 })?;
554
555 serde_json::from_str(&response_text).map_err(|e| {
556 tracing::error!("Failed to parse response from {endpoint}: {response_text}");
557 KrakenHttpError::ParseError(format!("Failed to deserialize response: {e}"))
558 })
559 }
560
561 pub async fn get_instruments(
563 &self,
564 ) -> anyhow::Result<FuturesInstrumentsResponse, KrakenHttpError> {
565 let endpoint = "/derivatives/api/v3/instruments";
566 let url = format!("{}{endpoint}", self.base_url);
567
568 self.send_request(Method::GET, endpoint, url, false).await
569 }
570
571 pub async fn get_tickers(&self) -> anyhow::Result<FuturesTickersResponse, KrakenHttpError> {
573 let endpoint = "/derivatives/api/v3/tickers";
574 let url = format!("{}{endpoint}", self.base_url);
575
576 self.send_request(Method::GET, endpoint, url, false).await
577 }
578
579 pub async fn get_ohlc(
581 &self,
582 tick_type: &str,
583 symbol: &str,
584 resolution: &str,
585 from: Option<i64>,
586 to: Option<i64>,
587 ) -> anyhow::Result<FuturesCandlesResponse, KrakenHttpError> {
588 let endpoint = format!("/api/charts/v1/{tick_type}/{symbol}/{resolution}");
589
590 let mut url = format!("{}{endpoint}", self.base_url);
591
592 let mut query_params = Vec::new();
593 if let Some(from_ts) = from {
594 query_params.push(format!("from={from_ts}"));
595 }
596 if let Some(to_ts) = to {
597 query_params.push(format!("to={to_ts}"));
598 }
599
600 if !query_params.is_empty() {
601 url.push('?');
602 url.push_str(&query_params.join("&"));
603 }
604
605 self.send_request(Method::GET, &endpoint, url, false).await
606 }
607
608 pub async fn get_public_executions(
610 &self,
611 symbol: &str,
612 since: Option<i64>,
613 before: Option<i64>,
614 sort: Option<&str>,
615 continuation_token: Option<&str>,
616 ) -> anyhow::Result<FuturesPublicExecutionsResponse, KrakenHttpError> {
617 let endpoint = format!("/api/history/v3/market/{symbol}/executions");
618
619 let mut url = format!("{}{endpoint}", self.base_url);
620
621 let mut query_params = Vec::new();
622 if let Some(since_ts) = since {
623 query_params.push(format!("since={since_ts}"));
624 }
625 if let Some(before_ts) = before {
626 query_params.push(format!("before={before_ts}"));
627 }
628 if let Some(sort_order) = sort {
629 query_params.push(format!("sort={sort_order}"));
630 }
631 if let Some(token) = continuation_token {
632 query_params.push(format!("continuationToken={token}"));
633 }
634
635 if !query_params.is_empty() {
636 url.push('?');
637 url.push_str(&query_params.join("&"));
638 }
639
640 self.send_request(Method::GET, &endpoint, url, false).await
641 }
642
643 pub async fn get_open_orders(
645 &self,
646 ) -> anyhow::Result<FuturesOpenOrdersResponse, KrakenHttpError> {
647 if self.credential.is_none() {
648 return Err(KrakenHttpError::AuthenticationError(
649 "API credentials required for futures open orders".to_string(),
650 ));
651 }
652
653 let endpoint = "/derivatives/api/v3/openorders";
654 let url = format!("{}{endpoint}", self.base_url);
655
656 self.send_request(Method::GET, endpoint, url, true).await
657 }
658
659 pub async fn get_order_events(
661 &self,
662 before: Option<i64>,
663 since: Option<i64>,
664 continuation_token: Option<&str>,
665 ) -> anyhow::Result<FuturesOrderEventsResponse, KrakenHttpError> {
666 if self.credential.is_none() {
667 return Err(KrakenHttpError::AuthenticationError(
668 "API credentials required for futures order events".to_string(),
669 ));
670 }
671
672 let endpoint = "/api/history/v2/orders";
673 let mut query_params = Vec::new();
674
675 if let Some(before_ts) = before {
676 query_params.push(format!("before={before_ts}"));
677 }
678 if let Some(since_ts) = since {
679 query_params.push(format!("since={since_ts}"));
680 }
681 if let Some(token) = continuation_token {
682 query_params.push(format!("continuation_token={token}"));
683 }
684
685 let query_string = query_params.join("&");
687 let url = if query_string.is_empty() {
688 format!("{}{endpoint}", self.base_url)
689 } else {
690 format!("{}{endpoint}?{query_string}", self.base_url)
691 };
692
693 self.send_get_with_query(endpoint, url, &query_string).await
696 }
697
698 pub async fn get_fills(
700 &self,
701 last_fill_time: Option<&str>,
702 ) -> anyhow::Result<FuturesFillsResponse, KrakenHttpError> {
703 if self.credential.is_none() {
704 return Err(KrakenHttpError::AuthenticationError(
705 "API credentials required for futures fills".to_string(),
706 ));
707 }
708
709 let endpoint = "/derivatives/api/v3/fills";
710 let query_string = last_fill_time
711 .map(|t| format!("lastFillTime={t}"))
712 .unwrap_or_default();
713
714 let url = if query_string.is_empty() {
715 format!("{}{endpoint}", self.base_url)
716 } else {
717 format!("{}{endpoint}?{query_string}", self.base_url)
718 };
719
720 self.send_get_with_query(endpoint, url, &query_string).await
722 }
723
724 pub async fn get_open_positions(
726 &self,
727 ) -> anyhow::Result<FuturesOpenPositionsResponse, KrakenHttpError> {
728 if self.credential.is_none() {
729 return Err(KrakenHttpError::AuthenticationError(
730 "API credentials required for futures open positions".to_string(),
731 ));
732 }
733
734 let endpoint = "/derivatives/api/v3/openpositions";
735 let url = format!("{}{endpoint}", self.base_url);
736
737 self.send_request(Method::GET, endpoint, url, true).await
738 }
739
740 pub async fn get_accounts(&self) -> anyhow::Result<FuturesAccountsResponse, KrakenHttpError> {
742 if self.credential.is_none() {
743 return Err(KrakenHttpError::AuthenticationError(
744 "API credentials required for futures accounts".to_string(),
745 ));
746 }
747
748 let endpoint = "/derivatives/api/v3/accounts";
749 let url = format!("{}{endpoint}", self.base_url);
750
751 self.send_request(Method::GET, endpoint, url, true).await
752 }
753
754 pub async fn send_order(
756 &self,
757 params: HashMap<String, String>,
758 ) -> anyhow::Result<FuturesSendOrderResponse, KrakenHttpError> {
759 if self.credential.is_none() {
760 return Err(KrakenHttpError::AuthenticationError(
761 "API credentials required for sending orders".to_string(),
762 ));
763 }
764
765 let endpoint = "/derivatives/api/v3/sendorder";
766 self.send_request_with_body(endpoint, params).await
767 }
768
769 pub async fn send_order_params(
771 &self,
772 params: &KrakenFuturesSendOrderParams,
773 ) -> anyhow::Result<FuturesSendOrderResponse, KrakenHttpError> {
774 if self.credential.is_none() {
775 return Err(KrakenHttpError::AuthenticationError(
776 "API credentials required for sending orders".to_string(),
777 ));
778 }
779
780 let endpoint = "/derivatives/api/v3/sendorder";
781 self.send_request_with_params(endpoint, params).await
782 }
783
784 pub async fn cancel_order(
786 &self,
787 order_id: Option<String>,
788 cli_ord_id: Option<String>,
789 ) -> anyhow::Result<FuturesCancelOrderResponse, KrakenHttpError> {
790 if self.credential.is_none() {
791 return Err(KrakenHttpError::AuthenticationError(
792 "API credentials required for canceling orders".to_string(),
793 ));
794 }
795
796 let mut params = HashMap::new();
797 if let Some(id) = order_id {
798 params.insert("order_id".to_string(), id);
799 }
800 if let Some(id) = cli_ord_id {
801 params.insert("cliOrdId".to_string(), id);
802 }
803
804 let endpoint = "/derivatives/api/v3/cancelorder";
805 self.send_request_with_body(endpoint, params).await
806 }
807
808 pub async fn edit_order(
810 &self,
811 params: &KrakenFuturesEditOrderParams,
812 ) -> anyhow::Result<FuturesEditOrderResponse, KrakenHttpError> {
813 if self.credential.is_none() {
814 return Err(KrakenHttpError::AuthenticationError(
815 "API credentials required for editing orders".to_string(),
816 ));
817 }
818
819 let endpoint = "/derivatives/api/v3/editorder";
820 self.send_request_with_params(endpoint, params).await
821 }
822
823 pub async fn batch_order(
825 &self,
826 params: HashMap<String, String>,
827 ) -> anyhow::Result<FuturesBatchOrderResponse, KrakenHttpError> {
828 if self.credential.is_none() {
829 return Err(KrakenHttpError::AuthenticationError(
830 "API credentials required for batch orders".to_string(),
831 ));
832 }
833
834 let endpoint = "/derivatives/api/v3/batchorder";
835 self.send_request_with_body(endpoint, params).await
836 }
837
838 pub async fn cancel_orders_batch(
840 &self,
841 order_ids: Vec<String>,
842 ) -> anyhow::Result<FuturesBatchCancelResponse, KrakenHttpError> {
843 if self.credential.is_none() {
844 return Err(KrakenHttpError::AuthenticationError(
845 "API credentials required for batch orders".to_string(),
846 ));
847 }
848
849 let batch_items: Vec<KrakenFuturesBatchCancelItem> = order_ids
850 .into_iter()
851 .map(KrakenFuturesBatchCancelItem::from_order_id)
852 .collect();
853
854 let params = KrakenFuturesBatchOrderParams::new(batch_items);
855 let post_data = params
856 .to_body()
857 .map_err(|e| KrakenHttpError::ParseError(format!("Failed to serialize batch: {e}")))?;
858
859 let endpoint = "/derivatives/api/v3/batchorder";
860 self.send_authenticated_post(endpoint, post_data).await
861 }
862
863 pub async fn cancel_all_orders(
865 &self,
866 symbol: Option<String>,
867 ) -> anyhow::Result<FuturesCancelAllOrdersResponse, KrakenHttpError> {
868 if self.credential.is_none() {
869 return Err(KrakenHttpError::AuthenticationError(
870 "API credentials required for canceling orders".to_string(),
871 ));
872 }
873
874 let mut params = HashMap::new();
875 if let Some(sym) = symbol {
876 params.insert("symbol".to_string(), sym);
877 }
878
879 let endpoint = "/derivatives/api/v3/cancelallorders";
880 self.send_request_with_body(endpoint, params).await
881 }
882}
883
884#[cfg_attr(
890 feature = "python",
891 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.kraken")
892)]
893pub struct KrakenFuturesHttpClient {
894 pub(crate) inner: Arc<KrakenFuturesRawHttpClient>,
895 pub(crate) instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
896 cache_initialized: Arc<AtomicBool>,
897}
898
899impl Clone for KrakenFuturesHttpClient {
900 fn clone(&self) -> Self {
901 Self {
902 inner: self.inner.clone(),
903 instruments_cache: self.instruments_cache.clone(),
904 cache_initialized: self.cache_initialized.clone(),
905 }
906 }
907}
908
909impl Default for KrakenFuturesHttpClient {
910 fn default() -> Self {
911 Self::new(
912 KrakenEnvironment::Mainnet,
913 None,
914 Some(60),
915 None,
916 None,
917 None,
918 None,
919 None,
920 )
921 .expect("Failed to create default KrakenFuturesHttpClient")
922 }
923}
924
925impl Debug for KrakenFuturesHttpClient {
926 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
927 f.debug_struct(stringify!(KrakenFuturesHttpClient))
928 .field("inner", &self.inner)
929 .finish()
930 }
931}
932
933impl KrakenFuturesHttpClient {
934 #[allow(clippy::too_many_arguments)]
936 pub fn new(
937 environment: KrakenEnvironment,
938 base_url_override: Option<String>,
939 timeout_secs: Option<u64>,
940 max_retries: Option<u32>,
941 retry_delay_ms: Option<u64>,
942 retry_delay_max_ms: Option<u64>,
943 proxy_url: Option<String>,
944 max_requests_per_second: Option<u32>,
945 ) -> anyhow::Result<Self> {
946 Ok(Self {
947 inner: Arc::new(KrakenFuturesRawHttpClient::new(
948 environment,
949 base_url_override,
950 timeout_secs,
951 max_retries,
952 retry_delay_ms,
953 retry_delay_max_ms,
954 proxy_url,
955 max_requests_per_second,
956 )?),
957 instruments_cache: Arc::new(DashMap::new()),
958 cache_initialized: Arc::new(AtomicBool::new(false)),
959 })
960 }
961
962 #[allow(clippy::too_many_arguments)]
964 pub fn with_credentials(
965 api_key: String,
966 api_secret: String,
967 environment: KrakenEnvironment,
968 base_url_override: Option<String>,
969 timeout_secs: Option<u64>,
970 max_retries: Option<u32>,
971 retry_delay_ms: Option<u64>,
972 retry_delay_max_ms: Option<u64>,
973 proxy_url: Option<String>,
974 max_requests_per_second: Option<u32>,
975 ) -> anyhow::Result<Self> {
976 Ok(Self {
977 inner: Arc::new(KrakenFuturesRawHttpClient::with_credentials(
978 api_key,
979 api_secret,
980 environment,
981 base_url_override,
982 timeout_secs,
983 max_retries,
984 retry_delay_ms,
985 retry_delay_max_ms,
986 proxy_url,
987 max_requests_per_second,
988 )?),
989 instruments_cache: Arc::new(DashMap::new()),
990 cache_initialized: Arc::new(AtomicBool::new(false)),
991 })
992 }
993
994 #[allow(clippy::too_many_arguments)]
1001 pub fn from_env(
1002 environment: KrakenEnvironment,
1003 base_url_override: Option<String>,
1004 timeout_secs: Option<u64>,
1005 max_retries: Option<u32>,
1006 retry_delay_ms: Option<u64>,
1007 retry_delay_max_ms: Option<u64>,
1008 proxy_url: Option<String>,
1009 max_requests_per_second: Option<u32>,
1010 ) -> anyhow::Result<Self> {
1011 let demo = environment == KrakenEnvironment::Demo;
1012
1013 if let Some(credential) = KrakenCredential::from_env_futures(demo) {
1014 let (api_key, api_secret) = credential.into_parts();
1015 Self::with_credentials(
1016 api_key,
1017 api_secret,
1018 environment,
1019 base_url_override,
1020 timeout_secs,
1021 max_retries,
1022 retry_delay_ms,
1023 retry_delay_max_ms,
1024 proxy_url,
1025 max_requests_per_second,
1026 )
1027 } else {
1028 Self::new(
1029 environment,
1030 base_url_override,
1031 timeout_secs,
1032 max_retries,
1033 retry_delay_ms,
1034 retry_delay_max_ms,
1035 proxy_url,
1036 max_requests_per_second,
1037 )
1038 }
1039 }
1040
1041 pub fn cancel_all_requests(&self) {
1043 self.inner.cancel_all_requests();
1044 }
1045
1046 pub fn cancellation_token(&self) -> &CancellationToken {
1048 self.inner.cancellation_token()
1049 }
1050
1051 pub fn cache_instrument(&self, instrument: InstrumentAny) {
1053 self.instruments_cache
1054 .insert(instrument.symbol().inner(), instrument);
1055 self.cache_initialized.store(true, Ordering::Release);
1056 }
1057
1058 pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
1060 for instrument in instruments {
1061 self.instruments_cache
1062 .insert(instrument.symbol().inner(), instrument);
1063 }
1064 self.cache_initialized.store(true, Ordering::Release);
1065 }
1066
1067 pub fn get_cached_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
1069 self.instruments_cache
1070 .get(symbol)
1071 .map(|entry| entry.value().clone())
1072 }
1073
1074 fn get_instrument_by_raw_symbol(&self, raw_symbol: &str) -> Option<InstrumentAny> {
1075 self.instruments_cache
1076 .iter()
1077 .find(|entry| entry.value().raw_symbol().as_str() == raw_symbol)
1078 .map(|entry| entry.value().clone())
1079 }
1080
1081 fn generate_ts_init(&self) -> UnixNanos {
1082 get_atomic_clock_realtime().get_time_ns()
1083 }
1084
1085 pub async fn request_instruments(&self) -> anyhow::Result<Vec<InstrumentAny>, KrakenHttpError> {
1087 let ts_init = self.generate_ts_init();
1088 let response = self.inner.get_instruments().await?;
1089
1090 let instruments: Vec<InstrumentAny> = response
1091 .instruments
1092 .iter()
1093 .filter_map(|fut_instrument| {
1094 match parse_futures_instrument(fut_instrument, ts_init, ts_init) {
1095 Ok(instrument) => Some(instrument),
1096 Err(e) => {
1097 let symbol = &fut_instrument.symbol;
1098 tracing::warn!("Failed to parse futures instrument {symbol}: {e}");
1099 None
1100 }
1101 }
1102 })
1103 .collect();
1104
1105 Ok(instruments)
1106 }
1107
1108 pub async fn request_mark_price(
1110 &self,
1111 instrument_id: InstrumentId,
1112 ) -> anyhow::Result<f64, KrakenHttpError> {
1113 let instrument = self
1114 .get_cached_instrument(&instrument_id.symbol.inner())
1115 .ok_or_else(|| {
1116 KrakenHttpError::ParseError(format!(
1117 "Instrument not found in cache: {instrument_id}"
1118 ))
1119 })?;
1120
1121 let raw_symbol = instrument.raw_symbol().to_string();
1122 let tickers = self.inner.get_tickers().await?;
1123
1124 tickers
1125 .tickers
1126 .iter()
1127 .find(|t| t.symbol == raw_symbol)
1128 .ok_or_else(|| {
1129 KrakenHttpError::ParseError(format!("Symbol {raw_symbol} not found in tickers"))
1130 })
1131 .and_then(|t| {
1132 t.mark_price.ok_or_else(|| {
1133 KrakenHttpError::ParseError(format!(
1134 "Mark price not available for {raw_symbol} (may not be available in testnet)"
1135 ))
1136 })
1137 })
1138 }
1139
1140 pub async fn request_index_price(
1141 &self,
1142 instrument_id: InstrumentId,
1143 ) -> anyhow::Result<f64, KrakenHttpError> {
1144 let instrument = self
1145 .get_cached_instrument(&instrument_id.symbol.inner())
1146 .ok_or_else(|| {
1147 KrakenHttpError::ParseError(format!(
1148 "Instrument not found in cache: {instrument_id}"
1149 ))
1150 })?;
1151
1152 let raw_symbol = instrument.raw_symbol().to_string();
1153 let tickers = self.inner.get_tickers().await?;
1154
1155 tickers
1156 .tickers
1157 .iter()
1158 .find(|t| t.symbol == raw_symbol)
1159 .ok_or_else(|| {
1160 KrakenHttpError::ParseError(format!("Symbol {raw_symbol} not found in tickers"))
1161 })
1162 .and_then(|t| {
1163 t.index_price.ok_or_else(|| {
1164 KrakenHttpError::ParseError(format!(
1165 "Index price not available for {raw_symbol} (may not be available in testnet)"
1166 ))
1167 })
1168 })
1169 }
1170
1171 pub async fn request_trades(
1172 &self,
1173 instrument_id: InstrumentId,
1174 start: Option<DateTime<Utc>>,
1175 end: Option<DateTime<Utc>>,
1176 limit: Option<u64>,
1177 ) -> anyhow::Result<Vec<TradeTick>, KrakenHttpError> {
1178 let instrument = self
1179 .get_cached_instrument(&instrument_id.symbol.inner())
1180 .ok_or_else(|| {
1181 KrakenHttpError::ParseError(format!(
1182 "Instrument not found in cache: {instrument_id}"
1183 ))
1184 })?;
1185
1186 let raw_symbol = instrument.raw_symbol().to_string();
1187 let ts_init = self.generate_ts_init();
1188
1189 let since = start.map(|dt| dt.timestamp_millis());
1190 let before = end.map(|dt| dt.timestamp_millis());
1191
1192 let response = self
1193 .inner
1194 .get_public_executions(&raw_symbol, since, before, Some("asc"), None)
1195 .await?;
1196
1197 let mut trades = Vec::new();
1198
1199 for element in &response.elements {
1200 let execution = &element.event.execution.execution;
1201 match parse_futures_public_execution(execution, &instrument, ts_init) {
1202 Ok(trade_tick) => {
1203 trades.push(trade_tick);
1204
1205 if let Some(limit_count) = limit
1206 && trades.len() >= limit_count as usize
1207 {
1208 return Ok(trades);
1209 }
1210 }
1211 Err(e) => {
1212 tracing::warn!("Failed to parse futures trade tick: {e}");
1213 }
1214 }
1215 }
1216
1217 Ok(trades)
1218 }
1219
1220 pub async fn request_bars(
1221 &self,
1222 bar_type: BarType,
1223 start: Option<DateTime<Utc>>,
1224 end: Option<DateTime<Utc>>,
1225 limit: Option<u64>,
1226 ) -> anyhow::Result<Vec<Bar>, KrakenHttpError> {
1227 let instrument_id = bar_type.instrument_id();
1228 let instrument = self
1229 .get_cached_instrument(&instrument_id.symbol.inner())
1230 .ok_or_else(|| {
1231 KrakenHttpError::ParseError(format!(
1232 "Instrument not found in cache: {instrument_id}"
1233 ))
1234 })?;
1235
1236 let raw_symbol = instrument.raw_symbol().to_string();
1237 let ts_init = self.generate_ts_init();
1238 let tick_type = "trade";
1239 let resolution = bar_type_to_futures_resolution(bar_type)
1240 .map_err(|e| KrakenHttpError::ParseError(e.to_string()))?;
1241
1242 let from = start.map(|dt| dt.timestamp());
1244 let to = end.map(|dt| dt.timestamp());
1245 let end_ns = end.map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64);
1246
1247 let response = self
1248 .inner
1249 .get_ohlc(tick_type, &raw_symbol, resolution, from, to)
1250 .await?;
1251
1252 let mut bars = Vec::new();
1253 for candle in response.candles {
1254 let ohlc = OhlcData {
1255 time: candle.time / 1000,
1256 open: candle.open,
1257 high: candle.high,
1258 low: candle.low,
1259 close: candle.close,
1260 vwap: "0".to_string(),
1261 volume: candle.volume,
1262 count: 0,
1263 };
1264
1265 match parse_bar(&ohlc, &instrument, bar_type, ts_init) {
1266 Ok(bar) => {
1267 if let Some(end_nanos) = end_ns
1268 && bar.ts_event.as_u64() > end_nanos
1269 {
1270 continue;
1271 }
1272 bars.push(bar);
1273
1274 if let Some(limit_count) = limit
1275 && bars.len() >= limit_count as usize
1276 {
1277 return Ok(bars);
1278 }
1279 }
1280 Err(e) => {
1281 tracing::warn!("Failed to parse futures bar: {e}");
1282 }
1283 }
1284 }
1285
1286 Ok(bars)
1287 }
1288
1289 pub async fn request_account_state(
1301 &self,
1302 account_id: AccountId,
1303 ) -> anyhow::Result<AccountState> {
1304 let accounts_response = self.inner.get_accounts().await?;
1305
1306 if accounts_response.result != KrakenApiResult::Success {
1307 let error_msg = accounts_response
1308 .error
1309 .unwrap_or_else(|| "Unknown error".to_string());
1310 anyhow::bail!("Failed to get futures accounts: {error_msg}");
1311 }
1312
1313 let ts_init = self.generate_ts_init();
1314
1315 let mut balances: Vec<AccountBalance> = Vec::new();
1316
1317 for account in accounts_response.accounts.values() {
1318 match account.account_type.as_str() {
1319 "multiCollateralMarginAccount" => {
1320 for (currency_code, currency_info) in &account.currencies {
1321 if currency_info.quantity == 0.0 {
1322 continue;
1323 }
1324
1325 let currency = Currency::new(
1326 currency_code.as_str(),
1327 8,
1328 0,
1329 currency_code.as_str(),
1330 CurrencyType::Crypto,
1331 );
1332
1333 let total_amount = currency_info.quantity;
1334 let total = Money::new(total_amount, currency);
1335
1336 let available_amount = currency_info
1338 .available
1339 .unwrap_or(total_amount)
1340 .min(total_amount);
1341 let locked_amount = (total_amount - available_amount).max(0.0);
1342 let locked = Money::new(locked_amount, currency);
1343 let free = total - locked;
1345
1346 balances.push(AccountBalance::new(total, locked, free));
1347 }
1348
1349 if let Some(portfolio_value) = account.portfolio_value
1353 && portfolio_value > 0.0
1354 {
1355 let usd_currency = Currency::USD();
1356 let total_usd = Money::new(portfolio_value, usd_currency);
1357 let available_usd = account
1358 .available_margin
1359 .unwrap_or(portfolio_value)
1360 .min(portfolio_value);
1361 let locked_usd =
1363 Money::new((portfolio_value - available_usd).max(0.0), usd_currency);
1364 let free_usd = total_usd - locked_usd;
1365
1366 balances.push(AccountBalance::new(total_usd, locked_usd, free_usd));
1367 }
1368 }
1369 "marginAccount" => {
1370 for (currency_code, &amount) in &account.balances {
1371 if amount == 0.0 {
1372 continue;
1373 }
1374
1375 let currency = Currency::new(
1376 currency_code.as_str(),
1377 8,
1378 0,
1379 currency_code.as_str(),
1380 CurrencyType::Crypto,
1381 );
1382
1383 let total = Money::new(amount, currency);
1384
1385 let available = account
1387 .auxiliary
1388 .as_ref()
1389 .and_then(|aux| aux.af)
1390 .unwrap_or(amount)
1391 .min(amount);
1392 let locked = amount - available;
1393
1394 balances.push(AccountBalance::new(
1395 total,
1396 Money::new(locked, currency),
1397 Money::new(available, currency),
1398 ));
1399 }
1400 }
1401 "cashAccount" => {
1402 for (currency_code, &amount) in &account.balances {
1403 if amount == 0.0 {
1404 continue;
1405 }
1406
1407 let currency = Currency::new(
1408 currency_code.as_str(),
1409 8,
1410 0,
1411 currency_code.as_str(),
1412 CurrencyType::Crypto,
1413 );
1414
1415 let total = Money::new(amount, currency);
1416 let locked = Money::new(0.0, currency);
1417
1418 balances.push(AccountBalance::new(total, locked, total));
1419 }
1420 }
1421 _ => {
1422 let account_type = &account.account_type;
1423 tracing::debug!("Unknown account type: {account_type}");
1424 }
1425 }
1426 }
1427
1428 Ok(AccountState::new(
1429 account_id,
1430 AccountType::Margin,
1431 balances,
1432 vec![],
1433 true,
1434 UUID4::new(),
1435 ts_init,
1436 ts_init,
1437 None,
1438 ))
1439 }
1440
1441 pub async fn request_order_status_reports(
1442 &self,
1443 account_id: AccountId,
1444 instrument_id: Option<InstrumentId>,
1445 start: Option<DateTime<Utc>>,
1446 end: Option<DateTime<Utc>>,
1447 open_only: bool,
1448 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1449 let ts_init = self.generate_ts_init();
1450 let mut all_reports = Vec::new();
1451
1452 let response = self
1453 .inner
1454 .get_open_orders()
1455 .await
1456 .map_err(|e| anyhow::anyhow!("get_open_orders failed: {e}"))?;
1457 if response.result != KrakenApiResult::Success {
1458 let error_msg = response
1459 .error
1460 .unwrap_or_else(|| "Unknown error".to_string());
1461 anyhow::bail!("Failed to get open orders: {error_msg}");
1462 }
1463
1464 for order in &response.open_orders {
1465 if let Some(ref target_id) = instrument_id {
1466 let instrument = self.get_cached_instrument(&target_id.symbol.inner());
1467 if let Some(inst) = instrument
1468 && inst.raw_symbol().as_str() != order.symbol
1469 {
1470 continue;
1471 }
1472 }
1473
1474 if let Some(instrument) = self.get_instrument_by_raw_symbol(&order.symbol) {
1475 match parse_futures_order_status_report(order, &instrument, account_id, ts_init) {
1476 Ok(report) => all_reports.push(report),
1477 Err(e) => {
1478 let order_id = &order.order_id;
1479 tracing::warn!("Failed to parse futures order {order_id}: {e}");
1480 }
1481 }
1482 }
1483 }
1484
1485 if !open_only {
1486 let start_ms = start.map(|dt| dt.timestamp_millis());
1488 let end_ms = end.map(|dt| dt.timestamp_millis());
1489 let response = self
1490 .inner
1491 .get_order_events(end_ms, start_ms, None)
1492 .await
1493 .map_err(|e| anyhow::anyhow!("get_order_events failed: {e}"))?;
1494
1495 for event_wrapper in response.order_events {
1496 let event = &event_wrapper.order;
1497 if let Some(ref target_id) = instrument_id {
1498 let instrument = self.get_cached_instrument(&target_id.symbol.inner());
1499 if let Some(inst) = instrument
1500 && inst.raw_symbol().as_str() != event.symbol
1501 {
1502 continue;
1503 }
1504 }
1505
1506 if let Some(instrument) = self.get_instrument_by_raw_symbol(&event.symbol) {
1507 match parse_futures_order_event_status_report(
1508 event,
1509 &instrument,
1510 account_id,
1511 ts_init,
1512 ) {
1513 Ok(report) => all_reports.push(report),
1514 Err(e) => {
1515 let order_id = &event.order_id;
1516 tracing::warn!("Failed to parse futures order event {order_id}: {e}");
1517 }
1518 }
1519 }
1520 }
1521 }
1522
1523 Ok(all_reports)
1524 }
1525
1526 pub async fn request_fill_reports(
1527 &self,
1528 account_id: AccountId,
1529 instrument_id: Option<InstrumentId>,
1530 start: Option<DateTime<Utc>>,
1531 end: Option<DateTime<Utc>>,
1532 ) -> anyhow::Result<Vec<FillReport>> {
1533 let ts_init = self.generate_ts_init();
1534 let mut all_reports = Vec::new();
1535
1536 let response = self.inner.get_fills(None).await?;
1537 if response.result != KrakenApiResult::Success {
1538 let error_msg = response
1539 .error
1540 .unwrap_or_else(|| "Unknown error".to_string());
1541 anyhow::bail!("Failed to get fills: {error_msg}");
1542 }
1543
1544 let start_ms = start.map(|dt| dt.timestamp_millis());
1545 let end_ms = end.map(|dt| dt.timestamp_millis());
1546
1547 for fill in response.fills {
1548 if let Some(start_threshold) = start_ms
1549 && let Ok(fill_ts) = DateTime::parse_from_rfc3339(&fill.fill_time)
1550 {
1551 let fill_ms = fill_ts.timestamp_millis();
1552 if fill_ms < start_threshold {
1553 continue;
1554 }
1555 }
1556 if let Some(end_threshold) = end_ms
1557 && let Ok(fill_ts) = DateTime::parse_from_rfc3339(&fill.fill_time)
1558 {
1559 let fill_ms = fill_ts.timestamp_millis();
1560 if fill_ms > end_threshold {
1561 continue;
1562 }
1563 }
1564
1565 if let Some(ref target_id) = instrument_id {
1566 let instrument = self.get_cached_instrument(&target_id.symbol.inner());
1567 if let Some(inst) = instrument
1568 && inst.raw_symbol().as_str() != fill.symbol
1569 {
1570 continue;
1571 }
1572 }
1573
1574 if let Some(instrument) = self.get_instrument_by_raw_symbol(&fill.symbol) {
1575 match parse_futures_fill_report(&fill, &instrument, account_id, ts_init) {
1576 Ok(report) => all_reports.push(report),
1577 Err(e) => {
1578 let fill_id = &fill.fill_id;
1579 tracing::warn!("Failed to parse futures fill {fill_id}: {e}");
1580 }
1581 }
1582 }
1583 }
1584
1585 Ok(all_reports)
1586 }
1587
1588 pub async fn request_position_status_reports(
1589 &self,
1590 account_id: AccountId,
1591 instrument_id: Option<InstrumentId>,
1592 ) -> anyhow::Result<Vec<PositionStatusReport>> {
1593 let ts_init = self.generate_ts_init();
1594 let mut all_reports = Vec::new();
1595
1596 let response = self.inner.get_open_positions().await?;
1597 if response.result != KrakenApiResult::Success {
1598 let error_msg = response
1599 .error
1600 .unwrap_or_else(|| "Unknown error".to_string());
1601 anyhow::bail!("Failed to get open positions: {error_msg}");
1602 }
1603
1604 for position in response.open_positions {
1605 if let Some(ref target_id) = instrument_id {
1606 let instrument = self.get_cached_instrument(&target_id.symbol.inner());
1607 if let Some(inst) = instrument
1608 && inst.raw_symbol().as_str() != position.symbol
1609 {
1610 continue;
1611 }
1612 }
1613
1614 if let Some(instrument) = self.get_instrument_by_raw_symbol(&position.symbol) {
1615 match parse_futures_position_status_report(
1616 &position,
1617 &instrument,
1618 account_id,
1619 ts_init,
1620 ) {
1621 Ok(report) => all_reports.push(report),
1622 Err(e) => {
1623 let symbol = &position.symbol;
1624 tracing::warn!("Failed to parse futures position {symbol}: {e}");
1625 }
1626 }
1627 }
1628 }
1629
1630 Ok(all_reports)
1631 }
1632
1633 #[allow(clippy::too_many_arguments)]
1644 pub async fn submit_order(
1645 &self,
1646 account_id: AccountId,
1647 instrument_id: InstrumentId,
1648 client_order_id: ClientOrderId,
1649 order_side: OrderSide,
1650 order_type: OrderType,
1651 quantity: Quantity,
1652 time_in_force: TimeInForce,
1653 price: Option<Price>,
1654 trigger_price: Option<Price>,
1655 reduce_only: bool,
1656 post_only: bool,
1657 ) -> anyhow::Result<OrderStatusReport> {
1658 let instrument = self
1659 .get_cached_instrument(&instrument_id.symbol.inner())
1660 .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {instrument_id}"))?;
1661
1662 let raw_symbol = instrument.raw_symbol().inner();
1663
1664 let kraken_order_type = match order_type {
1671 OrderType::Market => KrakenFuturesOrderType::Market,
1672 OrderType::Limit => {
1673 if post_only {
1674 KrakenFuturesOrderType::Post
1675 } else {
1676 match time_in_force {
1677 TimeInForce::Ioc => KrakenFuturesOrderType::Ioc,
1678 TimeInForce::Fok => {
1679 anyhow::bail!("FOK not supported by Kraken Futures, use IOC instead")
1680 }
1681 TimeInForce::Gtd => {
1682 anyhow::bail!("GTD not supported by Kraken Futures, use GTC instead")
1683 }
1684 _ => KrakenFuturesOrderType::Limit, }
1686 }
1687 }
1688 OrderType::StopMarket | OrderType::StopLimit => KrakenFuturesOrderType::Stop,
1689 OrderType::MarketIfTouched => KrakenFuturesOrderType::TakeProfit,
1690 _ => anyhow::bail!("Unsupported order type: {order_type:?}"),
1691 };
1692
1693 let mut builder = KrakenFuturesSendOrderParamsBuilder::default();
1694 builder
1695 .cli_ord_id(client_order_id.to_string())
1696 .broker(NAUTILUS_KRAKEN_BROKER_ID)
1697 .symbol(raw_symbol)
1698 .side(KrakenOrderSide::from(order_side))
1699 .size(quantity.to_string())
1700 .order_type(kraken_order_type);
1701
1702 match order_type {
1704 OrderType::StopMarket => {
1705 if let Some(trigger) = trigger_price {
1707 builder.stop_price(trigger.to_string());
1708 }
1709 }
1710 OrderType::StopLimit => {
1711 if let Some(trigger) = trigger_price {
1713 builder.stop_price(trigger.to_string());
1714 }
1715 if let Some(limit) = price {
1716 builder.limit_price(limit.to_string());
1717 }
1718 }
1719 OrderType::MarketIfTouched => {
1720 if let Some(trigger) = trigger_price {
1722 builder.stop_price(trigger.to_string());
1723 }
1724 if let Some(limit) = price {
1725 builder.limit_price(limit.to_string());
1726 }
1727 }
1728 _ => {
1729 if let Some(limit) = price {
1731 builder.limit_price(limit.to_string());
1732 }
1733 }
1734 }
1735
1736 if reduce_only {
1737 builder.reduce_only(true);
1738 }
1739
1740 let params = builder
1741 .build()
1742 .map_err(|e| anyhow::anyhow!("Failed to build order params: {e}"))?;
1743
1744 let response = self.inner.send_order_params(¶ms).await?;
1745
1746 if response.result != KrakenApiResult::Success {
1747 let error_msg = response
1748 .error
1749 .unwrap_or_else(|| "Unknown error".to_string());
1750 anyhow::bail!("Order submission failed: {error_msg}");
1751 }
1752
1753 let send_status = response
1754 .send_status
1755 .ok_or_else(|| anyhow::anyhow!("No send_status in successful response"))?;
1756
1757 let status = &send_status.status;
1758
1759 if status == "postWouldExecute" {
1761 let reason = send_status
1762 .order_events
1763 .as_ref()
1764 .and_then(|events| events.first())
1765 .and_then(|e| e.reason.clone())
1766 .unwrap_or_else(|| "Post-only order would have crossed".to_string());
1767 anyhow::bail!("POST_ONLY_REJECTED: {reason}");
1768 }
1769
1770 let venue_order_id = send_status
1771 .order_id
1772 .ok_or_else(|| anyhow::anyhow!("No order_id in send_status: {status}"))?;
1773
1774 let ts_init = self.generate_ts_init();
1775
1776 let open_orders_response = self.inner.get_open_orders().await?;
1777 if let Some(order) = open_orders_response
1778 .open_orders
1779 .iter()
1780 .find(|o| o.order_id == venue_order_id)
1781 {
1782 return parse_futures_order_status_report(order, &instrument, account_id, ts_init);
1783 }
1784
1785 if let Some(order_events) = &send_status.order_events
1788 && let Some(send_event) = order_events.first()
1789 {
1790 let event = if let Some(order_data) = &send_event.order {
1792 FuturesOrderEvent {
1793 order_id: order_data.order_id.clone(),
1794 cli_ord_id: order_data.cli_ord_id.clone(),
1795 order_type: order_data.order_type,
1796 symbol: order_data.symbol.clone(),
1797 side: order_data.side,
1798 quantity: order_data.quantity,
1799 filled: order_data.filled,
1800 limit_price: order_data.limit_price,
1801 stop_price: order_data.stop_price,
1802 timestamp: order_data.timestamp.clone(),
1803 last_update_timestamp: order_data.last_update_timestamp.clone(),
1804 reduce_only: order_data.reduce_only,
1805 }
1806 } else if let Some(trigger_data) = &send_event.order_trigger {
1807 FuturesOrderEvent {
1808 order_id: trigger_data.uid.clone(),
1809 cli_ord_id: trigger_data.client_id.clone(),
1810 order_type: trigger_data.order_type,
1811 symbol: trigger_data.symbol.clone(),
1812 side: trigger_data.side,
1813 quantity: trigger_data.quantity,
1814 filled: 0.0,
1815 limit_price: trigger_data.limit_price,
1816 stop_price: Some(trigger_data.trigger_price),
1817 timestamp: trigger_data.timestamp.clone(),
1818 last_update_timestamp: trigger_data.last_update_timestamp.clone(),
1819 reduce_only: trigger_data.reduce_only,
1820 }
1821 } else if let Some(prior_exec) = &send_event.order_prior_execution {
1822 FuturesOrderEvent {
1824 order_id: prior_exec.order_id.clone(),
1825 cli_ord_id: prior_exec.cli_ord_id.clone(),
1826 order_type: prior_exec.order_type,
1827 symbol: prior_exec.symbol.clone(),
1828 side: prior_exec.side,
1829 quantity: prior_exec.quantity,
1830 filled: send_event.amount.unwrap_or(prior_exec.quantity), limit_price: prior_exec.limit_price,
1832 stop_price: prior_exec.stop_price,
1833 timestamp: prior_exec.timestamp.clone(),
1834 last_update_timestamp: prior_exec.last_update_timestamp.clone(),
1835 reduce_only: prior_exec.reduce_only,
1836 }
1837 } else {
1838 anyhow::bail!("No order, orderTrigger, or orderPriorExecution data in event");
1839 };
1840 return parse_futures_order_event_status_report(
1841 &event,
1842 &instrument,
1843 account_id,
1844 ts_init,
1845 );
1846 }
1847
1848 let events_response = self.inner.get_order_events(None, None, None).await?;
1850 let event_wrapper = events_response
1851 .order_events
1852 .iter()
1853 .find(|e| e.order.order_id == venue_order_id)
1854 .ok_or_else(|| {
1855 anyhow::anyhow!("Order not found in open orders or events: {venue_order_id}")
1856 })?;
1857
1858 parse_futures_order_event_status_report(
1859 &event_wrapper.order,
1860 &instrument,
1861 account_id,
1862 ts_init,
1863 )
1864 }
1865
1866 pub async fn modify_order(
1878 &self,
1879 instrument_id: InstrumentId,
1880 client_order_id: Option<ClientOrderId>,
1881 venue_order_id: Option<VenueOrderId>,
1882 quantity: Option<Quantity>,
1883 price: Option<Price>,
1884 trigger_price: Option<Price>,
1885 ) -> anyhow::Result<VenueOrderId> {
1886 let _ = self
1887 .get_cached_instrument(&instrument_id.symbol.inner())
1888 .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {instrument_id}"))?;
1889
1890 let order_id = venue_order_id.as_ref().map(|id| id.to_string());
1891 let cli_ord_id = client_order_id.as_ref().map(|id| id.to_string());
1892
1893 if order_id.is_none() && cli_ord_id.is_none() {
1894 anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1895 }
1896
1897 let mut builder = KrakenFuturesEditOrderParamsBuilder::default();
1898
1899 if let Some(ref id) = order_id {
1900 builder.order_id(id.clone());
1901 }
1902 if let Some(ref id) = cli_ord_id {
1903 builder.cli_ord_id(id.clone());
1904 }
1905 if let Some(qty) = quantity {
1906 builder.size(qty.to_string());
1907 }
1908 if let Some(p) = price {
1909 builder.limit_price(p.to_string());
1910 }
1911 if let Some(tp) = trigger_price {
1912 builder.stop_price(tp.to_string());
1913 }
1914
1915 let params = builder
1916 .build()
1917 .map_err(|e| anyhow::anyhow!("Failed to build edit order params: {e}"))?;
1918
1919 let response = self.inner.edit_order(¶ms).await?;
1920
1921 if response.result != KrakenApiResult::Success {
1922 let status = &response.edit_status.status;
1923 anyhow::bail!("Order modification failed: {status}");
1924 }
1925
1926 let new_venue_order_id = response
1928 .edit_status
1929 .order_id
1930 .or(order_id)
1931 .ok_or_else(|| anyhow::anyhow!("No order ID in edit order response"))?;
1932
1933 Ok(VenueOrderId::new(&new_venue_order_id))
1934 }
1935
1936 pub async fn cancel_order(
1946 &self,
1947 _account_id: AccountId,
1948 instrument_id: InstrumentId,
1949 client_order_id: Option<ClientOrderId>,
1950 venue_order_id: Option<VenueOrderId>,
1951 ) -> anyhow::Result<()> {
1952 let _ = self
1953 .get_cached_instrument(&instrument_id.symbol.inner())
1954 .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {instrument_id}"))?;
1955
1956 let order_id = venue_order_id.as_ref().map(|id| id.to_string());
1957 let cli_ord_id = client_order_id.as_ref().map(|id| id.to_string());
1958
1959 if order_id.is_none() && cli_ord_id.is_none() {
1960 anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1961 }
1962
1963 let response = self.inner.cancel_order(order_id, cli_ord_id).await?;
1964
1965 if response.result != KrakenApiResult::Success {
1966 let status = &response.cancel_status.status;
1967 anyhow::bail!("Order cancellation failed: {status}");
1968 }
1969
1970 Ok(())
1971 }
1972
1973 pub async fn cancel_orders_batch(
1983 &self,
1984 venue_order_ids: Vec<VenueOrderId>,
1985 ) -> anyhow::Result<usize> {
1986 if venue_order_ids.is_empty() {
1987 return Ok(0);
1988 }
1989
1990 let mut total_cancelled = 0;
1991
1992 for chunk in venue_order_ids.chunks(BATCH_CANCEL_LIMIT) {
1993 let order_ids: Vec<String> = chunk.iter().map(|id| id.to_string()).collect();
1994 let response = self.inner.cancel_orders_batch(order_ids).await?;
1995
1996 if response.result != KrakenApiResult::Success {
1997 let error_msg = response.error.as_deref().unwrap_or("Unknown error");
1998 anyhow::bail!("Batch cancel failed: {error_msg}");
1999 }
2000
2001 let success_count = response
2002 .batch_status
2003 .iter()
2004 .filter(|s| {
2005 s.status == Some(KrakenSendStatus::Cancelled)
2006 || s.cancel_status
2007 .as_ref()
2008 .is_some_and(|cs| cs.status == KrakenSendStatus::Cancelled)
2009 })
2010 .count();
2011
2012 total_cancelled += success_count;
2013 }
2014
2015 Ok(total_cancelled)
2016 }
2017}
2018
2019#[cfg(test)]
2020mod tests {
2021 use rstest::rstest;
2022
2023 use super::*;
2024
2025 #[rstest]
2026 fn test_raw_client_creation() {
2027 let client = KrakenFuturesRawHttpClient::default();
2028 assert!(client.credential.is_none());
2029 assert!(client.base_url().contains("futures"));
2030 }
2031
2032 #[rstest]
2033 fn test_raw_client_with_credentials() {
2034 let client = KrakenFuturesRawHttpClient::with_credentials(
2035 "test_key".to_string(),
2036 "test_secret".to_string(),
2037 KrakenEnvironment::Mainnet,
2038 None,
2039 None,
2040 None,
2041 None,
2042 None,
2043 None,
2044 None,
2045 )
2046 .unwrap();
2047 assert!(client.credential.is_some());
2048 }
2049
2050 #[rstest]
2051 fn test_client_creation() {
2052 let client = KrakenFuturesHttpClient::default();
2053 assert!(client.instruments_cache.is_empty());
2054 }
2055
2056 #[rstest]
2057 fn test_client_with_credentials() {
2058 let client = KrakenFuturesHttpClient::with_credentials(
2059 "test_key".to_string(),
2060 "test_secret".to_string(),
2061 KrakenEnvironment::Mainnet,
2062 None,
2063 None,
2064 None,
2065 None,
2066 None,
2067 None,
2068 None,
2069 )
2070 .unwrap();
2071 assert!(client.instruments_cache.is_empty());
2072 }
2073}