nautilus_kraken/websocket/spot_v2/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket client for the Kraken v2 streaming API.
17
18use std::sync::{
19    Arc,
20    atomic::{AtomicBool, AtomicU8, Ordering},
21};
22
23use arc_swap::ArcSwap;
24use nautilus_common::live::runtime::get_runtime;
25use nautilus_model::{
26    data::BarType,
27    enums::BarAggregation,
28    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId},
29    instruments::InstrumentAny,
30};
31use nautilus_network::{
32    mode::ConnectionMode,
33    websocket::{
34        AuthTracker, SubscriptionState, WebSocketClient, WebSocketConfig, channel_message_handler,
35    },
36};
37use tokio_util::sync::CancellationToken;
38use ustr::Ustr;
39
40/// Topic delimiter for Kraken Spot v2 WebSocket subscriptions.
41///
42/// Topics use colon format: `channel:symbol` (e.g., `Trade:ETH/USD`).
43pub const KRAKEN_SPOT_WS_TOPIC_DELIMITER: char = ':';
44
45use super::{
46    enums::{KrakenWsChannel, KrakenWsMethod},
47    handler::{SpotFeedHandler, SpotHandlerCommand},
48    messages::{KrakenWsParams, KrakenWsRequest, NautilusWsMessage},
49};
50use crate::{
51    config::KrakenDataClientConfig, http::KrakenSpotHttpClient, websocket::error::KrakenWsError,
52};
53
54const WS_PING_MSG: &str = r#"{"method":"ping"}"#;
55
56/// WebSocket client for the Kraken Spot v2 streaming API.
57#[derive(Debug)]
58#[cfg_attr(
59    feature = "python",
60    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
61)]
62pub struct KrakenSpotWebSocketClient {
63    url: String,
64    config: KrakenDataClientConfig,
65    signal: Arc<AtomicBool>,
66    connection_mode: Arc<ArcSwap<AtomicU8>>,
67    cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<SpotHandlerCommand>>>,
68    out_rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>>,
69    task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
70    subscriptions: SubscriptionState,
71    auth_tracker: AuthTracker,
72    cancellation_token: CancellationToken,
73    req_id_counter: Arc<tokio::sync::RwLock<u64>>,
74    auth_token: Arc<tokio::sync::RwLock<Option<String>>>,
75}
76
77impl Clone for KrakenSpotWebSocketClient {
78    fn clone(&self) -> Self {
79        Self {
80            url: self.url.clone(),
81            config: self.config.clone(),
82            signal: Arc::clone(&self.signal),
83            connection_mode: Arc::clone(&self.connection_mode),
84            cmd_tx: Arc::clone(&self.cmd_tx),
85            out_rx: self.out_rx.clone(),
86            task_handle: self.task_handle.clone(),
87            subscriptions: self.subscriptions.clone(),
88            auth_tracker: self.auth_tracker.clone(),
89            cancellation_token: self.cancellation_token.clone(),
90            req_id_counter: self.req_id_counter.clone(),
91            auth_token: self.auth_token.clone(),
92        }
93    }
94}
95
96impl KrakenSpotWebSocketClient {
97    /// Creates a new client with the given configuration.
98    pub fn new(config: KrakenDataClientConfig, cancellation_token: CancellationToken) -> Self {
99        // Prefer private URL if explicitly set (for authenticated endpoints)
100        let url = if config.ws_private_url.is_some() {
101            config.ws_private_url()
102        } else {
103            config.ws_public_url()
104        };
105        let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<SpotHandlerCommand>();
106        let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
107        let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
108
109        Self {
110            url,
111            config,
112            signal: Arc::new(AtomicBool::new(false)),
113            connection_mode,
114            cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
115            out_rx: None,
116            task_handle: None,
117            subscriptions: SubscriptionState::new(KRAKEN_SPOT_WS_TOPIC_DELIMITER),
118            auth_tracker: AuthTracker::new(),
119            cancellation_token,
120            req_id_counter: Arc::new(tokio::sync::RwLock::new(0)),
121            auth_token: Arc::new(tokio::sync::RwLock::new(None)),
122        }
123    }
124
125    async fn get_next_req_id(&self) -> u64 {
126        let mut counter = self.req_id_counter.write().await;
127        *counter += 1;
128        *counter
129    }
130
131    /// Connects to the WebSocket server.
132    pub async fn connect(&mut self) -> Result<(), KrakenWsError> {
133        tracing::debug!("Connecting to {}", self.url);
134
135        self.signal.store(false, Ordering::Relaxed);
136
137        let (raw_handler, raw_rx) = channel_message_handler();
138
139        let ws_config = WebSocketConfig {
140            url: self.url.clone(),
141            headers: vec![],
142            message_handler: Some(raw_handler),
143            ping_handler: None,
144            heartbeat: self.config.heartbeat_interval_secs,
145            heartbeat_msg: Some(WS_PING_MSG.to_string()),
146            reconnect_timeout_ms: Some(5_000),
147            reconnect_delay_initial_ms: Some(500),
148            reconnect_delay_max_ms: Some(5_000),
149            reconnect_backoff_factor: Some(1.5),
150            reconnect_jitter_ms: Some(250),
151            reconnect_max_attempts: None,
152        };
153
154        let ws_client = WebSocketClient::connect(
155            ws_config,
156            None,   // post_reconnection
157            vec![], // keyed_quotas
158            None,   // default_quota
159        )
160        .await
161        .map_err(|e| KrakenWsError::ConnectionError(e.to_string()))?;
162
163        // Share connection state across clones via ArcSwap
164        self.connection_mode
165            .store(ws_client.connection_mode_atomic());
166
167        let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
168        self.out_rx = Some(Arc::new(out_rx));
169
170        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<SpotHandlerCommand>();
171        *self.cmd_tx.write().await = cmd_tx.clone();
172
173        if let Err(e) = cmd_tx.send(SpotHandlerCommand::SetClient(ws_client)) {
174            return Err(KrakenWsError::ConnectionError(format!(
175                "Failed to send WebSocketClient to handler: {e}"
176            )));
177        }
178
179        let signal = self.signal.clone();
180        let subscriptions = self.subscriptions.clone();
181        let config_for_reconnect = self.config.clone();
182        let auth_token_for_reconnect = self.auth_token.clone();
183        let req_id_counter_for_reconnect = self.req_id_counter.clone();
184        let cmd_tx_for_reconnect = cmd_tx.clone();
185
186        let stream_handle = get_runtime().spawn(async move {
187            let mut handler =
188                SpotFeedHandler::new(signal.clone(), cmd_rx, raw_rx, subscriptions.clone());
189
190            loop {
191                match handler.next().await {
192                    Some(NautilusWsMessage::Reconnected) => {
193                        if signal.load(Ordering::Relaxed) {
194                            continue;
195                        }
196                        tracing::info!("WebSocket reconnected, resubscribing");
197
198                        // Mark all confirmed subscriptions as failed to transition to pending
199                        let confirmed_topics = subscriptions.all_topics();
200                        for topic in &confirmed_topics {
201                            subscriptions.mark_failure(topic);
202                        }
203
204                        let topics = subscriptions.all_topics();
205                        if topics.is_empty() {
206                            tracing::debug!("No subscriptions to restore after reconnection");
207                        } else {
208                            // Check if we need to re-authenticate (had a token before)
209                            let had_auth = auth_token_for_reconnect.read().await.is_some();
210
211                            if had_auth && config_for_reconnect.has_api_credentials() {
212                                tracing::debug!("Re-authenticating after reconnect");
213
214                                match refresh_auth_token(&config_for_reconnect).await {
215                                    Ok(new_token) => {
216                                        *auth_token_for_reconnect.write().await = Some(new_token);
217                                        tracing::debug!("Re-authentication successful");
218                                    }
219                                    Err(e) => {
220                                        tracing::error!(
221                                            error = %e,
222                                            "Failed to re-authenticate after reconnect"
223                                        );
224                                        // Clear auth token since it's invalid
225                                        *auth_token_for_reconnect.write().await = None;
226                                    }
227                                }
228                            }
229
230                            tracing::info!(
231                                count = topics.len(),
232                                "Resubscribing after reconnection"
233                            );
234
235                            // Replay subscriptions
236                            for topic in &topics {
237                                let auth_token = auth_token_for_reconnect.read().await.clone();
238
239                                // Handle special "executions" topic first
240                                if topic == "executions" {
241                                    if let Some(ref token) = auth_token {
242                                        let mut counter =
243                                            req_id_counter_for_reconnect.write().await;
244                                        *counter += 1;
245                                        let req_id = *counter;
246
247                                        let request = KrakenWsRequest {
248                                            method: KrakenWsMethod::Subscribe,
249                                            params: Some(KrakenWsParams {
250                                                channel: KrakenWsChannel::Executions,
251                                                symbol: None,
252                                                snapshot: None,
253                                                depth: None,
254                                                interval: None,
255                                                token: Some(token.clone()),
256                                                snap_orders: Some(true),
257                                                snap_trades: Some(true),
258                                            }),
259                                            req_id: Some(req_id),
260                                        };
261
262                                        if let Ok(payload) = serde_json::to_string(&request)
263                                            && let Err(e) = cmd_tx_for_reconnect
264                                                .send(SpotHandlerCommand::SendText { payload })
265                                        {
266                                            tracing::error!(
267                                                error = %e,
268                                                "Failed to send executions resubscribe"
269                                            );
270                                        }
271
272                                        subscriptions.mark_subscribe(topic);
273                                    } else {
274                                        tracing::warn!(
275                                            "Cannot resubscribe to executions: no auth token"
276                                        );
277                                    }
278                                    continue;
279                                }
280
281                                // Parse topic format: "Channel:symbol" or "Channel:symbol:interval"
282                                let parts: Vec<&str> = topic.splitn(3, ':').collect();
283                                if parts.len() < 2 {
284                                    tracing::warn!(topic, "Invalid topic format for resubscribe");
285                                    continue;
286                                }
287
288                                let channel_str = parts[0];
289                                let channel = match channel_str {
290                                    "Book" => Some(KrakenWsChannel::Book),
291                                    "Trade" => Some(KrakenWsChannel::Trade),
292                                    "Ticker" => Some(KrakenWsChannel::Ticker),
293                                    "Ohlc" => Some(KrakenWsChannel::Ohlc),
294                                    "book" => Some(KrakenWsChannel::Book),
295                                    "quotes" => Some(KrakenWsChannel::Book),
296                                    _ => None,
297                                };
298
299                                let Some(channel) = channel else {
300                                    tracing::warn!(topic, "Unknown channel for resubscribe");
301                                    continue;
302                                };
303
304                                let mut counter = req_id_counter_for_reconnect.write().await;
305                                *counter += 1;
306                                let req_id = *counter;
307
308                                let depth = if channel_str == "quotes" {
309                                    Some(10)
310                                } else {
311                                    None
312                                };
313
314                                // Extract symbol and optional interval
315                                let (symbol_str, interval) = if parts.len() == 3 {
316                                    // Format: "Ohlc:BTC/USD:1" -> symbol="BTC/USD", interval=1
317                                    (parts[1], parts[2].parse::<u32>().ok())
318                                } else {
319                                    // Format: "Book:BTC/USD" -> symbol="BTC/USD", interval=None
320                                    (parts[1], None)
321                                };
322
323                                let request = KrakenWsRequest {
324                                    method: KrakenWsMethod::Subscribe,
325                                    params: Some(KrakenWsParams {
326                                        channel,
327                                        symbol: Some(vec![Ustr::from(symbol_str)]),
328                                        snapshot: None,
329                                        depth,
330                                        interval,
331                                        token: None,
332                                        snap_orders: None,
333                                        snap_trades: None,
334                                    }),
335                                    req_id: Some(req_id),
336                                };
337
338                                if let Ok(payload) = serde_json::to_string(&request)
339                                    && let Err(e) = cmd_tx_for_reconnect
340                                        .send(SpotHandlerCommand::SendText { payload })
341                                {
342                                    tracing::error!(
343                                        error = %e,
344                                        topic,
345                                        "Failed to send resubscribe command"
346                                    );
347                                }
348
349                                subscriptions.mark_subscribe(topic);
350                            }
351                        }
352
353                        if out_tx.send(NautilusWsMessage::Reconnected).is_err() {
354                            tracing::error!("Failed to send message (receiver dropped)");
355                            break;
356                        }
357                        continue;
358                    }
359                    Some(msg) => {
360                        if out_tx.send(msg).is_err() {
361                            tracing::error!("Failed to send message (receiver dropped)");
362                            break;
363                        }
364                    }
365                    None => {
366                        if handler.is_stopped() {
367                            tracing::debug!("Stop signal received, ending message processing");
368                            break;
369                        }
370                        tracing::warn!("WebSocket stream ended unexpectedly");
371                        break;
372                    }
373                }
374            }
375
376            tracing::debug!("Handler task exiting");
377        });
378
379        self.task_handle = Some(Arc::new(stream_handle));
380
381        tracing::debug!("WebSocket connected successfully");
382        Ok(())
383    }
384
385    /// Disconnects from the WebSocket server.
386    pub async fn disconnect(&mut self) -> Result<(), KrakenWsError> {
387        tracing::debug!("Disconnecting WebSocket");
388
389        self.signal.store(true, Ordering::Relaxed);
390
391        if let Err(e) = self
392            .cmd_tx
393            .read()
394            .await
395            .send(SpotHandlerCommand::Disconnect)
396        {
397            tracing::debug!(
398                "Failed to send disconnect command (handler may already be shut down): {e}"
399            );
400        }
401
402        if let Some(task_handle) = self.task_handle.take() {
403            match Arc::try_unwrap(task_handle) {
404                Ok(handle) => {
405                    tracing::debug!("Waiting for task handle to complete");
406                    match tokio::time::timeout(tokio::time::Duration::from_secs(2), handle).await {
407                        Ok(Ok(())) => tracing::debug!("Task handle completed successfully"),
408                        Ok(Err(e)) => tracing::error!("Task handle encountered an error: {e:?}"),
409                        Err(_) => {
410                            tracing::warn!(
411                                "Timeout waiting for task handle, task may still be running"
412                            );
413                        }
414                    }
415                }
416                Err(arc_handle) => {
417                    tracing::debug!(
418                        "Cannot take ownership of task handle - other references exist, aborting task"
419                    );
420                    arc_handle.abort();
421                }
422            }
423        } else {
424            tracing::debug!("No task handle to await");
425        }
426
427        self.subscriptions.clear();
428        self.auth_tracker.fail("Disconnected");
429
430        Ok(())
431    }
432
433    /// Closes the WebSocket connection.
434    pub async fn close(&mut self) -> Result<(), KrakenWsError> {
435        self.disconnect().await
436    }
437
438    /// Waits until the connection is active or timeout.
439    pub async fn wait_until_active(&self, timeout_secs: f64) -> Result<(), KrakenWsError> {
440        let timeout = tokio::time::Duration::from_secs_f64(timeout_secs);
441
442        tokio::time::timeout(timeout, async {
443            while !self.is_active() {
444                tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
445            }
446        })
447        .await
448        .map_err(|_| {
449            KrakenWsError::ConnectionError(format!(
450                "WebSocket connection timeout after {timeout_secs} seconds"
451            ))
452        })?;
453
454        Ok(())
455    }
456
457    /// Authenticates with the Kraken API to enable private subscriptions.
458    pub async fn authenticate(&self) -> Result<(), KrakenWsError> {
459        if !self.config.has_api_credentials() {
460            return Err(KrakenWsError::AuthenticationError(
461                "API credentials required for authentication".to_string(),
462            ));
463        }
464
465        let api_key = self
466            .config
467            .api_key
468            .clone()
469            .ok_or_else(|| KrakenWsError::AuthenticationError("Missing API key".to_string()))?;
470        let api_secret =
471            self.config.api_secret.clone().ok_or_else(|| {
472                KrakenWsError::AuthenticationError("Missing API secret".to_string())
473            })?;
474
475        let http_client = KrakenSpotHttpClient::with_credentials(
476            api_key,
477            api_secret,
478            self.config.environment,
479            Some(self.config.http_base_url()),
480            self.config.timeout_secs,
481            None,
482            None,
483            None,
484            self.config.http_proxy.clone(),
485            self.config.max_requests_per_second,
486        )
487        .map_err(|e| {
488            KrakenWsError::AuthenticationError(format!("Failed to create HTTP client: {e}"))
489        })?;
490
491        let ws_token = http_client.get_websockets_token().await.map_err(|e| {
492            KrakenWsError::AuthenticationError(format!("Failed to get WebSocket token: {e}"))
493        })?;
494
495        tracing::debug!(
496            token_length = ws_token.token.len(),
497            expires = ws_token.expires,
498            "WebSocket authentication token received"
499        );
500
501        let mut auth_token = self.auth_token.write().await;
502        *auth_token = Some(ws_token.token);
503
504        Ok(())
505    }
506
507    /// Caches multiple instruments for symbol lookup.
508    pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
509        // Before connect() the handler isn't running; this send will fail and that's expected
510        if let Ok(cmd_tx) = self.cmd_tx.try_read()
511            && let Err(e) = cmd_tx.send(SpotHandlerCommand::InitializeInstruments(instruments))
512        {
513            tracing::debug!("Failed to send instruments to handler: {e}");
514        }
515    }
516
517    /// Caches a single instrument for symbol lookup.
518    pub fn cache_instrument(&self, instrument: InstrumentAny) {
519        // Before connect() the handler isn't running; this send will fail and that's expected
520        if let Ok(cmd_tx) = self.cmd_tx.try_read()
521            && let Err(e) = cmd_tx.send(SpotHandlerCommand::UpdateInstrument(instrument))
522        {
523            tracing::debug!("Failed to send instrument update to handler: {e}");
524        }
525    }
526
527    /// Sets the account ID for execution reports.
528    ///
529    /// Must be called before subscribing to executions to properly generate
530    /// OrderStatusReport and FillReport objects.
531    pub fn set_account_id(&self, account_id: AccountId) {
532        if let Ok(cmd_tx) = self.cmd_tx.try_read()
533            && let Err(e) = cmd_tx.send(SpotHandlerCommand::SetAccountId(account_id))
534        {
535            tracing::debug!("Failed to send account ID to handler: {e}");
536        }
537    }
538
539    /// Caches order info for order tracking.
540    ///
541    /// This should be called BEFORE submitting an order via HTTP to handle the
542    /// race condition where WebSocket execution messages arrive before the
543    /// HTTP response (which contains the venue_order_id).
544    pub fn cache_client_order(
545        &self,
546        client_order_id: ClientOrderId,
547        instrument_id: InstrumentId,
548        trader_id: TraderId,
549        strategy_id: StrategyId,
550    ) {
551        if let Ok(cmd_tx) = self.cmd_tx.try_read()
552            && let Err(e) = cmd_tx.send(SpotHandlerCommand::CacheClientOrder {
553                client_order_id,
554                instrument_id,
555                trader_id,
556                strategy_id,
557            })
558        {
559            tracing::debug!("Failed to send cache client order command to handler: {e}");
560        }
561    }
562
563    /// Cancels all pending requests.
564    pub fn cancel_all_requests(&self) {
565        self.cancellation_token.cancel();
566    }
567
568    /// Returns the cancellation token for this client.
569    pub fn cancellation_token(&self) -> &CancellationToken {
570        &self.cancellation_token
571    }
572
573    /// Subscribes to a channel for the given symbols.
574    pub async fn subscribe(
575        &self,
576        channel: KrakenWsChannel,
577        symbols: Vec<Ustr>,
578        depth: Option<u32>,
579    ) -> Result<(), KrakenWsError> {
580        let mut symbols_to_subscribe = Vec::new();
581        for symbol in &symbols {
582            let key = format!("{:?}:{}", channel, symbol);
583            if self.subscriptions.add_reference(&key) {
584                self.subscriptions.mark_subscribe(&key);
585                symbols_to_subscribe.push(*symbol);
586            }
587        }
588
589        if symbols_to_subscribe.is_empty() {
590            return Ok(());
591        }
592
593        let is_private = matches!(
594            channel,
595            KrakenWsChannel::Executions | KrakenWsChannel::Balances
596        );
597        let token = if is_private {
598            Some(self.auth_token.read().await.clone().ok_or_else(|| {
599                KrakenWsError::AuthenticationError(
600                    "Authentication token required for private channels. Call authenticate() first"
601                        .to_string(),
602                )
603            })?)
604        } else {
605            None
606        };
607
608        let req_id = self.get_next_req_id().await;
609        let request = KrakenWsRequest {
610            method: KrakenWsMethod::Subscribe,
611            params: Some(KrakenWsParams {
612                channel,
613                symbol: Some(symbols_to_subscribe.clone()),
614                snapshot: None,
615                depth,
616                interval: None,
617                token,
618                snap_orders: None,
619                snap_trades: None,
620            }),
621            req_id: Some(req_id),
622        };
623
624        self.send_request(&request).await?;
625
626        for symbol in &symbols_to_subscribe {
627            let key = format!("{:?}:{}", channel, symbol);
628            self.subscriptions.confirm_subscribe(&key);
629        }
630
631        Ok(())
632    }
633
634    /// Subscribes to a channel with a specific interval (for OHLC).
635    async fn subscribe_with_interval(
636        &self,
637        channel: KrakenWsChannel,
638        symbols: Vec<Ustr>,
639        interval: u32,
640    ) -> Result<(), KrakenWsError> {
641        let mut symbols_to_subscribe = Vec::new();
642        for symbol in &symbols {
643            let key = format!("{channel:?}:{symbol}:{interval}");
644            if self.subscriptions.add_reference(&key) {
645                self.subscriptions.mark_subscribe(&key);
646                symbols_to_subscribe.push(*symbol);
647            }
648        }
649
650        if symbols_to_subscribe.is_empty() {
651            return Ok(());
652        }
653
654        let req_id = self.get_next_req_id().await;
655        let request = KrakenWsRequest {
656            method: KrakenWsMethod::Subscribe,
657            params: Some(KrakenWsParams {
658                channel,
659                symbol: Some(symbols_to_subscribe.clone()),
660                snapshot: None,
661                depth: None,
662                interval: Some(interval),
663                token: None,
664                snap_orders: None,
665                snap_trades: None,
666            }),
667            req_id: Some(req_id),
668        };
669
670        self.send_request(&request).await?;
671
672        for symbol in &symbols_to_subscribe {
673            let key = format!("{channel:?}:{symbol}:{interval}");
674            self.subscriptions.confirm_subscribe(&key);
675        }
676
677        Ok(())
678    }
679
680    /// Unsubscribes from a channel with a specific interval (for OHLC).
681    async fn unsubscribe_with_interval(
682        &self,
683        channel: KrakenWsChannel,
684        symbols: Vec<Ustr>,
685        interval: u32,
686    ) -> Result<(), KrakenWsError> {
687        let mut symbols_to_unsubscribe = Vec::new();
688        for symbol in &symbols {
689            let key = format!("{channel:?}:{symbol}:{interval}");
690            if self.subscriptions.remove_reference(&key) {
691                self.subscriptions.mark_unsubscribe(&key);
692                symbols_to_unsubscribe.push(*symbol);
693            }
694        }
695
696        if symbols_to_unsubscribe.is_empty() {
697            return Ok(());
698        }
699
700        let req_id = self.get_next_req_id().await;
701        let request = KrakenWsRequest {
702            method: KrakenWsMethod::Unsubscribe,
703            params: Some(KrakenWsParams {
704                channel,
705                symbol: Some(symbols_to_unsubscribe.clone()),
706                snapshot: None,
707                depth: None,
708                interval: Some(interval),
709                token: None,
710                snap_orders: None,
711                snap_trades: None,
712            }),
713            req_id: Some(req_id),
714        };
715
716        self.send_request(&request).await?;
717
718        for symbol in &symbols_to_unsubscribe {
719            let key = format!("{channel:?}:{symbol}:{interval}");
720            self.subscriptions.confirm_unsubscribe(&key);
721        }
722
723        Ok(())
724    }
725
726    /// Unsubscribes from a channel for the given symbols.
727    pub async fn unsubscribe(
728        &self,
729        channel: KrakenWsChannel,
730        symbols: Vec<Ustr>,
731    ) -> Result<(), KrakenWsError> {
732        let mut symbols_to_unsubscribe = Vec::new();
733        for symbol in &symbols {
734            let key = format!("{:?}:{}", channel, symbol);
735            if self.subscriptions.remove_reference(&key) {
736                self.subscriptions.mark_unsubscribe(&key);
737                symbols_to_unsubscribe.push(*symbol);
738            } else {
739                tracing::debug!(
740                    "Channel {:?} symbol {} still has active subscriptions, not unsubscribing",
741                    channel,
742                    symbol
743                );
744            }
745        }
746
747        if symbols_to_unsubscribe.is_empty() {
748            return Ok(());
749        }
750
751        let is_private = matches!(
752            channel,
753            KrakenWsChannel::Executions | KrakenWsChannel::Balances
754        );
755        let token = if is_private {
756            Some(self.auth_token.read().await.clone().ok_or_else(|| {
757                KrakenWsError::AuthenticationError(
758                    "Authentication token required for private channels. Call authenticate() first"
759                        .to_string(),
760                )
761            })?)
762        } else {
763            None
764        };
765
766        let req_id = self.get_next_req_id().await;
767        let request = KrakenWsRequest {
768            method: KrakenWsMethod::Unsubscribe,
769            params: Some(KrakenWsParams {
770                channel,
771                symbol: Some(symbols_to_unsubscribe.clone()),
772                snapshot: None,
773                depth: None,
774                interval: None,
775                token,
776                snap_orders: None,
777                snap_trades: None,
778            }),
779            req_id: Some(req_id),
780        };
781
782        self.send_request(&request).await?;
783
784        for symbol in &symbols_to_unsubscribe {
785            let key = format!("{:?}:{}", channel, symbol);
786            self.subscriptions.confirm_unsubscribe(&key);
787        }
788
789        Ok(())
790    }
791
792    /// Sends a ping message to keep the connection alive.
793    pub async fn send_ping(&self) -> Result<(), KrakenWsError> {
794        let req_id = self.get_next_req_id().await;
795
796        let request = KrakenWsRequest {
797            method: KrakenWsMethod::Ping,
798            params: None,
799            req_id: Some(req_id),
800        };
801
802        self.send_request(&request).await
803    }
804
805    async fn send_request(&self, request: &KrakenWsRequest) -> Result<(), KrakenWsError> {
806        let payload =
807            serde_json::to_string(request).map_err(|e| KrakenWsError::JsonError(e.to_string()))?;
808
809        tracing::trace!("Sending message: {payload}");
810
811        self.cmd_tx
812            .read()
813            .await
814            .send(SpotHandlerCommand::SendText { payload })
815            .map_err(|e| KrakenWsError::ConnectionError(format!("Failed to send request: {e}")))?;
816
817        Ok(())
818    }
819
820    /// Returns true if connected (not closed).
821    pub fn is_connected(&self) -> bool {
822        let connection_mode_arc = self.connection_mode.load();
823        !ConnectionMode::from_atomic(&connection_mode_arc).is_closed()
824    }
825
826    /// Returns true if the connection is active.
827    pub fn is_active(&self) -> bool {
828        let connection_mode_arc = self.connection_mode.load();
829        ConnectionMode::from_atomic(&connection_mode_arc).is_active()
830            && !self.signal.load(Ordering::Relaxed)
831    }
832
833    /// Returns true if the connection is closed.
834    pub fn is_closed(&self) -> bool {
835        let connection_mode_arc = self.connection_mode.load();
836        ConnectionMode::from_atomic(&connection_mode_arc).is_closed()
837            || self.signal.load(Ordering::Relaxed)
838    }
839
840    /// Returns the WebSocket URL.
841    pub fn url(&self) -> &str {
842        &self.url
843    }
844
845    /// Returns all active subscriptions.
846    pub fn get_subscriptions(&self) -> Vec<String> {
847        self.subscriptions.all_topics()
848    }
849
850    /// Returns a stream of WebSocket messages.
851    pub fn stream(&mut self) -> impl futures_util::Stream<Item = NautilusWsMessage> + use<> {
852        let rx = self
853            .out_rx
854            .take()
855            .expect("Stream receiver already taken or client not connected");
856        let mut rx = Arc::try_unwrap(rx).expect("Cannot take ownership - other references exist");
857        async_stream::stream! {
858            while let Some(msg) = rx.recv().await {
859                yield msg;
860            }
861        }
862    }
863
864    /// Subscribes to order book updates for the given instrument.
865    pub async fn subscribe_book(
866        &self,
867        instrument_id: InstrumentId,
868        depth: Option<u32>,
869    ) -> Result<(), KrakenWsError> {
870        // Kraken v2 WebSocket expects ISO 4217-A3 format (e.g., "ETH/USD")
871        let symbol = instrument_id.symbol.inner();
872        let book_key = format!("book:{symbol}");
873
874        if !self.subscriptions.add_reference(&book_key) {
875            return Ok(());
876        }
877
878        self.subscriptions.mark_subscribe(&book_key);
879        self.subscriptions.confirm_subscribe(&book_key);
880
881        self.subscribe(KrakenWsChannel::Book, vec![symbol], depth)
882            .await
883    }
884
885    /// Subscribes to quote updates for the given instrument.
886    ///
887    /// Uses the order book channel with depth 10 for low-latency top-of-book quotes
888    /// instead of the throttled ticker feed.
889    pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> Result<(), KrakenWsError> {
890        let symbol = instrument_id.symbol.inner();
891        let quotes_key = format!("quotes:{symbol}");
892
893        if !self.subscriptions.add_reference(&quotes_key) {
894            return Ok(());
895        }
896
897        self.subscriptions.mark_subscribe(&quotes_key);
898        self.subscriptions.confirm_subscribe(&quotes_key);
899        self.ensure_book_subscribed(symbol).await
900    }
901
902    /// Subscribes to trade updates for the given instrument.
903    pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> Result<(), KrakenWsError> {
904        let symbol = instrument_id.symbol.inner();
905        self.subscribe(KrakenWsChannel::Trade, vec![symbol], None)
906            .await
907    }
908
909    /// Subscribes to bar/OHLC updates for the given bar type.
910    ///
911    /// # Errors
912    ///
913    /// Returns an error if the bar aggregation is not supported by Kraken.
914    pub async fn subscribe_bars(&self, bar_type: BarType) -> Result<(), KrakenWsError> {
915        let symbol = bar_type.instrument_id().symbol.inner();
916        let interval = bar_type_to_ws_interval(bar_type)?;
917        self.subscribe_with_interval(KrakenWsChannel::Ohlc, vec![symbol], interval)
918            .await
919    }
920
921    /// Subscribes to execution updates (order and fill events).
922    ///
923    /// Requires authentication - call `authenticate()` first.
924    pub async fn subscribe_executions(
925        &self,
926        snap_orders: bool,
927        snap_trades: bool,
928    ) -> Result<(), KrakenWsError> {
929        let req_id = self.get_next_req_id().await;
930
931        let token = self.auth_token.read().await.clone().ok_or_else(|| {
932            KrakenWsError::AuthenticationError(
933                "Authentication token required for executions channel. Call authenticate() first"
934                    .to_string(),
935            )
936        })?;
937
938        let request = KrakenWsRequest {
939            method: KrakenWsMethod::Subscribe,
940            params: Some(KrakenWsParams {
941                channel: KrakenWsChannel::Executions,
942                symbol: None,
943                snapshot: None,
944                depth: None,
945                interval: None,
946                token: Some(token),
947                snap_orders: Some(snap_orders),
948                snap_trades: Some(snap_trades),
949            }),
950            req_id: Some(req_id),
951        };
952
953        self.send_request(&request).await?;
954
955        let key = "executions";
956        if self.subscriptions.add_reference(key) {
957            self.subscriptions.mark_subscribe(key);
958            self.subscriptions.confirm_subscribe(key);
959        }
960
961        Ok(())
962    }
963
964    /// Unsubscribes from order book updates for the given instrument.
965    ///
966    /// Note: Will only actually unsubscribe if quotes are not also subscribed.
967    pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> Result<(), KrakenWsError> {
968        let symbol = instrument_id.symbol.inner();
969        let book_key = format!("book:{symbol}");
970
971        if !self.subscriptions.remove_reference(&book_key) {
972            return Ok(());
973        }
974
975        self.subscriptions.mark_unsubscribe(&book_key);
976        self.subscriptions.confirm_unsubscribe(&book_key);
977        self.maybe_unsubscribe_book(symbol).await
978    }
979
980    /// Unsubscribes from quote updates for the given instrument.
981    pub async fn unsubscribe_quotes(
982        &self,
983        instrument_id: InstrumentId,
984    ) -> Result<(), KrakenWsError> {
985        let symbol = instrument_id.symbol.inner();
986        let quotes_key = format!("quotes:{symbol}");
987
988        if !self.subscriptions.remove_reference(&quotes_key) {
989            return Ok(());
990        }
991
992        self.subscriptions.mark_unsubscribe(&quotes_key);
993        self.subscriptions.confirm_unsubscribe(&quotes_key);
994        self.maybe_unsubscribe_book(symbol).await
995    }
996
997    /// Unsubscribes from trade updates for the given instrument.
998    pub async fn unsubscribe_trades(
999        &self,
1000        instrument_id: InstrumentId,
1001    ) -> Result<(), KrakenWsError> {
1002        let symbol = instrument_id.symbol.inner();
1003        self.unsubscribe(KrakenWsChannel::Trade, vec![symbol]).await
1004    }
1005
1006    /// Unsubscribes from bar/OHLC updates for the given bar type.
1007    ///
1008    /// # Errors
1009    ///
1010    /// Returns an error if the bar aggregation is not supported by Kraken.
1011    pub async fn unsubscribe_bars(&self, bar_type: BarType) -> Result<(), KrakenWsError> {
1012        let symbol = bar_type.instrument_id().symbol.inner();
1013        let interval = bar_type_to_ws_interval(bar_type)?;
1014        self.unsubscribe_with_interval(KrakenWsChannel::Ohlc, vec![symbol], interval)
1015            .await
1016    }
1017
1018    /// Ensures book channel is subscribed for the given symbol (used internally by quotes).
1019    ///
1020    /// Reference counting is handled by `subscribe` method.
1021    async fn ensure_book_subscribed(&self, symbol: Ustr) -> Result<(), KrakenWsError> {
1022        self.subscribe(KrakenWsChannel::Book, vec![symbol], Some(10))
1023            .await
1024    }
1025
1026    /// Unsubscribes from book channel if no more dependent subscriptions.
1027    ///
1028    /// Reference counting is handled by `unsubscribe` method.
1029    async fn maybe_unsubscribe_book(&self, symbol: Ustr) -> Result<(), KrakenWsError> {
1030        self.unsubscribe(KrakenWsChannel::Book, vec![symbol]).await
1031    }
1032}
1033
1034/// Helper function to refresh authentication token via HTTP API.
1035async fn refresh_auth_token(config: &KrakenDataClientConfig) -> Result<String, KrakenWsError> {
1036    let api_key = config
1037        .api_key
1038        .clone()
1039        .ok_or_else(|| KrakenWsError::AuthenticationError("Missing API key".to_string()))?;
1040    let api_secret = config
1041        .api_secret
1042        .clone()
1043        .ok_or_else(|| KrakenWsError::AuthenticationError("Missing API secret".to_string()))?;
1044
1045    let http_client = KrakenSpotHttpClient::with_credentials(
1046        api_key,
1047        api_secret,
1048        config.environment,
1049        Some(config.http_base_url()),
1050        config.timeout_secs,
1051        None,
1052        None,
1053        None,
1054        config.http_proxy.clone(),
1055        config.max_requests_per_second,
1056    )
1057    .map_err(|e| {
1058        KrakenWsError::AuthenticationError(format!("Failed to create HTTP client: {e}"))
1059    })?;
1060
1061    let ws_token = http_client.get_websockets_token().await.map_err(|e| {
1062        KrakenWsError::AuthenticationError(format!("Failed to get WebSocket token: {e}"))
1063    })?;
1064
1065    tracing::debug!(
1066        token_length = ws_token.token.len(),
1067        expires = ws_token.expires,
1068        "WebSocket authentication token refreshed"
1069    );
1070
1071    Ok(ws_token.token)
1072}
1073
1074/// Converts a Nautilus BarType to Kraken WebSocket OHLC interval (in minutes).
1075///
1076/// Supported intervals: 1, 5, 15, 30, 60, 240, 1440, 10080, 21600
1077/// (1m, 5m, 15m, 30m, 1h, 4h, 1d, 1w, 2w).
1078fn bar_type_to_ws_interval(bar_type: BarType) -> Result<u32, KrakenWsError> {
1079    let spec = bar_type.spec();
1080    let step = spec.step.get() as u32;
1081
1082    let base_minutes = match spec.aggregation {
1083        BarAggregation::Minute => 1,
1084        BarAggregation::Hour => 60,
1085        BarAggregation::Day => 1440,
1086        BarAggregation::Week => 10080,
1087        other => {
1088            return Err(KrakenWsError::SubscriptionError(format!(
1089                "Unsupported bar aggregation for Kraken OHLC streaming: {other:?}"
1090            )));
1091        }
1092    };
1093
1094    let interval = base_minutes * step;
1095
1096    const VALID_INTERVALS: [u32; 9] = [1, 5, 15, 30, 60, 240, 1440, 10080, 21600];
1097    if !VALID_INTERVALS.contains(&interval) {
1098        return Err(KrakenWsError::SubscriptionError(format!(
1099            "Invalid bar interval {interval} minutes for Kraken OHLC streaming. \
1100             Supported intervals: 1, 5, 15, 30, 60, 240, 1440, 10080, 21600"
1101        )));
1102    }
1103
1104    Ok(interval)
1105}