1use std::{
19 collections::HashMap,
20 fmt::Debug,
21 num::NonZeroU32,
22 sync::{
23 Arc, RwLock,
24 atomic::{AtomicBool, Ordering},
25 },
26};
27
28use chrono::{DateTime, Utc};
29use dashmap::DashMap;
30use indexmap::IndexMap;
31use nautilus_core::{
32 AtomicTime, UUID4, consts::NAUTILUS_USER_AGENT, datetime::NANOSECONDS_IN_SECOND,
33 nanos::UnixNanos, time::get_atomic_clock_realtime,
34};
35use nautilus_model::{
36 data::{Bar, BarType, TradeTick},
37 enums::{AccountType, CurrencyType, OrderSide, OrderType, PositionSideSpecified, TimeInForce},
38 events::AccountState,
39 identifiers::{AccountId, ClientOrderId, InstrumentId, VenueOrderId},
40 instruments::{Instrument, InstrumentAny},
41 reports::{FillReport, OrderStatusReport, PositionStatusReport},
42 types::{AccountBalance, Currency, Money, Price, Quantity},
43};
44use nautilus_network::{
45 http::{HttpClient, Method, USER_AGENT},
46 ratelimiter::quota::Quota,
47 retry::{RetryConfig, RetryManager},
48};
49use serde::de::DeserializeOwned;
50use tokio_util::sync::CancellationToken;
51use ustr::Ustr;
52
53use super::{models::*, query::*};
54use crate::{
55 common::{
56 consts::NAUTILUS_KRAKEN_BROKER_ID,
57 credential::KrakenCredential,
58 enums::{KrakenEnvironment, KrakenOrderSide, KrakenOrderType, KrakenProductType},
59 parse::{
60 bar_type_to_spot_interval, normalize_currency_code, parse_bar, parse_fill_report,
61 parse_order_status_report, parse_spot_instrument, parse_trade_tick_from_array,
62 },
63 urls::get_kraken_http_base_url,
64 },
65 http::error::KrakenHttpError,
66};
67
68pub const KRAKEN_SPOT_DEFAULT_RATE_LIMIT_PER_SECOND: u32 = 5;
70
71const KRAKEN_GLOBAL_RATE_KEY: &str = "kraken:spot:global";
72
73const BATCH_CANCEL_LIMIT: usize = 50;
75
76fn compute_time_in_force(
81 is_limit_order: bool,
82 time_in_force: TimeInForce,
83 expire_time: Option<UnixNanos>,
84) -> anyhow::Result<(Option<String>, Option<String>)> {
85 if is_limit_order {
86 match time_in_force {
87 TimeInForce::Gtc => Ok((None, None)), TimeInForce::Ioc => Ok((Some("IOC".to_string()), None)),
89 TimeInForce::Fok => {
90 anyhow::bail!("FOK time in force not supported by Kraken Spot API")
91 }
92 TimeInForce::Gtd => {
93 let expire = expire_time.ok_or_else(|| {
94 anyhow::anyhow!("GTD time in force requires expire_time parameter")
95 })?;
96 let expire_secs = expire.as_u64() / NANOSECONDS_IN_SECOND;
98 Ok((Some("GTD".to_string()), Some(expire_secs.to_string())))
99 }
100 _ => anyhow::bail!("Unsupported time in force: {time_in_force:?}"),
101 }
102 } else {
103 Ok((None, None))
105 }
106}
107
108pub struct KrakenSpotRawHttpClient {
113 base_url: String,
114 client: HttpClient,
115 credential: Option<KrakenCredential>,
116 retry_manager: RetryManager<KrakenHttpError>,
117 cancellation_token: CancellationToken,
118 clock: &'static AtomicTime,
119 auth_mutex: tokio::sync::Mutex<()>,
121}
122
123impl Default for KrakenSpotRawHttpClient {
124 fn default() -> Self {
125 Self::new(
126 KrakenEnvironment::Mainnet,
127 None,
128 Some(60),
129 None,
130 None,
131 None,
132 None,
133 None,
134 )
135 .expect("Failed to create default KrakenSpotRawHttpClient")
136 }
137}
138
139impl Debug for KrakenSpotRawHttpClient {
140 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141 f.debug_struct(stringify!(KrakenSpotRawHttpClient))
142 .field("base_url", &self.base_url)
143 .field("has_credentials", &self.credential.is_some())
144 .finish()
145 }
146}
147
148impl KrakenSpotRawHttpClient {
149 #[allow(clippy::too_many_arguments)]
151 pub fn new(
152 environment: KrakenEnvironment,
153 base_url_override: Option<String>,
154 timeout_secs: Option<u64>,
155 max_retries: Option<u32>,
156 retry_delay_ms: Option<u64>,
157 retry_delay_max_ms: Option<u64>,
158 proxy_url: Option<String>,
159 max_requests_per_second: Option<u32>,
160 ) -> anyhow::Result<Self> {
161 let retry_config = RetryConfig {
162 max_retries: max_retries.unwrap_or(3),
163 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
164 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
165 backoff_factor: 2.0,
166 jitter_ms: 1000,
167 operation_timeout_ms: Some(60_000),
168 immediate_first: false,
169 max_elapsed_ms: Some(180_000),
170 };
171
172 let retry_manager = RetryManager::new(retry_config);
173 let base_url = base_url_override.unwrap_or_else(|| {
174 get_kraken_http_base_url(KrakenProductType::Spot, environment).to_string()
175 });
176
177 let rate_limit =
178 max_requests_per_second.unwrap_or(KRAKEN_SPOT_DEFAULT_RATE_LIMIT_PER_SECOND);
179
180 Ok(Self {
181 base_url,
182 client: HttpClient::new(
183 Self::default_headers(),
184 vec![],
185 Self::rate_limiter_quotas(rate_limit),
186 Some(Self::default_quota(rate_limit)),
187 timeout_secs,
188 proxy_url,
189 )
190 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))?,
191 credential: None,
192 retry_manager,
193 cancellation_token: CancellationToken::new(),
194 clock: get_atomic_clock_realtime(),
195 auth_mutex: tokio::sync::Mutex::new(()),
196 })
197 }
198
199 #[allow(clippy::too_many_arguments)]
201 pub fn with_credentials(
202 api_key: String,
203 api_secret: String,
204 environment: KrakenEnvironment,
205 base_url_override: Option<String>,
206 timeout_secs: Option<u64>,
207 max_retries: Option<u32>,
208 retry_delay_ms: Option<u64>,
209 retry_delay_max_ms: Option<u64>,
210 proxy_url: Option<String>,
211 max_requests_per_second: Option<u32>,
212 ) -> anyhow::Result<Self> {
213 let retry_config = RetryConfig {
214 max_retries: max_retries.unwrap_or(3),
215 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
216 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
217 backoff_factor: 2.0,
218 jitter_ms: 1000,
219 operation_timeout_ms: Some(60_000),
220 immediate_first: false,
221 max_elapsed_ms: Some(180_000),
222 };
223
224 let retry_manager = RetryManager::new(retry_config);
225 let base_url = base_url_override.unwrap_or_else(|| {
226 get_kraken_http_base_url(KrakenProductType::Spot, environment).to_string()
227 });
228
229 let rate_limit =
230 max_requests_per_second.unwrap_or(KRAKEN_SPOT_DEFAULT_RATE_LIMIT_PER_SECOND);
231
232 Ok(Self {
233 base_url,
234 client: HttpClient::new(
235 Self::default_headers(),
236 vec![],
237 Self::rate_limiter_quotas(rate_limit),
238 Some(Self::default_quota(rate_limit)),
239 timeout_secs,
240 proxy_url,
241 )
242 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))?,
243 credential: Some(KrakenCredential::new(api_key, api_secret)),
244 retry_manager,
245 cancellation_token: CancellationToken::new(),
246 clock: get_atomic_clock_realtime(),
247 auth_mutex: tokio::sync::Mutex::new(()),
248 })
249 }
250
251 fn generate_nonce(&self) -> u64 {
256 self.clock.get_time_ns().as_u64()
257 }
258
259 pub fn base_url(&self) -> &str {
261 &self.base_url
262 }
263
264 pub fn credential(&self) -> Option<&KrakenCredential> {
266 self.credential.as_ref()
267 }
268
269 pub fn cancel_all_requests(&self) {
271 self.cancellation_token.cancel();
272 }
273
274 pub fn cancellation_token(&self) -> &CancellationToken {
276 &self.cancellation_token
277 }
278
279 fn default_headers() -> HashMap<String, String> {
280 HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())])
281 }
282
283 fn default_quota(max_requests_per_second: u32) -> Quota {
284 Quota::per_second(
285 NonZeroU32::new(max_requests_per_second).unwrap_or_else(|| {
286 NonZeroU32::new(KRAKEN_SPOT_DEFAULT_RATE_LIMIT_PER_SECOND).unwrap()
287 }),
288 )
289 }
290
291 fn rate_limiter_quotas(max_requests_per_second: u32) -> Vec<(String, Quota)> {
292 vec![(
293 KRAKEN_GLOBAL_RATE_KEY.to_string(),
294 Self::default_quota(max_requests_per_second),
295 )]
296 }
297
298 fn rate_limit_keys(endpoint: &str) -> Vec<String> {
299 let normalized = endpoint.split('?').next().unwrap_or(endpoint);
300 let route = format!("kraken:spot:{normalized}");
301 vec![KRAKEN_GLOBAL_RATE_KEY.to_string(), route]
302 }
303
304 fn sign_spot(
305 &self,
306 path: &str,
307 nonce: u64,
308 params: &HashMap<String, String>,
309 ) -> anyhow::Result<(HashMap<String, String>, String)> {
310 let credential = self
311 .credential
312 .as_ref()
313 .ok_or_else(|| anyhow::anyhow!("Missing credentials"))?;
314
315 let (signature, post_data) = credential.sign_spot(path, nonce, params)?;
316
317 let mut headers = HashMap::new();
318 headers.insert("API-Key".to_string(), credential.api_key().to_string());
319 headers.insert("API-Sign".to_string(), signature);
320
321 Ok((headers, post_data))
322 }
323
324 async fn send_request<T: DeserializeOwned>(
325 &self,
326 method: Method,
327 endpoint: &str,
328 body: Option<Vec<u8>>,
329 authenticate: bool,
330 ) -> anyhow::Result<KrakenResponse<T>, KrakenHttpError> {
331 let _guard = if authenticate {
335 Some(self.auth_mutex.lock().await)
336 } else {
337 None
338 };
339
340 let endpoint = endpoint.to_string();
341 let url = format!("{}{endpoint}", self.base_url);
342 let method_clone = method.clone();
343 let body_clone = body.clone();
344
345 let operation = || {
346 let url = url.clone();
347 let method = method_clone.clone();
348 let body = body_clone.clone();
349 let endpoint = endpoint.clone();
350
351 async move {
352 let mut headers = Self::default_headers();
353
354 let final_body = if authenticate {
355 let nonce = self.generate_nonce();
356 log::debug!("Generated nonce {nonce} for {endpoint}");
357
358 let params: HashMap<String, String> = if let Some(ref body_bytes) = body {
359 let body_str = std::str::from_utf8(body_bytes).map_err(|e| {
360 KrakenHttpError::ParseError(format!(
361 "Invalid UTF-8 in request body: {e}"
362 ))
363 })?;
364 serde_urlencoded::from_str(body_str).map_err(|e| {
365 KrakenHttpError::ParseError(format!(
366 "Failed to parse request params: {e}"
367 ))
368 })?
369 } else {
370 HashMap::new()
371 };
372
373 let (auth_headers, post_data) = self
374 .sign_spot(&endpoint, nonce, ¶ms)
375 .map_err(|e| KrakenHttpError::NetworkError(e.to_string()))?;
376 headers.extend(auth_headers);
377 Some(post_data.into_bytes())
378 } else {
379 body
380 };
381
382 if method == Method::POST {
383 headers.insert(
384 "Content-Type".to_string(),
385 "application/x-www-form-urlencoded".to_string(),
386 );
387 }
388
389 let rate_limit_keys = Self::rate_limit_keys(&endpoint);
390
391 let response = self
392 .client
393 .request(
394 method,
395 url,
396 None,
397 Some(headers),
398 final_body,
399 None,
400 Some(rate_limit_keys),
401 )
402 .await
403 .map_err(|e| KrakenHttpError::NetworkError(e.to_string()))?;
404
405 let status = response.status.as_u16();
406 if status >= 400 {
407 let body = String::from_utf8_lossy(&response.body).to_string();
408 if status == 401 || status == 403 {
410 return Err(KrakenHttpError::AuthenticationError(format!(
411 "HTTP error {status}: {body}"
412 )));
413 }
414 return Err(KrakenHttpError::NetworkError(format!(
415 "HTTP error {status}: {body}"
416 )));
417 }
418
419 let response_text = String::from_utf8(response.body.to_vec()).map_err(|e| {
420 KrakenHttpError::ParseError(format!("Failed to parse response as UTF-8: {e}"))
421 })?;
422
423 let kraken_response: KrakenResponse<T> = serde_json::from_str(&response_text)
424 .map_err(|e| {
425 KrakenHttpError::ParseError(format!("Failed to deserialize response: {e}"))
426 })?;
427
428 if !kraken_response.error.is_empty() {
429 return Err(KrakenHttpError::ApiError(kraken_response.error));
430 }
431
432 Ok(kraken_response)
433 }
434 };
435
436 let should_retry =
437 |error: &KrakenHttpError| -> bool { matches!(error, KrakenHttpError::NetworkError(_)) };
438 let create_error = |msg: String| -> KrakenHttpError { KrakenHttpError::NetworkError(msg) };
439
440 self.retry_manager
441 .execute_with_retry_with_cancel(
442 &endpoint,
443 operation,
444 should_retry,
445 create_error,
446 &self.cancellation_token,
447 )
448 .await
449 }
450
451 pub async fn get_server_time(&self) -> anyhow::Result<ServerTime, KrakenHttpError> {
453 let response: KrakenResponse<ServerTime> = self
454 .send_request(Method::GET, "/0/public/Time", None, false)
455 .await?;
456
457 response.result.ok_or_else(|| {
458 KrakenHttpError::ParseError("Missing result in server time response".to_string())
459 })
460 }
461
462 pub async fn get_system_status(&self) -> anyhow::Result<SystemStatus, KrakenHttpError> {
464 let response: KrakenResponse<SystemStatus> = self
465 .send_request(Method::GET, "/0/public/SystemStatus", None, false)
466 .await?;
467
468 response.result.ok_or_else(|| {
469 KrakenHttpError::ParseError("Missing result in system status response".to_string())
470 })
471 }
472
473 pub async fn get_asset_pairs(
475 &self,
476 pairs: Option<Vec<String>>,
477 ) -> anyhow::Result<AssetPairsResponse, KrakenHttpError> {
478 let endpoint = if let Some(pairs) = pairs {
479 format!("/0/public/AssetPairs?pair={}", pairs.join(","))
480 } else {
481 "/0/public/AssetPairs".to_string()
482 };
483
484 let response: KrakenResponse<AssetPairsResponse> = self
485 .send_request(Method::GET, &endpoint, None, false)
486 .await?;
487
488 response.result.ok_or_else(|| {
489 KrakenHttpError::ParseError("Missing result in asset pairs response".to_string())
490 })
491 }
492
493 pub async fn get_ticker(
495 &self,
496 pairs: Vec<String>,
497 ) -> anyhow::Result<TickerResponse, KrakenHttpError> {
498 let endpoint = format!("/0/public/Ticker?pair={}", pairs.join(","));
499
500 let response: KrakenResponse<TickerResponse> = self
501 .send_request(Method::GET, &endpoint, None, false)
502 .await?;
503
504 response.result.ok_or_else(|| {
505 KrakenHttpError::ParseError("Missing result in ticker response".to_string())
506 })
507 }
508
509 pub async fn get_ohlc(
511 &self,
512 pair: &str,
513 interval: Option<u32>,
514 since: Option<i64>,
515 ) -> anyhow::Result<OhlcResponse, KrakenHttpError> {
516 let mut endpoint = format!("/0/public/OHLC?pair={pair}");
517
518 if let Some(interval) = interval {
519 endpoint.push_str(&format!("&interval={interval}"));
520 }
521 if let Some(since) = since {
522 endpoint.push_str(&format!("&since={since}"));
523 }
524
525 let response: KrakenResponse<OhlcResponse> = self
526 .send_request(Method::GET, &endpoint, None, false)
527 .await?;
528
529 response.result.ok_or_else(|| {
530 KrakenHttpError::ParseError("Missing result in OHLC response".to_string())
531 })
532 }
533
534 pub async fn get_book_depth(
536 &self,
537 pair: &str,
538 count: Option<u32>,
539 ) -> anyhow::Result<OrderBookResponse, KrakenHttpError> {
540 let mut endpoint = format!("/0/public/Depth?pair={pair}");
541
542 if let Some(count) = count {
543 endpoint.push_str(&format!("&count={count}"));
544 }
545
546 let response: KrakenResponse<OrderBookResponse> = self
547 .send_request(Method::GET, &endpoint, None, false)
548 .await?;
549
550 response.result.ok_or_else(|| {
551 KrakenHttpError::ParseError("Missing result in book depth response".to_string())
552 })
553 }
554
555 pub async fn get_trades(
557 &self,
558 pair: &str,
559 since: Option<String>,
560 ) -> anyhow::Result<TradesResponse, KrakenHttpError> {
561 let mut endpoint = format!("/0/public/Trades?pair={pair}");
562
563 if let Some(since) = since {
564 endpoint.push_str(&format!("&since={since}"));
565 }
566
567 let response: KrakenResponse<TradesResponse> = self
568 .send_request(Method::GET, &endpoint, None, false)
569 .await?;
570
571 response.result.ok_or_else(|| {
572 KrakenHttpError::ParseError("Missing result in trades response".to_string())
573 })
574 }
575
576 pub async fn get_websockets_token(&self) -> anyhow::Result<WebSocketToken, KrakenHttpError> {
578 if self.credential.is_none() {
579 return Err(KrakenHttpError::AuthenticationError(
580 "API credentials required for GetWebSocketsToken".to_string(),
581 ));
582 }
583
584 let response: KrakenResponse<WebSocketToken> = self
585 .send_request(Method::POST, "/0/private/GetWebSocketsToken", None, true)
586 .await?;
587
588 response.result.ok_or_else(|| {
589 KrakenHttpError::ParseError("Missing result in websockets token response".to_string())
590 })
591 }
592
593 pub async fn get_open_orders(
595 &self,
596 trades: Option<bool>,
597 userref: Option<i64>,
598 ) -> anyhow::Result<IndexMap<String, SpotOrder>, KrakenHttpError> {
599 if self.credential.is_none() {
600 return Err(KrakenHttpError::AuthenticationError(
601 "API credentials required for OpenOrders".to_string(),
602 ));
603 }
604
605 let mut params = vec![];
606 if let Some(trades_flag) = trades {
607 params.push(format!("trades={trades_flag}"));
608 }
609 if let Some(userref_val) = userref {
610 params.push(format!("userref={userref_val}"));
611 }
612
613 let body = if params.is_empty() {
614 None
615 } else {
616 Some(params.join("&").into_bytes())
617 };
618
619 let response: KrakenResponse<SpotOpenOrdersResult> = self
620 .send_request(Method::POST, "/0/private/OpenOrders", body, true)
621 .await?;
622
623 let result = response.result.ok_or_else(|| {
624 KrakenHttpError::ParseError("Missing result in open orders response".to_string())
625 })?;
626
627 Ok(result.open)
628 }
629
630 pub async fn get_closed_orders(
632 &self,
633 trades: Option<bool>,
634 userref: Option<i64>,
635 start: Option<i64>,
636 end: Option<i64>,
637 ofs: Option<i32>,
638 closetime: Option<String>,
639 ) -> anyhow::Result<IndexMap<String, SpotOrder>, KrakenHttpError> {
640 if self.credential.is_none() {
641 return Err(KrakenHttpError::AuthenticationError(
642 "API credentials required for ClosedOrders".to_string(),
643 ));
644 }
645
646 let mut params = vec![];
647 if let Some(trades_flag) = trades {
648 params.push(format!("trades={trades_flag}"));
649 }
650 if let Some(userref_val) = userref {
651 params.push(format!("userref={userref_val}"));
652 }
653 if let Some(start_val) = start {
654 params.push(format!("start={start_val}"));
655 }
656 if let Some(end_val) = end {
657 params.push(format!("end={end_val}"));
658 }
659 if let Some(ofs_val) = ofs {
660 params.push(format!("ofs={ofs_val}"));
661 }
662 if let Some(closetime_val) = closetime {
663 params.push(format!("closetime={closetime_val}"));
664 }
665
666 let body = if params.is_empty() {
667 None
668 } else {
669 Some(params.join("&").into_bytes())
670 };
671
672 let response: KrakenResponse<SpotClosedOrdersResult> = self
673 .send_request(Method::POST, "/0/private/ClosedOrders", body, true)
674 .await?;
675
676 let result = response.result.ok_or_else(|| {
677 KrakenHttpError::ParseError("Missing result in closed orders response".to_string())
678 })?;
679
680 Ok(result.closed)
681 }
682
683 pub async fn get_trades_history(
685 &self,
686 trade_type: Option<String>,
687 trades: Option<bool>,
688 start: Option<i64>,
689 end: Option<i64>,
690 ofs: Option<i32>,
691 ) -> anyhow::Result<IndexMap<String, SpotTrade>, KrakenHttpError> {
692 if self.credential.is_none() {
693 return Err(KrakenHttpError::AuthenticationError(
694 "API credentials required for TradesHistory".to_string(),
695 ));
696 }
697
698 let mut params = vec![];
699 if let Some(type_val) = trade_type {
700 params.push(format!("type={type_val}"));
701 }
702 if let Some(trades_flag) = trades {
703 params.push(format!("trades={trades_flag}"));
704 }
705 if let Some(start_val) = start {
706 params.push(format!("start={start_val}"));
707 }
708 if let Some(end_val) = end {
709 params.push(format!("end={end_val}"));
710 }
711 if let Some(ofs_val) = ofs {
712 params.push(format!("ofs={ofs_val}"));
713 }
714
715 let body = if params.is_empty() {
716 None
717 } else {
718 Some(params.join("&").into_bytes())
719 };
720
721 let response: KrakenResponse<SpotTradesHistoryResult> = self
722 .send_request(Method::POST, "/0/private/TradesHistory", body, true)
723 .await?;
724
725 let result = response.result.ok_or_else(|| {
726 KrakenHttpError::ParseError("Missing result in trades history response".to_string())
727 })?;
728
729 Ok(result.trades)
730 }
731
732 pub async fn add_order(
734 &self,
735 params: &KrakenSpotAddOrderParams,
736 ) -> anyhow::Result<SpotAddOrderResponse, KrakenHttpError> {
737 if self.credential.is_none() {
738 return Err(KrakenHttpError::AuthenticationError(
739 "API credentials required for adding orders".to_string(),
740 ));
741 }
742
743 let param_string = serde_urlencoded::to_string(params)
744 .map_err(|e| KrakenHttpError::ParseError(format!("Failed to encode params: {e}")))?;
745 let body = Some(param_string.into_bytes());
746
747 let response: KrakenResponse<SpotAddOrderResponse> = self
748 .send_request(Method::POST, "/0/private/AddOrder", body, true)
749 .await?;
750
751 response
752 .result
753 .ok_or_else(|| KrakenHttpError::ParseError("Missing result in response".to_string()))
754 }
755
756 pub async fn cancel_order(
758 &self,
759 params: &KrakenSpotCancelOrderParams,
760 ) -> anyhow::Result<SpotCancelOrderResponse, KrakenHttpError> {
761 if self.credential.is_none() {
762 return Err(KrakenHttpError::AuthenticationError(
763 "API credentials required for canceling orders".to_string(),
764 ));
765 }
766
767 let param_string = serde_urlencoded::to_string(params)
768 .map_err(|e| KrakenHttpError::ParseError(format!("Failed to encode params: {e}")))?;
769
770 let body = Some(param_string.into_bytes());
771
772 let response: KrakenResponse<SpotCancelOrderResponse> = self
773 .send_request(Method::POST, "/0/private/CancelOrder", body, true)
774 .await?;
775
776 response
777 .result
778 .ok_or_else(|| KrakenHttpError::ParseError("Missing result in response".to_string()))
779 }
780
781 pub async fn cancel_order_batch(
783 &self,
784 params: &KrakenSpotCancelOrderBatchParams,
785 ) -> anyhow::Result<SpotCancelOrderBatchResponse, KrakenHttpError> {
786 let credential = self.credential.as_ref().ok_or_else(|| {
787 KrakenHttpError::AuthenticationError(
788 "API credentials required for canceling orders".to_string(),
789 )
790 })?;
791
792 let _guard = self.auth_mutex.lock().await;
794
795 let endpoint = "/0/private/CancelOrderBatch";
796 let nonce = self.generate_nonce();
797
798 let json_body = serde_json::json!({
800 "nonce": nonce.to_string(),
801 "orders": params.orders
802 });
803 let json_str = serde_json::to_string(&json_body)
804 .map_err(|e| KrakenHttpError::ParseError(format!("Failed to serialize: {e}")))?;
805
806 let signature = credential
807 .sign_spot_json(endpoint, nonce, &json_str)
808 .map_err(|e| KrakenHttpError::AuthenticationError(format!("Failed to sign: {e}")))?;
809
810 let mut headers = Self::default_headers();
811 headers.insert("API-Key".to_string(), credential.api_key().to_string());
812 headers.insert("API-Sign".to_string(), signature);
813 headers.insert("Content-Type".to_string(), "application/json".to_string());
814
815 let url = format!("{}{endpoint}", self.base_url);
816 let rate_limit_keys = Self::rate_limit_keys(endpoint);
817
818 let response = self
819 .client
820 .request(
821 Method::POST,
822 url,
823 None,
824 Some(headers),
825 Some(json_str.into_bytes()),
826 None,
827 Some(rate_limit_keys),
828 )
829 .await
830 .map_err(|e| KrakenHttpError::NetworkError(e.to_string()))?;
831
832 if response.status.as_u16() >= 400 {
833 let status = response.status.as_u16();
834 let body = String::from_utf8_lossy(&response.body).to_string();
835 if status == 401 || status == 403 {
836 return Err(KrakenHttpError::AuthenticationError(format!(
837 "HTTP error {status}: {body}"
838 )));
839 }
840 return Err(KrakenHttpError::NetworkError(format!(
841 "HTTP error {status}: {body}"
842 )));
843 }
844
845 let response_text = String::from_utf8(response.body.to_vec())
846 .map_err(|e| KrakenHttpError::ParseError(format!("Invalid UTF-8: {e}")))?;
847
848 let kraken_response: KrakenResponse<SpotCancelOrderBatchResponse> =
849 serde_json::from_str(&response_text).map_err(|e| {
850 KrakenHttpError::ParseError(format!("Failed to parse response: {e}"))
851 })?;
852
853 if !kraken_response.error.is_empty() {
854 return Err(KrakenHttpError::ApiError(kraken_response.error));
855 }
856
857 kraken_response
858 .result
859 .ok_or_else(|| KrakenHttpError::ParseError("Missing result in response".to_string()))
860 }
861
862 pub async fn cancel_all_orders(
864 &self,
865 ) -> anyhow::Result<SpotCancelOrderResponse, KrakenHttpError> {
866 if self.credential.is_none() {
867 return Err(KrakenHttpError::AuthenticationError(
868 "API credentials required for canceling orders".to_string(),
869 ));
870 }
871
872 let response: KrakenResponse<SpotCancelOrderResponse> = self
873 .send_request(Method::POST, "/0/private/CancelAll", None, true)
874 .await?;
875
876 response
877 .result
878 .ok_or_else(|| KrakenHttpError::ParseError("Missing result in response".to_string()))
879 }
880
881 pub async fn edit_order(
883 &self,
884 params: &KrakenSpotEditOrderParams,
885 ) -> anyhow::Result<SpotEditOrderResponse, KrakenHttpError> {
886 if self.credential.is_none() {
887 return Err(KrakenHttpError::AuthenticationError(
888 "API credentials required for editing orders".to_string(),
889 ));
890 }
891
892 let param_string = serde_urlencoded::to_string(params)
893 .map_err(|e| KrakenHttpError::ParseError(format!("Failed to encode params: {e}")))?;
894
895 let body = Some(param_string.into_bytes());
896
897 let response: KrakenResponse<SpotEditOrderResponse> = self
898 .send_request(Method::POST, "/0/private/EditOrder", body, true)
899 .await?;
900
901 response
902 .result
903 .ok_or_else(|| KrakenHttpError::ParseError("Missing result in response".to_string()))
904 }
905
906 pub async fn amend_order(
908 &self,
909 params: &KrakenSpotAmendOrderParams,
910 ) -> anyhow::Result<SpotAmendOrderResponse, KrakenHttpError> {
911 if self.credential.is_none() {
912 return Err(KrakenHttpError::AuthenticationError(
913 "API credentials required for amending orders".to_string(),
914 ));
915 }
916
917 let param_string = serde_urlencoded::to_string(params)
918 .map_err(|e| KrakenHttpError::ParseError(format!("Failed to encode params: {e}")))?;
919
920 let body = Some(param_string.into_bytes());
921
922 let response: KrakenResponse<SpotAmendOrderResponse> = self
923 .send_request(Method::POST, "/0/private/AmendOrder", body, true)
924 .await?;
925
926 response
927 .result
928 .ok_or_else(|| KrakenHttpError::ParseError("Missing result in response".to_string()))
929 }
930
931 pub async fn get_balance(&self) -> anyhow::Result<BalanceResponse, KrakenHttpError> {
933 if self.credential.is_none() {
934 return Err(KrakenHttpError::AuthenticationError(
935 "API credentials required for Balance".to_string(),
936 ));
937 }
938
939 let response: KrakenResponse<BalanceResponse> = self
940 .send_request(Method::POST, "/0/private/Balance", None, true)
941 .await?;
942
943 response.result.ok_or_else(|| {
944 KrakenHttpError::ParseError("Missing result in balance response".to_string())
945 })
946 }
947}
948
949#[cfg_attr(
955 feature = "python",
956 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.kraken")
957)]
958pub struct KrakenSpotHttpClient {
959 pub(crate) inner: Arc<KrakenSpotRawHttpClient>,
960 pub(crate) instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
961 cache_initialized: Arc<AtomicBool>,
962 use_spot_position_reports: Arc<AtomicBool>,
963 spot_positions_quote_currency: Arc<RwLock<Ustr>>,
964}
965
966impl Clone for KrakenSpotHttpClient {
967 fn clone(&self) -> Self {
968 Self {
969 inner: self.inner.clone(),
970 instruments_cache: self.instruments_cache.clone(),
971 cache_initialized: self.cache_initialized.clone(),
972 use_spot_position_reports: self.use_spot_position_reports.clone(),
973 spot_positions_quote_currency: self.spot_positions_quote_currency.clone(),
974 }
975 }
976}
977
978impl Default for KrakenSpotHttpClient {
979 fn default() -> Self {
980 Self::new(
981 KrakenEnvironment::Mainnet,
982 None,
983 Some(60),
984 None,
985 None,
986 None,
987 None,
988 None,
989 )
990 .expect("Failed to create default KrakenSpotHttpClient")
991 }
992}
993
994impl Debug for KrakenSpotHttpClient {
995 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
996 f.debug_struct(stringify!(KrakenSpotHttpClient))
997 .field("inner", &self.inner)
998 .finish()
999 }
1000}
1001
1002impl KrakenSpotHttpClient {
1003 #[allow(clippy::too_many_arguments)]
1005 pub fn new(
1006 environment: KrakenEnvironment,
1007 base_url_override: Option<String>,
1008 timeout_secs: Option<u64>,
1009 max_retries: Option<u32>,
1010 retry_delay_ms: Option<u64>,
1011 retry_delay_max_ms: Option<u64>,
1012 proxy_url: Option<String>,
1013 max_requests_per_second: Option<u32>,
1014 ) -> anyhow::Result<Self> {
1015 Ok(Self {
1016 inner: Arc::new(KrakenSpotRawHttpClient::new(
1017 environment,
1018 base_url_override,
1019 timeout_secs,
1020 max_retries,
1021 retry_delay_ms,
1022 retry_delay_max_ms,
1023 proxy_url,
1024 max_requests_per_second,
1025 )?),
1026 instruments_cache: Arc::new(DashMap::new()),
1027 cache_initialized: Arc::new(AtomicBool::new(false)),
1028 use_spot_position_reports: Arc::new(AtomicBool::new(false)),
1029 spot_positions_quote_currency: Arc::new(RwLock::new(Ustr::from("USDT"))),
1030 })
1031 }
1032
1033 #[allow(clippy::too_many_arguments)]
1035 pub fn with_credentials(
1036 api_key: String,
1037 api_secret: String,
1038 environment: KrakenEnvironment,
1039 base_url_override: Option<String>,
1040 timeout_secs: Option<u64>,
1041 max_retries: Option<u32>,
1042 retry_delay_ms: Option<u64>,
1043 retry_delay_max_ms: Option<u64>,
1044 proxy_url: Option<String>,
1045 max_requests_per_second: Option<u32>,
1046 ) -> anyhow::Result<Self> {
1047 Ok(Self {
1048 inner: Arc::new(KrakenSpotRawHttpClient::with_credentials(
1049 api_key,
1050 api_secret,
1051 environment,
1052 base_url_override,
1053 timeout_secs,
1054 max_retries,
1055 retry_delay_ms,
1056 retry_delay_max_ms,
1057 proxy_url,
1058 max_requests_per_second,
1059 )?),
1060 instruments_cache: Arc::new(DashMap::new()),
1061 cache_initialized: Arc::new(AtomicBool::new(false)),
1062 use_spot_position_reports: Arc::new(AtomicBool::new(false)),
1063 spot_positions_quote_currency: Arc::new(RwLock::new(Ustr::from("USDT"))),
1064 })
1065 }
1066
1067 #[allow(clippy::too_many_arguments)]
1075 pub fn from_env(
1076 environment: KrakenEnvironment,
1077 base_url_override: Option<String>,
1078 timeout_secs: Option<u64>,
1079 max_retries: Option<u32>,
1080 retry_delay_ms: Option<u64>,
1081 retry_delay_max_ms: Option<u64>,
1082 proxy_url: Option<String>,
1083 max_requests_per_second: Option<u32>,
1084 ) -> anyhow::Result<Self> {
1085 if let Some(credential) = KrakenCredential::from_env_spot() {
1086 let (api_key, api_secret) = credential.into_parts();
1087 Self::with_credentials(
1088 api_key,
1089 api_secret,
1090 environment,
1091 base_url_override,
1092 timeout_secs,
1093 max_retries,
1094 retry_delay_ms,
1095 retry_delay_max_ms,
1096 proxy_url,
1097 max_requests_per_second,
1098 )
1099 } else {
1100 Self::new(
1101 environment,
1102 base_url_override,
1103 timeout_secs,
1104 max_retries,
1105 retry_delay_ms,
1106 retry_delay_max_ms,
1107 proxy_url,
1108 max_requests_per_second,
1109 )
1110 }
1111 }
1112
1113 pub fn cancel_all_requests(&self) {
1115 self.inner.cancel_all_requests();
1116 }
1117
1118 pub fn cancellation_token(&self) -> &CancellationToken {
1120 self.inner.cancellation_token()
1121 }
1122
1123 pub fn cache_instrument(&self, instrument: InstrumentAny) {
1125 self.instruments_cache
1126 .insert(instrument.symbol().inner(), instrument);
1127 self.cache_initialized.store(true, Ordering::Release);
1128 }
1129
1130 pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
1132 for instrument in instruments {
1133 self.instruments_cache
1134 .insert(instrument.symbol().inner(), instrument);
1135 }
1136 self.cache_initialized.store(true, Ordering::Release);
1137 }
1138
1139 pub fn get_cached_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
1141 self.instruments_cache
1142 .get(symbol)
1143 .map(|entry| entry.value().clone())
1144 }
1145
1146 fn get_instrument_by_raw_symbol(&self, raw_symbol: &str) -> Option<InstrumentAny> {
1147 self.instruments_cache
1148 .iter()
1149 .find(|entry| entry.value().raw_symbol().as_str() == raw_symbol)
1150 .map(|entry| entry.value().clone())
1151 }
1152
1153 fn generate_ts_init(&self) -> UnixNanos {
1154 get_atomic_clock_realtime().get_time_ns()
1155 }
1156
1157 pub fn set_use_spot_position_reports(&self, value: bool) {
1159 self.use_spot_position_reports
1160 .store(value, Ordering::Relaxed);
1161 }
1162
1163 pub fn set_spot_positions_quote_currency(&self, currency: &str) {
1165 let mut guard = self.spot_positions_quote_currency.write().expect("lock");
1166 *guard = Ustr::from(currency);
1167 }
1168
1169 pub async fn get_websockets_token(&self) -> anyhow::Result<WebSocketToken, KrakenHttpError> {
1171 self.inner.get_websockets_token().await
1172 }
1173
1174 pub async fn request_instruments(
1176 &self,
1177 pairs: Option<Vec<String>>,
1178 ) -> anyhow::Result<Vec<InstrumentAny>, KrakenHttpError> {
1179 let ts_init = self.generate_ts_init();
1180 let asset_pairs = self.inner.get_asset_pairs(pairs).await?;
1181
1182 let instruments: Vec<InstrumentAny> = asset_pairs
1183 .iter()
1184 .filter_map(|(pair_name, definition)| {
1185 match parse_spot_instrument(pair_name, definition, ts_init, ts_init) {
1186 Ok(instrument) => Some(instrument),
1187 Err(e) => {
1188 log::warn!("Failed to parse instrument {pair_name}: {e}");
1189 None
1190 }
1191 }
1192 })
1193 .collect();
1194
1195 Ok(instruments)
1196 }
1197
1198 pub async fn request_trades(
1200 &self,
1201 instrument_id: InstrumentId,
1202 start: Option<DateTime<Utc>>,
1203 end: Option<DateTime<Utc>>,
1204 limit: Option<u64>,
1205 ) -> anyhow::Result<Vec<TradeTick>, KrakenHttpError> {
1206 let instrument = self
1207 .get_cached_instrument(&instrument_id.symbol.inner())
1208 .ok_or_else(|| {
1209 KrakenHttpError::ParseError(format!(
1210 "Instrument not found in cache: {instrument_id}",
1211 ))
1212 })?;
1213
1214 let raw_symbol = instrument.raw_symbol().to_string();
1215 let ts_init = self.generate_ts_init();
1216
1217 let since = start.map(|dt| (dt.timestamp_nanos_opt().unwrap_or(0) as u64).to_string());
1219 let response = self.inner.get_trades(&raw_symbol, since).await?;
1220
1221 let end_ns = end.map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64);
1222 let mut trades = Vec::new();
1223
1224 for (_pair_name, trade_arrays) in &response.data {
1225 for trade_array in trade_arrays {
1226 match parse_trade_tick_from_array(trade_array, &instrument, ts_init) {
1227 Ok(trade_tick) => {
1228 if let Some(end_nanos) = end_ns
1229 && trade_tick.ts_event.as_u64() > end_nanos
1230 {
1231 continue;
1232 }
1233 trades.push(trade_tick);
1234
1235 if let Some(limit_count) = limit
1236 && trades.len() >= limit_count as usize
1237 {
1238 return Ok(trades);
1239 }
1240 }
1241 Err(e) => {
1242 log::warn!("Failed to parse trade tick: {e}");
1243 }
1244 }
1245 }
1246 }
1247
1248 Ok(trades)
1249 }
1250
1251 pub async fn request_bars(
1253 &self,
1254 bar_type: BarType,
1255 start: Option<DateTime<Utc>>,
1256 end: Option<DateTime<Utc>>,
1257 limit: Option<u64>,
1258 ) -> anyhow::Result<Vec<Bar>, KrakenHttpError> {
1259 let instrument_id = bar_type.instrument_id();
1260 let instrument = self
1261 .get_cached_instrument(&instrument_id.symbol.inner())
1262 .ok_or_else(|| {
1263 KrakenHttpError::ParseError(format!(
1264 "Instrument not found in cache: {instrument_id}"
1265 ))
1266 })?;
1267
1268 let raw_symbol = instrument.raw_symbol().to_string();
1269 let ts_init = self.generate_ts_init();
1270
1271 let interval = Some(
1272 bar_type_to_spot_interval(bar_type)
1273 .map_err(|e| KrakenHttpError::ParseError(e.to_string()))?,
1274 );
1275
1276 let since = start.map(|dt| dt.timestamp());
1278 let end_ns = end.map(|dt| dt.timestamp_nanos_opt().unwrap_or(0) as u64);
1279 let response = self.inner.get_ohlc(&raw_symbol, interval, since).await?;
1280
1281 let mut bars = Vec::new();
1282
1283 for (_pair_name, ohlc_arrays) in &response.data {
1284 for ohlc_array in ohlc_arrays {
1285 if ohlc_array.len() < 8 {
1286 let len = ohlc_array.len();
1287 log::warn!("OHLC array too short: {len}");
1288 continue;
1289 }
1290
1291 let ohlc = OhlcData {
1292 time: ohlc_array[0].as_i64().unwrap_or(0),
1293 open: ohlc_array[1].as_str().unwrap_or("0").to_string(),
1294 high: ohlc_array[2].as_str().unwrap_or("0").to_string(),
1295 low: ohlc_array[3].as_str().unwrap_or("0").to_string(),
1296 close: ohlc_array[4].as_str().unwrap_or("0").to_string(),
1297 vwap: ohlc_array[5].as_str().unwrap_or("0").to_string(),
1298 volume: ohlc_array[6].as_str().unwrap_or("0").to_string(),
1299 count: ohlc_array[7].as_i64().unwrap_or(0),
1300 };
1301
1302 match parse_bar(&ohlc, &instrument, bar_type, ts_init) {
1303 Ok(bar) => {
1304 if let Some(end_nanos) = end_ns
1305 && bar.ts_event.as_u64() > end_nanos
1306 {
1307 continue;
1308 }
1309 bars.push(bar);
1310
1311 if let Some(limit_count) = limit
1312 && bars.len() >= limit_count as usize
1313 {
1314 return Ok(bars);
1315 }
1316 }
1317 Err(e) => {
1318 log::warn!("Failed to parse bar: {e}");
1319 }
1320 }
1321 }
1322 }
1323
1324 Ok(bars)
1325 }
1326
1327 pub async fn request_account_state(
1331 &self,
1332 account_id: AccountId,
1333 ) -> anyhow::Result<AccountState> {
1334 let balances_raw = self.inner.get_balance().await?;
1335 let ts_init = self.generate_ts_init();
1336
1337 let balances: Vec<AccountBalance> = balances_raw
1338 .iter()
1339 .filter_map(|(currency_code, amount_str)| {
1340 let amount = amount_str.parse::<f64>().ok()?;
1341 if amount == 0.0 {
1342 return None;
1343 }
1344
1345 let normalized_code = currency_code
1347 .strip_prefix("X")
1348 .or_else(|| currency_code.strip_prefix("Z"))
1349 .unwrap_or(currency_code);
1350
1351 let currency = Currency::new(
1352 normalized_code,
1353 8, 0,
1355 "0",
1356 CurrencyType::Crypto,
1357 );
1358
1359 let total = Money::new(amount, currency);
1360 let locked = Money::new(0.0, currency);
1361
1362 Some(AccountBalance::new(total, locked, total))
1364 })
1365 .collect();
1366
1367 Ok(AccountState::new(
1368 account_id,
1369 AccountType::Cash,
1370 balances,
1371 vec![], true, UUID4::new(),
1374 ts_init,
1375 ts_init,
1376 None,
1377 ))
1378 }
1379
1380 pub async fn request_order_status_reports(
1382 &self,
1383 account_id: AccountId,
1384 instrument_id: Option<InstrumentId>,
1385 start: Option<DateTime<Utc>>,
1386 end: Option<DateTime<Utc>>,
1387 open_only: bool,
1388 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1389 const PAGE_SIZE: i32 = 50;
1390
1391 let ts_init = self.generate_ts_init();
1392 let mut all_reports = Vec::new();
1393
1394 let open_orders = self.inner.get_open_orders(Some(true), None).await?;
1395
1396 for (order_id, order) in &open_orders {
1397 if let Some(ref target_id) = instrument_id {
1398 let instrument = self.get_cached_instrument(&target_id.symbol.inner());
1399 if let Some(inst) = instrument
1400 && inst.raw_symbol().as_str() != order.descr.pair
1401 {
1402 continue;
1403 }
1404 }
1405
1406 if let Some(instrument) = self.get_instrument_by_raw_symbol(order.descr.pair.as_str()) {
1407 match parse_order_status_report(order_id, order, &instrument, account_id, ts_init) {
1408 Ok(report) => all_reports.push(report),
1409 Err(e) => {
1410 log::warn!("Failed to parse order {order_id}: {e}");
1411 }
1412 }
1413 }
1414 }
1415
1416 if open_only {
1417 return Ok(all_reports);
1418 }
1419
1420 let start_ts = start.map(|dt| dt.timestamp());
1422 let end_ts = end.map(|dt| dt.timestamp());
1423
1424 let mut offset = 0;
1425
1426 loop {
1427 let closed_orders = self
1428 .inner
1429 .get_closed_orders(Some(true), None, start_ts, end_ts, Some(offset), None)
1430 .await?;
1431
1432 if closed_orders.is_empty() {
1433 break;
1434 }
1435
1436 for (order_id, order) in &closed_orders {
1437 if let Some(ref target_id) = instrument_id {
1438 let instrument = self.get_cached_instrument(&target_id.symbol.inner());
1439 if let Some(inst) = instrument
1440 && inst.raw_symbol().as_str() != order.descr.pair
1441 {
1442 continue;
1443 }
1444 }
1445
1446 if let Some(instrument) =
1447 self.get_instrument_by_raw_symbol(order.descr.pair.as_str())
1448 {
1449 match parse_order_status_report(
1450 order_id,
1451 order,
1452 &instrument,
1453 account_id,
1454 ts_init,
1455 ) {
1456 Ok(report) => all_reports.push(report),
1457 Err(e) => {
1458 log::warn!("Failed to parse order {order_id}: {e}");
1459 }
1460 }
1461 }
1462 }
1463
1464 offset += PAGE_SIZE;
1465 }
1466
1467 Ok(all_reports)
1468 }
1469
1470 pub async fn request_fill_reports(
1472 &self,
1473 account_id: AccountId,
1474 instrument_id: Option<InstrumentId>,
1475 start: Option<DateTime<Utc>>,
1476 end: Option<DateTime<Utc>>,
1477 ) -> anyhow::Result<Vec<FillReport>> {
1478 const PAGE_SIZE: i32 = 50;
1479
1480 let ts_init = self.generate_ts_init();
1481 let mut all_reports = Vec::new();
1482
1483 let start_ts = start.map(|dt| dt.timestamp());
1485 let end_ts = end.map(|dt| dt.timestamp());
1486
1487 let mut offset = 0;
1488
1489 loop {
1490 let trades = self
1491 .inner
1492 .get_trades_history(None, Some(true), start_ts, end_ts, Some(offset))
1493 .await?;
1494
1495 if trades.is_empty() {
1496 break;
1497 }
1498
1499 for (trade_id, trade) in &trades {
1500 if let Some(ref target_id) = instrument_id {
1501 let instrument = self.get_cached_instrument(&target_id.symbol.inner());
1502 if let Some(inst) = instrument
1503 && inst.raw_symbol().as_str() != trade.pair
1504 {
1505 continue;
1506 }
1507 }
1508
1509 if let Some(instrument) = self.get_instrument_by_raw_symbol(trade.pair.as_str()) {
1510 match parse_fill_report(trade_id, trade, &instrument, account_id, ts_init) {
1511 Ok(report) => all_reports.push(report),
1512 Err(e) => {
1513 log::warn!("Failed to parse trade {trade_id}: {e}");
1514 }
1515 }
1516 }
1517 }
1518
1519 offset += PAGE_SIZE;
1520 }
1521
1522 Ok(all_reports)
1523 }
1524
1525 pub async fn request_position_status_reports(
1530 &self,
1531 account_id: AccountId,
1532 instrument_id: Option<InstrumentId>,
1533 ) -> anyhow::Result<Vec<PositionStatusReport>> {
1534 if self.use_spot_position_reports.load(Ordering::Relaxed) {
1535 self.generate_spot_position_reports_from_wallet(account_id, instrument_id)
1536 .await
1537 } else {
1538 Ok(Vec::new())
1539 }
1540 }
1541
1542 async fn generate_spot_position_reports_from_wallet(
1548 &self,
1549 account_id: AccountId,
1550 instrument_id: Option<InstrumentId>,
1551 ) -> anyhow::Result<Vec<PositionStatusReport>> {
1552 let balances_raw = self.inner.get_balance().await?;
1553 let ts_init = self.generate_ts_init();
1554 let mut wallet_by_coin: HashMap<Ustr, f64> = HashMap::new();
1555
1556 for (currency_code, amount_str) in &balances_raw {
1557 let balance = match amount_str.parse::<f64>() {
1558 Ok(b) => b,
1559 Err(_) => continue,
1560 };
1561
1562 if balance == 0.0 {
1563 continue;
1564 }
1565
1566 wallet_by_coin.insert(Ustr::from(normalize_currency_code(currency_code)), balance);
1567 }
1568
1569 let mut reports = Vec::new();
1570
1571 if let Some(instrument_id) = instrument_id {
1572 if let Some(instrument) = self.get_cached_instrument(&instrument_id.symbol.inner()) {
1573 let base_currency = match instrument.base_currency() {
1574 Some(currency) => currency,
1575 None => return Ok(reports),
1576 };
1577
1578 let coin = Ustr::from(normalize_currency_code(base_currency.code.as_str()));
1579 let wallet_balance = wallet_by_coin.get(&coin).copied().unwrap_or(0.0);
1580
1581 let side = if wallet_balance > 0.0 {
1582 PositionSideSpecified::Long
1583 } else {
1584 PositionSideSpecified::Flat
1585 };
1586
1587 let abs_balance = wallet_balance.abs();
1588 let quantity = Quantity::new(abs_balance, instrument.size_precision());
1589
1590 let report = PositionStatusReport::new(
1591 account_id,
1592 instrument_id,
1593 side,
1594 quantity,
1595 ts_init,
1596 ts_init,
1597 None,
1598 None,
1599 None,
1600 );
1601
1602 reports.push(report);
1603 }
1604 } else {
1605 let quote_filter = *self.spot_positions_quote_currency.read().expect("lock");
1606
1607 for entry in self.instruments_cache.iter() {
1608 let instrument = entry.value();
1609
1610 let quote_currency = match instrument.quote_currency() {
1611 currency if currency.code == quote_filter => currency,
1612 _ => continue,
1613 };
1614
1615 let base_currency = match instrument.base_currency() {
1616 Some(currency) => currency,
1617 None => continue,
1618 };
1619
1620 let coin = Ustr::from(normalize_currency_code(base_currency.code.as_str()));
1621 let wallet_balance = wallet_by_coin.get(&coin).copied().unwrap_or(0.0);
1622
1623 if wallet_balance == 0.0 {
1624 continue;
1625 }
1626
1627 let side = PositionSideSpecified::Long;
1628 let quantity = Quantity::new(wallet_balance, instrument.size_precision());
1629
1630 if quantity.is_zero() {
1631 continue;
1632 }
1633
1634 log::debug!(
1635 "Spot position: {} {} (quote: {})",
1636 quantity,
1637 base_currency.code,
1638 quote_currency.code
1639 );
1640
1641 let report = PositionStatusReport::new(
1642 account_id,
1643 instrument.id(),
1644 side,
1645 quantity,
1646 ts_init,
1647 ts_init,
1648 None,
1649 None,
1650 None,
1651 );
1652
1653 reports.push(report);
1654 }
1655 }
1656
1657 Ok(reports)
1658 }
1659
1660 #[allow(clippy::too_many_arguments)]
1673 pub async fn submit_order(
1674 &self,
1675 _account_id: AccountId,
1676 instrument_id: InstrumentId,
1677 client_order_id: ClientOrderId,
1678 order_side: OrderSide,
1679 order_type: OrderType,
1680 quantity: Quantity,
1681 time_in_force: TimeInForce,
1682 expire_time: Option<UnixNanos>,
1683 price: Option<Price>,
1684 trigger_price: Option<Price>,
1685 reduce_only: bool,
1686 post_only: bool,
1687 ) -> anyhow::Result<VenueOrderId> {
1688 let instrument = self
1689 .get_cached_instrument(&instrument_id.symbol.inner())
1690 .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {instrument_id}"))?;
1691
1692 let raw_symbol = instrument.raw_symbol().inner();
1693
1694 let kraken_side = match order_side {
1695 OrderSide::Buy => KrakenOrderSide::Buy,
1696 OrderSide::Sell => KrakenOrderSide::Sell,
1697 _ => anyhow::bail!("Invalid order side: {order_side:?}"),
1698 };
1699
1700 let kraken_order_type = match order_type {
1701 OrderType::Market => KrakenOrderType::Market,
1702 OrderType::Limit => KrakenOrderType::Limit,
1703 OrderType::StopMarket => KrakenOrderType::StopLoss,
1704 OrderType::StopLimit => KrakenOrderType::StopLossLimit,
1705 OrderType::MarketIfTouched => KrakenOrderType::TakeProfit,
1706 OrderType::LimitIfTouched => KrakenOrderType::TakeProfitLimit,
1707 _ => anyhow::bail!("Unsupported order type: {order_type:?}"),
1708 };
1709
1710 let mut oflags = Vec::new();
1712 let is_limit_order = matches!(
1713 order_type,
1714 OrderType::Limit | OrderType::StopLimit | OrderType::LimitIfTouched
1715 );
1716
1717 let (timeinforce, expiretm) =
1718 compute_time_in_force(is_limit_order, time_in_force, expire_time)?;
1719
1720 if post_only {
1721 oflags.push("post");
1722 }
1723
1724 if reduce_only {
1725 log::warn!("reduce_only is not supported by Kraken Spot API, ignoring");
1726 }
1727
1728 let mut builder = KrakenSpotAddOrderParamsBuilder::default();
1729 builder
1730 .cl_ord_id(client_order_id.to_string())
1731 .broker(NAUTILUS_KRAKEN_BROKER_ID)
1732 .pair(raw_symbol)
1733 .side(kraken_side)
1734 .volume(quantity.to_string())
1735 .order_type(kraken_order_type);
1736
1737 let is_conditional = matches!(
1743 order_type,
1744 OrderType::StopMarket
1745 | OrderType::StopLimit
1746 | OrderType::MarketIfTouched
1747 | OrderType::LimitIfTouched
1748 );
1749
1750 if is_conditional {
1751 if let Some(trigger) = trigger_price {
1752 builder.price(trigger.to_string());
1753 }
1754 if let Some(limit) = price {
1755 builder.price2(limit.to_string());
1756 }
1757 } else if let Some(limit) = price {
1758 builder.price(limit.to_string());
1759 }
1760
1761 if !oflags.is_empty() {
1762 builder.oflags(oflags.join(","));
1763 }
1764
1765 if let Some(tif) = timeinforce {
1766 builder.timeinforce(tif);
1767 }
1768
1769 if let Some(expire) = expiretm {
1770 builder.expiretm(expire);
1771 }
1772
1773 let params = builder
1774 .build()
1775 .map_err(|e| anyhow::anyhow!("Failed to build order params: {e}"))?;
1776
1777 let response = self.inner.add_order(¶ms).await?;
1778
1779 let venue_order_id = response
1780 .txid
1781 .first()
1782 .ok_or_else(|| anyhow::anyhow!("No transaction ID in order response"))?;
1783
1784 Ok(VenueOrderId::new(venue_order_id))
1785 }
1786
1787 pub async fn modify_order(
1799 &self,
1800 instrument_id: InstrumentId,
1801 client_order_id: Option<ClientOrderId>,
1802 venue_order_id: Option<VenueOrderId>,
1803 quantity: Option<Quantity>,
1804 price: Option<Price>,
1805 trigger_price: Option<Price>,
1806 ) -> anyhow::Result<VenueOrderId> {
1807 let _ = self
1808 .get_cached_instrument(&instrument_id.symbol.inner())
1809 .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {instrument_id}"))?;
1810
1811 let txid = venue_order_id.as_ref().map(|id| id.to_string());
1812 let cl_ord_id = client_order_id.as_ref().map(|id| id.to_string());
1813
1814 if txid.is_none() && cl_ord_id.is_none() {
1815 anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1816 }
1817
1818 let mut builder = KrakenSpotAmendOrderParamsBuilder::default();
1819
1820 if let Some(ref id) = txid {
1822 builder.txid(id.clone());
1823 } else if let Some(ref id) = cl_ord_id {
1824 builder.cl_ord_id(id.clone());
1825 }
1826
1827 if let Some(qty) = quantity {
1828 builder.order_qty(qty.to_string());
1829 }
1830 if let Some(p) = price {
1831 builder.limit_price(p.to_string());
1832 }
1833 if let Some(tp) = trigger_price {
1834 builder.trigger_price(tp.to_string());
1835 }
1836
1837 let params = builder
1838 .build()
1839 .map_err(|e| anyhow::anyhow!("Failed to build amend order params: {e}"))?;
1840
1841 let _response = self.inner.amend_order(¶ms).await?;
1842
1843 let order_id = venue_order_id
1845 .ok_or_else(|| anyhow::anyhow!("venue_order_id required for amend response"))?;
1846
1847 Ok(order_id)
1848 }
1849
1850 pub async fn cancel_order(
1860 &self,
1861 _account_id: AccountId,
1862 instrument_id: InstrumentId,
1863 client_order_id: Option<ClientOrderId>,
1864 venue_order_id: Option<VenueOrderId>,
1865 ) -> anyhow::Result<()> {
1866 let _ = self
1867 .get_cached_instrument(&instrument_id.symbol.inner())
1868 .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {instrument_id}"))?;
1869
1870 let txid = venue_order_id.as_ref().map(|id| id.to_string());
1871 let cl_ord_id = client_order_id.as_ref().map(|id| id.to_string());
1872
1873 if txid.is_none() && cl_ord_id.is_none() {
1874 anyhow::bail!("Either client_order_id or venue_order_id must be provided");
1875 }
1876
1877 let mut builder = KrakenSpotCancelOrderParamsBuilder::default();
1880 if let Some(ref id) = txid {
1881 builder.txid(id.clone());
1882 } else if let Some(ref id) = cl_ord_id {
1883 builder.cl_ord_id(id.clone());
1884 }
1885 let params = builder
1886 .build()
1887 .map_err(|e| anyhow::anyhow!("Failed to build cancel params: {e}"))?;
1888
1889 self.inner.cancel_order(¶ms).await?;
1890
1891 Ok(())
1892 }
1893
1894 pub async fn cancel_orders_batch(
1896 &self,
1897 venue_order_ids: Vec<VenueOrderId>,
1898 ) -> anyhow::Result<i32> {
1899 if venue_order_ids.is_empty() {
1900 return Ok(0);
1901 }
1902
1903 let mut total_cancelled = 0;
1904
1905 for chunk in venue_order_ids.chunks(BATCH_CANCEL_LIMIT) {
1906 let orders: Vec<String> = chunk.iter().map(|id| id.to_string()).collect();
1907 let params = KrakenSpotCancelOrderBatchParams { orders };
1908
1909 let response = self.inner.cancel_order_batch(¶ms).await?;
1910 total_cancelled += response.count;
1911 }
1912
1913 Ok(total_cancelled)
1914 }
1915}
1916
1917#[cfg(test)]
1918mod tests {
1919 use rstest::rstest;
1920
1921 use super::*;
1922
1923 #[rstest]
1924 fn test_raw_client_creation() {
1925 let client = KrakenSpotRawHttpClient::default();
1926 assert!(client.credential.is_none());
1927 }
1928
1929 #[rstest]
1930 fn test_raw_client_with_credentials() {
1931 let client = KrakenSpotRawHttpClient::with_credentials(
1932 "test_key".to_string(),
1933 "test_secret".to_string(),
1934 KrakenEnvironment::Mainnet,
1935 None,
1936 None,
1937 None,
1938 None,
1939 None,
1940 None,
1941 None,
1942 )
1943 .unwrap();
1944 assert!(client.credential.is_some());
1945 }
1946
1947 #[rstest]
1948 fn test_client_creation() {
1949 let client = KrakenSpotHttpClient::default();
1950 assert!(client.instruments_cache.is_empty());
1951 }
1952
1953 #[rstest]
1954 fn test_client_with_credentials() {
1955 let client = KrakenSpotHttpClient::with_credentials(
1956 "test_key".to_string(),
1957 "test_secret".to_string(),
1958 KrakenEnvironment::Mainnet,
1959 None,
1960 None,
1961 None,
1962 None,
1963 None,
1964 None,
1965 None,
1966 )
1967 .unwrap();
1968 assert!(client.instruments_cache.is_empty());
1969 }
1970
1971 #[rstest]
1972 fn test_nonce_generation_strictly_increasing() {
1973 let client = KrakenSpotRawHttpClient::default();
1974
1975 let nonce1 = client.generate_nonce();
1976 let nonce2 = client.generate_nonce();
1977 let nonce3 = client.generate_nonce();
1978
1979 assert!(
1980 nonce2 > nonce1,
1981 "nonce2 ({nonce2}) should be > nonce1 ({nonce1})"
1982 );
1983 assert!(
1984 nonce3 > nonce2,
1985 "nonce3 ({nonce3}) should be > nonce2 ({nonce2})"
1986 );
1987 }
1988
1989 #[rstest]
1990 fn test_nonce_is_nanosecond_timestamp() {
1991 let client = KrakenSpotRawHttpClient::default();
1992
1993 let nonce = client.generate_nonce();
1994
1995 assert!(
1998 nonce > 1_500_000_000_000_000_000,
1999 "Nonce should be nanosecond timestamp"
2000 );
2001 }
2002
2003 #[rstest]
2004 #[case::gtc_limit(true, TimeInForce::Gtc, None, None, None)]
2005 #[case::ioc_limit(true, TimeInForce::Ioc, None, Some("IOC"), None)]
2006 #[case::gtd_limit_with_expire(
2007 true,
2008 TimeInForce::Gtd,
2009 Some(1_704_067_200_000_000_000u64),
2010 Some("GTD"),
2011 Some("1704067200")
2012 )]
2013 #[case::gtc_market(false, TimeInForce::Gtc, None, None, None)]
2014 #[case::ioc_market(false, TimeInForce::Ioc, None, None, None)]
2015 fn test_compute_time_in_force_success(
2016 #[case] is_limit: bool,
2017 #[case] tif: TimeInForce,
2018 #[case] expire_nanos: Option<u64>,
2019 #[case] expected_tif: Option<&str>,
2020 #[case] expected_expire: Option<&str>,
2021 ) {
2022 let expire_time = expire_nanos.map(UnixNanos::from);
2023 let result = compute_time_in_force(is_limit, tif, expire_time).unwrap();
2024 assert_eq!(result.0, expected_tif.map(String::from));
2025 assert_eq!(result.1, expected_expire.map(String::from));
2026 }
2027
2028 #[rstest]
2029 #[case::fok_not_supported(TimeInForce::Fok, None, "FOK")]
2030 #[case::gtd_missing_expire(TimeInForce::Gtd, None, "expire_time")]
2031 fn test_compute_time_in_force_errors(
2032 #[case] tif: TimeInForce,
2033 #[case] expire_nanos: Option<u64>,
2034 #[case] expected_error: &str,
2035 ) {
2036 let expire_time = expire_nanos.map(UnixNanos::from);
2037 let result = compute_time_in_force(true, tif, expire_time);
2038 assert!(result.is_err());
2039 assert!(result.unwrap_err().to_string().contains(expected_error));
2040 }
2041}