nautilus_kraken/websocket/spot_v2/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket client for the Kraken v2 streaming API.
17
18use std::sync::{
19    Arc,
20    atomic::{AtomicBool, AtomicU8, Ordering},
21};
22
23use arc_swap::ArcSwap;
24use nautilus_common::live::get_runtime;
25use nautilus_model::{
26    data::BarType,
27    enums::BarAggregation,
28    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId},
29    instruments::InstrumentAny,
30};
31use nautilus_network::{
32    mode::ConnectionMode,
33    websocket::{
34        AuthTracker, SubscriptionState, WebSocketClient, WebSocketConfig, channel_message_handler,
35    },
36};
37use tokio_util::sync::CancellationToken;
38use ustr::Ustr;
39
40/// Topic delimiter for Kraken Spot v2 WebSocket subscriptions.
41///
42/// Topics use colon format: `channel:symbol` (e.g., `Trade:ETH/USD`).
43pub const KRAKEN_SPOT_WS_TOPIC_DELIMITER: char = ':';
44
45use super::{
46    enums::{KrakenWsChannel, KrakenWsMethod},
47    handler::{SpotFeedHandler, SpotHandlerCommand},
48    messages::{KrakenWsParams, KrakenWsRequest, NautilusWsMessage},
49};
50use crate::{
51    config::KrakenDataClientConfig, http::KrakenSpotHttpClient, websocket::error::KrakenWsError,
52};
53
54const WS_PING_MSG: &str = r#"{"method":"ping"}"#;
55
56/// WebSocket client for the Kraken Spot v2 streaming API.
57#[derive(Debug)]
58#[cfg_attr(
59    feature = "python",
60    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.kraken")
61)]
62pub struct KrakenSpotWebSocketClient {
63    url: String,
64    config: KrakenDataClientConfig,
65    signal: Arc<AtomicBool>,
66    connection_mode: Arc<ArcSwap<AtomicU8>>,
67    cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<SpotHandlerCommand>>>,
68    out_rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>>,
69    task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
70    subscriptions: SubscriptionState,
71    auth_tracker: AuthTracker,
72    cancellation_token: CancellationToken,
73    req_id_counter: Arc<tokio::sync::RwLock<u64>>,
74    auth_token: Arc<tokio::sync::RwLock<Option<String>>>,
75}
76
77impl Clone for KrakenSpotWebSocketClient {
78    fn clone(&self) -> Self {
79        Self {
80            url: self.url.clone(),
81            config: self.config.clone(),
82            signal: Arc::clone(&self.signal),
83            connection_mode: Arc::clone(&self.connection_mode),
84            cmd_tx: Arc::clone(&self.cmd_tx),
85            out_rx: self.out_rx.clone(),
86            task_handle: self.task_handle.clone(),
87            subscriptions: self.subscriptions.clone(),
88            auth_tracker: self.auth_tracker.clone(),
89            cancellation_token: self.cancellation_token.clone(),
90            req_id_counter: self.req_id_counter.clone(),
91            auth_token: self.auth_token.clone(),
92        }
93    }
94}
95
96impl KrakenSpotWebSocketClient {
97    /// Creates a new client with the given configuration.
98    pub fn new(config: KrakenDataClientConfig, cancellation_token: CancellationToken) -> Self {
99        // Prefer private URL if explicitly set (for authenticated endpoints)
100        let url = if config.ws_private_url.is_some() {
101            config.ws_private_url()
102        } else {
103            config.ws_public_url()
104        };
105        let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<SpotHandlerCommand>();
106        let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
107        let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
108
109        Self {
110            url,
111            config,
112            signal: Arc::new(AtomicBool::new(false)),
113            connection_mode,
114            cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
115            out_rx: None,
116            task_handle: None,
117            subscriptions: SubscriptionState::new(KRAKEN_SPOT_WS_TOPIC_DELIMITER),
118            auth_tracker: AuthTracker::new(),
119            cancellation_token,
120            req_id_counter: Arc::new(tokio::sync::RwLock::new(0)),
121            auth_token: Arc::new(tokio::sync::RwLock::new(None)),
122        }
123    }
124
125    async fn get_next_req_id(&self) -> u64 {
126        let mut counter = self.req_id_counter.write().await;
127        *counter += 1;
128        *counter
129    }
130
131    /// Connects to the WebSocket server.
132    pub async fn connect(&mut self) -> Result<(), KrakenWsError> {
133        log::debug!("Connecting to {}", self.url);
134
135        self.signal.store(false, Ordering::Relaxed);
136
137        let (raw_handler, raw_rx) = channel_message_handler();
138
139        let ws_config = WebSocketConfig {
140            url: self.url.clone(),
141            headers: vec![],
142            heartbeat: self.config.heartbeat_interval_secs,
143            heartbeat_msg: Some(WS_PING_MSG.to_string()),
144            reconnect_timeout_ms: Some(5_000),
145            reconnect_delay_initial_ms: Some(500),
146            reconnect_delay_max_ms: Some(5_000),
147            reconnect_backoff_factor: Some(1.5),
148            reconnect_jitter_ms: Some(250),
149            reconnect_max_attempts: None,
150        };
151
152        let ws_client = WebSocketClient::connect(
153            ws_config,
154            Some(raw_handler),
155            None,   // ping_handler
156            None,   // post_reconnection
157            vec![], // keyed_quotas
158            None,   // default_quota
159        )
160        .await
161        .map_err(|e| KrakenWsError::ConnectionError(e.to_string()))?;
162
163        // Share connection state across clones via ArcSwap
164        self.connection_mode
165            .store(ws_client.connection_mode_atomic());
166
167        let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
168        self.out_rx = Some(Arc::new(out_rx));
169
170        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<SpotHandlerCommand>();
171        *self.cmd_tx.write().await = cmd_tx.clone();
172
173        if let Err(e) = cmd_tx.send(SpotHandlerCommand::SetClient(ws_client)) {
174            return Err(KrakenWsError::ConnectionError(format!(
175                "Failed to send WebSocketClient to handler: {e}"
176            )));
177        }
178
179        let signal = self.signal.clone();
180        let subscriptions = self.subscriptions.clone();
181        let config_for_reconnect = self.config.clone();
182        let auth_token_for_reconnect = self.auth_token.clone();
183        let req_id_counter_for_reconnect = self.req_id_counter.clone();
184        let cmd_tx_for_reconnect = cmd_tx.clone();
185
186        let stream_handle = get_runtime().spawn(async move {
187            let mut handler =
188                SpotFeedHandler::new(signal.clone(), cmd_rx, raw_rx, subscriptions.clone());
189
190            loop {
191                match handler.next().await {
192                    Some(NautilusWsMessage::Reconnected) => {
193                        if signal.load(Ordering::Relaxed) {
194                            continue;
195                        }
196                        log::info!("WebSocket reconnected, resubscribing");
197
198                        // Mark all confirmed subscriptions as failed to transition to pending
199                        let confirmed_topics = subscriptions.all_topics();
200                        for topic in &confirmed_topics {
201                            subscriptions.mark_failure(topic);
202                        }
203
204                        let topics = subscriptions.all_topics();
205                        if topics.is_empty() {
206                            log::debug!("No subscriptions to restore after reconnection");
207                        } else {
208                            // Check if we need to re-authenticate (had a token before)
209                            let had_auth = auth_token_for_reconnect.read().await.is_some();
210
211                            if had_auth && config_for_reconnect.has_api_credentials() {
212                                log::debug!("Re-authenticating after reconnect");
213
214                                match refresh_auth_token(&config_for_reconnect).await {
215                                    Ok(new_token) => {
216                                        *auth_token_for_reconnect.write().await = Some(new_token);
217                                        log::debug!("Re-authentication successful");
218                                    }
219                                    Err(e) => {
220                                        log::error!(
221                                            "Failed to re-authenticate after reconnect: {e}"
222                                        );
223                                        // Clear auth token since it's invalid
224                                        *auth_token_for_reconnect.write().await = None;
225                                    }
226                                }
227                            }
228
229                            log::info!("Resubscribing after reconnection: count={}", topics.len());
230
231                            // Replay subscriptions
232                            for topic in &topics {
233                                let auth_token = auth_token_for_reconnect.read().await.clone();
234
235                                // Handle special "executions" topic first
236                                if topic == "executions" {
237                                    if let Some(ref token) = auth_token {
238                                        let mut counter =
239                                            req_id_counter_for_reconnect.write().await;
240                                        *counter += 1;
241                                        let req_id = *counter;
242
243                                        let request = KrakenWsRequest {
244                                            method: KrakenWsMethod::Subscribe,
245                                            params: Some(KrakenWsParams {
246                                                channel: KrakenWsChannel::Executions,
247                                                symbol: None,
248                                                snapshot: None,
249                                                depth: None,
250                                                interval: None,
251                                                token: Some(token.clone()),
252                                                snap_orders: Some(true),
253                                                snap_trades: Some(true),
254                                            }),
255                                            req_id: Some(req_id),
256                                        };
257
258                                        if let Ok(payload) = serde_json::to_string(&request)
259                                            && let Err(e) = cmd_tx_for_reconnect
260                                                .send(SpotHandlerCommand::SendText { payload })
261                                        {
262                                            log::error!(
263                                                "Failed to send executions resubscribe: {e}"
264                                            );
265                                        }
266
267                                        subscriptions.mark_subscribe(topic);
268                                    } else {
269                                        log::warn!(
270                                            "Cannot resubscribe to executions: no auth token"
271                                        );
272                                    }
273                                    continue;
274                                }
275
276                                // Parse topic format: "Channel:symbol" or "Channel:symbol:interval"
277                                let parts: Vec<&str> = topic.splitn(3, ':').collect();
278                                if parts.len() < 2 {
279                                    log::warn!(
280                                        "Invalid topic format for resubscribe: topic={topic}"
281                                    );
282                                    continue;
283                                }
284
285                                let channel_str = parts[0];
286                                let channel = match channel_str {
287                                    "Book" => Some(KrakenWsChannel::Book),
288                                    "Trade" => Some(KrakenWsChannel::Trade),
289                                    "Ticker" => Some(KrakenWsChannel::Ticker),
290                                    "Ohlc" => Some(KrakenWsChannel::Ohlc),
291                                    "book" => Some(KrakenWsChannel::Book),
292                                    "quotes" => Some(KrakenWsChannel::Book),
293                                    _ => None,
294                                };
295
296                                let Some(channel) = channel else {
297                                    log::warn!("Unknown channel for resubscribe: topic={topic}");
298                                    continue;
299                                };
300
301                                let mut counter = req_id_counter_for_reconnect.write().await;
302                                *counter += 1;
303                                let req_id = *counter;
304
305                                let depth = if channel_str == "quotes" {
306                                    Some(10)
307                                } else {
308                                    None
309                                };
310
311                                // Extract symbol and optional interval
312                                let (symbol_str, interval) = if parts.len() == 3 {
313                                    // Format: "Ohlc:BTC/USD:1" -> symbol="BTC/USD", interval=1
314                                    (parts[1], parts[2].parse::<u32>().ok())
315                                } else {
316                                    // Format: "Book:BTC/USD" -> symbol="BTC/USD", interval=None
317                                    (parts[1], None)
318                                };
319
320                                let request = KrakenWsRequest {
321                                    method: KrakenWsMethod::Subscribe,
322                                    params: Some(KrakenWsParams {
323                                        channel,
324                                        symbol: Some(vec![Ustr::from(symbol_str)]),
325                                        snapshot: None,
326                                        depth,
327                                        interval,
328                                        token: None,
329                                        snap_orders: None,
330                                        snap_trades: None,
331                                    }),
332                                    req_id: Some(req_id),
333                                };
334
335                                if let Ok(payload) = serde_json::to_string(&request)
336                                    && let Err(e) = cmd_tx_for_reconnect
337                                        .send(SpotHandlerCommand::SendText { payload })
338                                {
339                                    log::error!(
340                                        "Failed to send resubscribe command: error={e}, \
341                                        topic={topic}"
342                                    );
343                                }
344
345                                subscriptions.mark_subscribe(topic);
346                            }
347                        }
348
349                        if out_tx.send(NautilusWsMessage::Reconnected).is_err() {
350                            log::error!("Failed to send message (receiver dropped)");
351                            break;
352                        }
353                        continue;
354                    }
355                    Some(msg) => {
356                        if out_tx.send(msg).is_err() {
357                            log::error!("Failed to send message (receiver dropped)");
358                            break;
359                        }
360                    }
361                    None => {
362                        if handler.is_stopped() {
363                            log::debug!("Stop signal received, ending message processing");
364                            break;
365                        }
366                        log::warn!("WebSocket stream ended unexpectedly");
367                        break;
368                    }
369                }
370            }
371
372            log::debug!("Handler task exiting");
373        });
374
375        self.task_handle = Some(Arc::new(stream_handle));
376
377        log::debug!("WebSocket connected successfully");
378        Ok(())
379    }
380
381    /// Disconnects from the WebSocket server.
382    pub async fn disconnect(&mut self) -> Result<(), KrakenWsError> {
383        log::debug!("Disconnecting WebSocket");
384
385        self.signal.store(true, Ordering::Relaxed);
386
387        if let Err(e) = self
388            .cmd_tx
389            .read()
390            .await
391            .send(SpotHandlerCommand::Disconnect)
392        {
393            log::debug!(
394                "Failed to send disconnect command (handler may already be shut down): {e}"
395            );
396        }
397
398        if let Some(task_handle) = self.task_handle.take() {
399            match Arc::try_unwrap(task_handle) {
400                Ok(handle) => {
401                    log::debug!("Waiting for task handle to complete");
402                    match tokio::time::timeout(tokio::time::Duration::from_secs(2), handle).await {
403                        Ok(Ok(())) => log::debug!("Task handle completed successfully"),
404                        Ok(Err(e)) => log::error!("Task handle encountered an error: {e:?}"),
405                        Err(_) => {
406                            log::warn!(
407                                "Timeout waiting for task handle, task may still be running"
408                            );
409                        }
410                    }
411                }
412                Err(arc_handle) => {
413                    log::debug!(
414                        "Cannot take ownership of task handle - other references exist, aborting task"
415                    );
416                    arc_handle.abort();
417                }
418            }
419        } else {
420            log::debug!("No task handle to await");
421        }
422
423        self.subscriptions.clear();
424        self.auth_tracker.fail("Disconnected");
425
426        Ok(())
427    }
428
429    /// Closes the WebSocket connection.
430    pub async fn close(&mut self) -> Result<(), KrakenWsError> {
431        self.disconnect().await
432    }
433
434    /// Waits until the connection is active or timeout.
435    pub async fn wait_until_active(&self, timeout_secs: f64) -> Result<(), KrakenWsError> {
436        let timeout = tokio::time::Duration::from_secs_f64(timeout_secs);
437
438        tokio::time::timeout(timeout, async {
439            while !self.is_active() {
440                tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
441            }
442        })
443        .await
444        .map_err(|_| {
445            KrakenWsError::ConnectionError(format!(
446                "WebSocket connection timeout after {timeout_secs} seconds"
447            ))
448        })?;
449
450        Ok(())
451    }
452
453    /// Authenticates with the Kraken API to enable private subscriptions.
454    pub async fn authenticate(&self) -> Result<(), KrakenWsError> {
455        if !self.config.has_api_credentials() {
456            return Err(KrakenWsError::AuthenticationError(
457                "API credentials required for authentication".to_string(),
458            ));
459        }
460
461        let api_key = self
462            .config
463            .api_key
464            .clone()
465            .ok_or_else(|| KrakenWsError::AuthenticationError("Missing API key".to_string()))?;
466        let api_secret =
467            self.config.api_secret.clone().ok_or_else(|| {
468                KrakenWsError::AuthenticationError("Missing API secret".to_string())
469            })?;
470
471        let http_client = KrakenSpotHttpClient::with_credentials(
472            api_key,
473            api_secret,
474            self.config.environment,
475            Some(self.config.http_base_url()),
476            self.config.timeout_secs,
477            None,
478            None,
479            None,
480            self.config.http_proxy.clone(),
481            self.config.max_requests_per_second,
482        )
483        .map_err(|e| {
484            KrakenWsError::AuthenticationError(format!("Failed to create HTTP client: {e}"))
485        })?;
486
487        let ws_token = http_client.get_websockets_token().await.map_err(|e| {
488            KrakenWsError::AuthenticationError(format!("Failed to get WebSocket token: {e}"))
489        })?;
490
491        log::debug!(
492            "WebSocket authentication token received: token_length={}, expires={}",
493            ws_token.token.len(),
494            ws_token.expires
495        );
496
497        let mut auth_token = self.auth_token.write().await;
498        *auth_token = Some(ws_token.token);
499
500        Ok(())
501    }
502
503    /// Caches multiple instruments for symbol lookup.
504    pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
505        // Before connect() the handler isn't running; this send will fail and that's expected
506        if let Ok(cmd_tx) = self.cmd_tx.try_read()
507            && let Err(e) = cmd_tx.send(SpotHandlerCommand::InitializeInstruments(instruments))
508        {
509            log::debug!("Failed to send instruments to handler: {e}");
510        }
511    }
512
513    /// Caches a single instrument for symbol lookup.
514    pub fn cache_instrument(&self, instrument: InstrumentAny) {
515        // Before connect() the handler isn't running; this send will fail and that's expected
516        if let Ok(cmd_tx) = self.cmd_tx.try_read()
517            && let Err(e) = cmd_tx.send(SpotHandlerCommand::UpdateInstrument(instrument))
518        {
519            log::debug!("Failed to send instrument update to handler: {e}");
520        }
521    }
522
523    /// Sets the account ID for execution reports.
524    ///
525    /// Must be called before subscribing to executions to properly generate
526    /// OrderStatusReport and FillReport objects.
527    pub fn set_account_id(&self, account_id: AccountId) {
528        if let Ok(cmd_tx) = self.cmd_tx.try_read()
529            && let Err(e) = cmd_tx.send(SpotHandlerCommand::SetAccountId(account_id))
530        {
531            log::debug!("Failed to send account ID to handler: {e}");
532        }
533    }
534
535    /// Caches order info for order tracking.
536    ///
537    /// This should be called BEFORE submitting an order via HTTP to handle the
538    /// race condition where WebSocket execution messages arrive before the
539    /// HTTP response (which contains the venue_order_id).
540    pub fn cache_client_order(
541        &self,
542        client_order_id: ClientOrderId,
543        instrument_id: InstrumentId,
544        trader_id: TraderId,
545        strategy_id: StrategyId,
546    ) {
547        if let Ok(cmd_tx) = self.cmd_tx.try_read()
548            && let Err(e) = cmd_tx.send(SpotHandlerCommand::CacheClientOrder {
549                client_order_id,
550                instrument_id,
551                trader_id,
552                strategy_id,
553            })
554        {
555            log::debug!("Failed to send cache client order command to handler: {e}");
556        }
557    }
558
559    /// Cancels all pending requests.
560    pub fn cancel_all_requests(&self) {
561        self.cancellation_token.cancel();
562    }
563
564    /// Returns the cancellation token for this client.
565    pub fn cancellation_token(&self) -> &CancellationToken {
566        &self.cancellation_token
567    }
568
569    /// Subscribes to a channel for the given symbols.
570    pub async fn subscribe(
571        &self,
572        channel: KrakenWsChannel,
573        symbols: Vec<Ustr>,
574        depth: Option<u32>,
575    ) -> Result<(), KrakenWsError> {
576        let mut symbols_to_subscribe = Vec::new();
577        for symbol in &symbols {
578            let key = format!("{channel:?}:{symbol}");
579            if self.subscriptions.add_reference(&key) {
580                self.subscriptions.mark_subscribe(&key);
581                symbols_to_subscribe.push(*symbol);
582            }
583        }
584
585        if symbols_to_subscribe.is_empty() {
586            return Ok(());
587        }
588
589        let is_private = matches!(
590            channel,
591            KrakenWsChannel::Executions | KrakenWsChannel::Balances
592        );
593        let token = if is_private {
594            Some(self.auth_token.read().await.clone().ok_or_else(|| {
595                KrakenWsError::AuthenticationError(
596                    "Authentication token required for private channels. Call authenticate() first"
597                        .to_string(),
598                )
599            })?)
600        } else {
601            None
602        };
603
604        let req_id = self.get_next_req_id().await;
605        let request = KrakenWsRequest {
606            method: KrakenWsMethod::Subscribe,
607            params: Some(KrakenWsParams {
608                channel,
609                symbol: Some(symbols_to_subscribe.clone()),
610                snapshot: None,
611                depth,
612                interval: None,
613                token,
614                snap_orders: None,
615                snap_trades: None,
616            }),
617            req_id: Some(req_id),
618        };
619
620        self.send_request(&request).await?;
621
622        for symbol in &symbols_to_subscribe {
623            let key = format!("{channel:?}:{symbol}");
624            self.subscriptions.confirm_subscribe(&key);
625        }
626
627        Ok(())
628    }
629
630    /// Subscribes to a channel with a specific interval (for OHLC).
631    async fn subscribe_with_interval(
632        &self,
633        channel: KrakenWsChannel,
634        symbols: Vec<Ustr>,
635        interval: u32,
636    ) -> Result<(), KrakenWsError> {
637        let mut symbols_to_subscribe = Vec::new();
638        for symbol in &symbols {
639            let key = format!("{channel:?}:{symbol}:{interval}");
640            if self.subscriptions.add_reference(&key) {
641                self.subscriptions.mark_subscribe(&key);
642                symbols_to_subscribe.push(*symbol);
643            }
644        }
645
646        if symbols_to_subscribe.is_empty() {
647            return Ok(());
648        }
649
650        let req_id = self.get_next_req_id().await;
651        let request = KrakenWsRequest {
652            method: KrakenWsMethod::Subscribe,
653            params: Some(KrakenWsParams {
654                channel,
655                symbol: Some(symbols_to_subscribe.clone()),
656                snapshot: None,
657                depth: None,
658                interval: Some(interval),
659                token: None,
660                snap_orders: None,
661                snap_trades: None,
662            }),
663            req_id: Some(req_id),
664        };
665
666        self.send_request(&request).await?;
667
668        for symbol in &symbols_to_subscribe {
669            let key = format!("{channel:?}:{symbol}:{interval}");
670            self.subscriptions.confirm_subscribe(&key);
671        }
672
673        Ok(())
674    }
675
676    /// Unsubscribes from a channel with a specific interval (for OHLC).
677    async fn unsubscribe_with_interval(
678        &self,
679        channel: KrakenWsChannel,
680        symbols: Vec<Ustr>,
681        interval: u32,
682    ) -> Result<(), KrakenWsError> {
683        let mut symbols_to_unsubscribe = Vec::new();
684        for symbol in &symbols {
685            let key = format!("{channel:?}:{symbol}:{interval}");
686            if self.subscriptions.remove_reference(&key) {
687                self.subscriptions.mark_unsubscribe(&key);
688                symbols_to_unsubscribe.push(*symbol);
689            }
690        }
691
692        if symbols_to_unsubscribe.is_empty() {
693            return Ok(());
694        }
695
696        let req_id = self.get_next_req_id().await;
697        let request = KrakenWsRequest {
698            method: KrakenWsMethod::Unsubscribe,
699            params: Some(KrakenWsParams {
700                channel,
701                symbol: Some(symbols_to_unsubscribe.clone()),
702                snapshot: None,
703                depth: None,
704                interval: Some(interval),
705                token: None,
706                snap_orders: None,
707                snap_trades: None,
708            }),
709            req_id: Some(req_id),
710        };
711
712        self.send_request(&request).await?;
713
714        for symbol in &symbols_to_unsubscribe {
715            let key = format!("{channel:?}:{symbol}:{interval}");
716            self.subscriptions.confirm_unsubscribe(&key);
717        }
718
719        Ok(())
720    }
721
722    /// Unsubscribes from a channel for the given symbols.
723    pub async fn unsubscribe(
724        &self,
725        channel: KrakenWsChannel,
726        symbols: Vec<Ustr>,
727    ) -> Result<(), KrakenWsError> {
728        let mut symbols_to_unsubscribe = Vec::new();
729        for symbol in &symbols {
730            let key = format!("{channel:?}:{symbol}");
731            if self.subscriptions.remove_reference(&key) {
732                self.subscriptions.mark_unsubscribe(&key);
733                symbols_to_unsubscribe.push(*symbol);
734            } else {
735                log::debug!(
736                    "Channel {channel:?} symbol {symbol} still has active subscriptions, not unsubscribing"
737                );
738            }
739        }
740
741        if symbols_to_unsubscribe.is_empty() {
742            return Ok(());
743        }
744
745        let is_private = matches!(
746            channel,
747            KrakenWsChannel::Executions | KrakenWsChannel::Balances
748        );
749        let token = if is_private {
750            Some(self.auth_token.read().await.clone().ok_or_else(|| {
751                KrakenWsError::AuthenticationError(
752                    "Authentication token required for private channels. Call authenticate() first"
753                        .to_string(),
754                )
755            })?)
756        } else {
757            None
758        };
759
760        let req_id = self.get_next_req_id().await;
761        let request = KrakenWsRequest {
762            method: KrakenWsMethod::Unsubscribe,
763            params: Some(KrakenWsParams {
764                channel,
765                symbol: Some(symbols_to_unsubscribe.clone()),
766                snapshot: None,
767                depth: None,
768                interval: None,
769                token,
770                snap_orders: None,
771                snap_trades: None,
772            }),
773            req_id: Some(req_id),
774        };
775
776        self.send_request(&request).await?;
777
778        for symbol in &symbols_to_unsubscribe {
779            let key = format!("{channel:?}:{symbol}");
780            self.subscriptions.confirm_unsubscribe(&key);
781        }
782
783        Ok(())
784    }
785
786    /// Sends a ping message to keep the connection alive.
787    pub async fn send_ping(&self) -> Result<(), KrakenWsError> {
788        let req_id = self.get_next_req_id().await;
789
790        let request = KrakenWsRequest {
791            method: KrakenWsMethod::Ping,
792            params: None,
793            req_id: Some(req_id),
794        };
795
796        self.send_request(&request).await
797    }
798
799    async fn send_request(&self, request: &KrakenWsRequest) -> Result<(), KrakenWsError> {
800        let payload =
801            serde_json::to_string(request).map_err(|e| KrakenWsError::JsonError(e.to_string()))?;
802
803        log::trace!("Sending message: {payload}");
804
805        self.cmd_tx
806            .read()
807            .await
808            .send(SpotHandlerCommand::SendText { payload })
809            .map_err(|e| KrakenWsError::ConnectionError(format!("Failed to send request: {e}")))?;
810
811        Ok(())
812    }
813
814    /// Returns true if connected (not closed).
815    pub fn is_connected(&self) -> bool {
816        let connection_mode_arc = self.connection_mode.load();
817        !ConnectionMode::from_atomic(&connection_mode_arc).is_closed()
818    }
819
820    /// Returns true if the connection is active.
821    pub fn is_active(&self) -> bool {
822        let connection_mode_arc = self.connection_mode.load();
823        ConnectionMode::from_atomic(&connection_mode_arc).is_active()
824            && !self.signal.load(Ordering::Relaxed)
825    }
826
827    /// Returns true if the connection is closed.
828    pub fn is_closed(&self) -> bool {
829        let connection_mode_arc = self.connection_mode.load();
830        ConnectionMode::from_atomic(&connection_mode_arc).is_closed()
831            || self.signal.load(Ordering::Relaxed)
832    }
833
834    /// Returns the WebSocket URL.
835    pub fn url(&self) -> &str {
836        &self.url
837    }
838
839    /// Returns all active subscriptions.
840    pub fn get_subscriptions(&self) -> Vec<String> {
841        self.subscriptions.all_topics()
842    }
843
844    /// Returns a stream of WebSocket messages.
845    pub fn stream(&mut self) -> impl futures_util::Stream<Item = NautilusWsMessage> + use<> {
846        let rx = self
847            .out_rx
848            .take()
849            .expect("Stream receiver already taken or client not connected");
850        let mut rx = Arc::try_unwrap(rx).expect("Cannot take ownership - other references exist");
851        async_stream::stream! {
852            while let Some(msg) = rx.recv().await {
853                yield msg;
854            }
855        }
856    }
857
858    /// Subscribes to order book updates for the given instrument.
859    pub async fn subscribe_book(
860        &self,
861        instrument_id: InstrumentId,
862        depth: Option<u32>,
863    ) -> Result<(), KrakenWsError> {
864        // Kraken v2 WebSocket expects ISO 4217-A3 format (e.g., "ETH/USD")
865        let symbol = instrument_id.symbol.inner();
866        let book_key = format!("book:{symbol}");
867
868        if !self.subscriptions.add_reference(&book_key) {
869            return Ok(());
870        }
871
872        self.subscriptions.mark_subscribe(&book_key);
873        self.subscriptions.confirm_subscribe(&book_key);
874
875        self.subscribe(KrakenWsChannel::Book, vec![symbol], depth)
876            .await
877    }
878
879    /// Subscribes to quote updates for the given instrument.
880    ///
881    /// Uses the order book channel with depth 10 for low-latency top-of-book quotes
882    /// instead of the throttled ticker feed.
883    pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> Result<(), KrakenWsError> {
884        let symbol = instrument_id.symbol.inner();
885        let quotes_key = format!("quotes:{symbol}");
886
887        if !self.subscriptions.add_reference(&quotes_key) {
888            return Ok(());
889        }
890
891        self.subscriptions.mark_subscribe(&quotes_key);
892        self.subscriptions.confirm_subscribe(&quotes_key);
893        self.ensure_book_subscribed(symbol).await
894    }
895
896    /// Subscribes to trade updates for the given instrument.
897    pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> Result<(), KrakenWsError> {
898        let symbol = instrument_id.symbol.inner();
899        self.subscribe(KrakenWsChannel::Trade, vec![symbol], None)
900            .await
901    }
902
903    /// Subscribes to bar/OHLC updates for the given bar type.
904    ///
905    /// # Errors
906    ///
907    /// Returns an error if the bar aggregation is not supported by Kraken.
908    pub async fn subscribe_bars(&self, bar_type: BarType) -> Result<(), KrakenWsError> {
909        let symbol = bar_type.instrument_id().symbol.inner();
910        let interval = bar_type_to_ws_interval(bar_type)?;
911        self.subscribe_with_interval(KrakenWsChannel::Ohlc, vec![symbol], interval)
912            .await
913    }
914
915    /// Subscribes to execution updates (order and fill events).
916    ///
917    /// Requires authentication - call `authenticate()` first.
918    pub async fn subscribe_executions(
919        &self,
920        snap_orders: bool,
921        snap_trades: bool,
922    ) -> Result<(), KrakenWsError> {
923        let req_id = self.get_next_req_id().await;
924
925        let token = self.auth_token.read().await.clone().ok_or_else(|| {
926            KrakenWsError::AuthenticationError(
927                "Authentication token required for executions channel. Call authenticate() first"
928                    .to_string(),
929            )
930        })?;
931
932        let request = KrakenWsRequest {
933            method: KrakenWsMethod::Subscribe,
934            params: Some(KrakenWsParams {
935                channel: KrakenWsChannel::Executions,
936                symbol: None,
937                snapshot: None,
938                depth: None,
939                interval: None,
940                token: Some(token),
941                snap_orders: Some(snap_orders),
942                snap_trades: Some(snap_trades),
943            }),
944            req_id: Some(req_id),
945        };
946
947        self.send_request(&request).await?;
948
949        let key = "executions";
950        if self.subscriptions.add_reference(key) {
951            self.subscriptions.mark_subscribe(key);
952            self.subscriptions.confirm_subscribe(key);
953        }
954
955        Ok(())
956    }
957
958    /// Unsubscribes from order book updates for the given instrument.
959    ///
960    /// Note: Will only actually unsubscribe if quotes are not also subscribed.
961    pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> Result<(), KrakenWsError> {
962        let symbol = instrument_id.symbol.inner();
963        let book_key = format!("book:{symbol}");
964
965        if !self.subscriptions.remove_reference(&book_key) {
966            return Ok(());
967        }
968
969        self.subscriptions.mark_unsubscribe(&book_key);
970        self.subscriptions.confirm_unsubscribe(&book_key);
971        self.maybe_unsubscribe_book(symbol).await
972    }
973
974    /// Unsubscribes from quote updates for the given instrument.
975    pub async fn unsubscribe_quotes(
976        &self,
977        instrument_id: InstrumentId,
978    ) -> Result<(), KrakenWsError> {
979        let symbol = instrument_id.symbol.inner();
980        let quotes_key = format!("quotes:{symbol}");
981
982        if !self.subscriptions.remove_reference(&quotes_key) {
983            return Ok(());
984        }
985
986        self.subscriptions.mark_unsubscribe(&quotes_key);
987        self.subscriptions.confirm_unsubscribe(&quotes_key);
988        self.maybe_unsubscribe_book(symbol).await
989    }
990
991    /// Unsubscribes from trade updates for the given instrument.
992    pub async fn unsubscribe_trades(
993        &self,
994        instrument_id: InstrumentId,
995    ) -> Result<(), KrakenWsError> {
996        let symbol = instrument_id.symbol.inner();
997        self.unsubscribe(KrakenWsChannel::Trade, vec![symbol]).await
998    }
999
1000    /// Unsubscribes from bar/OHLC updates for the given bar type.
1001    ///
1002    /// # Errors
1003    ///
1004    /// Returns an error if the bar aggregation is not supported by Kraken.
1005    pub async fn unsubscribe_bars(&self, bar_type: BarType) -> Result<(), KrakenWsError> {
1006        let symbol = bar_type.instrument_id().symbol.inner();
1007        let interval = bar_type_to_ws_interval(bar_type)?;
1008        self.unsubscribe_with_interval(KrakenWsChannel::Ohlc, vec![symbol], interval)
1009            .await
1010    }
1011
1012    /// Ensures book channel is subscribed for the given symbol (used internally by quotes).
1013    ///
1014    /// Reference counting is handled by `subscribe` method.
1015    async fn ensure_book_subscribed(&self, symbol: Ustr) -> Result<(), KrakenWsError> {
1016        self.subscribe(KrakenWsChannel::Book, vec![symbol], Some(10))
1017            .await
1018    }
1019
1020    /// Unsubscribes from book channel if no more dependent subscriptions.
1021    ///
1022    /// Reference counting is handled by `unsubscribe` method.
1023    async fn maybe_unsubscribe_book(&self, symbol: Ustr) -> Result<(), KrakenWsError> {
1024        self.unsubscribe(KrakenWsChannel::Book, vec![symbol]).await
1025    }
1026}
1027
1028/// Helper function to refresh authentication token via HTTP API.
1029async fn refresh_auth_token(config: &KrakenDataClientConfig) -> Result<String, KrakenWsError> {
1030    let api_key = config
1031        .api_key
1032        .clone()
1033        .ok_or_else(|| KrakenWsError::AuthenticationError("Missing API key".to_string()))?;
1034    let api_secret = config
1035        .api_secret
1036        .clone()
1037        .ok_or_else(|| KrakenWsError::AuthenticationError("Missing API secret".to_string()))?;
1038
1039    let http_client = KrakenSpotHttpClient::with_credentials(
1040        api_key,
1041        api_secret,
1042        config.environment,
1043        Some(config.http_base_url()),
1044        config.timeout_secs,
1045        None,
1046        None,
1047        None,
1048        config.http_proxy.clone(),
1049        config.max_requests_per_second,
1050    )
1051    .map_err(|e| {
1052        KrakenWsError::AuthenticationError(format!("Failed to create HTTP client: {e}"))
1053    })?;
1054
1055    let ws_token = http_client.get_websockets_token().await.map_err(|e| {
1056        KrakenWsError::AuthenticationError(format!("Failed to get WebSocket token: {e}"))
1057    })?;
1058
1059    log::debug!(
1060        "WebSocket authentication token refreshed: token_length={}, expires={}",
1061        ws_token.token.len(),
1062        ws_token.expires
1063    );
1064
1065    Ok(ws_token.token)
1066}
1067
1068/// Converts a Nautilus BarType to Kraken WebSocket OHLC interval (in minutes).
1069///
1070/// Supported intervals: 1, 5, 15, 30, 60, 240, 1440, 10080, 21600
1071/// (1m, 5m, 15m, 30m, 1h, 4h, 1d, 1w, 2w).
1072fn bar_type_to_ws_interval(bar_type: BarType) -> Result<u32, KrakenWsError> {
1073    const VALID_INTERVALS: [u32; 9] = [1, 5, 15, 30, 60, 240, 1440, 10080, 21600];
1074
1075    let spec = bar_type.spec();
1076    let step = spec.step.get() as u32;
1077
1078    let base_minutes = match spec.aggregation {
1079        BarAggregation::Minute => 1,
1080        BarAggregation::Hour => 60,
1081        BarAggregation::Day => 1440,
1082        BarAggregation::Week => 10080,
1083        other => {
1084            return Err(KrakenWsError::SubscriptionError(format!(
1085                "Unsupported bar aggregation for Kraken OHLC streaming: {other:?}"
1086            )));
1087        }
1088    };
1089
1090    let interval = base_minutes * step;
1091
1092    if !VALID_INTERVALS.contains(&interval) {
1093        return Err(KrakenWsError::SubscriptionError(format!(
1094            "Invalid bar interval {interval} minutes for Kraken OHLC streaming. \
1095             Supported intervals: 1, 5, 15, 30, 60, 240, 1440, 10080, 21600"
1096        )));
1097    }
1098
1099    Ok(interval)
1100}