1use std::sync::{
19 Arc,
20 atomic::{AtomicBool, AtomicU8, Ordering},
21};
22
23use arc_swap::ArcSwap;
24use nautilus_common::live::get_runtime;
25use nautilus_model::{
26 data::BarType,
27 enums::BarAggregation,
28 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId},
29 instruments::InstrumentAny,
30};
31use nautilus_network::{
32 mode::ConnectionMode,
33 websocket::{
34 AuthTracker, SubscriptionState, WebSocketClient, WebSocketConfig, channel_message_handler,
35 },
36};
37use tokio_util::sync::CancellationToken;
38use ustr::Ustr;
39
40pub const KRAKEN_SPOT_WS_TOPIC_DELIMITER: char = ':';
44
45use super::{
46 enums::{KrakenWsChannel, KrakenWsMethod},
47 handler::{SpotFeedHandler, SpotHandlerCommand},
48 messages::{KrakenWsParams, KrakenWsRequest, NautilusWsMessage},
49};
50use crate::{
51 config::KrakenDataClientConfig, http::KrakenSpotHttpClient, websocket::error::KrakenWsError,
52};
53
54const WS_PING_MSG: &str = r#"{"method":"ping"}"#;
55
56#[derive(Debug)]
58#[cfg_attr(
59 feature = "python",
60 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.kraken")
61)]
62pub struct KrakenSpotWebSocketClient {
63 url: String,
64 config: KrakenDataClientConfig,
65 signal: Arc<AtomicBool>,
66 connection_mode: Arc<ArcSwap<AtomicU8>>,
67 cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<SpotHandlerCommand>>>,
68 out_rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>>,
69 task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
70 subscriptions: SubscriptionState,
71 auth_tracker: AuthTracker,
72 cancellation_token: CancellationToken,
73 req_id_counter: Arc<tokio::sync::RwLock<u64>>,
74 auth_token: Arc<tokio::sync::RwLock<Option<String>>>,
75}
76
77impl Clone for KrakenSpotWebSocketClient {
78 fn clone(&self) -> Self {
79 Self {
80 url: self.url.clone(),
81 config: self.config.clone(),
82 signal: Arc::clone(&self.signal),
83 connection_mode: Arc::clone(&self.connection_mode),
84 cmd_tx: Arc::clone(&self.cmd_tx),
85 out_rx: self.out_rx.clone(),
86 task_handle: self.task_handle.clone(),
87 subscriptions: self.subscriptions.clone(),
88 auth_tracker: self.auth_tracker.clone(),
89 cancellation_token: self.cancellation_token.clone(),
90 req_id_counter: self.req_id_counter.clone(),
91 auth_token: self.auth_token.clone(),
92 }
93 }
94}
95
96impl KrakenSpotWebSocketClient {
97 pub fn new(config: KrakenDataClientConfig, cancellation_token: CancellationToken) -> Self {
99 let url = if config.ws_private_url.is_some() {
101 config.ws_private_url()
102 } else {
103 config.ws_public_url()
104 };
105 let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<SpotHandlerCommand>();
106 let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
107 let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
108
109 Self {
110 url,
111 config,
112 signal: Arc::new(AtomicBool::new(false)),
113 connection_mode,
114 cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
115 out_rx: None,
116 task_handle: None,
117 subscriptions: SubscriptionState::new(KRAKEN_SPOT_WS_TOPIC_DELIMITER),
118 auth_tracker: AuthTracker::new(),
119 cancellation_token,
120 req_id_counter: Arc::new(tokio::sync::RwLock::new(0)),
121 auth_token: Arc::new(tokio::sync::RwLock::new(None)),
122 }
123 }
124
125 async fn get_next_req_id(&self) -> u64 {
126 let mut counter = self.req_id_counter.write().await;
127 *counter += 1;
128 *counter
129 }
130
131 pub async fn connect(&mut self) -> Result<(), KrakenWsError> {
133 log::debug!("Connecting to {}", self.url);
134
135 self.signal.store(false, Ordering::Relaxed);
136
137 let (raw_handler, raw_rx) = channel_message_handler();
138
139 let ws_config = WebSocketConfig {
140 url: self.url.clone(),
141 headers: vec![],
142 heartbeat: self.config.heartbeat_interval_secs,
143 heartbeat_msg: Some(WS_PING_MSG.to_string()),
144 reconnect_timeout_ms: Some(5_000),
145 reconnect_delay_initial_ms: Some(500),
146 reconnect_delay_max_ms: Some(5_000),
147 reconnect_backoff_factor: Some(1.5),
148 reconnect_jitter_ms: Some(250),
149 reconnect_max_attempts: None,
150 };
151
152 let ws_client = WebSocketClient::connect(
153 ws_config,
154 Some(raw_handler),
155 None, None, vec![], None, )
160 .await
161 .map_err(|e| KrakenWsError::ConnectionError(e.to_string()))?;
162
163 self.connection_mode
165 .store(ws_client.connection_mode_atomic());
166
167 let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
168 self.out_rx = Some(Arc::new(out_rx));
169
170 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<SpotHandlerCommand>();
171 *self.cmd_tx.write().await = cmd_tx.clone();
172
173 if let Err(e) = cmd_tx.send(SpotHandlerCommand::SetClient(ws_client)) {
174 return Err(KrakenWsError::ConnectionError(format!(
175 "Failed to send WebSocketClient to handler: {e}"
176 )));
177 }
178
179 let signal = self.signal.clone();
180 let subscriptions = self.subscriptions.clone();
181 let config_for_reconnect = self.config.clone();
182 let auth_token_for_reconnect = self.auth_token.clone();
183 let req_id_counter_for_reconnect = self.req_id_counter.clone();
184 let cmd_tx_for_reconnect = cmd_tx.clone();
185
186 let stream_handle = get_runtime().spawn(async move {
187 let mut handler =
188 SpotFeedHandler::new(signal.clone(), cmd_rx, raw_rx, subscriptions.clone());
189
190 loop {
191 match handler.next().await {
192 Some(NautilusWsMessage::Reconnected) => {
193 if signal.load(Ordering::Relaxed) {
194 continue;
195 }
196 log::info!("WebSocket reconnected, resubscribing");
197
198 let confirmed_topics = subscriptions.all_topics();
200 for topic in &confirmed_topics {
201 subscriptions.mark_failure(topic);
202 }
203
204 let topics = subscriptions.all_topics();
205 if topics.is_empty() {
206 log::debug!("No subscriptions to restore after reconnection");
207 } else {
208 let had_auth = auth_token_for_reconnect.read().await.is_some();
210
211 if had_auth && config_for_reconnect.has_api_credentials() {
212 log::debug!("Re-authenticating after reconnect");
213
214 match refresh_auth_token(&config_for_reconnect).await {
215 Ok(new_token) => {
216 *auth_token_for_reconnect.write().await = Some(new_token);
217 log::debug!("Re-authentication successful");
218 }
219 Err(e) => {
220 log::error!(
221 "Failed to re-authenticate after reconnect: {e}"
222 );
223 *auth_token_for_reconnect.write().await = None;
225 }
226 }
227 }
228
229 log::info!("Resubscribing after reconnection: count={}", topics.len());
230
231 for topic in &topics {
233 let auth_token = auth_token_for_reconnect.read().await.clone();
234
235 if topic == "executions" {
237 if let Some(ref token) = auth_token {
238 let mut counter =
239 req_id_counter_for_reconnect.write().await;
240 *counter += 1;
241 let req_id = *counter;
242
243 let request = KrakenWsRequest {
244 method: KrakenWsMethod::Subscribe,
245 params: Some(KrakenWsParams {
246 channel: KrakenWsChannel::Executions,
247 symbol: None,
248 snapshot: None,
249 depth: None,
250 interval: None,
251 token: Some(token.clone()),
252 snap_orders: Some(true),
253 snap_trades: Some(true),
254 }),
255 req_id: Some(req_id),
256 };
257
258 if let Ok(payload) = serde_json::to_string(&request)
259 && let Err(e) = cmd_tx_for_reconnect
260 .send(SpotHandlerCommand::SendText { payload })
261 {
262 log::error!(
263 "Failed to send executions resubscribe: {e}"
264 );
265 }
266
267 subscriptions.mark_subscribe(topic);
268 } else {
269 log::warn!(
270 "Cannot resubscribe to executions: no auth token"
271 );
272 }
273 continue;
274 }
275
276 let parts: Vec<&str> = topic.splitn(3, ':').collect();
278 if parts.len() < 2 {
279 log::warn!(
280 "Invalid topic format for resubscribe: topic={topic}"
281 );
282 continue;
283 }
284
285 let channel_str = parts[0];
286 let channel = match channel_str {
287 "Book" => Some(KrakenWsChannel::Book),
288 "Trade" => Some(KrakenWsChannel::Trade),
289 "Ticker" => Some(KrakenWsChannel::Ticker),
290 "Ohlc" => Some(KrakenWsChannel::Ohlc),
291 "book" => Some(KrakenWsChannel::Book),
292 "quotes" => Some(KrakenWsChannel::Book),
293 _ => None,
294 };
295
296 let Some(channel) = channel else {
297 log::warn!("Unknown channel for resubscribe: topic={topic}");
298 continue;
299 };
300
301 let mut counter = req_id_counter_for_reconnect.write().await;
302 *counter += 1;
303 let req_id = *counter;
304
305 let depth = if channel_str == "quotes" {
306 Some(10)
307 } else {
308 None
309 };
310
311 let (symbol_str, interval) = if parts.len() == 3 {
313 (parts[1], parts[2].parse::<u32>().ok())
315 } else {
316 (parts[1], None)
318 };
319
320 let request = KrakenWsRequest {
321 method: KrakenWsMethod::Subscribe,
322 params: Some(KrakenWsParams {
323 channel,
324 symbol: Some(vec![Ustr::from(symbol_str)]),
325 snapshot: None,
326 depth,
327 interval,
328 token: None,
329 snap_orders: None,
330 snap_trades: None,
331 }),
332 req_id: Some(req_id),
333 };
334
335 if let Ok(payload) = serde_json::to_string(&request)
336 && let Err(e) = cmd_tx_for_reconnect
337 .send(SpotHandlerCommand::SendText { payload })
338 {
339 log::error!(
340 "Failed to send resubscribe command: error={e}, \
341 topic={topic}"
342 );
343 }
344
345 subscriptions.mark_subscribe(topic);
346 }
347 }
348
349 if out_tx.send(NautilusWsMessage::Reconnected).is_err() {
350 log::error!("Failed to send message (receiver dropped)");
351 break;
352 }
353 continue;
354 }
355 Some(msg) => {
356 if out_tx.send(msg).is_err() {
357 log::error!("Failed to send message (receiver dropped)");
358 break;
359 }
360 }
361 None => {
362 if handler.is_stopped() {
363 log::debug!("Stop signal received, ending message processing");
364 break;
365 }
366 log::warn!("WebSocket stream ended unexpectedly");
367 break;
368 }
369 }
370 }
371
372 log::debug!("Handler task exiting");
373 });
374
375 self.task_handle = Some(Arc::new(stream_handle));
376
377 log::debug!("WebSocket connected successfully");
378 Ok(())
379 }
380
381 pub async fn disconnect(&mut self) -> Result<(), KrakenWsError> {
383 log::debug!("Disconnecting WebSocket");
384
385 self.signal.store(true, Ordering::Relaxed);
386
387 if let Err(e) = self
388 .cmd_tx
389 .read()
390 .await
391 .send(SpotHandlerCommand::Disconnect)
392 {
393 log::debug!(
394 "Failed to send disconnect command (handler may already be shut down): {e}"
395 );
396 }
397
398 if let Some(task_handle) = self.task_handle.take() {
399 match Arc::try_unwrap(task_handle) {
400 Ok(handle) => {
401 log::debug!("Waiting for task handle to complete");
402 match tokio::time::timeout(tokio::time::Duration::from_secs(2), handle).await {
403 Ok(Ok(())) => log::debug!("Task handle completed successfully"),
404 Ok(Err(e)) => log::error!("Task handle encountered an error: {e:?}"),
405 Err(_) => {
406 log::warn!(
407 "Timeout waiting for task handle, task may still be running"
408 );
409 }
410 }
411 }
412 Err(arc_handle) => {
413 log::debug!(
414 "Cannot take ownership of task handle - other references exist, aborting task"
415 );
416 arc_handle.abort();
417 }
418 }
419 } else {
420 log::debug!("No task handle to await");
421 }
422
423 self.subscriptions.clear();
424 self.auth_tracker.fail("Disconnected");
425
426 Ok(())
427 }
428
429 pub async fn close(&mut self) -> Result<(), KrakenWsError> {
431 self.disconnect().await
432 }
433
434 pub async fn wait_until_active(&self, timeout_secs: f64) -> Result<(), KrakenWsError> {
436 let timeout = tokio::time::Duration::from_secs_f64(timeout_secs);
437
438 tokio::time::timeout(timeout, async {
439 while !self.is_active() {
440 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
441 }
442 })
443 .await
444 .map_err(|_| {
445 KrakenWsError::ConnectionError(format!(
446 "WebSocket connection timeout after {timeout_secs} seconds"
447 ))
448 })?;
449
450 Ok(())
451 }
452
453 pub async fn authenticate(&self) -> Result<(), KrakenWsError> {
455 if !self.config.has_api_credentials() {
456 return Err(KrakenWsError::AuthenticationError(
457 "API credentials required for authentication".to_string(),
458 ));
459 }
460
461 let api_key = self
462 .config
463 .api_key
464 .clone()
465 .ok_or_else(|| KrakenWsError::AuthenticationError("Missing API key".to_string()))?;
466 let api_secret =
467 self.config.api_secret.clone().ok_or_else(|| {
468 KrakenWsError::AuthenticationError("Missing API secret".to_string())
469 })?;
470
471 let http_client = KrakenSpotHttpClient::with_credentials(
472 api_key,
473 api_secret,
474 self.config.environment,
475 Some(self.config.http_base_url()),
476 self.config.timeout_secs,
477 None,
478 None,
479 None,
480 self.config.http_proxy.clone(),
481 self.config.max_requests_per_second,
482 )
483 .map_err(|e| {
484 KrakenWsError::AuthenticationError(format!("Failed to create HTTP client: {e}"))
485 })?;
486
487 let ws_token = http_client.get_websockets_token().await.map_err(|e| {
488 KrakenWsError::AuthenticationError(format!("Failed to get WebSocket token: {e}"))
489 })?;
490
491 log::debug!(
492 "WebSocket authentication token received: token_length={}, expires={}",
493 ws_token.token.len(),
494 ws_token.expires
495 );
496
497 let mut auth_token = self.auth_token.write().await;
498 *auth_token = Some(ws_token.token);
499
500 Ok(())
501 }
502
503 pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
505 if let Ok(cmd_tx) = self.cmd_tx.try_read()
507 && let Err(e) = cmd_tx.send(SpotHandlerCommand::InitializeInstruments(instruments))
508 {
509 log::debug!("Failed to send instruments to handler: {e}");
510 }
511 }
512
513 pub fn cache_instrument(&self, instrument: InstrumentAny) {
515 if let Ok(cmd_tx) = self.cmd_tx.try_read()
517 && let Err(e) = cmd_tx.send(SpotHandlerCommand::UpdateInstrument(instrument))
518 {
519 log::debug!("Failed to send instrument update to handler: {e}");
520 }
521 }
522
523 pub fn set_account_id(&self, account_id: AccountId) {
528 if let Ok(cmd_tx) = self.cmd_tx.try_read()
529 && let Err(e) = cmd_tx.send(SpotHandlerCommand::SetAccountId(account_id))
530 {
531 log::debug!("Failed to send account ID to handler: {e}");
532 }
533 }
534
535 pub fn cache_client_order(
541 &self,
542 client_order_id: ClientOrderId,
543 instrument_id: InstrumentId,
544 trader_id: TraderId,
545 strategy_id: StrategyId,
546 ) {
547 if let Ok(cmd_tx) = self.cmd_tx.try_read()
548 && let Err(e) = cmd_tx.send(SpotHandlerCommand::CacheClientOrder {
549 client_order_id,
550 instrument_id,
551 trader_id,
552 strategy_id,
553 })
554 {
555 log::debug!("Failed to send cache client order command to handler: {e}");
556 }
557 }
558
559 pub fn cancel_all_requests(&self) {
561 self.cancellation_token.cancel();
562 }
563
564 pub fn cancellation_token(&self) -> &CancellationToken {
566 &self.cancellation_token
567 }
568
569 pub async fn subscribe(
571 &self,
572 channel: KrakenWsChannel,
573 symbols: Vec<Ustr>,
574 depth: Option<u32>,
575 ) -> Result<(), KrakenWsError> {
576 let mut symbols_to_subscribe = Vec::new();
577 for symbol in &symbols {
578 let key = format!("{channel:?}:{symbol}");
579 if self.subscriptions.add_reference(&key) {
580 self.subscriptions.mark_subscribe(&key);
581 symbols_to_subscribe.push(*symbol);
582 }
583 }
584
585 if symbols_to_subscribe.is_empty() {
586 return Ok(());
587 }
588
589 let is_private = matches!(
590 channel,
591 KrakenWsChannel::Executions | KrakenWsChannel::Balances
592 );
593 let token = if is_private {
594 Some(self.auth_token.read().await.clone().ok_or_else(|| {
595 KrakenWsError::AuthenticationError(
596 "Authentication token required for private channels. Call authenticate() first"
597 .to_string(),
598 )
599 })?)
600 } else {
601 None
602 };
603
604 let req_id = self.get_next_req_id().await;
605 let request = KrakenWsRequest {
606 method: KrakenWsMethod::Subscribe,
607 params: Some(KrakenWsParams {
608 channel,
609 symbol: Some(symbols_to_subscribe.clone()),
610 snapshot: None,
611 depth,
612 interval: None,
613 token,
614 snap_orders: None,
615 snap_trades: None,
616 }),
617 req_id: Some(req_id),
618 };
619
620 self.send_request(&request).await?;
621
622 for symbol in &symbols_to_subscribe {
623 let key = format!("{channel:?}:{symbol}");
624 self.subscriptions.confirm_subscribe(&key);
625 }
626
627 Ok(())
628 }
629
630 async fn subscribe_with_interval(
632 &self,
633 channel: KrakenWsChannel,
634 symbols: Vec<Ustr>,
635 interval: u32,
636 ) -> Result<(), KrakenWsError> {
637 let mut symbols_to_subscribe = Vec::new();
638 for symbol in &symbols {
639 let key = format!("{channel:?}:{symbol}:{interval}");
640 if self.subscriptions.add_reference(&key) {
641 self.subscriptions.mark_subscribe(&key);
642 symbols_to_subscribe.push(*symbol);
643 }
644 }
645
646 if symbols_to_subscribe.is_empty() {
647 return Ok(());
648 }
649
650 let req_id = self.get_next_req_id().await;
651 let request = KrakenWsRequest {
652 method: KrakenWsMethod::Subscribe,
653 params: Some(KrakenWsParams {
654 channel,
655 symbol: Some(symbols_to_subscribe.clone()),
656 snapshot: None,
657 depth: None,
658 interval: Some(interval),
659 token: None,
660 snap_orders: None,
661 snap_trades: None,
662 }),
663 req_id: Some(req_id),
664 };
665
666 self.send_request(&request).await?;
667
668 for symbol in &symbols_to_subscribe {
669 let key = format!("{channel:?}:{symbol}:{interval}");
670 self.subscriptions.confirm_subscribe(&key);
671 }
672
673 Ok(())
674 }
675
676 async fn unsubscribe_with_interval(
678 &self,
679 channel: KrakenWsChannel,
680 symbols: Vec<Ustr>,
681 interval: u32,
682 ) -> Result<(), KrakenWsError> {
683 let mut symbols_to_unsubscribe = Vec::new();
684 for symbol in &symbols {
685 let key = format!("{channel:?}:{symbol}:{interval}");
686 if self.subscriptions.remove_reference(&key) {
687 self.subscriptions.mark_unsubscribe(&key);
688 symbols_to_unsubscribe.push(*symbol);
689 }
690 }
691
692 if symbols_to_unsubscribe.is_empty() {
693 return Ok(());
694 }
695
696 let req_id = self.get_next_req_id().await;
697 let request = KrakenWsRequest {
698 method: KrakenWsMethod::Unsubscribe,
699 params: Some(KrakenWsParams {
700 channel,
701 symbol: Some(symbols_to_unsubscribe.clone()),
702 snapshot: None,
703 depth: None,
704 interval: Some(interval),
705 token: None,
706 snap_orders: None,
707 snap_trades: None,
708 }),
709 req_id: Some(req_id),
710 };
711
712 self.send_request(&request).await?;
713
714 for symbol in &symbols_to_unsubscribe {
715 let key = format!("{channel:?}:{symbol}:{interval}");
716 self.subscriptions.confirm_unsubscribe(&key);
717 }
718
719 Ok(())
720 }
721
722 pub async fn unsubscribe(
724 &self,
725 channel: KrakenWsChannel,
726 symbols: Vec<Ustr>,
727 ) -> Result<(), KrakenWsError> {
728 let mut symbols_to_unsubscribe = Vec::new();
729 for symbol in &symbols {
730 let key = format!("{channel:?}:{symbol}");
731 if self.subscriptions.remove_reference(&key) {
732 self.subscriptions.mark_unsubscribe(&key);
733 symbols_to_unsubscribe.push(*symbol);
734 } else {
735 log::debug!(
736 "Channel {channel:?} symbol {symbol} still has active subscriptions, not unsubscribing"
737 );
738 }
739 }
740
741 if symbols_to_unsubscribe.is_empty() {
742 return Ok(());
743 }
744
745 let is_private = matches!(
746 channel,
747 KrakenWsChannel::Executions | KrakenWsChannel::Balances
748 );
749 let token = if is_private {
750 Some(self.auth_token.read().await.clone().ok_or_else(|| {
751 KrakenWsError::AuthenticationError(
752 "Authentication token required for private channels. Call authenticate() first"
753 .to_string(),
754 )
755 })?)
756 } else {
757 None
758 };
759
760 let req_id = self.get_next_req_id().await;
761 let request = KrakenWsRequest {
762 method: KrakenWsMethod::Unsubscribe,
763 params: Some(KrakenWsParams {
764 channel,
765 symbol: Some(symbols_to_unsubscribe.clone()),
766 snapshot: None,
767 depth: None,
768 interval: None,
769 token,
770 snap_orders: None,
771 snap_trades: None,
772 }),
773 req_id: Some(req_id),
774 };
775
776 self.send_request(&request).await?;
777
778 for symbol in &symbols_to_unsubscribe {
779 let key = format!("{channel:?}:{symbol}");
780 self.subscriptions.confirm_unsubscribe(&key);
781 }
782
783 Ok(())
784 }
785
786 pub async fn send_ping(&self) -> Result<(), KrakenWsError> {
788 let req_id = self.get_next_req_id().await;
789
790 let request = KrakenWsRequest {
791 method: KrakenWsMethod::Ping,
792 params: None,
793 req_id: Some(req_id),
794 };
795
796 self.send_request(&request).await
797 }
798
799 async fn send_request(&self, request: &KrakenWsRequest) -> Result<(), KrakenWsError> {
800 let payload =
801 serde_json::to_string(request).map_err(|e| KrakenWsError::JsonError(e.to_string()))?;
802
803 log::trace!("Sending message: {payload}");
804
805 self.cmd_tx
806 .read()
807 .await
808 .send(SpotHandlerCommand::SendText { payload })
809 .map_err(|e| KrakenWsError::ConnectionError(format!("Failed to send request: {e}")))?;
810
811 Ok(())
812 }
813
814 pub fn is_connected(&self) -> bool {
816 let connection_mode_arc = self.connection_mode.load();
817 !ConnectionMode::from_atomic(&connection_mode_arc).is_closed()
818 }
819
820 pub fn is_active(&self) -> bool {
822 let connection_mode_arc = self.connection_mode.load();
823 ConnectionMode::from_atomic(&connection_mode_arc).is_active()
824 && !self.signal.load(Ordering::Relaxed)
825 }
826
827 pub fn is_closed(&self) -> bool {
829 let connection_mode_arc = self.connection_mode.load();
830 ConnectionMode::from_atomic(&connection_mode_arc).is_closed()
831 || self.signal.load(Ordering::Relaxed)
832 }
833
834 pub fn url(&self) -> &str {
836 &self.url
837 }
838
839 pub fn get_subscriptions(&self) -> Vec<String> {
841 self.subscriptions.all_topics()
842 }
843
844 pub fn stream(&mut self) -> impl futures_util::Stream<Item = NautilusWsMessage> + use<> {
846 let rx = self
847 .out_rx
848 .take()
849 .expect("Stream receiver already taken or client not connected");
850 let mut rx = Arc::try_unwrap(rx).expect("Cannot take ownership - other references exist");
851 async_stream::stream! {
852 while let Some(msg) = rx.recv().await {
853 yield msg;
854 }
855 }
856 }
857
858 pub async fn subscribe_book(
860 &self,
861 instrument_id: InstrumentId,
862 depth: Option<u32>,
863 ) -> Result<(), KrakenWsError> {
864 let symbol = instrument_id.symbol.inner();
866 let book_key = format!("book:{symbol}");
867
868 if !self.subscriptions.add_reference(&book_key) {
869 return Ok(());
870 }
871
872 self.subscriptions.mark_subscribe(&book_key);
873 self.subscriptions.confirm_subscribe(&book_key);
874
875 self.subscribe(KrakenWsChannel::Book, vec![symbol], depth)
876 .await
877 }
878
879 pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> Result<(), KrakenWsError> {
884 let symbol = instrument_id.symbol.inner();
885 let quotes_key = format!("quotes:{symbol}");
886
887 if !self.subscriptions.add_reference("es_key) {
888 return Ok(());
889 }
890
891 self.subscriptions.mark_subscribe("es_key);
892 self.subscriptions.confirm_subscribe("es_key);
893 self.ensure_book_subscribed(symbol).await
894 }
895
896 pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> Result<(), KrakenWsError> {
898 let symbol = instrument_id.symbol.inner();
899 self.subscribe(KrakenWsChannel::Trade, vec![symbol], None)
900 .await
901 }
902
903 pub async fn subscribe_bars(&self, bar_type: BarType) -> Result<(), KrakenWsError> {
909 let symbol = bar_type.instrument_id().symbol.inner();
910 let interval = bar_type_to_ws_interval(bar_type)?;
911 self.subscribe_with_interval(KrakenWsChannel::Ohlc, vec![symbol], interval)
912 .await
913 }
914
915 pub async fn subscribe_executions(
919 &self,
920 snap_orders: bool,
921 snap_trades: bool,
922 ) -> Result<(), KrakenWsError> {
923 let req_id = self.get_next_req_id().await;
924
925 let token = self.auth_token.read().await.clone().ok_or_else(|| {
926 KrakenWsError::AuthenticationError(
927 "Authentication token required for executions channel. Call authenticate() first"
928 .to_string(),
929 )
930 })?;
931
932 let request = KrakenWsRequest {
933 method: KrakenWsMethod::Subscribe,
934 params: Some(KrakenWsParams {
935 channel: KrakenWsChannel::Executions,
936 symbol: None,
937 snapshot: None,
938 depth: None,
939 interval: None,
940 token: Some(token),
941 snap_orders: Some(snap_orders),
942 snap_trades: Some(snap_trades),
943 }),
944 req_id: Some(req_id),
945 };
946
947 self.send_request(&request).await?;
948
949 let key = "executions";
950 if self.subscriptions.add_reference(key) {
951 self.subscriptions.mark_subscribe(key);
952 self.subscriptions.confirm_subscribe(key);
953 }
954
955 Ok(())
956 }
957
958 pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> Result<(), KrakenWsError> {
962 let symbol = instrument_id.symbol.inner();
963 let book_key = format!("book:{symbol}");
964
965 if !self.subscriptions.remove_reference(&book_key) {
966 return Ok(());
967 }
968
969 self.subscriptions.mark_unsubscribe(&book_key);
970 self.subscriptions.confirm_unsubscribe(&book_key);
971 self.maybe_unsubscribe_book(symbol).await
972 }
973
974 pub async fn unsubscribe_quotes(
976 &self,
977 instrument_id: InstrumentId,
978 ) -> Result<(), KrakenWsError> {
979 let symbol = instrument_id.symbol.inner();
980 let quotes_key = format!("quotes:{symbol}");
981
982 if !self.subscriptions.remove_reference("es_key) {
983 return Ok(());
984 }
985
986 self.subscriptions.mark_unsubscribe("es_key);
987 self.subscriptions.confirm_unsubscribe("es_key);
988 self.maybe_unsubscribe_book(symbol).await
989 }
990
991 pub async fn unsubscribe_trades(
993 &self,
994 instrument_id: InstrumentId,
995 ) -> Result<(), KrakenWsError> {
996 let symbol = instrument_id.symbol.inner();
997 self.unsubscribe(KrakenWsChannel::Trade, vec![symbol]).await
998 }
999
1000 pub async fn unsubscribe_bars(&self, bar_type: BarType) -> Result<(), KrakenWsError> {
1006 let symbol = bar_type.instrument_id().symbol.inner();
1007 let interval = bar_type_to_ws_interval(bar_type)?;
1008 self.unsubscribe_with_interval(KrakenWsChannel::Ohlc, vec![symbol], interval)
1009 .await
1010 }
1011
1012 async fn ensure_book_subscribed(&self, symbol: Ustr) -> Result<(), KrakenWsError> {
1016 self.subscribe(KrakenWsChannel::Book, vec![symbol], Some(10))
1017 .await
1018 }
1019
1020 async fn maybe_unsubscribe_book(&self, symbol: Ustr) -> Result<(), KrakenWsError> {
1024 self.unsubscribe(KrakenWsChannel::Book, vec![symbol]).await
1025 }
1026}
1027
1028async fn refresh_auth_token(config: &KrakenDataClientConfig) -> Result<String, KrakenWsError> {
1030 let api_key = config
1031 .api_key
1032 .clone()
1033 .ok_or_else(|| KrakenWsError::AuthenticationError("Missing API key".to_string()))?;
1034 let api_secret = config
1035 .api_secret
1036 .clone()
1037 .ok_or_else(|| KrakenWsError::AuthenticationError("Missing API secret".to_string()))?;
1038
1039 let http_client = KrakenSpotHttpClient::with_credentials(
1040 api_key,
1041 api_secret,
1042 config.environment,
1043 Some(config.http_base_url()),
1044 config.timeout_secs,
1045 None,
1046 None,
1047 None,
1048 config.http_proxy.clone(),
1049 config.max_requests_per_second,
1050 )
1051 .map_err(|e| {
1052 KrakenWsError::AuthenticationError(format!("Failed to create HTTP client: {e}"))
1053 })?;
1054
1055 let ws_token = http_client.get_websockets_token().await.map_err(|e| {
1056 KrakenWsError::AuthenticationError(format!("Failed to get WebSocket token: {e}"))
1057 })?;
1058
1059 log::debug!(
1060 "WebSocket authentication token refreshed: token_length={}, expires={}",
1061 ws_token.token.len(),
1062 ws_token.expires
1063 );
1064
1065 Ok(ws_token.token)
1066}
1067
1068fn bar_type_to_ws_interval(bar_type: BarType) -> Result<u32, KrakenWsError> {
1073 const VALID_INTERVALS: [u32; 9] = [1, 5, 15, 30, 60, 240, 1440, 10080, 21600];
1074
1075 let spec = bar_type.spec();
1076 let step = spec.step.get() as u32;
1077
1078 let base_minutes = match spec.aggregation {
1079 BarAggregation::Minute => 1,
1080 BarAggregation::Hour => 60,
1081 BarAggregation::Day => 1440,
1082 BarAggregation::Week => 10080,
1083 other => {
1084 return Err(KrakenWsError::SubscriptionError(format!(
1085 "Unsupported bar aggregation for Kraken OHLC streaming: {other:?}"
1086 )));
1087 }
1088 };
1089
1090 let interval = base_minutes * step;
1091
1092 if !VALID_INTERVALS.contains(&interval) {
1093 return Err(KrakenWsError::SubscriptionError(format!(
1094 "Invalid bar interval {interval} minutes for Kraken OHLC streaming. \
1095 Supported intervals: 1, 5, 15, 30, 60, 240, 1440, 10080, 21600"
1096 )));
1097 }
1098
1099 Ok(interval)
1100}