1use std::sync::{
19 Arc,
20 atomic::{AtomicBool, AtomicU8, Ordering},
21};
22
23use arc_swap::ArcSwap;
24use nautilus_common::live::runtime::get_runtime;
25use nautilus_model::{
26 data::BarType,
27 enums::BarAggregation,
28 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId},
29 instruments::InstrumentAny,
30};
31use nautilus_network::{
32 mode::ConnectionMode,
33 websocket::{
34 AuthTracker, SubscriptionState, WebSocketClient, WebSocketConfig, channel_message_handler,
35 },
36};
37use tokio_util::sync::CancellationToken;
38use ustr::Ustr;
39
40pub const KRAKEN_SPOT_WS_TOPIC_DELIMITER: char = ':';
44
45use super::{
46 enums::{KrakenWsChannel, KrakenWsMethod},
47 handler::{SpotFeedHandler, SpotHandlerCommand},
48 messages::{KrakenWsParams, KrakenWsRequest, NautilusWsMessage},
49};
50use crate::{
51 config::KrakenDataClientConfig, http::KrakenSpotHttpClient, websocket::error::KrakenWsError,
52};
53
54const WS_PING_MSG: &str = r#"{"method":"ping"}"#;
55
56#[derive(Debug)]
58#[cfg_attr(
59 feature = "python",
60 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
61)]
62pub struct KrakenSpotWebSocketClient {
63 url: String,
64 config: KrakenDataClientConfig,
65 signal: Arc<AtomicBool>,
66 connection_mode: Arc<ArcSwap<AtomicU8>>,
67 cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<SpotHandlerCommand>>>,
68 out_rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>>,
69 task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
70 subscriptions: SubscriptionState,
71 auth_tracker: AuthTracker,
72 cancellation_token: CancellationToken,
73 req_id_counter: Arc<tokio::sync::RwLock<u64>>,
74 auth_token: Arc<tokio::sync::RwLock<Option<String>>>,
75}
76
77impl Clone for KrakenSpotWebSocketClient {
78 fn clone(&self) -> Self {
79 Self {
80 url: self.url.clone(),
81 config: self.config.clone(),
82 signal: Arc::clone(&self.signal),
83 connection_mode: Arc::clone(&self.connection_mode),
84 cmd_tx: Arc::clone(&self.cmd_tx),
85 out_rx: self.out_rx.clone(),
86 task_handle: self.task_handle.clone(),
87 subscriptions: self.subscriptions.clone(),
88 auth_tracker: self.auth_tracker.clone(),
89 cancellation_token: self.cancellation_token.clone(),
90 req_id_counter: self.req_id_counter.clone(),
91 auth_token: self.auth_token.clone(),
92 }
93 }
94}
95
96impl KrakenSpotWebSocketClient {
97 pub fn new(config: KrakenDataClientConfig, cancellation_token: CancellationToken) -> Self {
99 let url = if config.ws_private_url.is_some() {
101 config.ws_private_url()
102 } else {
103 config.ws_public_url()
104 };
105 let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<SpotHandlerCommand>();
106 let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
107 let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
108
109 Self {
110 url,
111 config,
112 signal: Arc::new(AtomicBool::new(false)),
113 connection_mode,
114 cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
115 out_rx: None,
116 task_handle: None,
117 subscriptions: SubscriptionState::new(KRAKEN_SPOT_WS_TOPIC_DELIMITER),
118 auth_tracker: AuthTracker::new(),
119 cancellation_token,
120 req_id_counter: Arc::new(tokio::sync::RwLock::new(0)),
121 auth_token: Arc::new(tokio::sync::RwLock::new(None)),
122 }
123 }
124
125 async fn get_next_req_id(&self) -> u64 {
126 let mut counter = self.req_id_counter.write().await;
127 *counter += 1;
128 *counter
129 }
130
131 pub async fn connect(&mut self) -> Result<(), KrakenWsError> {
133 tracing::debug!("Connecting to {}", self.url);
134
135 self.signal.store(false, Ordering::Relaxed);
136
137 let (raw_handler, raw_rx) = channel_message_handler();
138
139 let ws_config = WebSocketConfig {
140 url: self.url.clone(),
141 headers: vec![],
142 message_handler: Some(raw_handler),
143 ping_handler: None,
144 heartbeat: self.config.heartbeat_interval_secs,
145 heartbeat_msg: Some(WS_PING_MSG.to_string()),
146 reconnect_timeout_ms: Some(5_000),
147 reconnect_delay_initial_ms: Some(500),
148 reconnect_delay_max_ms: Some(5_000),
149 reconnect_backoff_factor: Some(1.5),
150 reconnect_jitter_ms: Some(250),
151 reconnect_max_attempts: None,
152 };
153
154 let ws_client = WebSocketClient::connect(
155 ws_config,
156 None, vec![], None, )
160 .await
161 .map_err(|e| KrakenWsError::ConnectionError(e.to_string()))?;
162
163 self.connection_mode
165 .store(ws_client.connection_mode_atomic());
166
167 let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
168 self.out_rx = Some(Arc::new(out_rx));
169
170 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<SpotHandlerCommand>();
171 *self.cmd_tx.write().await = cmd_tx.clone();
172
173 if let Err(e) = cmd_tx.send(SpotHandlerCommand::SetClient(ws_client)) {
174 return Err(KrakenWsError::ConnectionError(format!(
175 "Failed to send WebSocketClient to handler: {e}"
176 )));
177 }
178
179 let signal = self.signal.clone();
180 let subscriptions = self.subscriptions.clone();
181 let config_for_reconnect = self.config.clone();
182 let auth_token_for_reconnect = self.auth_token.clone();
183 let req_id_counter_for_reconnect = self.req_id_counter.clone();
184 let cmd_tx_for_reconnect = cmd_tx.clone();
185
186 let stream_handle = get_runtime().spawn(async move {
187 let mut handler =
188 SpotFeedHandler::new(signal.clone(), cmd_rx, raw_rx, subscriptions.clone());
189
190 loop {
191 match handler.next().await {
192 Some(NautilusWsMessage::Reconnected) => {
193 if signal.load(Ordering::Relaxed) {
194 continue;
195 }
196 tracing::info!("WebSocket reconnected, resubscribing");
197
198 let confirmed_topics = subscriptions.all_topics();
200 for topic in &confirmed_topics {
201 subscriptions.mark_failure(topic);
202 }
203
204 let topics = subscriptions.all_topics();
205 if topics.is_empty() {
206 tracing::debug!("No subscriptions to restore after reconnection");
207 } else {
208 let had_auth = auth_token_for_reconnect.read().await.is_some();
210
211 if had_auth && config_for_reconnect.has_api_credentials() {
212 tracing::debug!("Re-authenticating after reconnect");
213
214 match refresh_auth_token(&config_for_reconnect).await {
215 Ok(new_token) => {
216 *auth_token_for_reconnect.write().await = Some(new_token);
217 tracing::debug!("Re-authentication successful");
218 }
219 Err(e) => {
220 tracing::error!(
221 error = %e,
222 "Failed to re-authenticate after reconnect"
223 );
224 *auth_token_for_reconnect.write().await = None;
226 }
227 }
228 }
229
230 tracing::info!(
231 count = topics.len(),
232 "Resubscribing after reconnection"
233 );
234
235 for topic in &topics {
237 let auth_token = auth_token_for_reconnect.read().await.clone();
238
239 if topic == "executions" {
241 if let Some(ref token) = auth_token {
242 let mut counter =
243 req_id_counter_for_reconnect.write().await;
244 *counter += 1;
245 let req_id = *counter;
246
247 let request = KrakenWsRequest {
248 method: KrakenWsMethod::Subscribe,
249 params: Some(KrakenWsParams {
250 channel: KrakenWsChannel::Executions,
251 symbol: None,
252 snapshot: None,
253 depth: None,
254 interval: None,
255 token: Some(token.clone()),
256 snap_orders: Some(true),
257 snap_trades: Some(true),
258 }),
259 req_id: Some(req_id),
260 };
261
262 if let Ok(payload) = serde_json::to_string(&request)
263 && let Err(e) = cmd_tx_for_reconnect
264 .send(SpotHandlerCommand::SendText { payload })
265 {
266 tracing::error!(
267 error = %e,
268 "Failed to send executions resubscribe"
269 );
270 }
271
272 subscriptions.mark_subscribe(topic);
273 } else {
274 tracing::warn!(
275 "Cannot resubscribe to executions: no auth token"
276 );
277 }
278 continue;
279 }
280
281 let parts: Vec<&str> = topic.splitn(3, ':').collect();
283 if parts.len() < 2 {
284 tracing::warn!(topic, "Invalid topic format for resubscribe");
285 continue;
286 }
287
288 let channel_str = parts[0];
289 let channel = match channel_str {
290 "Book" => Some(KrakenWsChannel::Book),
291 "Trade" => Some(KrakenWsChannel::Trade),
292 "Ticker" => Some(KrakenWsChannel::Ticker),
293 "Ohlc" => Some(KrakenWsChannel::Ohlc),
294 "book" => Some(KrakenWsChannel::Book),
295 "quotes" => Some(KrakenWsChannel::Book),
296 _ => None,
297 };
298
299 let Some(channel) = channel else {
300 tracing::warn!(topic, "Unknown channel for resubscribe");
301 continue;
302 };
303
304 let mut counter = req_id_counter_for_reconnect.write().await;
305 *counter += 1;
306 let req_id = *counter;
307
308 let depth = if channel_str == "quotes" {
309 Some(10)
310 } else {
311 None
312 };
313
314 let (symbol_str, interval) = if parts.len() == 3 {
316 (parts[1], parts[2].parse::<u32>().ok())
318 } else {
319 (parts[1], None)
321 };
322
323 let request = KrakenWsRequest {
324 method: KrakenWsMethod::Subscribe,
325 params: Some(KrakenWsParams {
326 channel,
327 symbol: Some(vec![Ustr::from(symbol_str)]),
328 snapshot: None,
329 depth,
330 interval,
331 token: None,
332 snap_orders: None,
333 snap_trades: None,
334 }),
335 req_id: Some(req_id),
336 };
337
338 if let Ok(payload) = serde_json::to_string(&request)
339 && let Err(e) = cmd_tx_for_reconnect
340 .send(SpotHandlerCommand::SendText { payload })
341 {
342 tracing::error!(
343 error = %e,
344 topic,
345 "Failed to send resubscribe command"
346 );
347 }
348
349 subscriptions.mark_subscribe(topic);
350 }
351 }
352
353 if out_tx.send(NautilusWsMessage::Reconnected).is_err() {
354 tracing::error!("Failed to send message (receiver dropped)");
355 break;
356 }
357 continue;
358 }
359 Some(msg) => {
360 if out_tx.send(msg).is_err() {
361 tracing::error!("Failed to send message (receiver dropped)");
362 break;
363 }
364 }
365 None => {
366 if handler.is_stopped() {
367 tracing::debug!("Stop signal received, ending message processing");
368 break;
369 }
370 tracing::warn!("WebSocket stream ended unexpectedly");
371 break;
372 }
373 }
374 }
375
376 tracing::debug!("Handler task exiting");
377 });
378
379 self.task_handle = Some(Arc::new(stream_handle));
380
381 tracing::debug!("WebSocket connected successfully");
382 Ok(())
383 }
384
385 pub async fn disconnect(&mut self) -> Result<(), KrakenWsError> {
387 tracing::debug!("Disconnecting WebSocket");
388
389 self.signal.store(true, Ordering::Relaxed);
390
391 if let Err(e) = self
392 .cmd_tx
393 .read()
394 .await
395 .send(SpotHandlerCommand::Disconnect)
396 {
397 tracing::debug!(
398 "Failed to send disconnect command (handler may already be shut down): {e}"
399 );
400 }
401
402 if let Some(task_handle) = self.task_handle.take() {
403 match Arc::try_unwrap(task_handle) {
404 Ok(handle) => {
405 tracing::debug!("Waiting for task handle to complete");
406 match tokio::time::timeout(tokio::time::Duration::from_secs(2), handle).await {
407 Ok(Ok(())) => tracing::debug!("Task handle completed successfully"),
408 Ok(Err(e)) => tracing::error!("Task handle encountered an error: {e:?}"),
409 Err(_) => {
410 tracing::warn!(
411 "Timeout waiting for task handle, task may still be running"
412 );
413 }
414 }
415 }
416 Err(arc_handle) => {
417 tracing::debug!(
418 "Cannot take ownership of task handle - other references exist, aborting task"
419 );
420 arc_handle.abort();
421 }
422 }
423 } else {
424 tracing::debug!("No task handle to await");
425 }
426
427 self.subscriptions.clear();
428 self.auth_tracker.fail("Disconnected");
429
430 Ok(())
431 }
432
433 pub async fn close(&mut self) -> Result<(), KrakenWsError> {
435 self.disconnect().await
436 }
437
438 pub async fn wait_until_active(&self, timeout_secs: f64) -> Result<(), KrakenWsError> {
440 let timeout = tokio::time::Duration::from_secs_f64(timeout_secs);
441
442 tokio::time::timeout(timeout, async {
443 while !self.is_active() {
444 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
445 }
446 })
447 .await
448 .map_err(|_| {
449 KrakenWsError::ConnectionError(format!(
450 "WebSocket connection timeout after {timeout_secs} seconds"
451 ))
452 })?;
453
454 Ok(())
455 }
456
457 pub async fn authenticate(&self) -> Result<(), KrakenWsError> {
459 if !self.config.has_api_credentials() {
460 return Err(KrakenWsError::AuthenticationError(
461 "API credentials required for authentication".to_string(),
462 ));
463 }
464
465 let api_key = self
466 .config
467 .api_key
468 .clone()
469 .ok_or_else(|| KrakenWsError::AuthenticationError("Missing API key".to_string()))?;
470 let api_secret =
471 self.config.api_secret.clone().ok_or_else(|| {
472 KrakenWsError::AuthenticationError("Missing API secret".to_string())
473 })?;
474
475 let http_client = KrakenSpotHttpClient::with_credentials(
476 api_key,
477 api_secret,
478 self.config.environment,
479 Some(self.config.http_base_url()),
480 self.config.timeout_secs,
481 None,
482 None,
483 None,
484 self.config.http_proxy.clone(),
485 self.config.max_requests_per_second,
486 )
487 .map_err(|e| {
488 KrakenWsError::AuthenticationError(format!("Failed to create HTTP client: {e}"))
489 })?;
490
491 let ws_token = http_client.get_websockets_token().await.map_err(|e| {
492 KrakenWsError::AuthenticationError(format!("Failed to get WebSocket token: {e}"))
493 })?;
494
495 tracing::debug!(
496 token_length = ws_token.token.len(),
497 expires = ws_token.expires,
498 "WebSocket authentication token received"
499 );
500
501 let mut auth_token = self.auth_token.write().await;
502 *auth_token = Some(ws_token.token);
503
504 Ok(())
505 }
506
507 pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
509 if let Ok(cmd_tx) = self.cmd_tx.try_read()
511 && let Err(e) = cmd_tx.send(SpotHandlerCommand::InitializeInstruments(instruments))
512 {
513 tracing::debug!("Failed to send instruments to handler: {e}");
514 }
515 }
516
517 pub fn cache_instrument(&self, instrument: InstrumentAny) {
519 if let Ok(cmd_tx) = self.cmd_tx.try_read()
521 && let Err(e) = cmd_tx.send(SpotHandlerCommand::UpdateInstrument(instrument))
522 {
523 tracing::debug!("Failed to send instrument update to handler: {e}");
524 }
525 }
526
527 pub fn set_account_id(&self, account_id: AccountId) {
532 if let Ok(cmd_tx) = self.cmd_tx.try_read()
533 && let Err(e) = cmd_tx.send(SpotHandlerCommand::SetAccountId(account_id))
534 {
535 tracing::debug!("Failed to send account ID to handler: {e}");
536 }
537 }
538
539 pub fn cache_client_order(
545 &self,
546 client_order_id: ClientOrderId,
547 instrument_id: InstrumentId,
548 trader_id: TraderId,
549 strategy_id: StrategyId,
550 ) {
551 if let Ok(cmd_tx) = self.cmd_tx.try_read()
552 && let Err(e) = cmd_tx.send(SpotHandlerCommand::CacheClientOrder {
553 client_order_id,
554 instrument_id,
555 trader_id,
556 strategy_id,
557 })
558 {
559 tracing::debug!("Failed to send cache client order command to handler: {e}");
560 }
561 }
562
563 pub fn cancel_all_requests(&self) {
565 self.cancellation_token.cancel();
566 }
567
568 pub fn cancellation_token(&self) -> &CancellationToken {
570 &self.cancellation_token
571 }
572
573 pub async fn subscribe(
575 &self,
576 channel: KrakenWsChannel,
577 symbols: Vec<Ustr>,
578 depth: Option<u32>,
579 ) -> Result<(), KrakenWsError> {
580 let mut symbols_to_subscribe = Vec::new();
581 for symbol in &symbols {
582 let key = format!("{:?}:{}", channel, symbol);
583 if self.subscriptions.add_reference(&key) {
584 self.subscriptions.mark_subscribe(&key);
585 symbols_to_subscribe.push(*symbol);
586 }
587 }
588
589 if symbols_to_subscribe.is_empty() {
590 return Ok(());
591 }
592
593 let is_private = matches!(
594 channel,
595 KrakenWsChannel::Executions | KrakenWsChannel::Balances
596 );
597 let token = if is_private {
598 Some(self.auth_token.read().await.clone().ok_or_else(|| {
599 KrakenWsError::AuthenticationError(
600 "Authentication token required for private channels. Call authenticate() first"
601 .to_string(),
602 )
603 })?)
604 } else {
605 None
606 };
607
608 let req_id = self.get_next_req_id().await;
609 let request = KrakenWsRequest {
610 method: KrakenWsMethod::Subscribe,
611 params: Some(KrakenWsParams {
612 channel,
613 symbol: Some(symbols_to_subscribe.clone()),
614 snapshot: None,
615 depth,
616 interval: None,
617 token,
618 snap_orders: None,
619 snap_trades: None,
620 }),
621 req_id: Some(req_id),
622 };
623
624 self.send_request(&request).await?;
625
626 for symbol in &symbols_to_subscribe {
627 let key = format!("{:?}:{}", channel, symbol);
628 self.subscriptions.confirm_subscribe(&key);
629 }
630
631 Ok(())
632 }
633
634 async fn subscribe_with_interval(
636 &self,
637 channel: KrakenWsChannel,
638 symbols: Vec<Ustr>,
639 interval: u32,
640 ) -> Result<(), KrakenWsError> {
641 let mut symbols_to_subscribe = Vec::new();
642 for symbol in &symbols {
643 let key = format!("{channel:?}:{symbol}:{interval}");
644 if self.subscriptions.add_reference(&key) {
645 self.subscriptions.mark_subscribe(&key);
646 symbols_to_subscribe.push(*symbol);
647 }
648 }
649
650 if symbols_to_subscribe.is_empty() {
651 return Ok(());
652 }
653
654 let req_id = self.get_next_req_id().await;
655 let request = KrakenWsRequest {
656 method: KrakenWsMethod::Subscribe,
657 params: Some(KrakenWsParams {
658 channel,
659 symbol: Some(symbols_to_subscribe.clone()),
660 snapshot: None,
661 depth: None,
662 interval: Some(interval),
663 token: None,
664 snap_orders: None,
665 snap_trades: None,
666 }),
667 req_id: Some(req_id),
668 };
669
670 self.send_request(&request).await?;
671
672 for symbol in &symbols_to_subscribe {
673 let key = format!("{channel:?}:{symbol}:{interval}");
674 self.subscriptions.confirm_subscribe(&key);
675 }
676
677 Ok(())
678 }
679
680 async fn unsubscribe_with_interval(
682 &self,
683 channel: KrakenWsChannel,
684 symbols: Vec<Ustr>,
685 interval: u32,
686 ) -> Result<(), KrakenWsError> {
687 let mut symbols_to_unsubscribe = Vec::new();
688 for symbol in &symbols {
689 let key = format!("{channel:?}:{symbol}:{interval}");
690 if self.subscriptions.remove_reference(&key) {
691 self.subscriptions.mark_unsubscribe(&key);
692 symbols_to_unsubscribe.push(*symbol);
693 }
694 }
695
696 if symbols_to_unsubscribe.is_empty() {
697 return Ok(());
698 }
699
700 let req_id = self.get_next_req_id().await;
701 let request = KrakenWsRequest {
702 method: KrakenWsMethod::Unsubscribe,
703 params: Some(KrakenWsParams {
704 channel,
705 symbol: Some(symbols_to_unsubscribe.clone()),
706 snapshot: None,
707 depth: None,
708 interval: Some(interval),
709 token: None,
710 snap_orders: None,
711 snap_trades: None,
712 }),
713 req_id: Some(req_id),
714 };
715
716 self.send_request(&request).await?;
717
718 for symbol in &symbols_to_unsubscribe {
719 let key = format!("{channel:?}:{symbol}:{interval}");
720 self.subscriptions.confirm_unsubscribe(&key);
721 }
722
723 Ok(())
724 }
725
726 pub async fn unsubscribe(
728 &self,
729 channel: KrakenWsChannel,
730 symbols: Vec<Ustr>,
731 ) -> Result<(), KrakenWsError> {
732 let mut symbols_to_unsubscribe = Vec::new();
733 for symbol in &symbols {
734 let key = format!("{:?}:{}", channel, symbol);
735 if self.subscriptions.remove_reference(&key) {
736 self.subscriptions.mark_unsubscribe(&key);
737 symbols_to_unsubscribe.push(*symbol);
738 } else {
739 tracing::debug!(
740 "Channel {:?} symbol {} still has active subscriptions, not unsubscribing",
741 channel,
742 symbol
743 );
744 }
745 }
746
747 if symbols_to_unsubscribe.is_empty() {
748 return Ok(());
749 }
750
751 let is_private = matches!(
752 channel,
753 KrakenWsChannel::Executions | KrakenWsChannel::Balances
754 );
755 let token = if is_private {
756 Some(self.auth_token.read().await.clone().ok_or_else(|| {
757 KrakenWsError::AuthenticationError(
758 "Authentication token required for private channels. Call authenticate() first"
759 .to_string(),
760 )
761 })?)
762 } else {
763 None
764 };
765
766 let req_id = self.get_next_req_id().await;
767 let request = KrakenWsRequest {
768 method: KrakenWsMethod::Unsubscribe,
769 params: Some(KrakenWsParams {
770 channel,
771 symbol: Some(symbols_to_unsubscribe.clone()),
772 snapshot: None,
773 depth: None,
774 interval: None,
775 token,
776 snap_orders: None,
777 snap_trades: None,
778 }),
779 req_id: Some(req_id),
780 };
781
782 self.send_request(&request).await?;
783
784 for symbol in &symbols_to_unsubscribe {
785 let key = format!("{:?}:{}", channel, symbol);
786 self.subscriptions.confirm_unsubscribe(&key);
787 }
788
789 Ok(())
790 }
791
792 pub async fn send_ping(&self) -> Result<(), KrakenWsError> {
794 let req_id = self.get_next_req_id().await;
795
796 let request = KrakenWsRequest {
797 method: KrakenWsMethod::Ping,
798 params: None,
799 req_id: Some(req_id),
800 };
801
802 self.send_request(&request).await
803 }
804
805 async fn send_request(&self, request: &KrakenWsRequest) -> Result<(), KrakenWsError> {
806 let payload =
807 serde_json::to_string(request).map_err(|e| KrakenWsError::JsonError(e.to_string()))?;
808
809 tracing::trace!("Sending message: {payload}");
810
811 self.cmd_tx
812 .read()
813 .await
814 .send(SpotHandlerCommand::SendText { payload })
815 .map_err(|e| KrakenWsError::ConnectionError(format!("Failed to send request: {e}")))?;
816
817 Ok(())
818 }
819
820 pub fn is_connected(&self) -> bool {
822 let connection_mode_arc = self.connection_mode.load();
823 !ConnectionMode::from_atomic(&connection_mode_arc).is_closed()
824 }
825
826 pub fn is_active(&self) -> bool {
828 let connection_mode_arc = self.connection_mode.load();
829 ConnectionMode::from_atomic(&connection_mode_arc).is_active()
830 && !self.signal.load(Ordering::Relaxed)
831 }
832
833 pub fn is_closed(&self) -> bool {
835 let connection_mode_arc = self.connection_mode.load();
836 ConnectionMode::from_atomic(&connection_mode_arc).is_closed()
837 || self.signal.load(Ordering::Relaxed)
838 }
839
840 pub fn url(&self) -> &str {
842 &self.url
843 }
844
845 pub fn get_subscriptions(&self) -> Vec<String> {
847 self.subscriptions.all_topics()
848 }
849
850 pub fn stream(&mut self) -> impl futures_util::Stream<Item = NautilusWsMessage> + use<> {
852 let rx = self
853 .out_rx
854 .take()
855 .expect("Stream receiver already taken or client not connected");
856 let mut rx = Arc::try_unwrap(rx).expect("Cannot take ownership - other references exist");
857 async_stream::stream! {
858 while let Some(msg) = rx.recv().await {
859 yield msg;
860 }
861 }
862 }
863
864 pub async fn subscribe_book(
866 &self,
867 instrument_id: InstrumentId,
868 depth: Option<u32>,
869 ) -> Result<(), KrakenWsError> {
870 let symbol = instrument_id.symbol.inner();
872 let book_key = format!("book:{symbol}");
873
874 if !self.subscriptions.add_reference(&book_key) {
875 return Ok(());
876 }
877
878 self.subscriptions.mark_subscribe(&book_key);
879 self.subscriptions.confirm_subscribe(&book_key);
880
881 self.subscribe(KrakenWsChannel::Book, vec![symbol], depth)
882 .await
883 }
884
885 pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> Result<(), KrakenWsError> {
890 let symbol = instrument_id.symbol.inner();
891 let quotes_key = format!("quotes:{symbol}");
892
893 if !self.subscriptions.add_reference("es_key) {
894 return Ok(());
895 }
896
897 self.subscriptions.mark_subscribe("es_key);
898 self.subscriptions.confirm_subscribe("es_key);
899 self.ensure_book_subscribed(symbol).await
900 }
901
902 pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> Result<(), KrakenWsError> {
904 let symbol = instrument_id.symbol.inner();
905 self.subscribe(KrakenWsChannel::Trade, vec![symbol], None)
906 .await
907 }
908
909 pub async fn subscribe_bars(&self, bar_type: BarType) -> Result<(), KrakenWsError> {
915 let symbol = bar_type.instrument_id().symbol.inner();
916 let interval = bar_type_to_ws_interval(bar_type)?;
917 self.subscribe_with_interval(KrakenWsChannel::Ohlc, vec![symbol], interval)
918 .await
919 }
920
921 pub async fn subscribe_executions(
925 &self,
926 snap_orders: bool,
927 snap_trades: bool,
928 ) -> Result<(), KrakenWsError> {
929 let req_id = self.get_next_req_id().await;
930
931 let token = self.auth_token.read().await.clone().ok_or_else(|| {
932 KrakenWsError::AuthenticationError(
933 "Authentication token required for executions channel. Call authenticate() first"
934 .to_string(),
935 )
936 })?;
937
938 let request = KrakenWsRequest {
939 method: KrakenWsMethod::Subscribe,
940 params: Some(KrakenWsParams {
941 channel: KrakenWsChannel::Executions,
942 symbol: None,
943 snapshot: None,
944 depth: None,
945 interval: None,
946 token: Some(token),
947 snap_orders: Some(snap_orders),
948 snap_trades: Some(snap_trades),
949 }),
950 req_id: Some(req_id),
951 };
952
953 self.send_request(&request).await?;
954
955 let key = "executions";
956 if self.subscriptions.add_reference(key) {
957 self.subscriptions.mark_subscribe(key);
958 self.subscriptions.confirm_subscribe(key);
959 }
960
961 Ok(())
962 }
963
964 pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> Result<(), KrakenWsError> {
968 let symbol = instrument_id.symbol.inner();
969 let book_key = format!("book:{symbol}");
970
971 if !self.subscriptions.remove_reference(&book_key) {
972 return Ok(());
973 }
974
975 self.subscriptions.mark_unsubscribe(&book_key);
976 self.subscriptions.confirm_unsubscribe(&book_key);
977 self.maybe_unsubscribe_book(symbol).await
978 }
979
980 pub async fn unsubscribe_quotes(
982 &self,
983 instrument_id: InstrumentId,
984 ) -> Result<(), KrakenWsError> {
985 let symbol = instrument_id.symbol.inner();
986 let quotes_key = format!("quotes:{symbol}");
987
988 if !self.subscriptions.remove_reference("es_key) {
989 return Ok(());
990 }
991
992 self.subscriptions.mark_unsubscribe("es_key);
993 self.subscriptions.confirm_unsubscribe("es_key);
994 self.maybe_unsubscribe_book(symbol).await
995 }
996
997 pub async fn unsubscribe_trades(
999 &self,
1000 instrument_id: InstrumentId,
1001 ) -> Result<(), KrakenWsError> {
1002 let symbol = instrument_id.symbol.inner();
1003 self.unsubscribe(KrakenWsChannel::Trade, vec![symbol]).await
1004 }
1005
1006 pub async fn unsubscribe_bars(&self, bar_type: BarType) -> Result<(), KrakenWsError> {
1012 let symbol = bar_type.instrument_id().symbol.inner();
1013 let interval = bar_type_to_ws_interval(bar_type)?;
1014 self.unsubscribe_with_interval(KrakenWsChannel::Ohlc, vec![symbol], interval)
1015 .await
1016 }
1017
1018 async fn ensure_book_subscribed(&self, symbol: Ustr) -> Result<(), KrakenWsError> {
1022 self.subscribe(KrakenWsChannel::Book, vec![symbol], Some(10))
1023 .await
1024 }
1025
1026 async fn maybe_unsubscribe_book(&self, symbol: Ustr) -> Result<(), KrakenWsError> {
1030 self.unsubscribe(KrakenWsChannel::Book, vec![symbol]).await
1031 }
1032}
1033
1034async fn refresh_auth_token(config: &KrakenDataClientConfig) -> Result<String, KrakenWsError> {
1036 let api_key = config
1037 .api_key
1038 .clone()
1039 .ok_or_else(|| KrakenWsError::AuthenticationError("Missing API key".to_string()))?;
1040 let api_secret = config
1041 .api_secret
1042 .clone()
1043 .ok_or_else(|| KrakenWsError::AuthenticationError("Missing API secret".to_string()))?;
1044
1045 let http_client = KrakenSpotHttpClient::with_credentials(
1046 api_key,
1047 api_secret,
1048 config.environment,
1049 Some(config.http_base_url()),
1050 config.timeout_secs,
1051 None,
1052 None,
1053 None,
1054 config.http_proxy.clone(),
1055 config.max_requests_per_second,
1056 )
1057 .map_err(|e| {
1058 KrakenWsError::AuthenticationError(format!("Failed to create HTTP client: {e}"))
1059 })?;
1060
1061 let ws_token = http_client.get_websockets_token().await.map_err(|e| {
1062 KrakenWsError::AuthenticationError(format!("Failed to get WebSocket token: {e}"))
1063 })?;
1064
1065 tracing::debug!(
1066 token_length = ws_token.token.len(),
1067 expires = ws_token.expires,
1068 "WebSocket authentication token refreshed"
1069 );
1070
1071 Ok(ws_token.token)
1072}
1073
1074fn bar_type_to_ws_interval(bar_type: BarType) -> Result<u32, KrakenWsError> {
1079 let spec = bar_type.spec();
1080 let step = spec.step.get() as u32;
1081
1082 let base_minutes = match spec.aggregation {
1083 BarAggregation::Minute => 1,
1084 BarAggregation::Hour => 60,
1085 BarAggregation::Day => 1440,
1086 BarAggregation::Week => 10080,
1087 other => {
1088 return Err(KrakenWsError::SubscriptionError(format!(
1089 "Unsupported bar aggregation for Kraken OHLC streaming: {other:?}"
1090 )));
1091 }
1092 };
1093
1094 let interval = base_minutes * step;
1095
1096 const VALID_INTERVALS: [u32; 9] = [1, 5, 15, 30, 60, 240, 1440, 10080, 21600];
1097 if !VALID_INTERVALS.contains(&interval) {
1098 return Err(KrakenWsError::SubscriptionError(format!(
1099 "Invalid bar interval {interval} minutes for Kraken OHLC streaming. \
1100 Supported intervals: 1, 5, 15, 30, 60, 240, 1440, 10080, 21600"
1101 )));
1102 }
1103
1104 Ok(interval)
1105}