Skip to main content

nautilus_kraken/websocket/spot_v2/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket client for the Kraken v2 streaming API.
17
18use std::sync::{
19    Arc,
20    atomic::{AtomicBool, AtomicU8, Ordering},
21};
22
23use arc_swap::ArcSwap;
24use nautilus_common::live::get_runtime;
25use nautilus_model::{
26    data::BarType,
27    enums::BarAggregation,
28    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId},
29    instruments::InstrumentAny,
30};
31use nautilus_network::{
32    mode::ConnectionMode,
33    websocket::{
34        AuthTracker, SubscriptionState, WebSocketClient, WebSocketConfig, channel_message_handler,
35    },
36};
37use tokio_util::sync::CancellationToken;
38use ustr::Ustr;
39
40/// Topic delimiter for Kraken Spot v2 WebSocket subscriptions.
41///
42/// Topics use colon format: `channel:symbol` (e.g., `Trade:ETH/USD`).
43pub const KRAKEN_SPOT_WS_TOPIC_DELIMITER: char = ':';
44
45use super::{
46    enums::{KrakenWsChannel, KrakenWsMethod},
47    handler::{SpotFeedHandler, SpotHandlerCommand},
48    messages::{KrakenWsParams, KrakenWsRequest, NautilusWsMessage},
49};
50use crate::{
51    common::parse::normalize_spot_symbol, config::KrakenDataClientConfig,
52    http::KrakenSpotHttpClient, websocket::error::KrakenWsError,
53};
54
55const WS_PING_MSG: &str = r#"{"method":"ping"}"#;
56
57/// WebSocket client for the Kraken Spot v2 streaming API.
58#[derive(Debug)]
59#[cfg_attr(
60    feature = "python",
61    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.kraken")
62)]
63pub struct KrakenSpotWebSocketClient {
64    url: String,
65    config: KrakenDataClientConfig,
66    signal: Arc<AtomicBool>,
67    connection_mode: Arc<ArcSwap<AtomicU8>>,
68    cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<SpotHandlerCommand>>>,
69    out_rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>>,
70    task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
71    subscriptions: SubscriptionState,
72    auth_tracker: AuthTracker,
73    cancellation_token: CancellationToken,
74    req_id_counter: Arc<tokio::sync::RwLock<u64>>,
75    auth_token: Arc<tokio::sync::RwLock<Option<String>>>,
76}
77
78impl Clone for KrakenSpotWebSocketClient {
79    fn clone(&self) -> Self {
80        Self {
81            url: self.url.clone(),
82            config: self.config.clone(),
83            signal: Arc::clone(&self.signal),
84            connection_mode: Arc::clone(&self.connection_mode),
85            cmd_tx: Arc::clone(&self.cmd_tx),
86            out_rx: self.out_rx.clone(),
87            task_handle: self.task_handle.clone(),
88            subscriptions: self.subscriptions.clone(),
89            auth_tracker: self.auth_tracker.clone(),
90            cancellation_token: self.cancellation_token.clone(),
91            req_id_counter: self.req_id_counter.clone(),
92            auth_token: self.auth_token.clone(),
93        }
94    }
95}
96
97impl KrakenSpotWebSocketClient {
98    /// Creates a new client with the given configuration.
99    pub fn new(config: KrakenDataClientConfig, cancellation_token: CancellationToken) -> Self {
100        // Prefer private URL if explicitly set (for authenticated endpoints)
101        let url = if config.ws_private_url.is_some() {
102            config.ws_private_url()
103        } else {
104            config.ws_public_url()
105        };
106        let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<SpotHandlerCommand>();
107        let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
108        let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
109
110        Self {
111            url,
112            config,
113            signal: Arc::new(AtomicBool::new(false)),
114            connection_mode,
115            cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
116            out_rx: None,
117            task_handle: None,
118            subscriptions: SubscriptionState::new(KRAKEN_SPOT_WS_TOPIC_DELIMITER),
119            auth_tracker: AuthTracker::new(),
120            cancellation_token,
121            req_id_counter: Arc::new(tokio::sync::RwLock::new(0)),
122            auth_token: Arc::new(tokio::sync::RwLock::new(None)),
123        }
124    }
125
126    async fn get_next_req_id(&self) -> u64 {
127        let mut counter = self.req_id_counter.write().await;
128        *counter += 1;
129        *counter
130    }
131
132    /// Connects to the WebSocket server.
133    pub async fn connect(&mut self) -> Result<(), KrakenWsError> {
134        log::debug!("Connecting to {}", self.url);
135
136        self.signal.store(false, Ordering::Relaxed);
137
138        let (raw_handler, raw_rx) = channel_message_handler();
139
140        let ws_config = WebSocketConfig {
141            url: self.url.clone(),
142            headers: vec![],
143            heartbeat: self.config.heartbeat_interval_secs,
144            heartbeat_msg: Some(WS_PING_MSG.to_string()),
145            reconnect_timeout_ms: Some(5_000),
146            reconnect_delay_initial_ms: Some(500),
147            reconnect_delay_max_ms: Some(5_000),
148            reconnect_backoff_factor: Some(1.5),
149            reconnect_jitter_ms: Some(250),
150            reconnect_max_attempts: None,
151        };
152
153        let ws_client = WebSocketClient::connect(
154            ws_config,
155            Some(raw_handler),
156            None,   // ping_handler
157            None,   // post_reconnection
158            vec![], // keyed_quotas
159            None,   // default_quota
160        )
161        .await
162        .map_err(|e| KrakenWsError::ConnectionError(e.to_string()))?;
163
164        // Share connection state across clones via ArcSwap
165        self.connection_mode
166            .store(ws_client.connection_mode_atomic());
167
168        let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
169        self.out_rx = Some(Arc::new(out_rx));
170
171        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<SpotHandlerCommand>();
172        *self.cmd_tx.write().await = cmd_tx.clone();
173
174        if let Err(e) = cmd_tx.send(SpotHandlerCommand::SetClient(ws_client)) {
175            return Err(KrakenWsError::ConnectionError(format!(
176                "Failed to send WebSocketClient to handler: {e}"
177            )));
178        }
179
180        let signal = self.signal.clone();
181        let subscriptions = self.subscriptions.clone();
182        let config_for_reconnect = self.config.clone();
183        let auth_token_for_reconnect = self.auth_token.clone();
184        let req_id_counter_for_reconnect = self.req_id_counter.clone();
185        let cmd_tx_for_reconnect = cmd_tx.clone();
186
187        let stream_handle = get_runtime().spawn(async move {
188            let mut handler =
189                SpotFeedHandler::new(signal.clone(), cmd_rx, raw_rx, subscriptions.clone());
190
191            loop {
192                match handler.next().await {
193                    Some(NautilusWsMessage::Reconnected) => {
194                        if signal.load(Ordering::Relaxed) {
195                            continue;
196                        }
197                        log::info!("WebSocket reconnected, resubscribing");
198
199                        // Mark all confirmed subscriptions as failed to transition to pending
200                        let confirmed_topics = subscriptions.all_topics();
201                        for topic in &confirmed_topics {
202                            subscriptions.mark_failure(topic);
203                        }
204
205                        let topics = subscriptions.all_topics();
206                        if topics.is_empty() {
207                            log::debug!("No subscriptions to restore after reconnection");
208                        } else {
209                            // Check if we need to re-authenticate (had a token before)
210                            let had_auth = auth_token_for_reconnect.read().await.is_some();
211
212                            if had_auth && config_for_reconnect.has_api_credentials() {
213                                log::debug!("Re-authenticating after reconnect");
214
215                                match refresh_auth_token(&config_for_reconnect).await {
216                                    Ok(new_token) => {
217                                        *auth_token_for_reconnect.write().await = Some(new_token);
218                                        log::debug!("Re-authentication successful");
219                                    }
220                                    Err(e) => {
221                                        log::error!(
222                                            "Failed to re-authenticate after reconnect: {e}"
223                                        );
224                                        // Clear auth token since it's invalid
225                                        *auth_token_for_reconnect.write().await = None;
226                                    }
227                                }
228                            }
229
230                            log::info!("Resubscribing after reconnection: count={}", topics.len());
231
232                            // Replay subscriptions
233                            for topic in &topics {
234                                let auth_token = auth_token_for_reconnect.read().await.clone();
235
236                                // Handle special "executions" topic first
237                                if topic == "executions" {
238                                    if let Some(ref token) = auth_token {
239                                        let mut counter =
240                                            req_id_counter_for_reconnect.write().await;
241                                        *counter += 1;
242                                        let req_id = *counter;
243
244                                        let request = KrakenWsRequest {
245                                            method: KrakenWsMethod::Subscribe,
246                                            params: Some(KrakenWsParams {
247                                                channel: KrakenWsChannel::Executions,
248                                                symbol: None,
249                                                snapshot: None,
250                                                depth: None,
251                                                interval: None,
252                                                event_trigger: None,
253                                                token: Some(token.clone()),
254                                                snap_orders: Some(true),
255                                                snap_trades: Some(true),
256                                            }),
257                                            req_id: Some(req_id),
258                                        };
259
260                                        if let Ok(payload) = serde_json::to_string(&request)
261                                            && let Err(e) = cmd_tx_for_reconnect
262                                                .send(SpotHandlerCommand::SendText { payload })
263                                        {
264                                            log::error!(
265                                                "Failed to send executions resubscribe: {e}"
266                                            );
267                                        }
268
269                                        subscriptions.mark_subscribe(topic);
270                                    } else {
271                                        log::warn!(
272                                            "Cannot resubscribe to executions: no auth token"
273                                        );
274                                    }
275                                    continue;
276                                }
277
278                                // Parse topic format: "channel:symbol" or "channel:symbol:interval"
279                                let parts: Vec<&str> = topic.splitn(3, ':').collect();
280                                if parts.len() < 2 {
281                                    log::warn!(
282                                        "Invalid topic format for resubscribe: topic={topic}"
283                                    );
284                                    continue;
285                                }
286
287                                let channel_str = parts[0];
288                                let channel = match channel_str {
289                                    "book" => Some(KrakenWsChannel::Book),
290                                    "trade" => Some(KrakenWsChannel::Trade),
291                                    "ticker" => Some(KrakenWsChannel::Ticker),
292                                    "quotes" => Some(KrakenWsChannel::Ticker),
293                                    "ohlc" => Some(KrakenWsChannel::Ohlc),
294                                    _ => None,
295                                };
296
297                                let Some(channel) = channel else {
298                                    log::warn!("Unknown channel for resubscribe: topic={topic}");
299                                    continue;
300                                };
301
302                                let mut counter = req_id_counter_for_reconnect.write().await;
303                                *counter += 1;
304                                let req_id = *counter;
305
306                                // Extract symbol and optional interval
307                                let (symbol_str, interval) = if parts.len() == 3 {
308                                    // Format: "ohlc:BTC/USD:1" -> symbol="BTC/USD", interval=1
309                                    (parts[1], parts[2].parse::<u32>().ok())
310                                } else {
311                                    // Format: "book:BTC/USD" -> symbol="BTC/USD", interval=None
312                                    (parts[1], None)
313                                };
314
315                                // Quotes use event_trigger: "bbo", raw ticker does not
316                                let event_trigger = if channel_str == "quotes" {
317                                    Some("bbo".to_string())
318                                } else {
319                                    None
320                                };
321
322                                // Disable snapshots for OHLC to avoid historical bar flood
323                                let snapshot = if channel == KrakenWsChannel::Ohlc {
324                                    Some(false)
325                                } else {
326                                    None
327                                };
328
329                                let request = KrakenWsRequest {
330                                    method: KrakenWsMethod::Subscribe,
331                                    params: Some(KrakenWsParams {
332                                        channel,
333                                        symbol: Some(vec![Ustr::from(symbol_str)]),
334                                        snapshot,
335                                        depth: None,
336                                        interval,
337                                        event_trigger,
338                                        token: None,
339                                        snap_orders: None,
340                                        snap_trades: None,
341                                    }),
342                                    req_id: Some(req_id),
343                                };
344
345                                if let Ok(payload) = serde_json::to_string(&request)
346                                    && let Err(e) = cmd_tx_for_reconnect
347                                        .send(SpotHandlerCommand::SendText { payload })
348                                {
349                                    log::error!(
350                                        "Failed to send resubscribe command: error={e}, \
351                                        topic={topic}"
352                                    );
353                                }
354
355                                subscriptions.mark_subscribe(topic);
356                            }
357                        }
358
359                        if out_tx.send(NautilusWsMessage::Reconnected).is_err() {
360                            log::error!("Failed to send message (receiver dropped)");
361                            break;
362                        }
363                        continue;
364                    }
365                    Some(msg) => {
366                        if out_tx.send(msg).is_err() {
367                            log::error!("Failed to send message (receiver dropped)");
368                            break;
369                        }
370                    }
371                    None => {
372                        if handler.is_stopped() {
373                            log::debug!("Stop signal received, ending message processing");
374                            break;
375                        }
376                        log::warn!("WebSocket stream ended unexpectedly");
377                        break;
378                    }
379                }
380            }
381
382            log::debug!("Handler task exiting");
383        });
384
385        self.task_handle = Some(Arc::new(stream_handle));
386
387        log::debug!("WebSocket connected successfully");
388        Ok(())
389    }
390
391    /// Disconnects from the WebSocket server.
392    pub async fn disconnect(&mut self) -> Result<(), KrakenWsError> {
393        log::debug!("Disconnecting WebSocket");
394
395        self.signal.store(true, Ordering::Relaxed);
396
397        if let Err(e) = self
398            .cmd_tx
399            .read()
400            .await
401            .send(SpotHandlerCommand::Disconnect)
402        {
403            log::debug!(
404                "Failed to send disconnect command (handler may already be shut down): {e}"
405            );
406        }
407
408        if let Some(task_handle) = self.task_handle.take() {
409            match Arc::try_unwrap(task_handle) {
410                Ok(handle) => {
411                    log::debug!("Waiting for task handle to complete");
412                    match tokio::time::timeout(tokio::time::Duration::from_secs(2), handle).await {
413                        Ok(Ok(())) => log::debug!("Task handle completed successfully"),
414                        Ok(Err(e)) => log::error!("Task handle encountered an error: {e:?}"),
415                        Err(_) => {
416                            log::warn!(
417                                "Timeout waiting for task handle, task may still be running"
418                            );
419                        }
420                    }
421                }
422                Err(arc_handle) => {
423                    log::debug!(
424                        "Cannot take ownership of task handle - other references exist, aborting task"
425                    );
426                    arc_handle.abort();
427                }
428            }
429        } else {
430            log::debug!("No task handle to await");
431        }
432
433        self.subscriptions.clear();
434        self.auth_tracker.fail("Disconnected");
435
436        Ok(())
437    }
438
439    /// Closes the WebSocket connection.
440    pub async fn close(&mut self) -> Result<(), KrakenWsError> {
441        self.disconnect().await
442    }
443
444    /// Waits until the connection is active or timeout.
445    pub async fn wait_until_active(&self, timeout_secs: f64) -> Result<(), KrakenWsError> {
446        let timeout = tokio::time::Duration::from_secs_f64(timeout_secs);
447
448        tokio::time::timeout(timeout, async {
449            while !self.is_active() {
450                tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
451            }
452        })
453        .await
454        .map_err(|_| {
455            KrakenWsError::ConnectionError(format!(
456                "WebSocket connection timeout after {timeout_secs} seconds"
457            ))
458        })?;
459
460        Ok(())
461    }
462
463    /// Authenticates with the Kraken API to enable private subscriptions.
464    pub async fn authenticate(&self) -> Result<(), KrakenWsError> {
465        if !self.config.has_api_credentials() {
466            return Err(KrakenWsError::AuthenticationError(
467                "API credentials required for authentication".to_string(),
468            ));
469        }
470
471        let api_key = self
472            .config
473            .api_key
474            .clone()
475            .ok_or_else(|| KrakenWsError::AuthenticationError("Missing API key".to_string()))?;
476        let api_secret =
477            self.config.api_secret.clone().ok_or_else(|| {
478                KrakenWsError::AuthenticationError("Missing API secret".to_string())
479            })?;
480
481        let http_client = KrakenSpotHttpClient::with_credentials(
482            api_key,
483            api_secret,
484            self.config.environment,
485            Some(self.config.http_base_url()),
486            self.config.timeout_secs,
487            None,
488            None,
489            None,
490            self.config.http_proxy.clone(),
491            self.config.max_requests_per_second,
492        )
493        .map_err(|e| {
494            KrakenWsError::AuthenticationError(format!("Failed to create HTTP client: {e}"))
495        })?;
496
497        let ws_token = http_client.get_websockets_token().await.map_err(|e| {
498            KrakenWsError::AuthenticationError(format!("Failed to get WebSocket token: {e}"))
499        })?;
500
501        log::debug!(
502            "WebSocket authentication token received: token_length={}, expires={}",
503            ws_token.token.len(),
504            ws_token.expires
505        );
506
507        let mut auth_token = self.auth_token.write().await;
508        *auth_token = Some(ws_token.token);
509
510        Ok(())
511    }
512
513    /// Caches multiple instruments for symbol lookup.
514    pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
515        // Before connect() the handler isn't running; this send will fail and that's expected
516        if let Ok(cmd_tx) = self.cmd_tx.try_read()
517            && let Err(e) = cmd_tx.send(SpotHandlerCommand::InitializeInstruments(instruments))
518        {
519            log::debug!("Failed to send instruments to handler: {e}");
520        }
521    }
522
523    /// Caches a single instrument for symbol lookup.
524    pub fn cache_instrument(&self, instrument: InstrumentAny) {
525        // Before connect() the handler isn't running; this send will fail and that's expected
526        if let Ok(cmd_tx) = self.cmd_tx.try_read()
527            && let Err(e) = cmd_tx.send(SpotHandlerCommand::UpdateInstrument(instrument))
528        {
529            log::debug!("Failed to send instrument update to handler: {e}");
530        }
531    }
532
533    /// Sets the account ID for execution reports.
534    ///
535    /// Must be called before subscribing to executions to properly generate
536    /// OrderStatusReport and FillReport objects.
537    pub fn set_account_id(&self, account_id: AccountId) {
538        if let Ok(cmd_tx) = self.cmd_tx.try_read()
539            && let Err(e) = cmd_tx.send(SpotHandlerCommand::SetAccountId(account_id))
540        {
541            log::debug!("Failed to send account ID to handler: {e}");
542        }
543    }
544
545    /// Caches order info for order tracking.
546    ///
547    /// This should be called BEFORE submitting an order via HTTP to handle the
548    /// race condition where WebSocket execution messages arrive before the
549    /// HTTP response (which contains the venue_order_id).
550    pub fn cache_client_order(
551        &self,
552        client_order_id: ClientOrderId,
553        instrument_id: InstrumentId,
554        trader_id: TraderId,
555        strategy_id: StrategyId,
556    ) {
557        if let Ok(cmd_tx) = self.cmd_tx.try_read()
558            && let Err(e) = cmd_tx.send(SpotHandlerCommand::CacheClientOrder {
559                client_order_id,
560                instrument_id,
561                trader_id,
562                strategy_id,
563            })
564        {
565            log::debug!("Failed to send cache client order command to handler: {e}");
566        }
567    }
568
569    /// Cancels all pending requests.
570    pub fn cancel_all_requests(&self) {
571        self.cancellation_token.cancel();
572    }
573
574    /// Returns the cancellation token for this client.
575    pub fn cancellation_token(&self) -> &CancellationToken {
576        &self.cancellation_token
577    }
578
579    /// Subscribes to a channel for the given symbols.
580    pub async fn subscribe(
581        &self,
582        channel: KrakenWsChannel,
583        symbols: Vec<Ustr>,
584        depth: Option<u32>,
585    ) -> Result<(), KrakenWsError> {
586        let mut symbols_to_subscribe = Vec::new();
587        let channel_str = channel.as_ref();
588        for symbol in &symbols {
589            let key = format!("{channel_str}:{symbol}");
590            if self.subscriptions.add_reference(&key) {
591                self.subscriptions.mark_subscribe(&key);
592                symbols_to_subscribe.push(*symbol);
593            }
594        }
595
596        if symbols_to_subscribe.is_empty() {
597            return Ok(());
598        }
599
600        let is_private = matches!(
601            channel,
602            KrakenWsChannel::Executions | KrakenWsChannel::Balances
603        );
604        let token = if is_private {
605            Some(self.auth_token.read().await.clone().ok_or_else(|| {
606                KrakenWsError::AuthenticationError(
607                    "Authentication token required for private channels. Call authenticate() first"
608                        .to_string(),
609                )
610            })?)
611        } else {
612            None
613        };
614
615        let req_id = self.get_next_req_id().await;
616        let request = KrakenWsRequest {
617            method: KrakenWsMethod::Subscribe,
618            params: Some(KrakenWsParams {
619                channel,
620                symbol: Some(symbols_to_subscribe.clone()),
621                snapshot: None,
622                depth,
623                interval: None,
624                event_trigger: None,
625                token,
626                snap_orders: None,
627                snap_trades: None,
628            }),
629            req_id: Some(req_id),
630        };
631
632        self.send_request(&request).await?;
633
634        for symbol in &symbols_to_subscribe {
635            let key = format!("{channel_str}:{symbol}");
636            self.subscriptions.confirm_subscribe(&key);
637        }
638
639        Ok(())
640    }
641
642    /// Subscribes to a channel with a specific interval (for OHLC).
643    async fn subscribe_with_interval(
644        &self,
645        channel: KrakenWsChannel,
646        symbols: Vec<Ustr>,
647        interval: u32,
648    ) -> Result<(), KrakenWsError> {
649        let mut symbols_to_subscribe = Vec::new();
650        let channel_str = channel.as_ref();
651        for symbol in &symbols {
652            let key = format!("{channel_str}:{symbol}:{interval}");
653            if self.subscriptions.add_reference(&key) {
654                self.subscriptions.mark_subscribe(&key);
655                symbols_to_subscribe.push(*symbol);
656            }
657        }
658
659        if symbols_to_subscribe.is_empty() {
660            return Ok(());
661        }
662
663        let req_id = self.get_next_req_id().await;
664        let request = KrakenWsRequest {
665            method: KrakenWsMethod::Subscribe,
666            params: Some(KrakenWsParams {
667                channel,
668                symbol: Some(symbols_to_subscribe.clone()),
669                snapshot: Some(false),
670                depth: None,
671                interval: Some(interval),
672                event_trigger: None,
673                token: None,
674                snap_orders: None,
675                snap_trades: None,
676            }),
677            req_id: Some(req_id),
678        };
679
680        self.send_request(&request).await?;
681
682        for symbol in &symbols_to_subscribe {
683            let key = format!("{channel_str}:{symbol}:{interval}");
684            self.subscriptions.confirm_subscribe(&key);
685        }
686
687        Ok(())
688    }
689
690    /// Unsubscribes from a channel with a specific interval (for OHLC).
691    async fn unsubscribe_with_interval(
692        &self,
693        channel: KrakenWsChannel,
694        symbols: Vec<Ustr>,
695        interval: u32,
696    ) -> Result<(), KrakenWsError> {
697        let mut symbols_to_unsubscribe = Vec::new();
698        let channel_str = channel.as_ref();
699        for symbol in &symbols {
700            let key = format!("{channel_str}:{symbol}:{interval}");
701            if self.subscriptions.remove_reference(&key) {
702                self.subscriptions.mark_unsubscribe(&key);
703                symbols_to_unsubscribe.push(*symbol);
704            }
705        }
706
707        if symbols_to_unsubscribe.is_empty() {
708            return Ok(());
709        }
710
711        let req_id = self.get_next_req_id().await;
712        let request = KrakenWsRequest {
713            method: KrakenWsMethod::Unsubscribe,
714            params: Some(KrakenWsParams {
715                channel,
716                symbol: Some(symbols_to_unsubscribe.clone()),
717                snapshot: None,
718                depth: None,
719                interval: Some(interval),
720                event_trigger: None,
721                token: None,
722                snap_orders: None,
723                snap_trades: None,
724            }),
725            req_id: Some(req_id),
726        };
727
728        self.send_request(&request).await?;
729
730        for symbol in &symbols_to_unsubscribe {
731            let key = format!("{channel_str}:{symbol}:{interval}");
732            self.subscriptions.confirm_unsubscribe(&key);
733        }
734
735        Ok(())
736    }
737
738    /// Unsubscribes from a channel for the given symbols.
739    pub async fn unsubscribe(
740        &self,
741        channel: KrakenWsChannel,
742        symbols: Vec<Ustr>,
743    ) -> Result<(), KrakenWsError> {
744        let mut symbols_to_unsubscribe = Vec::new();
745        let channel_str = channel.as_ref();
746        for symbol in &symbols {
747            let key = format!("{channel_str}:{symbol}");
748            if self.subscriptions.remove_reference(&key) {
749                self.subscriptions.mark_unsubscribe(&key);
750                symbols_to_unsubscribe.push(*symbol);
751            } else {
752                log::debug!(
753                    "Channel {channel_str} symbol {symbol} still has active subscriptions, not unsubscribing"
754                );
755            }
756        }
757
758        if symbols_to_unsubscribe.is_empty() {
759            return Ok(());
760        }
761
762        let is_private = matches!(
763            channel,
764            KrakenWsChannel::Executions | KrakenWsChannel::Balances
765        );
766        let token = if is_private {
767            Some(self.auth_token.read().await.clone().ok_or_else(|| {
768                KrakenWsError::AuthenticationError(
769                    "Authentication token required for private channels. Call authenticate() first"
770                        .to_string(),
771                )
772            })?)
773        } else {
774            None
775        };
776
777        let req_id = self.get_next_req_id().await;
778        let request = KrakenWsRequest {
779            method: KrakenWsMethod::Unsubscribe,
780            params: Some(KrakenWsParams {
781                channel,
782                symbol: Some(symbols_to_unsubscribe.clone()),
783                snapshot: None,
784                depth: None,
785                interval: None,
786                event_trigger: None,
787                token,
788                snap_orders: None,
789                snap_trades: None,
790            }),
791            req_id: Some(req_id),
792        };
793
794        self.send_request(&request).await?;
795
796        for symbol in &symbols_to_unsubscribe {
797            let key = format!("{channel_str}:{symbol}");
798            self.subscriptions.confirm_unsubscribe(&key);
799        }
800
801        Ok(())
802    }
803
804    /// Sends a ping message to keep the connection alive.
805    pub async fn send_ping(&self) -> Result<(), KrakenWsError> {
806        let req_id = self.get_next_req_id().await;
807
808        let request = KrakenWsRequest {
809            method: KrakenWsMethod::Ping,
810            params: None,
811            req_id: Some(req_id),
812        };
813
814        self.send_request(&request).await
815    }
816
817    async fn send_request(&self, request: &KrakenWsRequest) -> Result<(), KrakenWsError> {
818        let payload =
819            serde_json::to_string(request).map_err(|e| KrakenWsError::JsonError(e.to_string()))?;
820
821        log::trace!("Sending message: {payload}");
822
823        self.cmd_tx
824            .read()
825            .await
826            .send(SpotHandlerCommand::SendText { payload })
827            .map_err(|e| KrakenWsError::ConnectionError(format!("Failed to send request: {e}")))?;
828
829        Ok(())
830    }
831
832    /// Returns true if connected (not closed).
833    pub fn is_connected(&self) -> bool {
834        let connection_mode_arc = self.connection_mode.load();
835        !ConnectionMode::from_atomic(&connection_mode_arc).is_closed()
836    }
837
838    /// Returns true if the connection is active.
839    pub fn is_active(&self) -> bool {
840        let connection_mode_arc = self.connection_mode.load();
841        ConnectionMode::from_atomic(&connection_mode_arc).is_active()
842            && !self.signal.load(Ordering::Relaxed)
843    }
844
845    /// Returns true if the connection is closed.
846    pub fn is_closed(&self) -> bool {
847        let connection_mode_arc = self.connection_mode.load();
848        ConnectionMode::from_atomic(&connection_mode_arc).is_closed()
849            || self.signal.load(Ordering::Relaxed)
850    }
851
852    /// Returns the WebSocket URL.
853    pub fn url(&self) -> &str {
854        &self.url
855    }
856
857    /// Returns all active subscriptions.
858    pub fn get_subscriptions(&self) -> Vec<String> {
859        self.subscriptions.all_topics()
860    }
861
862    /// Returns a stream of WebSocket messages.
863    ///
864    /// # Errors
865    ///
866    /// Returns an error if:
867    /// - The stream receiver has already been taken
868    /// - Other clones of this client still hold references to the receiver
869    pub fn stream(
870        &mut self,
871    ) -> Result<impl futures_util::Stream<Item = NautilusWsMessage> + use<>, KrakenWsError> {
872        let rx = self.out_rx.take().ok_or_else(|| {
873            KrakenWsError::ChannelError(
874                "Stream receiver already taken or client not connected".to_string(),
875            )
876        })?;
877        let mut rx = Arc::try_unwrap(rx).map_err(|_| {
878            KrakenWsError::ChannelError(
879                "Cannot take ownership of stream - other client clones still hold references"
880                    .to_string(),
881            )
882        })?;
883        Ok(async_stream::stream! {
884            while let Some(msg) = rx.recv().await {
885                yield msg;
886            }
887        })
888    }
889
890    /// Subscribes to order book updates for the given instrument.
891    pub async fn subscribe_book(
892        &self,
893        instrument_id: InstrumentId,
894        depth: Option<u32>,
895    ) -> Result<(), KrakenWsError> {
896        let symbol = to_ws_v2_symbol(instrument_id.symbol.inner());
897        self.subscribe(KrakenWsChannel::Book, vec![symbol], depth)
898            .await
899    }
900
901    /// Subscribes to quote updates for the given instrument.
902    ///
903    /// Uses the Ticker channel with `event_trigger: "bbo"` for updates only on
904    /// best bid/offer changes.
905    pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> Result<(), KrakenWsError> {
906        let symbol = to_ws_v2_symbol(instrument_id.symbol.inner());
907        let key = format!("quotes:{symbol}");
908
909        if !self.subscriptions.add_reference(&key) {
910            return Ok(());
911        }
912
913        self.subscriptions.mark_subscribe(&key);
914
915        let req_id = self.get_next_req_id().await;
916        let request = KrakenWsRequest {
917            method: KrakenWsMethod::Subscribe,
918            params: Some(KrakenWsParams {
919                channel: KrakenWsChannel::Ticker,
920                symbol: Some(vec![symbol]),
921                snapshot: None,
922                depth: None,
923                interval: None,
924                event_trigger: Some("bbo".to_string()),
925                token: None,
926                snap_orders: None,
927                snap_trades: None,
928            }),
929            req_id: Some(req_id),
930        };
931
932        self.send_request(&request).await?;
933        self.subscriptions.confirm_subscribe(&key);
934        Ok(())
935    }
936
937    /// Subscribes to trade updates for the given instrument.
938    pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> Result<(), KrakenWsError> {
939        let symbol = to_ws_v2_symbol(instrument_id.symbol.inner());
940        self.subscribe(KrakenWsChannel::Trade, vec![symbol], None)
941            .await
942    }
943
944    /// Subscribes to bar/OHLC updates for the given bar type.
945    ///
946    /// # Errors
947    ///
948    /// Returns an error if the bar aggregation is not supported by Kraken.
949    pub async fn subscribe_bars(&self, bar_type: BarType) -> Result<(), KrakenWsError> {
950        let symbol = to_ws_v2_symbol(bar_type.instrument_id().symbol.inner());
951        let interval = bar_type_to_ws_interval(bar_type)?;
952        self.subscribe_with_interval(KrakenWsChannel::Ohlc, vec![symbol], interval)
953            .await
954    }
955
956    /// Subscribes to execution updates (order and fill events).
957    ///
958    /// Requires authentication - call `authenticate()` first.
959    pub async fn subscribe_executions(
960        &self,
961        snap_orders: bool,
962        snap_trades: bool,
963    ) -> Result<(), KrakenWsError> {
964        let req_id = self.get_next_req_id().await;
965
966        let token = self.auth_token.read().await.clone().ok_or_else(|| {
967            KrakenWsError::AuthenticationError(
968                "Authentication token required for executions channel. Call authenticate() first"
969                    .to_string(),
970            )
971        })?;
972
973        let request = KrakenWsRequest {
974            method: KrakenWsMethod::Subscribe,
975            params: Some(KrakenWsParams {
976                channel: KrakenWsChannel::Executions,
977                symbol: None,
978                snapshot: None,
979                depth: None,
980                interval: None,
981                event_trigger: None,
982                token: Some(token),
983                snap_orders: Some(snap_orders),
984                snap_trades: Some(snap_trades),
985            }),
986            req_id: Some(req_id),
987        };
988
989        self.send_request(&request).await?;
990
991        let key = "executions";
992        if self.subscriptions.add_reference(key) {
993            self.subscriptions.mark_subscribe(key);
994            self.subscriptions.confirm_subscribe(key);
995        }
996
997        Ok(())
998    }
999
1000    /// Unsubscribes from order book updates for the given instrument.
1001    pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> Result<(), KrakenWsError> {
1002        let symbol = to_ws_v2_symbol(instrument_id.symbol.inner());
1003        self.unsubscribe(KrakenWsChannel::Book, vec![symbol]).await
1004    }
1005
1006    /// Unsubscribes from quote updates for the given instrument.
1007    pub async fn unsubscribe_quotes(
1008        &self,
1009        instrument_id: InstrumentId,
1010    ) -> Result<(), KrakenWsError> {
1011        let symbol = to_ws_v2_symbol(instrument_id.symbol.inner());
1012        let key = format!("quotes:{symbol}");
1013
1014        if !self.subscriptions.remove_reference(&key) {
1015            return Ok(());
1016        }
1017
1018        self.subscriptions.mark_unsubscribe(&key);
1019
1020        let req_id = self.get_next_req_id().await;
1021        let request = KrakenWsRequest {
1022            method: KrakenWsMethod::Unsubscribe,
1023            params: Some(KrakenWsParams {
1024                channel: KrakenWsChannel::Ticker,
1025                symbol: Some(vec![symbol]),
1026                snapshot: None,
1027                depth: None,
1028                interval: None,
1029                event_trigger: Some("bbo".to_string()),
1030                token: None,
1031                snap_orders: None,
1032                snap_trades: None,
1033            }),
1034            req_id: Some(req_id),
1035        };
1036
1037        self.send_request(&request).await?;
1038        self.subscriptions.confirm_unsubscribe(&key);
1039        Ok(())
1040    }
1041
1042    /// Unsubscribes from trade updates for the given instrument.
1043    pub async fn unsubscribe_trades(
1044        &self,
1045        instrument_id: InstrumentId,
1046    ) -> Result<(), KrakenWsError> {
1047        let symbol = to_ws_v2_symbol(instrument_id.symbol.inner());
1048        self.unsubscribe(KrakenWsChannel::Trade, vec![symbol]).await
1049    }
1050
1051    /// Unsubscribes from bar/OHLC updates for the given bar type.
1052    ///
1053    /// # Errors
1054    ///
1055    /// Returns an error if the bar aggregation is not supported by Kraken.
1056    pub async fn unsubscribe_bars(&self, bar_type: BarType) -> Result<(), KrakenWsError> {
1057        let symbol = to_ws_v2_symbol(bar_type.instrument_id().symbol.inner());
1058        let interval = bar_type_to_ws_interval(bar_type)?;
1059        self.unsubscribe_with_interval(KrakenWsChannel::Ohlc, vec![symbol], interval)
1060            .await
1061    }
1062}
1063
1064/// Helper function to refresh authentication token via HTTP API.
1065async fn refresh_auth_token(config: &KrakenDataClientConfig) -> Result<String, KrakenWsError> {
1066    let api_key = config
1067        .api_key
1068        .clone()
1069        .ok_or_else(|| KrakenWsError::AuthenticationError("Missing API key".to_string()))?;
1070    let api_secret = config
1071        .api_secret
1072        .clone()
1073        .ok_or_else(|| KrakenWsError::AuthenticationError("Missing API secret".to_string()))?;
1074
1075    let http_client = KrakenSpotHttpClient::with_credentials(
1076        api_key,
1077        api_secret,
1078        config.environment,
1079        Some(config.http_base_url()),
1080        config.timeout_secs,
1081        None,
1082        None,
1083        None,
1084        config.http_proxy.clone(),
1085        config.max_requests_per_second,
1086    )
1087    .map_err(|e| {
1088        KrakenWsError::AuthenticationError(format!("Failed to create HTTP client: {e}"))
1089    })?;
1090
1091    let ws_token = http_client.get_websockets_token().await.map_err(|e| {
1092        KrakenWsError::AuthenticationError(format!("Failed to get WebSocket token: {e}"))
1093    })?;
1094
1095    log::debug!(
1096        "WebSocket authentication token refreshed: token_length={}, expires={}",
1097        ws_token.token.len(),
1098        ws_token.expires
1099    );
1100
1101    Ok(ws_token.token)
1102}
1103
1104#[inline]
1105fn to_ws_v2_symbol(symbol: Ustr) -> Ustr {
1106    Ustr::from(&normalize_spot_symbol(symbol.as_str()))
1107}
1108
1109fn bar_type_to_ws_interval(bar_type: BarType) -> Result<u32, KrakenWsError> {
1110    const VALID_INTERVALS: [u32; 9] = [1, 5, 15, 30, 60, 240, 1440, 10080, 21600];
1111
1112    let spec = bar_type.spec();
1113    let step = spec.step.get() as u32;
1114
1115    let base_minutes = match spec.aggregation {
1116        BarAggregation::Minute => 1,
1117        BarAggregation::Hour => 60,
1118        BarAggregation::Day => 1440,
1119        BarAggregation::Week => 10080,
1120        other => {
1121            return Err(KrakenWsError::SubscriptionError(format!(
1122                "Unsupported bar aggregation for Kraken OHLC streaming: {other:?}"
1123            )));
1124        }
1125    };
1126
1127    let interval = base_minutes * step;
1128
1129    if !VALID_INTERVALS.contains(&interval) {
1130        return Err(KrakenWsError::SubscriptionError(format!(
1131            "Invalid bar interval {interval} minutes for Kraken OHLC streaming. \
1132             Supported intervals: 1, 5, 15, 30, 60, 240, 1440, 10080, 21600"
1133        )));
1134    }
1135
1136    Ok(interval)
1137}
1138
1139#[cfg(test)]
1140mod tests {
1141    use rstest::rstest;
1142
1143    use super::*;
1144
1145    #[rstest]
1146    #[case("XBT/EUR", "BTC/EUR")]
1147    #[case("XBT/USD", "BTC/USD")]
1148    #[case("XBT/USDT", "BTC/USDT")]
1149    #[case("ETH/USD", "ETH/USD")]
1150    #[case("ETH/XBT", "ETH/BTC")]
1151    #[case("SOL/XBT", "SOL/BTC")]
1152    #[case("SOL/USD", "SOL/USD")]
1153    #[case("BTC/USD", "BTC/USD")]
1154    #[case("ETH/BTC", "ETH/BTC")]
1155    fn test_to_kraken_ws_v2_symbol(#[case] input: &str, #[case] expected: &str) {
1156        let symbol = Ustr::from(input);
1157        let result = to_ws_v2_symbol(symbol);
1158        assert_eq!(result.as_str(), expected);
1159    }
1160}