Skip to main content

nautilus_kraken/websocket/futures/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket client for the Kraken Futures v1 streaming API.
17
18use std::sync::{
19    Arc,
20    atomic::{AtomicBool, AtomicU8, Ordering},
21};
22
23use arc_swap::ArcSwap;
24use nautilus_common::live::get_runtime;
25use nautilus_model::{
26    identifiers::{
27        AccountId, ClientOrderId, InstrumentId, StrategyId, Symbol, TraderId, VenueOrderId,
28    },
29    instruments::InstrumentAny,
30};
31use nautilus_network::{
32    mode::ConnectionMode,
33    websocket::{
34        AuthTracker, SubscriptionState, WebSocketClient, WebSocketConfig, channel_message_handler,
35    },
36};
37use tokio_util::sync::CancellationToken;
38
39use super::{
40    handler::{FuturesFeedHandler, HandlerCommand},
41    messages::{KrakenFuturesFeed, KrakenFuturesWsMessage},
42};
43use crate::{common::credential::KrakenCredential, websocket::error::KrakenWsError};
44
45/// Topic delimiter for Kraken Futures WebSocket subscriptions.
46///
47/// Topics use colon format: `feed:symbol` (e.g., `trades:PF_ETHUSD`).
48pub const KRAKEN_FUTURES_WS_TOPIC_DELIMITER: char = ':';
49
50/// WebSocket client for the Kraken Futures v1 streaming API.
51#[derive(Debug)]
52#[cfg_attr(
53    feature = "python",
54    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.kraken")
55)]
56pub struct KrakenFuturesWebSocketClient {
57    url: String,
58    heartbeat_secs: Option<u64>,
59    signal: Arc<AtomicBool>,
60    connection_mode: Arc<ArcSwap<AtomicU8>>,
61    cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
62    out_rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<KrakenFuturesWsMessage>>>,
63    task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
64    subscriptions: SubscriptionState,
65    auth_tracker: AuthTracker,
66    cancellation_token: CancellationToken,
67    credential: Option<KrakenCredential>,
68    original_challenge: Arc<tokio::sync::RwLock<Option<String>>>,
69    signed_challenge: Arc<tokio::sync::RwLock<Option<String>>>,
70}
71
72impl Clone for KrakenFuturesWebSocketClient {
73    fn clone(&self) -> Self {
74        Self {
75            url: self.url.clone(),
76            heartbeat_secs: self.heartbeat_secs,
77            signal: Arc::clone(&self.signal),
78            connection_mode: Arc::clone(&self.connection_mode),
79            cmd_tx: Arc::clone(&self.cmd_tx),
80            out_rx: self.out_rx.clone(),
81            task_handle: self.task_handle.clone(),
82            subscriptions: self.subscriptions.clone(),
83            auth_tracker: self.auth_tracker.clone(),
84            cancellation_token: self.cancellation_token.clone(),
85            credential: self.credential.clone(),
86            original_challenge: Arc::clone(&self.original_challenge),
87            signed_challenge: Arc::clone(&self.signed_challenge),
88        }
89    }
90}
91
92impl KrakenFuturesWebSocketClient {
93    /// Creates a new client with the given URL.
94    #[must_use]
95    pub fn new(url: String, heartbeat_secs: Option<u64>) -> Self {
96        Self::with_credentials(url, heartbeat_secs, None)
97    }
98
99    /// Creates a new client with API credentials for authenticated feeds.
100    #[must_use]
101    pub fn with_credentials(
102        url: String,
103        heartbeat_secs: Option<u64>,
104        credential: Option<KrakenCredential>,
105    ) -> Self {
106        let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
107        let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
108        let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
109
110        Self {
111            url,
112            heartbeat_secs,
113            signal: Arc::new(AtomicBool::new(false)),
114            connection_mode,
115            cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
116            out_rx: None,
117            task_handle: None,
118            subscriptions: SubscriptionState::new(KRAKEN_FUTURES_WS_TOPIC_DELIMITER),
119            auth_tracker: AuthTracker::new(),
120            cancellation_token: CancellationToken::new(),
121            credential,
122            original_challenge: Arc::new(tokio::sync::RwLock::new(None)),
123            signed_challenge: Arc::new(tokio::sync::RwLock::new(None)),
124        }
125    }
126
127    /// Returns true if the client has API credentials set.
128    #[must_use]
129    pub fn has_credentials(&self) -> bool {
130        self.credential.is_some()
131    }
132
133    /// Returns the WebSocket URL.
134    #[must_use]
135    pub fn url(&self) -> &str {
136        &self.url
137    }
138
139    /// Returns true if the connection is closed.
140    #[must_use]
141    pub fn is_closed(&self) -> bool {
142        ConnectionMode::from_u8(self.connection_mode.load().load(Ordering::Relaxed))
143            == ConnectionMode::Closed
144    }
145
146    /// Returns true if the connection is active.
147    #[must_use]
148    pub fn is_active(&self) -> bool {
149        ConnectionMode::from_u8(self.connection_mode.load().load(Ordering::Relaxed))
150            == ConnectionMode::Active
151    }
152
153    /// Waits until the WebSocket connection is active or timeout.
154    pub async fn wait_until_active(&self, timeout_secs: f64) -> Result<(), KrakenWsError> {
155        let timeout = tokio::time::Duration::from_secs_f64(timeout_secs);
156
157        tokio::time::timeout(timeout, async {
158            while !self.is_active() {
159                tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
160            }
161        })
162        .await
163        .map_err(|_| {
164            KrakenWsError::ConnectionError(format!(
165                "WebSocket connection timeout after {timeout_secs} seconds"
166            ))
167        })?;
168
169        Ok(())
170    }
171
172    /// Authenticates the WebSocket connection for private feeds.
173    ///
174    /// This sends a challenge request, waits for the response, signs it,
175    /// and stores the credentials for use in private subscriptions.
176    pub async fn authenticate(&self) -> Result<(), KrakenWsError> {
177        let credential = self.credential.as_ref().ok_or_else(|| {
178            KrakenWsError::AuthenticationError("API credentials required".to_string())
179        })?;
180
181        let api_key = credential.api_key().to_string();
182        let (tx, rx) = tokio::sync::oneshot::channel();
183
184        self.cmd_tx
185            .read()
186            .await
187            .send(HandlerCommand::RequestChallenge {
188                api_key: api_key.clone(),
189                response_tx: tx,
190            })
191            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
192
193        let challenge = tokio::time::timeout(tokio::time::Duration::from_secs(10), rx)
194            .await
195            .map_err(|_| {
196                KrakenWsError::AuthenticationError("Timeout waiting for challenge".to_string())
197            })?
198            .map_err(|_| {
199                KrakenWsError::AuthenticationError("Challenge channel closed".to_string())
200            })?;
201
202        let signed_challenge = credential.sign_ws_challenge(&challenge).map_err(|e| {
203            KrakenWsError::AuthenticationError(format!("Failed to sign challenge: {e}"))
204        })?;
205
206        *self.original_challenge.write().await = Some(challenge.clone());
207        *self.signed_challenge.write().await = Some(signed_challenge.clone());
208
209        self.cmd_tx
210            .read()
211            .await
212            .send(HandlerCommand::SetAuthCredentials {
213                api_key,
214                original_challenge: challenge,
215                signed_challenge,
216            })
217            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
218
219        log::debug!("Futures WebSocket authentication successful");
220        Ok(())
221    }
222
223    /// Caches instruments for price precision lookup (bulk replace).
224    ///
225    /// Must be called after `connect()` when the handler is ready to receive commands.
226    pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
227        if let Ok(tx) = self.cmd_tx.try_read()
228            && let Err(e) = tx.send(HandlerCommand::InitializeInstruments(instruments))
229        {
230            log::debug!("Failed to send instruments to handler: {e}");
231        }
232    }
233
234    /// Caches a single instrument for price precision lookup (upsert).
235    ///
236    /// Must be called after `connect()` when the handler is ready to receive commands.
237    pub fn cache_instrument(&self, instrument: InstrumentAny) {
238        if let Ok(tx) = self.cmd_tx.try_read()
239            && let Err(e) = tx.send(HandlerCommand::UpdateInstrument(instrument))
240        {
241            log::debug!("Failed to send instrument update to handler: {e}");
242        }
243    }
244
245    /// Connects to the WebSocket server.
246    pub async fn connect(&mut self) -> Result<(), KrakenWsError> {
247        log::debug!("Connecting to Futures WebSocket: {}", self.url);
248
249        self.signal.store(false, Ordering::Relaxed);
250
251        let (raw_handler, raw_rx) = channel_message_handler();
252
253        let ws_config = WebSocketConfig {
254            url: self.url.clone(),
255            headers: vec![],
256            heartbeat: self.heartbeat_secs,
257            heartbeat_msg: None, // Use WebSocket ping frames, not text messages
258            reconnect_timeout_ms: Some(5_000),
259            reconnect_delay_initial_ms: Some(500),
260            reconnect_delay_max_ms: Some(5_000),
261            reconnect_backoff_factor: Some(1.5),
262            reconnect_jitter_ms: Some(250),
263            reconnect_max_attempts: None,
264        };
265
266        let ws_client =
267            WebSocketClient::connect(ws_config, Some(raw_handler), None, None, vec![], None)
268                .await
269                .map_err(|e| KrakenWsError::ConnectionError(e.to_string()))?;
270
271        self.connection_mode
272            .store(ws_client.connection_mode_atomic());
273
274        let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<KrakenFuturesWsMessage>();
275        self.out_rx = Some(Arc::new(out_rx));
276
277        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
278        *self.cmd_tx.write().await = cmd_tx.clone();
279
280        if let Err(e) = cmd_tx.send(HandlerCommand::SetClient(ws_client)) {
281            return Err(KrakenWsError::ConnectionError(format!(
282                "Failed to send WebSocketClient to handler: {e}"
283            )));
284        }
285
286        let signal = self.signal.clone();
287        let subscriptions = self.subscriptions.clone();
288        let cmd_tx_for_reconnect = cmd_tx.clone();
289        let credential_for_reconnect = self.credential.clone();
290
291        let stream_handle = get_runtime().spawn(async move {
292            let mut handler =
293                FuturesFeedHandler::new(signal.clone(), cmd_rx, raw_rx, subscriptions.clone());
294
295            loop {
296                match handler.next().await {
297                    Some(KrakenFuturesWsMessage::Reconnected) => {
298                        if signal.load(Ordering::Relaxed) {
299                            continue;
300                        }
301                        log::info!("WebSocket reconnected, resubscribing");
302
303                        // Mark all confirmed as failed to transition to pending for replay
304                        let confirmed_topics = subscriptions.all_topics();
305                        for topic in &confirmed_topics {
306                            subscriptions.mark_failure(topic);
307                        }
308
309                        let topics = subscriptions.all_topics();
310                        if topics.is_empty() {
311                            log::debug!("No subscriptions to restore after reconnection");
312                        } else {
313                            // Check if we have private subscriptions that need re-authentication
314                            let has_private_subs = topics.iter().any(|t| {
315                                t == "open_orders"
316                                    || t == "fills"
317                                    || t.starts_with("open_orders:")
318                                    || t.starts_with("fills:")
319                            });
320
321                            if has_private_subs {
322                                if let Some(ref cred) = credential_for_reconnect {
323                                    // Request fresh challenge for the new connection
324                                    let (tx, rx) = tokio::sync::oneshot::channel();
325                                    if let Err(e) = cmd_tx_for_reconnect.send(
326                                        HandlerCommand::RequestChallenge {
327                                            api_key: cred.api_key().to_string(),
328                                            response_tx: tx,
329                                        },
330                                    ) {
331                                        log::error!(
332                                            "Failed to request challenge for reconnect: {e}"
333                                        );
334                                    } else {
335                                        match tokio::time::timeout(
336                                            tokio::time::Duration::from_secs(10),
337                                            rx,
338                                        )
339                                        .await
340                                        {
341                                            Ok(Ok(challenge)) => {
342                                                match cred.sign_ws_challenge(&challenge) {
343                                                    Ok(signed) => {
344                                                        if let Err(e) = cmd_tx_for_reconnect.send(
345                                                            HandlerCommand::SetAuthCredentials {
346                                                                api_key: cred.api_key().to_string(),
347                                                                original_challenge: challenge,
348                                                                signed_challenge: signed,
349                                                            },
350                                                        ) {
351                                                            log::error!(
352                                                                "Failed to set auth credentials: {e}"
353                                                            );
354                                                        } else {
355                                                            log::debug!(
356                                                                "Re-authenticated after reconnect"
357                                                            );
358                                                        }
359                                                    }
360                                                    Err(e) => {
361                                                        log::error!(
362                                                            "Failed to sign challenge for reconnect: {e}"
363                                                        );
364                                                    }
365                                                }
366                                            }
367                                            Ok(Err(_)) => {
368                                                log::error!(
369                                                    "Challenge channel closed during reconnect"
370                                                );
371                                            }
372                                            Err(_) => {
373                                                log::error!(
374                                                    "Timeout waiting for challenge during reconnect"
375                                                );
376                                            }
377                                        }
378                                    }
379                                } else {
380                                    log::warn!(
381                                        "Private subscriptions exist but no credentials available"
382                                    );
383                                }
384                            }
385
386                            log::info!(
387                                "Resubscribing after reconnection: count={}",
388                                topics.len()
389                            );
390
391                            for topic in &topics {
392                                let cmd =
393                                    if let Some((feed_str, symbol_str)) = topic.split_once(':') {
394                                        let symbol = Symbol::from(symbol_str);
395                                        match feed_str.parse::<KrakenFuturesFeed>() {
396                                            Ok(KrakenFuturesFeed::Trade) => {
397                                                Some(HandlerCommand::SubscribeTrade(symbol))
398                                            }
399                                            Ok(KrakenFuturesFeed::Book) => {
400                                                Some(HandlerCommand::SubscribeBook(symbol))
401                                            }
402                                            Ok(KrakenFuturesFeed::Ticker) => {
403                                                Some(HandlerCommand::SubscribeTicker(symbol))
404                                            }
405                                            Ok(KrakenFuturesFeed::OpenOrders) => {
406                                                Some(HandlerCommand::SubscribeOpenOrders)
407                                            }
408                                            Ok(KrakenFuturesFeed::Fills) => {
409                                                Some(HandlerCommand::SubscribeFills)
410                                            }
411                                            Ok(_) | Err(_) => None,
412                                        }
413                                    } else {
414                                        match topic.parse::<KrakenFuturesFeed>() {
415                                            Ok(KrakenFuturesFeed::OpenOrders) => {
416                                                Some(HandlerCommand::SubscribeOpenOrders)
417                                            }
418                                            Ok(KrakenFuturesFeed::Fills) => {
419                                                Some(HandlerCommand::SubscribeFills)
420                                            }
421                                            Ok(_) | Err(_) => None,
422                                        }
423                                    };
424
425                                if let Some(cmd) = cmd
426                                    && let Err(e) = cmd_tx_for_reconnect.send(cmd)
427                                {
428                                    log::error!(
429                                        "Failed to send resubscribe command: error={e}, \
430                                        topic={topic}"
431                                    );
432                                }
433
434                                subscriptions.mark_subscribe(topic);
435                            }
436                        }
437
438                        if let Err(e) = out_tx.send(KrakenFuturesWsMessage::Reconnected) {
439                            log::debug!("Output channel closed: {e}");
440                            break;
441                        }
442                        continue;
443                    }
444                    Some(msg) => {
445                        if let Err(e) = out_tx.send(msg) {
446                            log::debug!("Output channel closed: {e}");
447                            break;
448                        }
449                    }
450                    None => {
451                        log::debug!("Handler stream ended");
452                        break;
453                    }
454                }
455            }
456
457            log::debug!("Futures handler task exiting");
458        });
459
460        self.task_handle = Some(Arc::new(stream_handle));
461
462        log::debug!("Futures WebSocket connected successfully");
463        Ok(())
464    }
465
466    /// Disconnects from the WebSocket server.
467    pub async fn disconnect(&mut self) -> Result<(), KrakenWsError> {
468        log::debug!("Disconnecting Futures WebSocket");
469
470        self.signal.store(true, Ordering::Relaxed);
471
472        if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Disconnect) {
473            log::debug!(
474                "Failed to send disconnect command (handler may already be shut down): {e}"
475            );
476        }
477
478        if let Some(task_handle) = self.task_handle.take() {
479            match Arc::try_unwrap(task_handle) {
480                Ok(handle) => {
481                    match tokio::time::timeout(tokio::time::Duration::from_secs(2), handle).await {
482                        Ok(Ok(())) => log::debug!("Task handle completed successfully"),
483                        Ok(Err(e)) => log::error!("Task handle encountered an error: {e:?}"),
484                        Err(_) => {
485                            log::warn!("Timeout waiting for task handle");
486                        }
487                    }
488                }
489                Err(arc_handle) => {
490                    log::debug!("Cannot take ownership of task handle, aborting");
491                    arc_handle.abort();
492                }
493            }
494        }
495
496        self.subscriptions.clear();
497        self.auth_tracker.fail("Disconnected");
498        Ok(())
499    }
500
501    /// Closes the WebSocket connection.
502    pub async fn close(&mut self) -> Result<(), KrakenWsError> {
503        self.disconnect().await
504    }
505
506    /// Subscribes to mark price updates for the given instrument.
507    pub async fn subscribe_mark_price(
508        &self,
509        instrument_id: InstrumentId,
510    ) -> Result<(), KrakenWsError> {
511        let symbol = instrument_id.symbol;
512        let key = format!("mark:{symbol}");
513
514        if !self.subscriptions.add_reference(&key) {
515            return Ok(());
516        }
517
518        self.subscriptions.mark_subscribe(&key);
519        self.subscriptions.confirm_subscribe(&key);
520        self.ensure_ticker_subscribed(symbol).await
521    }
522
523    /// Unsubscribes from mark price updates for the given instrument.
524    pub async fn unsubscribe_mark_price(
525        &self,
526        instrument_id: InstrumentId,
527    ) -> Result<(), KrakenWsError> {
528        let symbol = instrument_id.symbol;
529        let key = format!("mark:{symbol}");
530
531        if !self.subscriptions.remove_reference(&key) {
532            return Ok(());
533        }
534
535        self.subscriptions.mark_unsubscribe(&key);
536        self.subscriptions.confirm_unsubscribe(&key);
537        self.maybe_unsubscribe_ticker(symbol).await
538    }
539
540    /// Subscribes to index price updates for the given instrument.
541    pub async fn subscribe_index_price(
542        &self,
543        instrument_id: InstrumentId,
544    ) -> Result<(), KrakenWsError> {
545        let symbol = instrument_id.symbol;
546        let key = format!("index:{symbol}");
547
548        if !self.subscriptions.add_reference(&key) {
549            return Ok(());
550        }
551
552        self.subscriptions.mark_subscribe(&key);
553        self.subscriptions.confirm_subscribe(&key);
554        self.ensure_ticker_subscribed(symbol).await
555    }
556
557    /// Unsubscribes from index price updates for the given instrument.
558    pub async fn unsubscribe_index_price(
559        &self,
560        instrument_id: InstrumentId,
561    ) -> Result<(), KrakenWsError> {
562        let symbol = instrument_id.symbol;
563        let key = format!("index:{symbol}");
564
565        if !self.subscriptions.remove_reference(&key) {
566            return Ok(());
567        }
568
569        self.subscriptions.mark_unsubscribe(&key);
570        self.subscriptions.confirm_unsubscribe(&key);
571        self.maybe_unsubscribe_ticker(symbol).await
572    }
573
574    /// Subscribes to funding rate updates for the given instrument.
575    pub async fn subscribe_funding_rate(
576        &self,
577        instrument_id: InstrumentId,
578    ) -> Result<(), KrakenWsError> {
579        let symbol = instrument_id.symbol;
580        let key = format!("funding:{symbol}");
581
582        if !self.subscriptions.add_reference(&key) {
583            return Ok(());
584        }
585
586        self.subscriptions.mark_subscribe(&key);
587        self.subscriptions.confirm_subscribe(&key);
588        self.ensure_ticker_subscribed(symbol).await
589    }
590
591    /// Unsubscribes from funding rate updates for the given instrument.
592    pub async fn unsubscribe_funding_rate(
593        &self,
594        instrument_id: InstrumentId,
595    ) -> Result<(), KrakenWsError> {
596        let symbol = instrument_id.symbol;
597        let key = format!("funding:{symbol}");
598
599        if !self.subscriptions.remove_reference(&key) {
600            return Ok(());
601        }
602
603        self.subscriptions.mark_unsubscribe(&key);
604        self.subscriptions.confirm_unsubscribe(&key);
605        self.maybe_unsubscribe_ticker(symbol).await
606    }
607
608    /// Subscribes to quote updates for the given instrument.
609    ///
610    /// Uses the order book channel for low-latency top-of-book quotes.
611    pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> Result<(), KrakenWsError> {
612        let symbol = instrument_id.symbol;
613        let key = format!("quotes:{symbol}");
614
615        if !self.subscriptions.add_reference(&key) {
616            return Ok(());
617        }
618
619        self.subscriptions.mark_subscribe(&key);
620        self.subscriptions.confirm_subscribe(&key);
621
622        // Use book feed for low-latency quotes (not throttled ticker)
623        self.ensure_book_subscribed(symbol).await
624    }
625
626    /// Unsubscribes from quote updates for the given instrument.
627    pub async fn unsubscribe_quotes(
628        &self,
629        instrument_id: InstrumentId,
630    ) -> Result<(), KrakenWsError> {
631        let symbol = instrument_id.symbol;
632        let key = format!("quotes:{symbol}");
633
634        if !self.subscriptions.remove_reference(&key) {
635            return Ok(());
636        }
637
638        self.subscriptions.mark_unsubscribe(&key);
639        self.subscriptions.confirm_unsubscribe(&key);
640        self.maybe_unsubscribe_book(symbol).await
641    }
642
643    /// Subscribes to trade updates for the given instrument.
644    pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> Result<(), KrakenWsError> {
645        let symbol = instrument_id.symbol;
646        let key = format!("trades:{symbol}");
647
648        if !self.subscriptions.add_reference(&key) {
649            return Ok(());
650        }
651
652        self.subscriptions.mark_subscribe(&key);
653
654        self.cmd_tx
655            .read()
656            .await
657            .send(HandlerCommand::SubscribeTrade(symbol))
658            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
659
660        self.subscriptions.confirm_subscribe(&key);
661        Ok(())
662    }
663
664    /// Unsubscribes from trade updates for the given instrument.
665    pub async fn unsubscribe_trades(
666        &self,
667        instrument_id: InstrumentId,
668    ) -> Result<(), KrakenWsError> {
669        let symbol = instrument_id.symbol;
670        let key = format!("trades:{symbol}");
671
672        if !self.subscriptions.remove_reference(&key) {
673            return Ok(());
674        }
675
676        self.subscriptions.mark_unsubscribe(&key);
677
678        self.cmd_tx
679            .read()
680            .await
681            .send(HandlerCommand::UnsubscribeTrade(symbol))
682            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
683
684        self.subscriptions.confirm_unsubscribe(&key);
685        Ok(())
686    }
687
688    /// Subscribes to order book updates for the given instrument.
689    ///
690    /// Note: The `depth` parameter is accepted for API compatibility with spot client but is
691    /// not used by Kraken Futures (full book is always returned).
692    pub async fn subscribe_book(
693        &self,
694        instrument_id: InstrumentId,
695        _depth: Option<u32>,
696    ) -> Result<(), KrakenWsError> {
697        let symbol = instrument_id.symbol;
698        let key = format!("book:{symbol}");
699
700        if !self.subscriptions.add_reference(&key) {
701            return Ok(());
702        }
703
704        self.subscriptions.mark_subscribe(&key);
705
706        self.cmd_tx
707            .read()
708            .await
709            .send(HandlerCommand::SubscribeBook(symbol))
710            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
711
712        self.subscriptions.confirm_subscribe(&key);
713        Ok(())
714    }
715
716    /// Unsubscribes from order book updates for the given instrument.
717    pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> Result<(), KrakenWsError> {
718        let symbol = instrument_id.symbol;
719        let key = format!("book:{symbol}");
720
721        if !self.subscriptions.remove_reference(&key) {
722            return Ok(());
723        }
724
725        self.subscriptions.mark_unsubscribe(&key);
726
727        self.cmd_tx
728            .read()
729            .await
730            .send(HandlerCommand::UnsubscribeBook(symbol))
731            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
732
733        self.subscriptions.confirm_unsubscribe(&key);
734        Ok(())
735    }
736
737    /// Ensures ticker feed is subscribed for the given symbol.
738    async fn ensure_ticker_subscribed(&self, symbol: Symbol) -> Result<(), KrakenWsError> {
739        let ticker_key = format!("ticker:{symbol}");
740
741        if !self.subscriptions.add_reference(&ticker_key) {
742            return Ok(());
743        }
744
745        self.subscriptions.mark_subscribe(&ticker_key);
746        self.cmd_tx
747            .read()
748            .await
749            .send(HandlerCommand::SubscribeTicker(symbol))
750            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
751        self.subscriptions.confirm_subscribe(&ticker_key);
752        Ok(())
753    }
754
755    /// Unsubscribes from ticker if no more dependent subscriptions.
756    async fn maybe_unsubscribe_ticker(&self, symbol: Symbol) -> Result<(), KrakenWsError> {
757        let ticker_key = format!("ticker:{symbol}");
758
759        if !self.subscriptions.remove_reference(&ticker_key) {
760            return Ok(());
761        }
762
763        self.subscriptions.mark_unsubscribe(&ticker_key);
764        self.cmd_tx
765            .read()
766            .await
767            .send(HandlerCommand::UnsubscribeTicker(symbol))
768            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
769        self.subscriptions.confirm_unsubscribe(&ticker_key);
770        Ok(())
771    }
772
773    /// Ensures book feed is subscribed for the given symbol (for quotes).
774    async fn ensure_book_subscribed(&self, symbol: Symbol) -> Result<(), KrakenWsError> {
775        let book_key = format!("book:{symbol}");
776
777        if !self.subscriptions.add_reference(&book_key) {
778            return Ok(());
779        }
780
781        self.subscriptions.mark_subscribe(&book_key);
782        self.cmd_tx
783            .read()
784            .await
785            .send(HandlerCommand::SubscribeBook(symbol))
786            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
787        self.subscriptions.confirm_subscribe(&book_key);
788        Ok(())
789    }
790
791    /// Unsubscribes from book if no more dependent subscriptions.
792    async fn maybe_unsubscribe_book(&self, symbol: Symbol) -> Result<(), KrakenWsError> {
793        let book_key = format!("book:{symbol}");
794
795        if !self.subscriptions.remove_reference(&book_key) {
796            return Ok(());
797        }
798
799        self.subscriptions.mark_unsubscribe(&book_key);
800        self.cmd_tx
801            .read()
802            .await
803            .send(HandlerCommand::UnsubscribeBook(symbol))
804            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
805        self.subscriptions.confirm_unsubscribe(&book_key);
806        Ok(())
807    }
808
809    /// Gets the output receiver for processed messages.
810    pub fn take_output_rx(
811        &mut self,
812    ) -> Option<tokio::sync::mpsc::UnboundedReceiver<KrakenFuturesWsMessage>> {
813        self.out_rx.take().and_then(|arc| Arc::try_unwrap(arc).ok())
814    }
815
816    /// Sets the account ID for execution reports.
817    ///
818    /// Must be called before subscribing to execution feeds to properly generate
819    /// OrderStatusReport and FillReport objects.
820    pub fn set_account_id(&self, account_id: AccountId) {
821        if let Ok(tx) = self.cmd_tx.try_read()
822            && let Err(e) = tx.send(HandlerCommand::SetAccountId(account_id))
823        {
824            log::debug!("Failed to send account_id to handler: {e}");
825        }
826    }
827
828    /// Caches a client order ID mapping for order tracking.
829    ///
830    /// This caches the trader_id, strategy_id, and instrument_id for an order,
831    /// allowing the handler to emit proper order events with correct identifiers
832    /// when WebSocket messages arrive.
833    pub fn cache_client_order(
834        &self,
835        client_order_id: ClientOrderId,
836        venue_order_id: Option<VenueOrderId>,
837        instrument_id: InstrumentId,
838        trader_id: TraderId,
839        strategy_id: StrategyId,
840    ) {
841        if let Ok(tx) = self.cmd_tx.try_read()
842            && let Err(e) = tx.send(HandlerCommand::CacheClientOrder {
843                client_order_id,
844                venue_order_id,
845                instrument_id,
846                trader_id,
847                strategy_id,
848            })
849        {
850            log::debug!("Failed to cache client order: {e}");
851        }
852    }
853
854    /// Requests a challenge from the WebSocket for authentication.
855    ///
856    /// After calling this, listen for the challenge response message and then
857    /// call `authenticate_with_challenge()` to complete authentication.
858    pub async fn request_challenge(&self) -> Result<(), KrakenWsError> {
859        let credential = self.credential.as_ref().ok_or_else(|| {
860            KrakenWsError::AuthenticationError(
861                "API credentials required for authentication".to_string(),
862            )
863        })?;
864
865        // TODO: Send via WebSocket client when we have direct access
866        // For now, the Python layer will handle the challenge request/response flow
867        log::debug!(
868            "Challenge request prepared for API key: {}",
869            credential.api_key_masked()
870        );
871
872        Ok(())
873    }
874
875    /// Set authentication credentials directly (for when challenge is obtained externally).
876    pub async fn set_auth_credentials(
877        &self,
878        original_challenge: String,
879        signed_challenge: String,
880    ) -> Result<(), KrakenWsError> {
881        let credential = self.credential.as_ref().ok_or_else(|| {
882            KrakenWsError::AuthenticationError("API credentials required".to_string())
883        })?;
884
885        *self.original_challenge.write().await = Some(original_challenge.clone());
886        *self.signed_challenge.write().await = Some(signed_challenge.clone());
887
888        self.cmd_tx
889            .read()
890            .await
891            .send(HandlerCommand::SetAuthCredentials {
892                api_key: credential.api_key().to_string(),
893                original_challenge,
894                signed_challenge,
895            })
896            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
897
898        Ok(())
899    }
900
901    /// Sign a challenge with the API credentials.
902    ///
903    /// Returns the signed challenge on success.
904    pub fn sign_challenge(&self, challenge: &str) -> Result<String, KrakenWsError> {
905        let credential = self.credential.as_ref().ok_or_else(|| {
906            KrakenWsError::AuthenticationError("API credentials required".to_string())
907        })?;
908
909        credential.sign_ws_challenge(challenge).map_err(|e| {
910            KrakenWsError::AuthenticationError(format!("Failed to sign challenge: {e}"))
911        })
912    }
913
914    /// Complete authentication with a received challenge.
915    pub async fn authenticate_with_challenge(&self, challenge: &str) -> Result<(), KrakenWsError> {
916        let credential = self.credential.as_ref().ok_or_else(|| {
917            KrakenWsError::AuthenticationError("API credentials required".to_string())
918        })?;
919
920        let signed_challenge = credential.sign_ws_challenge(challenge).map_err(|e| {
921            KrakenWsError::AuthenticationError(format!("Failed to sign challenge: {e}"))
922        })?;
923
924        self.set_auth_credentials(challenge.to_string(), signed_challenge)
925            .await
926    }
927
928    /// Subscribes to open orders feed (private, requires authentication).
929    pub async fn subscribe_open_orders(&self) -> Result<(), KrakenWsError> {
930        if self.original_challenge.read().await.is_none() {
931            return Err(KrakenWsError::AuthenticationError(
932                "Must authenticate before subscribing to private feeds".to_string(),
933            ));
934        }
935
936        let key = "open_orders";
937        if !self.subscriptions.add_reference(key) {
938            return Ok(());
939        }
940
941        self.subscriptions.mark_subscribe(key);
942
943        self.cmd_tx
944            .read()
945            .await
946            .send(HandlerCommand::SubscribeOpenOrders)
947            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
948
949        self.subscriptions.confirm_subscribe(key);
950        Ok(())
951    }
952
953    /// Subscribes to fills feed (private, requires authentication).
954    pub async fn subscribe_fills(&self) -> Result<(), KrakenWsError> {
955        if self.original_challenge.read().await.is_none() {
956            return Err(KrakenWsError::AuthenticationError(
957                "Must authenticate before subscribing to private feeds".to_string(),
958            ));
959        }
960
961        let key = "fills";
962        if !self.subscriptions.add_reference(key) {
963            return Ok(());
964        }
965
966        self.subscriptions.mark_subscribe(key);
967
968        self.cmd_tx
969            .read()
970            .await
971            .send(HandlerCommand::SubscribeFills)
972            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
973
974        self.subscriptions.confirm_subscribe(key);
975        Ok(())
976    }
977
978    /// Subscribes to both open orders and fills (convenience method).
979    pub async fn subscribe_executions(&self) -> Result<(), KrakenWsError> {
980        self.subscribe_open_orders().await?;
981        self.subscribe_fills().await?;
982        Ok(())
983    }
984}