nautilus_kraken/websocket/futures/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket client for the Kraken Futures v1 streaming API.
17
18use std::sync::{
19    Arc,
20    atomic::{AtomicBool, AtomicU8, Ordering},
21};
22
23use arc_swap::ArcSwap;
24use nautilus_common::live::get_runtime;
25use nautilus_model::{
26    identifiers::{
27        AccountId, ClientOrderId, InstrumentId, StrategyId, Symbol, TraderId, VenueOrderId,
28    },
29    instruments::InstrumentAny,
30};
31use nautilus_network::{
32    mode::ConnectionMode,
33    websocket::{
34        AuthTracker, SubscriptionState, WebSocketClient, WebSocketConfig, channel_message_handler,
35    },
36};
37use tokio_util::sync::CancellationToken;
38
39use super::{
40    handler::{FuturesFeedHandler, HandlerCommand},
41    messages::{KrakenFuturesFeed, KrakenFuturesWsMessage},
42};
43use crate::{common::credential::KrakenCredential, websocket::error::KrakenWsError};
44
45/// Topic delimiter for Kraken Futures WebSocket subscriptions.
46///
47/// Topics use colon format: `feed:symbol` (e.g., `trades:PF_ETHUSD`).
48pub const KRAKEN_FUTURES_WS_TOPIC_DELIMITER: char = ':';
49
50const WS_PING_MSG: &str = r#"{"event":"ping"}"#;
51
52/// WebSocket client for the Kraken Futures v1 streaming API.
53#[derive(Debug)]
54#[cfg_attr(
55    feature = "python",
56    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.kraken")
57)]
58pub struct KrakenFuturesWebSocketClient {
59    url: String,
60    heartbeat_secs: Option<u64>,
61    signal: Arc<AtomicBool>,
62    connection_mode: Arc<ArcSwap<AtomicU8>>,
63    cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
64    out_rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<KrakenFuturesWsMessage>>>,
65    task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
66    subscriptions: SubscriptionState,
67    auth_tracker: AuthTracker,
68    cancellation_token: CancellationToken,
69    credential: Option<KrakenCredential>,
70    original_challenge: Arc<tokio::sync::RwLock<Option<String>>>,
71    signed_challenge: Arc<tokio::sync::RwLock<Option<String>>>,
72}
73
74impl Clone for KrakenFuturesWebSocketClient {
75    fn clone(&self) -> Self {
76        Self {
77            url: self.url.clone(),
78            heartbeat_secs: self.heartbeat_secs,
79            signal: Arc::clone(&self.signal),
80            connection_mode: Arc::clone(&self.connection_mode),
81            cmd_tx: Arc::clone(&self.cmd_tx),
82            out_rx: self.out_rx.clone(),
83            task_handle: self.task_handle.clone(),
84            subscriptions: self.subscriptions.clone(),
85            auth_tracker: self.auth_tracker.clone(),
86            cancellation_token: self.cancellation_token.clone(),
87            credential: self.credential.clone(),
88            original_challenge: Arc::clone(&self.original_challenge),
89            signed_challenge: Arc::clone(&self.signed_challenge),
90        }
91    }
92}
93
94impl KrakenFuturesWebSocketClient {
95    /// Creates a new client with the given URL.
96    #[must_use]
97    pub fn new(url: String, heartbeat_secs: Option<u64>) -> Self {
98        Self::with_credentials(url, heartbeat_secs, None)
99    }
100
101    /// Creates a new client with API credentials for authenticated feeds.
102    #[must_use]
103    pub fn with_credentials(
104        url: String,
105        heartbeat_secs: Option<u64>,
106        credential: Option<KrakenCredential>,
107    ) -> Self {
108        let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
109        let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
110        let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
111
112        Self {
113            url,
114            heartbeat_secs,
115            signal: Arc::new(AtomicBool::new(false)),
116            connection_mode,
117            cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
118            out_rx: None,
119            task_handle: None,
120            subscriptions: SubscriptionState::new(KRAKEN_FUTURES_WS_TOPIC_DELIMITER),
121            auth_tracker: AuthTracker::new(),
122            cancellation_token: CancellationToken::new(),
123            credential,
124            original_challenge: Arc::new(tokio::sync::RwLock::new(None)),
125            signed_challenge: Arc::new(tokio::sync::RwLock::new(None)),
126        }
127    }
128
129    /// Returns true if the client has API credentials set.
130    #[must_use]
131    pub fn has_credentials(&self) -> bool {
132        self.credential.is_some()
133    }
134
135    /// Returns the WebSocket URL.
136    #[must_use]
137    pub fn url(&self) -> &str {
138        &self.url
139    }
140
141    /// Returns true if the connection is closed.
142    #[must_use]
143    pub fn is_closed(&self) -> bool {
144        ConnectionMode::from_u8(self.connection_mode.load().load(Ordering::Relaxed))
145            == ConnectionMode::Closed
146    }
147
148    /// Returns true if the connection is active.
149    #[must_use]
150    pub fn is_active(&self) -> bool {
151        ConnectionMode::from_u8(self.connection_mode.load().load(Ordering::Relaxed))
152            == ConnectionMode::Active
153    }
154
155    /// Waits until the WebSocket connection is active or timeout.
156    pub async fn wait_until_active(&self, timeout_secs: f64) -> Result<(), KrakenWsError> {
157        let timeout = tokio::time::Duration::from_secs_f64(timeout_secs);
158
159        tokio::time::timeout(timeout, async {
160            while !self.is_active() {
161                tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
162            }
163        })
164        .await
165        .map_err(|_| {
166            KrakenWsError::ConnectionError(format!(
167                "WebSocket connection timeout after {timeout_secs} seconds"
168            ))
169        })?;
170
171        Ok(())
172    }
173
174    /// Authenticates the WebSocket connection for private feeds.
175    ///
176    /// This sends a challenge request, waits for the response, signs it,
177    /// and stores the credentials for use in private subscriptions.
178    pub async fn authenticate(&self) -> Result<(), KrakenWsError> {
179        let credential = self.credential.as_ref().ok_or_else(|| {
180            KrakenWsError::AuthenticationError("API credentials required".to_string())
181        })?;
182
183        let api_key = credential.api_key().to_string();
184        let (tx, rx) = tokio::sync::oneshot::channel();
185
186        self.cmd_tx
187            .read()
188            .await
189            .send(HandlerCommand::RequestChallenge {
190                api_key: api_key.clone(),
191                response_tx: tx,
192            })
193            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
194
195        let challenge = tokio::time::timeout(tokio::time::Duration::from_secs(10), rx)
196            .await
197            .map_err(|_| {
198                KrakenWsError::AuthenticationError("Timeout waiting for challenge".to_string())
199            })?
200            .map_err(|_| {
201                KrakenWsError::AuthenticationError("Challenge channel closed".to_string())
202            })?;
203
204        let signed_challenge = credential.sign_ws_challenge(&challenge).map_err(|e| {
205            KrakenWsError::AuthenticationError(format!("Failed to sign challenge: {e}"))
206        })?;
207
208        *self.original_challenge.write().await = Some(challenge.clone());
209        *self.signed_challenge.write().await = Some(signed_challenge.clone());
210
211        self.cmd_tx
212            .read()
213            .await
214            .send(HandlerCommand::SetAuthCredentials {
215                api_key,
216                original_challenge: challenge,
217                signed_challenge,
218            })
219            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
220
221        tracing::debug!("Futures WebSocket authentication successful");
222        Ok(())
223    }
224
225    /// Caches instruments for price precision lookup (bulk replace).
226    ///
227    /// Must be called after `connect()` when the handler is ready to receive commands.
228    pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
229        if let Ok(tx) = self.cmd_tx.try_read()
230            && let Err(e) = tx.send(HandlerCommand::InitializeInstruments(instruments))
231        {
232            tracing::debug!("Failed to send instruments to handler: {e}");
233        }
234    }
235
236    /// Caches a single instrument for price precision lookup (upsert).
237    ///
238    /// Must be called after `connect()` when the handler is ready to receive commands.
239    pub fn cache_instrument(&self, instrument: InstrumentAny) {
240        if let Ok(tx) = self.cmd_tx.try_read()
241            && let Err(e) = tx.send(HandlerCommand::UpdateInstrument(instrument))
242        {
243            tracing::debug!("Failed to send instrument update to handler: {e}");
244        }
245    }
246
247    /// Connects to the WebSocket server.
248    pub async fn connect(&mut self) -> Result<(), KrakenWsError> {
249        tracing::debug!("Connecting to Futures WebSocket: {}", self.url);
250
251        self.signal.store(false, Ordering::Relaxed);
252
253        let (raw_handler, raw_rx) = channel_message_handler();
254
255        let ws_config = WebSocketConfig {
256            url: self.url.clone(),
257            headers: vec![],
258            heartbeat: self.heartbeat_secs,
259            heartbeat_msg: Some(WS_PING_MSG.to_string()),
260            reconnect_timeout_ms: Some(5_000),
261            reconnect_delay_initial_ms: Some(500),
262            reconnect_delay_max_ms: Some(5_000),
263            reconnect_backoff_factor: Some(1.5),
264            reconnect_jitter_ms: Some(250),
265            reconnect_max_attempts: None,
266        };
267
268        let ws_client =
269            WebSocketClient::connect(ws_config, Some(raw_handler), None, None, vec![], None)
270                .await
271                .map_err(|e| KrakenWsError::ConnectionError(e.to_string()))?;
272
273        self.connection_mode
274            .store(ws_client.connection_mode_atomic());
275
276        let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<KrakenFuturesWsMessage>();
277        self.out_rx = Some(Arc::new(out_rx));
278
279        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
280        *self.cmd_tx.write().await = cmd_tx.clone();
281
282        if let Err(e) = cmd_tx.send(HandlerCommand::SetClient(ws_client)) {
283            return Err(KrakenWsError::ConnectionError(format!(
284                "Failed to send WebSocketClient to handler: {e}"
285            )));
286        }
287
288        let signal = self.signal.clone();
289        let subscriptions = self.subscriptions.clone();
290        let cmd_tx_for_reconnect = cmd_tx.clone();
291        let credential_for_reconnect = self.credential.clone();
292
293        let stream_handle = get_runtime().spawn(async move {
294            let mut handler =
295                FuturesFeedHandler::new(signal.clone(), cmd_rx, raw_rx, subscriptions.clone());
296
297            loop {
298                match handler.next().await {
299                    Some(KrakenFuturesWsMessage::Reconnected) => {
300                        if signal.load(Ordering::Relaxed) {
301                            continue;
302                        }
303                        tracing::info!("WebSocket reconnected, resubscribing");
304
305                        // Mark all confirmed as failed to transition to pending for replay
306                        let confirmed_topics = subscriptions.all_topics();
307                        for topic in &confirmed_topics {
308                            subscriptions.mark_failure(topic);
309                        }
310
311                        let topics = subscriptions.all_topics();
312                        if topics.is_empty() {
313                            tracing::debug!("No subscriptions to restore after reconnection");
314                        } else {
315                            // Check if we have private subscriptions that need re-authentication
316                            let has_private_subs = topics.iter().any(|t| {
317                                t == "open_orders"
318                                    || t == "fills"
319                                    || t.starts_with("open_orders:")
320                                    || t.starts_with("fills:")
321                            });
322
323                            if has_private_subs {
324                                if let Some(ref cred) = credential_for_reconnect {
325                                    // Request fresh challenge for the new connection
326                                    let (tx, rx) = tokio::sync::oneshot::channel();
327                                    if let Err(e) = cmd_tx_for_reconnect.send(
328                                        HandlerCommand::RequestChallenge {
329                                            api_key: cred.api_key().to_string(),
330                                            response_tx: tx,
331                                        },
332                                    ) {
333                                        tracing::error!(
334                                            error = %e,
335                                            "Failed to request challenge for reconnect"
336                                        );
337                                    } else {
338                                        match tokio::time::timeout(
339                                            tokio::time::Duration::from_secs(10),
340                                            rx,
341                                        )
342                                        .await
343                                        {
344                                            Ok(Ok(challenge)) => {
345                                                match cred.sign_ws_challenge(&challenge) {
346                                                    Ok(signed) => {
347                                                        if let Err(e) = cmd_tx_for_reconnect.send(
348                                                            HandlerCommand::SetAuthCredentials {
349                                                                api_key: cred.api_key().to_string(),
350                                                                original_challenge: challenge,
351                                                                signed_challenge: signed,
352                                                            },
353                                                        ) {
354                                                            tracing::error!(
355                                                                error = %e,
356                                                                "Failed to set auth credentials"
357                                                            );
358                                                        } else {
359                                                            tracing::debug!(
360                                                                "Re-authenticated after reconnect"
361                                                            );
362                                                        }
363                                                    }
364                                                    Err(e) => {
365                                                        tracing::error!(
366                                                            error = %e,
367                                                            "Failed to sign challenge for reconnect"
368                                                        );
369                                                    }
370                                                }
371                                            }
372                                            Ok(Err(_)) => {
373                                                tracing::error!(
374                                                    "Challenge channel closed during reconnect"
375                                                );
376                                            }
377                                            Err(_) => {
378                                                tracing::error!(
379                                                    "Timeout waiting for challenge during reconnect"
380                                                );
381                                            }
382                                        }
383                                    }
384                                } else {
385                                    tracing::warn!(
386                                        "Private subscriptions exist but no credentials available"
387                                    );
388                                }
389                            }
390
391                            tracing::info!(
392                                count = topics.len(),
393                                "Resubscribing after reconnection"
394                            );
395
396                            for topic in &topics {
397                                let cmd =
398                                    if let Some((feed_str, symbol_str)) = topic.split_once(':') {
399                                        let symbol = Symbol::from(symbol_str);
400                                        match feed_str.parse::<KrakenFuturesFeed>() {
401                                            Ok(KrakenFuturesFeed::Trade) => {
402                                                Some(HandlerCommand::SubscribeTrade(symbol))
403                                            }
404                                            Ok(KrakenFuturesFeed::Book) => {
405                                                Some(HandlerCommand::SubscribeBook(symbol))
406                                            }
407                                            Ok(KrakenFuturesFeed::Ticker) => {
408                                                Some(HandlerCommand::SubscribeTicker(symbol))
409                                            }
410                                            Ok(KrakenFuturesFeed::OpenOrders) => {
411                                                Some(HandlerCommand::SubscribeOpenOrders)
412                                            }
413                                            Ok(KrakenFuturesFeed::Fills) => {
414                                                Some(HandlerCommand::SubscribeFills)
415                                            }
416                                            Ok(_) | Err(_) => None,
417                                        }
418                                    } else {
419                                        match topic.parse::<KrakenFuturesFeed>() {
420                                            Ok(KrakenFuturesFeed::OpenOrders) => {
421                                                Some(HandlerCommand::SubscribeOpenOrders)
422                                            }
423                                            Ok(KrakenFuturesFeed::Fills) => {
424                                                Some(HandlerCommand::SubscribeFills)
425                                            }
426                                            Ok(_) | Err(_) => None,
427                                        }
428                                    };
429
430                                if let Some(cmd) = cmd
431                                    && let Err(e) = cmd_tx_for_reconnect.send(cmd)
432                                {
433                                    tracing::error!(
434                                        error = %e, topic,
435                                        "Failed to send resubscribe command"
436                                    );
437                                }
438
439                                subscriptions.mark_subscribe(topic);
440                            }
441                        }
442
443                        if let Err(e) = out_tx.send(KrakenFuturesWsMessage::Reconnected) {
444                            tracing::debug!("Output channel closed: {e}");
445                            break;
446                        }
447                        continue;
448                    }
449                    Some(msg) => {
450                        if let Err(e) = out_tx.send(msg) {
451                            tracing::debug!("Output channel closed: {e}");
452                            break;
453                        }
454                    }
455                    None => {
456                        tracing::debug!("Handler stream ended");
457                        break;
458                    }
459                }
460            }
461
462            tracing::debug!("Futures handler task exiting");
463        });
464
465        self.task_handle = Some(Arc::new(stream_handle));
466
467        tracing::debug!("Futures WebSocket connected successfully");
468        Ok(())
469    }
470
471    /// Disconnects from the WebSocket server.
472    pub async fn disconnect(&mut self) -> Result<(), KrakenWsError> {
473        tracing::debug!("Disconnecting Futures WebSocket");
474
475        self.signal.store(true, Ordering::Relaxed);
476
477        if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Disconnect) {
478            tracing::debug!(
479                "Failed to send disconnect command (handler may already be shut down): {e}"
480            );
481        }
482
483        if let Some(task_handle) = self.task_handle.take() {
484            match Arc::try_unwrap(task_handle) {
485                Ok(handle) => {
486                    match tokio::time::timeout(tokio::time::Duration::from_secs(2), handle).await {
487                        Ok(Ok(())) => tracing::debug!("Task handle completed successfully"),
488                        Ok(Err(e)) => tracing::error!("Task handle encountered an error: {e:?}"),
489                        Err(_) => {
490                            tracing::warn!("Timeout waiting for task handle");
491                        }
492                    }
493                }
494                Err(arc_handle) => {
495                    tracing::debug!("Cannot take ownership of task handle, aborting");
496                    arc_handle.abort();
497                }
498            }
499        }
500
501        self.subscriptions.clear();
502        self.auth_tracker.fail("Disconnected");
503        Ok(())
504    }
505
506    /// Closes the WebSocket connection.
507    pub async fn close(&mut self) -> Result<(), KrakenWsError> {
508        self.disconnect().await
509    }
510
511    /// Subscribes to mark price updates for the given instrument.
512    pub async fn subscribe_mark_price(
513        &self,
514        instrument_id: InstrumentId,
515    ) -> Result<(), KrakenWsError> {
516        let symbol = instrument_id.symbol;
517        let key = format!("mark:{symbol}");
518
519        if !self.subscriptions.add_reference(&key) {
520            return Ok(());
521        }
522
523        self.subscriptions.mark_subscribe(&key);
524        self.subscriptions.confirm_subscribe(&key);
525        self.ensure_ticker_subscribed(symbol).await
526    }
527
528    /// Unsubscribes from mark price updates for the given instrument.
529    pub async fn unsubscribe_mark_price(
530        &self,
531        instrument_id: InstrumentId,
532    ) -> Result<(), KrakenWsError> {
533        let symbol = instrument_id.symbol;
534        let key = format!("mark:{symbol}");
535
536        if !self.subscriptions.remove_reference(&key) {
537            return Ok(());
538        }
539
540        self.subscriptions.mark_unsubscribe(&key);
541        self.subscriptions.confirm_unsubscribe(&key);
542        self.maybe_unsubscribe_ticker(symbol).await
543    }
544
545    /// Subscribes to index price updates for the given instrument.
546    pub async fn subscribe_index_price(
547        &self,
548        instrument_id: InstrumentId,
549    ) -> Result<(), KrakenWsError> {
550        let symbol = instrument_id.symbol;
551        let key = format!("index:{symbol}");
552
553        if !self.subscriptions.add_reference(&key) {
554            return Ok(());
555        }
556
557        self.subscriptions.mark_subscribe(&key);
558        self.subscriptions.confirm_subscribe(&key);
559        self.ensure_ticker_subscribed(symbol).await
560    }
561
562    /// Unsubscribes from index price updates for the given instrument.
563    pub async fn unsubscribe_index_price(
564        &self,
565        instrument_id: InstrumentId,
566    ) -> Result<(), KrakenWsError> {
567        let symbol = instrument_id.symbol;
568        let key = format!("index:{symbol}");
569
570        if !self.subscriptions.remove_reference(&key) {
571            return Ok(());
572        }
573
574        self.subscriptions.mark_unsubscribe(&key);
575        self.subscriptions.confirm_unsubscribe(&key);
576        self.maybe_unsubscribe_ticker(symbol).await
577    }
578
579    /// Subscribes to quote updates for the given instrument.
580    ///
581    /// Uses the order book channel for low-latency top-of-book quotes.
582    pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> Result<(), KrakenWsError> {
583        let symbol = instrument_id.symbol;
584        let key = format!("quotes:{symbol}");
585
586        if !self.subscriptions.add_reference(&key) {
587            return Ok(());
588        }
589
590        self.subscriptions.mark_subscribe(&key);
591        self.subscriptions.confirm_subscribe(&key);
592
593        // Use book feed for low-latency quotes (not throttled ticker)
594        self.ensure_book_subscribed(symbol).await
595    }
596
597    /// Unsubscribes from quote updates for the given instrument.
598    pub async fn unsubscribe_quotes(
599        &self,
600        instrument_id: InstrumentId,
601    ) -> Result<(), KrakenWsError> {
602        let symbol = instrument_id.symbol;
603        let key = format!("quotes:{symbol}");
604
605        if !self.subscriptions.remove_reference(&key) {
606            return Ok(());
607        }
608
609        self.subscriptions.mark_unsubscribe(&key);
610        self.subscriptions.confirm_unsubscribe(&key);
611        self.maybe_unsubscribe_book(symbol).await
612    }
613
614    /// Subscribes to trade updates for the given instrument.
615    pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> Result<(), KrakenWsError> {
616        let symbol = instrument_id.symbol;
617        let key = format!("trades:{symbol}");
618
619        if !self.subscriptions.add_reference(&key) {
620            return Ok(());
621        }
622
623        self.subscriptions.mark_subscribe(&key);
624
625        self.cmd_tx
626            .read()
627            .await
628            .send(HandlerCommand::SubscribeTrade(symbol))
629            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
630
631        self.subscriptions.confirm_subscribe(&key);
632        Ok(())
633    }
634
635    /// Unsubscribes from trade updates for the given instrument.
636    pub async fn unsubscribe_trades(
637        &self,
638        instrument_id: InstrumentId,
639    ) -> Result<(), KrakenWsError> {
640        let symbol = instrument_id.symbol;
641        let key = format!("trades:{symbol}");
642
643        if !self.subscriptions.remove_reference(&key) {
644            return Ok(());
645        }
646
647        self.subscriptions.mark_unsubscribe(&key);
648
649        self.cmd_tx
650            .read()
651            .await
652            .send(HandlerCommand::UnsubscribeTrade(symbol))
653            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
654
655        self.subscriptions.confirm_unsubscribe(&key);
656        Ok(())
657    }
658
659    /// Subscribes to order book updates for the given instrument.
660    ///
661    /// Note: The `depth` parameter is accepted for API compatibility with spot client but is
662    /// not used by Kraken Futures (full book is always returned).
663    pub async fn subscribe_book(
664        &self,
665        instrument_id: InstrumentId,
666        _depth: Option<u32>,
667    ) -> Result<(), KrakenWsError> {
668        let symbol = instrument_id.symbol;
669        let key = format!("book:{symbol}");
670
671        if !self.subscriptions.add_reference(&key) {
672            return Ok(());
673        }
674
675        self.subscriptions.mark_subscribe(&key);
676
677        self.cmd_tx
678            .read()
679            .await
680            .send(HandlerCommand::SubscribeBook(symbol))
681            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
682
683        self.subscriptions.confirm_subscribe(&key);
684        Ok(())
685    }
686
687    /// Unsubscribes from order book updates for the given instrument.
688    pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> Result<(), KrakenWsError> {
689        let symbol = instrument_id.symbol;
690        let key = format!("book:{symbol}");
691
692        if !self.subscriptions.remove_reference(&key) {
693            return Ok(());
694        }
695
696        self.subscriptions.mark_unsubscribe(&key);
697
698        self.cmd_tx
699            .read()
700            .await
701            .send(HandlerCommand::UnsubscribeBook(symbol))
702            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
703
704        self.subscriptions.confirm_unsubscribe(&key);
705        Ok(())
706    }
707
708    /// Ensures ticker feed is subscribed for the given symbol.
709    async fn ensure_ticker_subscribed(&self, symbol: Symbol) -> Result<(), KrakenWsError> {
710        let ticker_key = format!("ticker:{symbol}");
711
712        if !self.subscriptions.add_reference(&ticker_key) {
713            return Ok(());
714        }
715
716        self.subscriptions.mark_subscribe(&ticker_key);
717        self.cmd_tx
718            .read()
719            .await
720            .send(HandlerCommand::SubscribeTicker(symbol))
721            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
722        self.subscriptions.confirm_subscribe(&ticker_key);
723        Ok(())
724    }
725
726    /// Unsubscribes from ticker if no more dependent subscriptions.
727    async fn maybe_unsubscribe_ticker(&self, symbol: Symbol) -> Result<(), KrakenWsError> {
728        let ticker_key = format!("ticker:{symbol}");
729
730        if !self.subscriptions.remove_reference(&ticker_key) {
731            return Ok(());
732        }
733
734        self.subscriptions.mark_unsubscribe(&ticker_key);
735        self.cmd_tx
736            .read()
737            .await
738            .send(HandlerCommand::UnsubscribeTicker(symbol))
739            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
740        self.subscriptions.confirm_unsubscribe(&ticker_key);
741        Ok(())
742    }
743
744    /// Ensures book feed is subscribed for the given symbol (for quotes).
745    async fn ensure_book_subscribed(&self, symbol: Symbol) -> Result<(), KrakenWsError> {
746        let book_key = format!("book:{symbol}");
747
748        if !self.subscriptions.add_reference(&book_key) {
749            return Ok(());
750        }
751
752        self.subscriptions.mark_subscribe(&book_key);
753        self.cmd_tx
754            .read()
755            .await
756            .send(HandlerCommand::SubscribeBook(symbol))
757            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
758        self.subscriptions.confirm_subscribe(&book_key);
759        Ok(())
760    }
761
762    /// Unsubscribes from book if no more dependent subscriptions.
763    async fn maybe_unsubscribe_book(&self, symbol: Symbol) -> Result<(), KrakenWsError> {
764        let book_key = format!("book:{symbol}");
765
766        if !self.subscriptions.remove_reference(&book_key) {
767            return Ok(());
768        }
769
770        self.subscriptions.mark_unsubscribe(&book_key);
771        self.cmd_tx
772            .read()
773            .await
774            .send(HandlerCommand::UnsubscribeBook(symbol))
775            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
776        self.subscriptions.confirm_unsubscribe(&book_key);
777        Ok(())
778    }
779
780    /// Gets the output receiver for processed messages.
781    pub fn take_output_rx(
782        &mut self,
783    ) -> Option<tokio::sync::mpsc::UnboundedReceiver<KrakenFuturesWsMessage>> {
784        self.out_rx.take().and_then(|arc| Arc::try_unwrap(arc).ok())
785    }
786
787    /// Sets the account ID for execution reports.
788    ///
789    /// Must be called before subscribing to execution feeds to properly generate
790    /// OrderStatusReport and FillReport objects.
791    pub fn set_account_id(&self, account_id: AccountId) {
792        if let Ok(tx) = self.cmd_tx.try_read()
793            && let Err(e) = tx.send(HandlerCommand::SetAccountId(account_id))
794        {
795            tracing::debug!("Failed to send account_id to handler: {e}");
796        }
797    }
798
799    /// Caches a client order ID mapping for order tracking.
800    ///
801    /// This caches the trader_id, strategy_id, and instrument_id for an order,
802    /// allowing the handler to emit proper order events with correct identifiers
803    /// when WebSocket messages arrive.
804    pub fn cache_client_order(
805        &self,
806        client_order_id: ClientOrderId,
807        venue_order_id: Option<VenueOrderId>,
808        instrument_id: InstrumentId,
809        trader_id: TraderId,
810        strategy_id: StrategyId,
811    ) {
812        if let Ok(tx) = self.cmd_tx.try_read()
813            && let Err(e) = tx.send(HandlerCommand::CacheClientOrder {
814                client_order_id,
815                venue_order_id,
816                instrument_id,
817                trader_id,
818                strategy_id,
819            })
820        {
821            tracing::debug!("Failed to cache client order: {e}");
822        }
823    }
824
825    /// Requests a challenge from the WebSocket for authentication.
826    ///
827    /// After calling this, listen for the challenge response message and then
828    /// call `authenticate_with_challenge()` to complete authentication.
829    pub async fn request_challenge(&self) -> Result<(), KrakenWsError> {
830        let credential = self.credential.as_ref().ok_or_else(|| {
831            KrakenWsError::AuthenticationError(
832                "API credentials required for authentication".to_string(),
833            )
834        })?;
835
836        // TODO: Send via WebSocket client when we have direct access
837        // For now, the Python layer will handle the challenge request/response flow
838        tracing::debug!(
839            "Challenge request prepared for API key: {}",
840            credential.api_key_masked()
841        );
842
843        Ok(())
844    }
845
846    /// Set authentication credentials directly (for when challenge is obtained externally).
847    pub async fn set_auth_credentials(
848        &self,
849        original_challenge: String,
850        signed_challenge: String,
851    ) -> Result<(), KrakenWsError> {
852        let credential = self.credential.as_ref().ok_or_else(|| {
853            KrakenWsError::AuthenticationError("API credentials required".to_string())
854        })?;
855
856        *self.original_challenge.write().await = Some(original_challenge.clone());
857        *self.signed_challenge.write().await = Some(signed_challenge.clone());
858
859        self.cmd_tx
860            .read()
861            .await
862            .send(HandlerCommand::SetAuthCredentials {
863                api_key: credential.api_key().to_string(),
864                original_challenge,
865                signed_challenge,
866            })
867            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
868
869        Ok(())
870    }
871
872    /// Sign a challenge with the API credentials.
873    ///
874    /// Returns the signed challenge on success.
875    pub fn sign_challenge(&self, challenge: &str) -> Result<String, KrakenWsError> {
876        let credential = self.credential.as_ref().ok_or_else(|| {
877            KrakenWsError::AuthenticationError("API credentials required".to_string())
878        })?;
879
880        credential.sign_ws_challenge(challenge).map_err(|e| {
881            KrakenWsError::AuthenticationError(format!("Failed to sign challenge: {e}"))
882        })
883    }
884
885    /// Complete authentication with a received challenge.
886    pub async fn authenticate_with_challenge(&self, challenge: &str) -> Result<(), KrakenWsError> {
887        let credential = self.credential.as_ref().ok_or_else(|| {
888            KrakenWsError::AuthenticationError("API credentials required".to_string())
889        })?;
890
891        let signed_challenge = credential.sign_ws_challenge(challenge).map_err(|e| {
892            KrakenWsError::AuthenticationError(format!("Failed to sign challenge: {e}"))
893        })?;
894
895        self.set_auth_credentials(challenge.to_string(), signed_challenge)
896            .await
897    }
898
899    /// Subscribes to open orders feed (private, requires authentication).
900    pub async fn subscribe_open_orders(&self) -> Result<(), KrakenWsError> {
901        if self.original_challenge.read().await.is_none() {
902            return Err(KrakenWsError::AuthenticationError(
903                "Must authenticate before subscribing to private feeds".to_string(),
904            ));
905        }
906
907        let key = "open_orders";
908        if !self.subscriptions.add_reference(key) {
909            return Ok(());
910        }
911
912        self.subscriptions.mark_subscribe(key);
913
914        self.cmd_tx
915            .read()
916            .await
917            .send(HandlerCommand::SubscribeOpenOrders)
918            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
919
920        self.subscriptions.confirm_subscribe(key);
921        Ok(())
922    }
923
924    /// Subscribes to fills feed (private, requires authentication).
925    pub async fn subscribe_fills(&self) -> Result<(), KrakenWsError> {
926        if self.original_challenge.read().await.is_none() {
927            return Err(KrakenWsError::AuthenticationError(
928                "Must authenticate before subscribing to private feeds".to_string(),
929            ));
930        }
931
932        let key = "fills";
933        if !self.subscriptions.add_reference(key) {
934            return Ok(());
935        }
936
937        self.subscriptions.mark_subscribe(key);
938
939        self.cmd_tx
940            .read()
941            .await
942            .send(HandlerCommand::SubscribeFills)
943            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
944
945        self.subscriptions.confirm_subscribe(key);
946        Ok(())
947    }
948
949    /// Subscribes to both open orders and fills (convenience method).
950    pub async fn subscribe_executions(&self) -> Result<(), KrakenWsError> {
951        self.subscribe_open_orders().await?;
952        self.subscribe_fills().await?;
953        Ok(())
954    }
955}