1use std::sync::{
19 Arc,
20 atomic::{AtomicBool, AtomicU8, Ordering},
21};
22
23use arc_swap::ArcSwap;
24use nautilus_common::live::get_runtime;
25use nautilus_model::{
26 data::BarType,
27 enums::BarAggregation,
28 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId},
29 instruments::InstrumentAny,
30};
31use nautilus_network::{
32 mode::ConnectionMode,
33 websocket::{
34 AuthTracker, SubscriptionState, WebSocketClient, WebSocketConfig, channel_message_handler,
35 },
36};
37use tokio_util::sync::CancellationToken;
38use ustr::Ustr;
39
40pub const KRAKEN_SPOT_WS_TOPIC_DELIMITER: char = ':';
44
45use super::{
46 enums::{KrakenWsChannel, KrakenWsMethod},
47 handler::{SpotFeedHandler, SpotHandlerCommand},
48 messages::{KrakenWsParams, KrakenWsRequest, NautilusWsMessage},
49};
50use crate::{
51 common::parse::normalize_spot_symbol, config::KrakenDataClientConfig,
52 http::KrakenSpotHttpClient, websocket::error::KrakenWsError,
53};
54
55const WS_PING_MSG: &str = r#"{"method":"ping"}"#;
56
57#[derive(Debug)]
59#[cfg_attr(
60 feature = "python",
61 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.kraken")
62)]
63pub struct KrakenSpotWebSocketClient {
64 url: String,
65 config: KrakenDataClientConfig,
66 signal: Arc<AtomicBool>,
67 connection_mode: Arc<ArcSwap<AtomicU8>>,
68 cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<SpotHandlerCommand>>>,
69 out_rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>>,
70 task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
71 subscriptions: SubscriptionState,
72 auth_tracker: AuthTracker,
73 cancellation_token: CancellationToken,
74 req_id_counter: Arc<tokio::sync::RwLock<u64>>,
75 auth_token: Arc<tokio::sync::RwLock<Option<String>>>,
76}
77
78impl Clone for KrakenSpotWebSocketClient {
79 fn clone(&self) -> Self {
80 Self {
81 url: self.url.clone(),
82 config: self.config.clone(),
83 signal: Arc::clone(&self.signal),
84 connection_mode: Arc::clone(&self.connection_mode),
85 cmd_tx: Arc::clone(&self.cmd_tx),
86 out_rx: self.out_rx.clone(),
87 task_handle: self.task_handle.clone(),
88 subscriptions: self.subscriptions.clone(),
89 auth_tracker: self.auth_tracker.clone(),
90 cancellation_token: self.cancellation_token.clone(),
91 req_id_counter: self.req_id_counter.clone(),
92 auth_token: self.auth_token.clone(),
93 }
94 }
95}
96
97impl KrakenSpotWebSocketClient {
98 pub fn new(config: KrakenDataClientConfig, cancellation_token: CancellationToken) -> Self {
100 let url = if config.ws_private_url.is_some() {
102 config.ws_private_url()
103 } else {
104 config.ws_public_url()
105 };
106 let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<SpotHandlerCommand>();
107 let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
108 let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
109
110 Self {
111 url,
112 config,
113 signal: Arc::new(AtomicBool::new(false)),
114 connection_mode,
115 cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
116 out_rx: None,
117 task_handle: None,
118 subscriptions: SubscriptionState::new(KRAKEN_SPOT_WS_TOPIC_DELIMITER),
119 auth_tracker: AuthTracker::new(),
120 cancellation_token,
121 req_id_counter: Arc::new(tokio::sync::RwLock::new(0)),
122 auth_token: Arc::new(tokio::sync::RwLock::new(None)),
123 }
124 }
125
126 async fn get_next_req_id(&self) -> u64 {
127 let mut counter = self.req_id_counter.write().await;
128 *counter += 1;
129 *counter
130 }
131
132 pub async fn connect(&mut self) -> Result<(), KrakenWsError> {
134 log::debug!("Connecting to {}", self.url);
135
136 self.signal.store(false, Ordering::Relaxed);
137
138 let (raw_handler, raw_rx) = channel_message_handler();
139
140 let ws_config = WebSocketConfig {
141 url: self.url.clone(),
142 headers: vec![],
143 heartbeat: self.config.heartbeat_interval_secs,
144 heartbeat_msg: Some(WS_PING_MSG.to_string()),
145 reconnect_timeout_ms: Some(5_000),
146 reconnect_delay_initial_ms: Some(500),
147 reconnect_delay_max_ms: Some(5_000),
148 reconnect_backoff_factor: Some(1.5),
149 reconnect_jitter_ms: Some(250),
150 reconnect_max_attempts: None,
151 };
152
153 let ws_client = WebSocketClient::connect(
154 ws_config,
155 Some(raw_handler),
156 None, None, vec![], None, )
161 .await
162 .map_err(|e| KrakenWsError::ConnectionError(e.to_string()))?;
163
164 self.connection_mode
166 .store(ws_client.connection_mode_atomic());
167
168 let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
169 self.out_rx = Some(Arc::new(out_rx));
170
171 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<SpotHandlerCommand>();
172 *self.cmd_tx.write().await = cmd_tx.clone();
173
174 if let Err(e) = cmd_tx.send(SpotHandlerCommand::SetClient(ws_client)) {
175 return Err(KrakenWsError::ConnectionError(format!(
176 "Failed to send WebSocketClient to handler: {e}"
177 )));
178 }
179
180 let signal = self.signal.clone();
181 let subscriptions = self.subscriptions.clone();
182 let config_for_reconnect = self.config.clone();
183 let auth_token_for_reconnect = self.auth_token.clone();
184 let req_id_counter_for_reconnect = self.req_id_counter.clone();
185 let cmd_tx_for_reconnect = cmd_tx.clone();
186
187 let stream_handle = get_runtime().spawn(async move {
188 let mut handler =
189 SpotFeedHandler::new(signal.clone(), cmd_rx, raw_rx, subscriptions.clone());
190
191 loop {
192 match handler.next().await {
193 Some(NautilusWsMessage::Reconnected) => {
194 if signal.load(Ordering::Relaxed) {
195 continue;
196 }
197 log::info!("WebSocket reconnected, resubscribing");
198
199 let confirmed_topics = subscriptions.all_topics();
201 for topic in &confirmed_topics {
202 subscriptions.mark_failure(topic);
203 }
204
205 let topics = subscriptions.all_topics();
206 if topics.is_empty() {
207 log::debug!("No subscriptions to restore after reconnection");
208 } else {
209 let had_auth = auth_token_for_reconnect.read().await.is_some();
211
212 if had_auth && config_for_reconnect.has_api_credentials() {
213 log::debug!("Re-authenticating after reconnect");
214
215 match refresh_auth_token(&config_for_reconnect).await {
216 Ok(new_token) => {
217 *auth_token_for_reconnect.write().await = Some(new_token);
218 log::debug!("Re-authentication successful");
219 }
220 Err(e) => {
221 log::error!(
222 "Failed to re-authenticate after reconnect: {e}"
223 );
224 *auth_token_for_reconnect.write().await = None;
226 }
227 }
228 }
229
230 log::info!("Resubscribing after reconnection: count={}", topics.len());
231
232 for topic in &topics {
234 let auth_token = auth_token_for_reconnect.read().await.clone();
235
236 if topic == "executions" {
238 if let Some(ref token) = auth_token {
239 let mut counter =
240 req_id_counter_for_reconnect.write().await;
241 *counter += 1;
242 let req_id = *counter;
243
244 let request = KrakenWsRequest {
245 method: KrakenWsMethod::Subscribe,
246 params: Some(KrakenWsParams {
247 channel: KrakenWsChannel::Executions,
248 symbol: None,
249 snapshot: None,
250 depth: None,
251 interval: None,
252 event_trigger: None,
253 token: Some(token.clone()),
254 snap_orders: Some(true),
255 snap_trades: Some(true),
256 }),
257 req_id: Some(req_id),
258 };
259
260 if let Ok(payload) = serde_json::to_string(&request)
261 && let Err(e) = cmd_tx_for_reconnect
262 .send(SpotHandlerCommand::SendText { payload })
263 {
264 log::error!(
265 "Failed to send executions resubscribe: {e}"
266 );
267 }
268
269 subscriptions.mark_subscribe(topic);
270 } else {
271 log::warn!(
272 "Cannot resubscribe to executions: no auth token"
273 );
274 }
275 continue;
276 }
277
278 let parts: Vec<&str> = topic.splitn(3, ':').collect();
280 if parts.len() < 2 {
281 log::warn!(
282 "Invalid topic format for resubscribe: topic={topic}"
283 );
284 continue;
285 }
286
287 let channel_str = parts[0];
288 let channel = match channel_str {
289 "book" => Some(KrakenWsChannel::Book),
290 "trade" => Some(KrakenWsChannel::Trade),
291 "ticker" => Some(KrakenWsChannel::Ticker),
292 "quotes" => Some(KrakenWsChannel::Ticker),
293 "ohlc" => Some(KrakenWsChannel::Ohlc),
294 _ => None,
295 };
296
297 let Some(channel) = channel else {
298 log::warn!("Unknown channel for resubscribe: topic={topic}");
299 continue;
300 };
301
302 let mut counter = req_id_counter_for_reconnect.write().await;
303 *counter += 1;
304 let req_id = *counter;
305
306 let (symbol_str, interval) = if parts.len() == 3 {
308 (parts[1], parts[2].parse::<u32>().ok())
310 } else {
311 (parts[1], None)
313 };
314
315 let event_trigger = if channel_str == "quotes" {
317 Some("bbo".to_string())
318 } else {
319 None
320 };
321
322 let snapshot = if channel == KrakenWsChannel::Ohlc {
324 Some(false)
325 } else {
326 None
327 };
328
329 let request = KrakenWsRequest {
330 method: KrakenWsMethod::Subscribe,
331 params: Some(KrakenWsParams {
332 channel,
333 symbol: Some(vec![Ustr::from(symbol_str)]),
334 snapshot,
335 depth: None,
336 interval,
337 event_trigger,
338 token: None,
339 snap_orders: None,
340 snap_trades: None,
341 }),
342 req_id: Some(req_id),
343 };
344
345 if let Ok(payload) = serde_json::to_string(&request)
346 && let Err(e) = cmd_tx_for_reconnect
347 .send(SpotHandlerCommand::SendText { payload })
348 {
349 log::error!(
350 "Failed to send resubscribe command: error={e}, \
351 topic={topic}"
352 );
353 }
354
355 subscriptions.mark_subscribe(topic);
356 }
357 }
358
359 if out_tx.send(NautilusWsMessage::Reconnected).is_err() {
360 log::error!("Failed to send message (receiver dropped)");
361 break;
362 }
363 continue;
364 }
365 Some(msg) => {
366 if out_tx.send(msg).is_err() {
367 log::error!("Failed to send message (receiver dropped)");
368 break;
369 }
370 }
371 None => {
372 if handler.is_stopped() {
373 log::debug!("Stop signal received, ending message processing");
374 break;
375 }
376 log::warn!("WebSocket stream ended unexpectedly");
377 break;
378 }
379 }
380 }
381
382 log::debug!("Handler task exiting");
383 });
384
385 self.task_handle = Some(Arc::new(stream_handle));
386
387 log::debug!("WebSocket connected successfully");
388 Ok(())
389 }
390
391 pub async fn disconnect(&mut self) -> Result<(), KrakenWsError> {
393 log::debug!("Disconnecting WebSocket");
394
395 self.signal.store(true, Ordering::Relaxed);
396
397 if let Err(e) = self
398 .cmd_tx
399 .read()
400 .await
401 .send(SpotHandlerCommand::Disconnect)
402 {
403 log::debug!(
404 "Failed to send disconnect command (handler may already be shut down): {e}"
405 );
406 }
407
408 if let Some(task_handle) = self.task_handle.take() {
409 match Arc::try_unwrap(task_handle) {
410 Ok(handle) => {
411 log::debug!("Waiting for task handle to complete");
412 match tokio::time::timeout(tokio::time::Duration::from_secs(2), handle).await {
413 Ok(Ok(())) => log::debug!("Task handle completed successfully"),
414 Ok(Err(e)) => log::error!("Task handle encountered an error: {e:?}"),
415 Err(_) => {
416 log::warn!(
417 "Timeout waiting for task handle, task may still be running"
418 );
419 }
420 }
421 }
422 Err(arc_handle) => {
423 log::debug!(
424 "Cannot take ownership of task handle - other references exist, aborting task"
425 );
426 arc_handle.abort();
427 }
428 }
429 } else {
430 log::debug!("No task handle to await");
431 }
432
433 self.subscriptions.clear();
434 self.auth_tracker.fail("Disconnected");
435
436 Ok(())
437 }
438
439 pub async fn close(&mut self) -> Result<(), KrakenWsError> {
441 self.disconnect().await
442 }
443
444 pub async fn wait_until_active(&self, timeout_secs: f64) -> Result<(), KrakenWsError> {
446 let timeout = tokio::time::Duration::from_secs_f64(timeout_secs);
447
448 tokio::time::timeout(timeout, async {
449 while !self.is_active() {
450 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
451 }
452 })
453 .await
454 .map_err(|_| {
455 KrakenWsError::ConnectionError(format!(
456 "WebSocket connection timeout after {timeout_secs} seconds"
457 ))
458 })?;
459
460 Ok(())
461 }
462
463 pub async fn authenticate(&self) -> Result<(), KrakenWsError> {
465 if !self.config.has_api_credentials() {
466 return Err(KrakenWsError::AuthenticationError(
467 "API credentials required for authentication".to_string(),
468 ));
469 }
470
471 let api_key = self
472 .config
473 .api_key
474 .clone()
475 .ok_or_else(|| KrakenWsError::AuthenticationError("Missing API key".to_string()))?;
476 let api_secret =
477 self.config.api_secret.clone().ok_or_else(|| {
478 KrakenWsError::AuthenticationError("Missing API secret".to_string())
479 })?;
480
481 let http_client = KrakenSpotHttpClient::with_credentials(
482 api_key,
483 api_secret,
484 self.config.environment,
485 Some(self.config.http_base_url()),
486 self.config.timeout_secs,
487 None,
488 None,
489 None,
490 self.config.http_proxy.clone(),
491 self.config.max_requests_per_second,
492 )
493 .map_err(|e| {
494 KrakenWsError::AuthenticationError(format!("Failed to create HTTP client: {e}"))
495 })?;
496
497 let ws_token = http_client.get_websockets_token().await.map_err(|e| {
498 KrakenWsError::AuthenticationError(format!("Failed to get WebSocket token: {e}"))
499 })?;
500
501 log::debug!(
502 "WebSocket authentication token received: token_length={}, expires={}",
503 ws_token.token.len(),
504 ws_token.expires
505 );
506
507 let mut auth_token = self.auth_token.write().await;
508 *auth_token = Some(ws_token.token);
509
510 Ok(())
511 }
512
513 pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
515 if let Ok(cmd_tx) = self.cmd_tx.try_read()
517 && let Err(e) = cmd_tx.send(SpotHandlerCommand::InitializeInstruments(instruments))
518 {
519 log::debug!("Failed to send instruments to handler: {e}");
520 }
521 }
522
523 pub fn cache_instrument(&self, instrument: InstrumentAny) {
525 if let Ok(cmd_tx) = self.cmd_tx.try_read()
527 && let Err(e) = cmd_tx.send(SpotHandlerCommand::UpdateInstrument(instrument))
528 {
529 log::debug!("Failed to send instrument update to handler: {e}");
530 }
531 }
532
533 pub fn set_account_id(&self, account_id: AccountId) {
538 if let Ok(cmd_tx) = self.cmd_tx.try_read()
539 && let Err(e) = cmd_tx.send(SpotHandlerCommand::SetAccountId(account_id))
540 {
541 log::debug!("Failed to send account ID to handler: {e}");
542 }
543 }
544
545 pub fn cache_client_order(
551 &self,
552 client_order_id: ClientOrderId,
553 instrument_id: InstrumentId,
554 trader_id: TraderId,
555 strategy_id: StrategyId,
556 ) {
557 if let Ok(cmd_tx) = self.cmd_tx.try_read()
558 && let Err(e) = cmd_tx.send(SpotHandlerCommand::CacheClientOrder {
559 client_order_id,
560 instrument_id,
561 trader_id,
562 strategy_id,
563 })
564 {
565 log::debug!("Failed to send cache client order command to handler: {e}");
566 }
567 }
568
569 pub fn cancel_all_requests(&self) {
571 self.cancellation_token.cancel();
572 }
573
574 pub fn cancellation_token(&self) -> &CancellationToken {
576 &self.cancellation_token
577 }
578
579 pub async fn subscribe(
581 &self,
582 channel: KrakenWsChannel,
583 symbols: Vec<Ustr>,
584 depth: Option<u32>,
585 ) -> Result<(), KrakenWsError> {
586 let mut symbols_to_subscribe = Vec::new();
587 let channel_str = channel.as_ref();
588 for symbol in &symbols {
589 let key = format!("{channel_str}:{symbol}");
590 if self.subscriptions.add_reference(&key) {
591 self.subscriptions.mark_subscribe(&key);
592 symbols_to_subscribe.push(*symbol);
593 }
594 }
595
596 if symbols_to_subscribe.is_empty() {
597 return Ok(());
598 }
599
600 let is_private = matches!(
601 channel,
602 KrakenWsChannel::Executions | KrakenWsChannel::Balances
603 );
604 let token = if is_private {
605 Some(self.auth_token.read().await.clone().ok_or_else(|| {
606 KrakenWsError::AuthenticationError(
607 "Authentication token required for private channels. Call authenticate() first"
608 .to_string(),
609 )
610 })?)
611 } else {
612 None
613 };
614
615 let req_id = self.get_next_req_id().await;
616 let request = KrakenWsRequest {
617 method: KrakenWsMethod::Subscribe,
618 params: Some(KrakenWsParams {
619 channel,
620 symbol: Some(symbols_to_subscribe.clone()),
621 snapshot: None,
622 depth,
623 interval: None,
624 event_trigger: None,
625 token,
626 snap_orders: None,
627 snap_trades: None,
628 }),
629 req_id: Some(req_id),
630 };
631
632 self.send_request(&request).await?;
633
634 for symbol in &symbols_to_subscribe {
635 let key = format!("{channel_str}:{symbol}");
636 self.subscriptions.confirm_subscribe(&key);
637 }
638
639 Ok(())
640 }
641
642 async fn subscribe_with_interval(
644 &self,
645 channel: KrakenWsChannel,
646 symbols: Vec<Ustr>,
647 interval: u32,
648 ) -> Result<(), KrakenWsError> {
649 let mut symbols_to_subscribe = Vec::new();
650 let channel_str = channel.as_ref();
651 for symbol in &symbols {
652 let key = format!("{channel_str}:{symbol}:{interval}");
653 if self.subscriptions.add_reference(&key) {
654 self.subscriptions.mark_subscribe(&key);
655 symbols_to_subscribe.push(*symbol);
656 }
657 }
658
659 if symbols_to_subscribe.is_empty() {
660 return Ok(());
661 }
662
663 let req_id = self.get_next_req_id().await;
664 let request = KrakenWsRequest {
665 method: KrakenWsMethod::Subscribe,
666 params: Some(KrakenWsParams {
667 channel,
668 symbol: Some(symbols_to_subscribe.clone()),
669 snapshot: Some(false),
670 depth: None,
671 interval: Some(interval),
672 event_trigger: None,
673 token: None,
674 snap_orders: None,
675 snap_trades: None,
676 }),
677 req_id: Some(req_id),
678 };
679
680 self.send_request(&request).await?;
681
682 for symbol in &symbols_to_subscribe {
683 let key = format!("{channel_str}:{symbol}:{interval}");
684 self.subscriptions.confirm_subscribe(&key);
685 }
686
687 Ok(())
688 }
689
690 async fn unsubscribe_with_interval(
692 &self,
693 channel: KrakenWsChannel,
694 symbols: Vec<Ustr>,
695 interval: u32,
696 ) -> Result<(), KrakenWsError> {
697 let mut symbols_to_unsubscribe = Vec::new();
698 let channel_str = channel.as_ref();
699 for symbol in &symbols {
700 let key = format!("{channel_str}:{symbol}:{interval}");
701 if self.subscriptions.remove_reference(&key) {
702 self.subscriptions.mark_unsubscribe(&key);
703 symbols_to_unsubscribe.push(*symbol);
704 }
705 }
706
707 if symbols_to_unsubscribe.is_empty() {
708 return Ok(());
709 }
710
711 let req_id = self.get_next_req_id().await;
712 let request = KrakenWsRequest {
713 method: KrakenWsMethod::Unsubscribe,
714 params: Some(KrakenWsParams {
715 channel,
716 symbol: Some(symbols_to_unsubscribe.clone()),
717 snapshot: None,
718 depth: None,
719 interval: Some(interval),
720 event_trigger: None,
721 token: None,
722 snap_orders: None,
723 snap_trades: None,
724 }),
725 req_id: Some(req_id),
726 };
727
728 self.send_request(&request).await?;
729
730 for symbol in &symbols_to_unsubscribe {
731 let key = format!("{channel_str}:{symbol}:{interval}");
732 self.subscriptions.confirm_unsubscribe(&key);
733 }
734
735 Ok(())
736 }
737
738 pub async fn unsubscribe(
740 &self,
741 channel: KrakenWsChannel,
742 symbols: Vec<Ustr>,
743 ) -> Result<(), KrakenWsError> {
744 let mut symbols_to_unsubscribe = Vec::new();
745 let channel_str = channel.as_ref();
746 for symbol in &symbols {
747 let key = format!("{channel_str}:{symbol}");
748 if self.subscriptions.remove_reference(&key) {
749 self.subscriptions.mark_unsubscribe(&key);
750 symbols_to_unsubscribe.push(*symbol);
751 } else {
752 log::debug!(
753 "Channel {channel_str} symbol {symbol} still has active subscriptions, not unsubscribing"
754 );
755 }
756 }
757
758 if symbols_to_unsubscribe.is_empty() {
759 return Ok(());
760 }
761
762 let is_private = matches!(
763 channel,
764 KrakenWsChannel::Executions | KrakenWsChannel::Balances
765 );
766 let token = if is_private {
767 Some(self.auth_token.read().await.clone().ok_or_else(|| {
768 KrakenWsError::AuthenticationError(
769 "Authentication token required for private channels. Call authenticate() first"
770 .to_string(),
771 )
772 })?)
773 } else {
774 None
775 };
776
777 let req_id = self.get_next_req_id().await;
778 let request = KrakenWsRequest {
779 method: KrakenWsMethod::Unsubscribe,
780 params: Some(KrakenWsParams {
781 channel,
782 symbol: Some(symbols_to_unsubscribe.clone()),
783 snapshot: None,
784 depth: None,
785 interval: None,
786 event_trigger: None,
787 token,
788 snap_orders: None,
789 snap_trades: None,
790 }),
791 req_id: Some(req_id),
792 };
793
794 self.send_request(&request).await?;
795
796 for symbol in &symbols_to_unsubscribe {
797 let key = format!("{channel_str}:{symbol}");
798 self.subscriptions.confirm_unsubscribe(&key);
799 }
800
801 Ok(())
802 }
803
804 pub async fn send_ping(&self) -> Result<(), KrakenWsError> {
806 let req_id = self.get_next_req_id().await;
807
808 let request = KrakenWsRequest {
809 method: KrakenWsMethod::Ping,
810 params: None,
811 req_id: Some(req_id),
812 };
813
814 self.send_request(&request).await
815 }
816
817 async fn send_request(&self, request: &KrakenWsRequest) -> Result<(), KrakenWsError> {
818 let payload =
819 serde_json::to_string(request).map_err(|e| KrakenWsError::JsonError(e.to_string()))?;
820
821 log::trace!("Sending message: {payload}");
822
823 self.cmd_tx
824 .read()
825 .await
826 .send(SpotHandlerCommand::SendText { payload })
827 .map_err(|e| KrakenWsError::ConnectionError(format!("Failed to send request: {e}")))?;
828
829 Ok(())
830 }
831
832 pub fn is_connected(&self) -> bool {
834 let connection_mode_arc = self.connection_mode.load();
835 !ConnectionMode::from_atomic(&connection_mode_arc).is_closed()
836 }
837
838 pub fn is_active(&self) -> bool {
840 let connection_mode_arc = self.connection_mode.load();
841 ConnectionMode::from_atomic(&connection_mode_arc).is_active()
842 && !self.signal.load(Ordering::Relaxed)
843 }
844
845 pub fn is_closed(&self) -> bool {
847 let connection_mode_arc = self.connection_mode.load();
848 ConnectionMode::from_atomic(&connection_mode_arc).is_closed()
849 || self.signal.load(Ordering::Relaxed)
850 }
851
852 pub fn url(&self) -> &str {
854 &self.url
855 }
856
857 pub fn get_subscriptions(&self) -> Vec<String> {
859 self.subscriptions.all_topics()
860 }
861
862 pub fn stream(
870 &mut self,
871 ) -> Result<impl futures_util::Stream<Item = NautilusWsMessage> + use<>, KrakenWsError> {
872 let rx = self.out_rx.take().ok_or_else(|| {
873 KrakenWsError::ChannelError(
874 "Stream receiver already taken or client not connected".to_string(),
875 )
876 })?;
877 let mut rx = Arc::try_unwrap(rx).map_err(|_| {
878 KrakenWsError::ChannelError(
879 "Cannot take ownership of stream - other client clones still hold references"
880 .to_string(),
881 )
882 })?;
883 Ok(async_stream::stream! {
884 while let Some(msg) = rx.recv().await {
885 yield msg;
886 }
887 })
888 }
889
890 pub async fn subscribe_book(
892 &self,
893 instrument_id: InstrumentId,
894 depth: Option<u32>,
895 ) -> Result<(), KrakenWsError> {
896 let symbol = to_ws_v2_symbol(instrument_id.symbol.inner());
897 self.subscribe(KrakenWsChannel::Book, vec![symbol], depth)
898 .await
899 }
900
901 pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> Result<(), KrakenWsError> {
906 let symbol = to_ws_v2_symbol(instrument_id.symbol.inner());
907 let key = format!("quotes:{symbol}");
908
909 if !self.subscriptions.add_reference(&key) {
910 return Ok(());
911 }
912
913 self.subscriptions.mark_subscribe(&key);
914
915 let req_id = self.get_next_req_id().await;
916 let request = KrakenWsRequest {
917 method: KrakenWsMethod::Subscribe,
918 params: Some(KrakenWsParams {
919 channel: KrakenWsChannel::Ticker,
920 symbol: Some(vec![symbol]),
921 snapshot: None,
922 depth: None,
923 interval: None,
924 event_trigger: Some("bbo".to_string()),
925 token: None,
926 snap_orders: None,
927 snap_trades: None,
928 }),
929 req_id: Some(req_id),
930 };
931
932 self.send_request(&request).await?;
933 self.subscriptions.confirm_subscribe(&key);
934 Ok(())
935 }
936
937 pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> Result<(), KrakenWsError> {
939 let symbol = to_ws_v2_symbol(instrument_id.symbol.inner());
940 self.subscribe(KrakenWsChannel::Trade, vec![symbol], None)
941 .await
942 }
943
944 pub async fn subscribe_bars(&self, bar_type: BarType) -> Result<(), KrakenWsError> {
950 let symbol = to_ws_v2_symbol(bar_type.instrument_id().symbol.inner());
951 let interval = bar_type_to_ws_interval(bar_type)?;
952 self.subscribe_with_interval(KrakenWsChannel::Ohlc, vec![symbol], interval)
953 .await
954 }
955
956 pub async fn subscribe_executions(
960 &self,
961 snap_orders: bool,
962 snap_trades: bool,
963 ) -> Result<(), KrakenWsError> {
964 let req_id = self.get_next_req_id().await;
965
966 let token = self.auth_token.read().await.clone().ok_or_else(|| {
967 KrakenWsError::AuthenticationError(
968 "Authentication token required for executions channel. Call authenticate() first"
969 .to_string(),
970 )
971 })?;
972
973 let request = KrakenWsRequest {
974 method: KrakenWsMethod::Subscribe,
975 params: Some(KrakenWsParams {
976 channel: KrakenWsChannel::Executions,
977 symbol: None,
978 snapshot: None,
979 depth: None,
980 interval: None,
981 event_trigger: None,
982 token: Some(token),
983 snap_orders: Some(snap_orders),
984 snap_trades: Some(snap_trades),
985 }),
986 req_id: Some(req_id),
987 };
988
989 self.send_request(&request).await?;
990
991 let key = "executions";
992 if self.subscriptions.add_reference(key) {
993 self.subscriptions.mark_subscribe(key);
994 self.subscriptions.confirm_subscribe(key);
995 }
996
997 Ok(())
998 }
999
1000 pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> Result<(), KrakenWsError> {
1002 let symbol = to_ws_v2_symbol(instrument_id.symbol.inner());
1003 self.unsubscribe(KrakenWsChannel::Book, vec![symbol]).await
1004 }
1005
1006 pub async fn unsubscribe_quotes(
1008 &self,
1009 instrument_id: InstrumentId,
1010 ) -> Result<(), KrakenWsError> {
1011 let symbol = to_ws_v2_symbol(instrument_id.symbol.inner());
1012 let key = format!("quotes:{symbol}");
1013
1014 if !self.subscriptions.remove_reference(&key) {
1015 return Ok(());
1016 }
1017
1018 self.subscriptions.mark_unsubscribe(&key);
1019
1020 let req_id = self.get_next_req_id().await;
1021 let request = KrakenWsRequest {
1022 method: KrakenWsMethod::Unsubscribe,
1023 params: Some(KrakenWsParams {
1024 channel: KrakenWsChannel::Ticker,
1025 symbol: Some(vec![symbol]),
1026 snapshot: None,
1027 depth: None,
1028 interval: None,
1029 event_trigger: Some("bbo".to_string()),
1030 token: None,
1031 snap_orders: None,
1032 snap_trades: None,
1033 }),
1034 req_id: Some(req_id),
1035 };
1036
1037 self.send_request(&request).await?;
1038 self.subscriptions.confirm_unsubscribe(&key);
1039 Ok(())
1040 }
1041
1042 pub async fn unsubscribe_trades(
1044 &self,
1045 instrument_id: InstrumentId,
1046 ) -> Result<(), KrakenWsError> {
1047 let symbol = to_ws_v2_symbol(instrument_id.symbol.inner());
1048 self.unsubscribe(KrakenWsChannel::Trade, vec![symbol]).await
1049 }
1050
1051 pub async fn unsubscribe_bars(&self, bar_type: BarType) -> Result<(), KrakenWsError> {
1057 let symbol = to_ws_v2_symbol(bar_type.instrument_id().symbol.inner());
1058 let interval = bar_type_to_ws_interval(bar_type)?;
1059 self.unsubscribe_with_interval(KrakenWsChannel::Ohlc, vec![symbol], interval)
1060 .await
1061 }
1062}
1063
1064async fn refresh_auth_token(config: &KrakenDataClientConfig) -> Result<String, KrakenWsError> {
1066 let api_key = config
1067 .api_key
1068 .clone()
1069 .ok_or_else(|| KrakenWsError::AuthenticationError("Missing API key".to_string()))?;
1070 let api_secret = config
1071 .api_secret
1072 .clone()
1073 .ok_or_else(|| KrakenWsError::AuthenticationError("Missing API secret".to_string()))?;
1074
1075 let http_client = KrakenSpotHttpClient::with_credentials(
1076 api_key,
1077 api_secret,
1078 config.environment,
1079 Some(config.http_base_url()),
1080 config.timeout_secs,
1081 None,
1082 None,
1083 None,
1084 config.http_proxy.clone(),
1085 config.max_requests_per_second,
1086 )
1087 .map_err(|e| {
1088 KrakenWsError::AuthenticationError(format!("Failed to create HTTP client: {e}"))
1089 })?;
1090
1091 let ws_token = http_client.get_websockets_token().await.map_err(|e| {
1092 KrakenWsError::AuthenticationError(format!("Failed to get WebSocket token: {e}"))
1093 })?;
1094
1095 log::debug!(
1096 "WebSocket authentication token refreshed: token_length={}, expires={}",
1097 ws_token.token.len(),
1098 ws_token.expires
1099 );
1100
1101 Ok(ws_token.token)
1102}
1103
1104#[inline]
1105fn to_ws_v2_symbol(symbol: Ustr) -> Ustr {
1106 Ustr::from(&normalize_spot_symbol(symbol.as_str()))
1107}
1108
1109fn bar_type_to_ws_interval(bar_type: BarType) -> Result<u32, KrakenWsError> {
1110 const VALID_INTERVALS: [u32; 9] = [1, 5, 15, 30, 60, 240, 1440, 10080, 21600];
1111
1112 let spec = bar_type.spec();
1113 let step = spec.step.get() as u32;
1114
1115 let base_minutes = match spec.aggregation {
1116 BarAggregation::Minute => 1,
1117 BarAggregation::Hour => 60,
1118 BarAggregation::Day => 1440,
1119 BarAggregation::Week => 10080,
1120 other => {
1121 return Err(KrakenWsError::SubscriptionError(format!(
1122 "Unsupported bar aggregation for Kraken OHLC streaming: {other:?}"
1123 )));
1124 }
1125 };
1126
1127 let interval = base_minutes * step;
1128
1129 if !VALID_INTERVALS.contains(&interval) {
1130 return Err(KrakenWsError::SubscriptionError(format!(
1131 "Invalid bar interval {interval} minutes for Kraken OHLC streaming. \
1132 Supported intervals: 1, 5, 15, 30, 60, 240, 1440, 10080, 21600"
1133 )));
1134 }
1135
1136 Ok(interval)
1137}
1138
1139#[cfg(test)]
1140mod tests {
1141 use rstest::rstest;
1142
1143 use super::*;
1144
1145 #[rstest]
1146 #[case("XBT/EUR", "BTC/EUR")]
1147 #[case("XBT/USD", "BTC/USD")]
1148 #[case("XBT/USDT", "BTC/USDT")]
1149 #[case("ETH/USD", "ETH/USD")]
1150 #[case("ETH/XBT", "ETH/BTC")]
1151 #[case("SOL/XBT", "SOL/BTC")]
1152 #[case("SOL/USD", "SOL/USD")]
1153 #[case("BTC/USD", "BTC/USD")]
1154 #[case("ETH/BTC", "ETH/BTC")]
1155 fn test_to_kraken_ws_v2_symbol(#[case] input: &str, #[case] expected: &str) {
1156 let symbol = Ustr::from(input);
1157 let result = to_ws_v2_symbol(symbol);
1158 assert_eq!(result.as_str(), expected);
1159 }
1160}