nautilus_kraken/websocket/futures/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket client for the Kraken Futures v1 streaming API.
17
18use std::sync::{
19    Arc,
20    atomic::{AtomicBool, AtomicU8, Ordering},
21};
22
23use arc_swap::ArcSwap;
24use nautilus_common::live::get_runtime;
25use nautilus_model::{
26    identifiers::{
27        AccountId, ClientOrderId, InstrumentId, StrategyId, Symbol, TraderId, VenueOrderId,
28    },
29    instruments::InstrumentAny,
30};
31use nautilus_network::{
32    mode::ConnectionMode,
33    websocket::{
34        AuthTracker, SubscriptionState, WebSocketClient, WebSocketConfig, channel_message_handler,
35    },
36};
37use tokio_util::sync::CancellationToken;
38
39use super::{
40    handler::{FuturesFeedHandler, HandlerCommand},
41    messages::{KrakenFuturesFeed, KrakenFuturesWsMessage},
42};
43use crate::{common::credential::KrakenCredential, websocket::error::KrakenWsError};
44
45/// Topic delimiter for Kraken Futures WebSocket subscriptions.
46///
47/// Topics use colon format: `feed:symbol` (e.g., `trades:PF_ETHUSD`).
48pub const KRAKEN_FUTURES_WS_TOPIC_DELIMITER: char = ':';
49
50const WS_PING_MSG: &str = r#"{"event":"ping"}"#;
51
52/// WebSocket client for the Kraken Futures v1 streaming API.
53#[derive(Debug)]
54#[cfg_attr(
55    feature = "python",
56    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.kraken")
57)]
58pub struct KrakenFuturesWebSocketClient {
59    url: String,
60    heartbeat_secs: Option<u64>,
61    signal: Arc<AtomicBool>,
62    connection_mode: Arc<ArcSwap<AtomicU8>>,
63    cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
64    out_rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<KrakenFuturesWsMessage>>>,
65    task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
66    subscriptions: SubscriptionState,
67    auth_tracker: AuthTracker,
68    cancellation_token: CancellationToken,
69    credential: Option<KrakenCredential>,
70    original_challenge: Arc<tokio::sync::RwLock<Option<String>>>,
71    signed_challenge: Arc<tokio::sync::RwLock<Option<String>>>,
72}
73
74impl Clone for KrakenFuturesWebSocketClient {
75    fn clone(&self) -> Self {
76        Self {
77            url: self.url.clone(),
78            heartbeat_secs: self.heartbeat_secs,
79            signal: Arc::clone(&self.signal),
80            connection_mode: Arc::clone(&self.connection_mode),
81            cmd_tx: Arc::clone(&self.cmd_tx),
82            out_rx: self.out_rx.clone(),
83            task_handle: self.task_handle.clone(),
84            subscriptions: self.subscriptions.clone(),
85            auth_tracker: self.auth_tracker.clone(),
86            cancellation_token: self.cancellation_token.clone(),
87            credential: self.credential.clone(),
88            original_challenge: Arc::clone(&self.original_challenge),
89            signed_challenge: Arc::clone(&self.signed_challenge),
90        }
91    }
92}
93
94impl KrakenFuturesWebSocketClient {
95    /// Creates a new client with the given URL.
96    #[must_use]
97    pub fn new(url: String, heartbeat_secs: Option<u64>) -> Self {
98        Self::with_credentials(url, heartbeat_secs, None)
99    }
100
101    /// Creates a new client with API credentials for authenticated feeds.
102    #[must_use]
103    pub fn with_credentials(
104        url: String,
105        heartbeat_secs: Option<u64>,
106        credential: Option<KrakenCredential>,
107    ) -> Self {
108        let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
109        let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
110        let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
111
112        Self {
113            url,
114            heartbeat_secs,
115            signal: Arc::new(AtomicBool::new(false)),
116            connection_mode,
117            cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
118            out_rx: None,
119            task_handle: None,
120            subscriptions: SubscriptionState::new(KRAKEN_FUTURES_WS_TOPIC_DELIMITER),
121            auth_tracker: AuthTracker::new(),
122            cancellation_token: CancellationToken::new(),
123            credential,
124            original_challenge: Arc::new(tokio::sync::RwLock::new(None)),
125            signed_challenge: Arc::new(tokio::sync::RwLock::new(None)),
126        }
127    }
128
129    /// Returns true if the client has API credentials set.
130    #[must_use]
131    pub fn has_credentials(&self) -> bool {
132        self.credential.is_some()
133    }
134
135    /// Returns the WebSocket URL.
136    #[must_use]
137    pub fn url(&self) -> &str {
138        &self.url
139    }
140
141    /// Returns true if the connection is closed.
142    #[must_use]
143    pub fn is_closed(&self) -> bool {
144        ConnectionMode::from_u8(self.connection_mode.load().load(Ordering::Relaxed))
145            == ConnectionMode::Closed
146    }
147
148    /// Returns true if the connection is active.
149    #[must_use]
150    pub fn is_active(&self) -> bool {
151        ConnectionMode::from_u8(self.connection_mode.load().load(Ordering::Relaxed))
152            == ConnectionMode::Active
153    }
154
155    /// Waits until the WebSocket connection is active or timeout.
156    pub async fn wait_until_active(&self, timeout_secs: f64) -> Result<(), KrakenWsError> {
157        let timeout = tokio::time::Duration::from_secs_f64(timeout_secs);
158
159        tokio::time::timeout(timeout, async {
160            while !self.is_active() {
161                tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
162            }
163        })
164        .await
165        .map_err(|_| {
166            KrakenWsError::ConnectionError(format!(
167                "WebSocket connection timeout after {timeout_secs} seconds"
168            ))
169        })?;
170
171        Ok(())
172    }
173
174    /// Authenticates the WebSocket connection for private feeds.
175    ///
176    /// This sends a challenge request, waits for the response, signs it,
177    /// and stores the credentials for use in private subscriptions.
178    pub async fn authenticate(&self) -> Result<(), KrakenWsError> {
179        let credential = self.credential.as_ref().ok_or_else(|| {
180            KrakenWsError::AuthenticationError("API credentials required".to_string())
181        })?;
182
183        let api_key = credential.api_key().to_string();
184        let (tx, rx) = tokio::sync::oneshot::channel();
185
186        self.cmd_tx
187            .read()
188            .await
189            .send(HandlerCommand::RequestChallenge {
190                api_key: api_key.clone(),
191                response_tx: tx,
192            })
193            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
194
195        let challenge = tokio::time::timeout(tokio::time::Duration::from_secs(10), rx)
196            .await
197            .map_err(|_| {
198                KrakenWsError::AuthenticationError("Timeout waiting for challenge".to_string())
199            })?
200            .map_err(|_| {
201                KrakenWsError::AuthenticationError("Challenge channel closed".to_string())
202            })?;
203
204        let signed_challenge = credential.sign_ws_challenge(&challenge).map_err(|e| {
205            KrakenWsError::AuthenticationError(format!("Failed to sign challenge: {e}"))
206        })?;
207
208        *self.original_challenge.write().await = Some(challenge.clone());
209        *self.signed_challenge.write().await = Some(signed_challenge.clone());
210
211        self.cmd_tx
212            .read()
213            .await
214            .send(HandlerCommand::SetAuthCredentials {
215                api_key,
216                original_challenge: challenge,
217                signed_challenge,
218            })
219            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
220
221        log::debug!("Futures WebSocket authentication successful");
222        Ok(())
223    }
224
225    /// Caches instruments for price precision lookup (bulk replace).
226    ///
227    /// Must be called after `connect()` when the handler is ready to receive commands.
228    pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
229        if let Ok(tx) = self.cmd_tx.try_read()
230            && let Err(e) = tx.send(HandlerCommand::InitializeInstruments(instruments))
231        {
232            log::debug!("Failed to send instruments to handler: {e}");
233        }
234    }
235
236    /// Caches a single instrument for price precision lookup (upsert).
237    ///
238    /// Must be called after `connect()` when the handler is ready to receive commands.
239    pub fn cache_instrument(&self, instrument: InstrumentAny) {
240        if let Ok(tx) = self.cmd_tx.try_read()
241            && let Err(e) = tx.send(HandlerCommand::UpdateInstrument(instrument))
242        {
243            log::debug!("Failed to send instrument update to handler: {e}");
244        }
245    }
246
247    /// Connects to the WebSocket server.
248    pub async fn connect(&mut self) -> Result<(), KrakenWsError> {
249        log::debug!("Connecting to Futures WebSocket: {}", self.url);
250
251        self.signal.store(false, Ordering::Relaxed);
252
253        let (raw_handler, raw_rx) = channel_message_handler();
254
255        let ws_config = WebSocketConfig {
256            url: self.url.clone(),
257            headers: vec![],
258            heartbeat: self.heartbeat_secs,
259            heartbeat_msg: Some(WS_PING_MSG.to_string()),
260            reconnect_timeout_ms: Some(5_000),
261            reconnect_delay_initial_ms: Some(500),
262            reconnect_delay_max_ms: Some(5_000),
263            reconnect_backoff_factor: Some(1.5),
264            reconnect_jitter_ms: Some(250),
265            reconnect_max_attempts: None,
266        };
267
268        let ws_client =
269            WebSocketClient::connect(ws_config, Some(raw_handler), None, None, vec![], None)
270                .await
271                .map_err(|e| KrakenWsError::ConnectionError(e.to_string()))?;
272
273        self.connection_mode
274            .store(ws_client.connection_mode_atomic());
275
276        let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<KrakenFuturesWsMessage>();
277        self.out_rx = Some(Arc::new(out_rx));
278
279        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
280        *self.cmd_tx.write().await = cmd_tx.clone();
281
282        if let Err(e) = cmd_tx.send(HandlerCommand::SetClient(ws_client)) {
283            return Err(KrakenWsError::ConnectionError(format!(
284                "Failed to send WebSocketClient to handler: {e}"
285            )));
286        }
287
288        let signal = self.signal.clone();
289        let subscriptions = self.subscriptions.clone();
290        let cmd_tx_for_reconnect = cmd_tx.clone();
291        let credential_for_reconnect = self.credential.clone();
292
293        let stream_handle = get_runtime().spawn(async move {
294            let mut handler =
295                FuturesFeedHandler::new(signal.clone(), cmd_rx, raw_rx, subscriptions.clone());
296
297            loop {
298                match handler.next().await {
299                    Some(KrakenFuturesWsMessage::Reconnected) => {
300                        if signal.load(Ordering::Relaxed) {
301                            continue;
302                        }
303                        log::info!("WebSocket reconnected, resubscribing");
304
305                        // Mark all confirmed as failed to transition to pending for replay
306                        let confirmed_topics = subscriptions.all_topics();
307                        for topic in &confirmed_topics {
308                            subscriptions.mark_failure(topic);
309                        }
310
311                        let topics = subscriptions.all_topics();
312                        if topics.is_empty() {
313                            log::debug!("No subscriptions to restore after reconnection");
314                        } else {
315                            // Check if we have private subscriptions that need re-authentication
316                            let has_private_subs = topics.iter().any(|t| {
317                                t == "open_orders"
318                                    || t == "fills"
319                                    || t.starts_with("open_orders:")
320                                    || t.starts_with("fills:")
321                            });
322
323                            if has_private_subs {
324                                if let Some(ref cred) = credential_for_reconnect {
325                                    // Request fresh challenge for the new connection
326                                    let (tx, rx) = tokio::sync::oneshot::channel();
327                                    if let Err(e) = cmd_tx_for_reconnect.send(
328                                        HandlerCommand::RequestChallenge {
329                                            api_key: cred.api_key().to_string(),
330                                            response_tx: tx,
331                                        },
332                                    ) {
333                                        log::error!(
334                                            "Failed to request challenge for reconnect: {e}"
335                                        );
336                                    } else {
337                                        match tokio::time::timeout(
338                                            tokio::time::Duration::from_secs(10),
339                                            rx,
340                                        )
341                                        .await
342                                        {
343                                            Ok(Ok(challenge)) => {
344                                                match cred.sign_ws_challenge(&challenge) {
345                                                    Ok(signed) => {
346                                                        if let Err(e) = cmd_tx_for_reconnect.send(
347                                                            HandlerCommand::SetAuthCredentials {
348                                                                api_key: cred.api_key().to_string(),
349                                                                original_challenge: challenge,
350                                                                signed_challenge: signed,
351                                                            },
352                                                        ) {
353                                                            log::error!(
354                                                                "Failed to set auth credentials: {e}"
355                                                            );
356                                                        } else {
357                                                            log::debug!(
358                                                                "Re-authenticated after reconnect"
359                                                            );
360                                                        }
361                                                    }
362                                                    Err(e) => {
363                                                        log::error!(
364                                                            "Failed to sign challenge for reconnect: {e}"
365                                                        );
366                                                    }
367                                                }
368                                            }
369                                            Ok(Err(_)) => {
370                                                log::error!(
371                                                    "Challenge channel closed during reconnect"
372                                                );
373                                            }
374                                            Err(_) => {
375                                                log::error!(
376                                                    "Timeout waiting for challenge during reconnect"
377                                                );
378                                            }
379                                        }
380                                    }
381                                } else {
382                                    log::warn!(
383                                        "Private subscriptions exist but no credentials available"
384                                    );
385                                }
386                            }
387
388                            log::info!(
389                                "Resubscribing after reconnection: count={}",
390                                topics.len()
391                            );
392
393                            for topic in &topics {
394                                let cmd =
395                                    if let Some((feed_str, symbol_str)) = topic.split_once(':') {
396                                        let symbol = Symbol::from(symbol_str);
397                                        match feed_str.parse::<KrakenFuturesFeed>() {
398                                            Ok(KrakenFuturesFeed::Trade) => {
399                                                Some(HandlerCommand::SubscribeTrade(symbol))
400                                            }
401                                            Ok(KrakenFuturesFeed::Book) => {
402                                                Some(HandlerCommand::SubscribeBook(symbol))
403                                            }
404                                            Ok(KrakenFuturesFeed::Ticker) => {
405                                                Some(HandlerCommand::SubscribeTicker(symbol))
406                                            }
407                                            Ok(KrakenFuturesFeed::OpenOrders) => {
408                                                Some(HandlerCommand::SubscribeOpenOrders)
409                                            }
410                                            Ok(KrakenFuturesFeed::Fills) => {
411                                                Some(HandlerCommand::SubscribeFills)
412                                            }
413                                            Ok(_) | Err(_) => None,
414                                        }
415                                    } else {
416                                        match topic.parse::<KrakenFuturesFeed>() {
417                                            Ok(KrakenFuturesFeed::OpenOrders) => {
418                                                Some(HandlerCommand::SubscribeOpenOrders)
419                                            }
420                                            Ok(KrakenFuturesFeed::Fills) => {
421                                                Some(HandlerCommand::SubscribeFills)
422                                            }
423                                            Ok(_) | Err(_) => None,
424                                        }
425                                    };
426
427                                if let Some(cmd) = cmd
428                                    && let Err(e) = cmd_tx_for_reconnect.send(cmd)
429                                {
430                                    log::error!(
431                                        "Failed to send resubscribe command: error={e}, \
432                                        topic={topic}"
433                                    );
434                                }
435
436                                subscriptions.mark_subscribe(topic);
437                            }
438                        }
439
440                        if let Err(e) = out_tx.send(KrakenFuturesWsMessage::Reconnected) {
441                            log::debug!("Output channel closed: {e}");
442                            break;
443                        }
444                        continue;
445                    }
446                    Some(msg) => {
447                        if let Err(e) = out_tx.send(msg) {
448                            log::debug!("Output channel closed: {e}");
449                            break;
450                        }
451                    }
452                    None => {
453                        log::debug!("Handler stream ended");
454                        break;
455                    }
456                }
457            }
458
459            log::debug!("Futures handler task exiting");
460        });
461
462        self.task_handle = Some(Arc::new(stream_handle));
463
464        log::debug!("Futures WebSocket connected successfully");
465        Ok(())
466    }
467
468    /// Disconnects from the WebSocket server.
469    pub async fn disconnect(&mut self) -> Result<(), KrakenWsError> {
470        log::debug!("Disconnecting Futures WebSocket");
471
472        self.signal.store(true, Ordering::Relaxed);
473
474        if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Disconnect) {
475            log::debug!(
476                "Failed to send disconnect command (handler may already be shut down): {e}"
477            );
478        }
479
480        if let Some(task_handle) = self.task_handle.take() {
481            match Arc::try_unwrap(task_handle) {
482                Ok(handle) => {
483                    match tokio::time::timeout(tokio::time::Duration::from_secs(2), handle).await {
484                        Ok(Ok(())) => log::debug!("Task handle completed successfully"),
485                        Ok(Err(e)) => log::error!("Task handle encountered an error: {e:?}"),
486                        Err(_) => {
487                            log::warn!("Timeout waiting for task handle");
488                        }
489                    }
490                }
491                Err(arc_handle) => {
492                    log::debug!("Cannot take ownership of task handle, aborting");
493                    arc_handle.abort();
494                }
495            }
496        }
497
498        self.subscriptions.clear();
499        self.auth_tracker.fail("Disconnected");
500        Ok(())
501    }
502
503    /// Closes the WebSocket connection.
504    pub async fn close(&mut self) -> Result<(), KrakenWsError> {
505        self.disconnect().await
506    }
507
508    /// Subscribes to mark price updates for the given instrument.
509    pub async fn subscribe_mark_price(
510        &self,
511        instrument_id: InstrumentId,
512    ) -> Result<(), KrakenWsError> {
513        let symbol = instrument_id.symbol;
514        let key = format!("mark:{symbol}");
515
516        if !self.subscriptions.add_reference(&key) {
517            return Ok(());
518        }
519
520        self.subscriptions.mark_subscribe(&key);
521        self.subscriptions.confirm_subscribe(&key);
522        self.ensure_ticker_subscribed(symbol).await
523    }
524
525    /// Unsubscribes from mark price updates for the given instrument.
526    pub async fn unsubscribe_mark_price(
527        &self,
528        instrument_id: InstrumentId,
529    ) -> Result<(), KrakenWsError> {
530        let symbol = instrument_id.symbol;
531        let key = format!("mark:{symbol}");
532
533        if !self.subscriptions.remove_reference(&key) {
534            return Ok(());
535        }
536
537        self.subscriptions.mark_unsubscribe(&key);
538        self.subscriptions.confirm_unsubscribe(&key);
539        self.maybe_unsubscribe_ticker(symbol).await
540    }
541
542    /// Subscribes to index price updates for the given instrument.
543    pub async fn subscribe_index_price(
544        &self,
545        instrument_id: InstrumentId,
546    ) -> Result<(), KrakenWsError> {
547        let symbol = instrument_id.symbol;
548        let key = format!("index:{symbol}");
549
550        if !self.subscriptions.add_reference(&key) {
551            return Ok(());
552        }
553
554        self.subscriptions.mark_subscribe(&key);
555        self.subscriptions.confirm_subscribe(&key);
556        self.ensure_ticker_subscribed(symbol).await
557    }
558
559    /// Unsubscribes from index price updates for the given instrument.
560    pub async fn unsubscribe_index_price(
561        &self,
562        instrument_id: InstrumentId,
563    ) -> Result<(), KrakenWsError> {
564        let symbol = instrument_id.symbol;
565        let key = format!("index:{symbol}");
566
567        if !self.subscriptions.remove_reference(&key) {
568            return Ok(());
569        }
570
571        self.subscriptions.mark_unsubscribe(&key);
572        self.subscriptions.confirm_unsubscribe(&key);
573        self.maybe_unsubscribe_ticker(symbol).await
574    }
575
576    /// Subscribes to quote updates for the given instrument.
577    ///
578    /// Uses the order book channel for low-latency top-of-book quotes.
579    pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> Result<(), KrakenWsError> {
580        let symbol = instrument_id.symbol;
581        let key = format!("quotes:{symbol}");
582
583        if !self.subscriptions.add_reference(&key) {
584            return Ok(());
585        }
586
587        self.subscriptions.mark_subscribe(&key);
588        self.subscriptions.confirm_subscribe(&key);
589
590        // Use book feed for low-latency quotes (not throttled ticker)
591        self.ensure_book_subscribed(symbol).await
592    }
593
594    /// Unsubscribes from quote updates for the given instrument.
595    pub async fn unsubscribe_quotes(
596        &self,
597        instrument_id: InstrumentId,
598    ) -> Result<(), KrakenWsError> {
599        let symbol = instrument_id.symbol;
600        let key = format!("quotes:{symbol}");
601
602        if !self.subscriptions.remove_reference(&key) {
603            return Ok(());
604        }
605
606        self.subscriptions.mark_unsubscribe(&key);
607        self.subscriptions.confirm_unsubscribe(&key);
608        self.maybe_unsubscribe_book(symbol).await
609    }
610
611    /// Subscribes to trade updates for the given instrument.
612    pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> Result<(), KrakenWsError> {
613        let symbol = instrument_id.symbol;
614        let key = format!("trades:{symbol}");
615
616        if !self.subscriptions.add_reference(&key) {
617            return Ok(());
618        }
619
620        self.subscriptions.mark_subscribe(&key);
621
622        self.cmd_tx
623            .read()
624            .await
625            .send(HandlerCommand::SubscribeTrade(symbol))
626            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
627
628        self.subscriptions.confirm_subscribe(&key);
629        Ok(())
630    }
631
632    /// Unsubscribes from trade updates for the given instrument.
633    pub async fn unsubscribe_trades(
634        &self,
635        instrument_id: InstrumentId,
636    ) -> Result<(), KrakenWsError> {
637        let symbol = instrument_id.symbol;
638        let key = format!("trades:{symbol}");
639
640        if !self.subscriptions.remove_reference(&key) {
641            return Ok(());
642        }
643
644        self.subscriptions.mark_unsubscribe(&key);
645
646        self.cmd_tx
647            .read()
648            .await
649            .send(HandlerCommand::UnsubscribeTrade(symbol))
650            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
651
652        self.subscriptions.confirm_unsubscribe(&key);
653        Ok(())
654    }
655
656    /// Subscribes to order book updates for the given instrument.
657    ///
658    /// Note: The `depth` parameter is accepted for API compatibility with spot client but is
659    /// not used by Kraken Futures (full book is always returned).
660    pub async fn subscribe_book(
661        &self,
662        instrument_id: InstrumentId,
663        _depth: Option<u32>,
664    ) -> Result<(), KrakenWsError> {
665        let symbol = instrument_id.symbol;
666        let key = format!("book:{symbol}");
667
668        if !self.subscriptions.add_reference(&key) {
669            return Ok(());
670        }
671
672        self.subscriptions.mark_subscribe(&key);
673
674        self.cmd_tx
675            .read()
676            .await
677            .send(HandlerCommand::SubscribeBook(symbol))
678            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
679
680        self.subscriptions.confirm_subscribe(&key);
681        Ok(())
682    }
683
684    /// Unsubscribes from order book updates for the given instrument.
685    pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> Result<(), KrakenWsError> {
686        let symbol = instrument_id.symbol;
687        let key = format!("book:{symbol}");
688
689        if !self.subscriptions.remove_reference(&key) {
690            return Ok(());
691        }
692
693        self.subscriptions.mark_unsubscribe(&key);
694
695        self.cmd_tx
696            .read()
697            .await
698            .send(HandlerCommand::UnsubscribeBook(symbol))
699            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
700
701        self.subscriptions.confirm_unsubscribe(&key);
702        Ok(())
703    }
704
705    /// Ensures ticker feed is subscribed for the given symbol.
706    async fn ensure_ticker_subscribed(&self, symbol: Symbol) -> Result<(), KrakenWsError> {
707        let ticker_key = format!("ticker:{symbol}");
708
709        if !self.subscriptions.add_reference(&ticker_key) {
710            return Ok(());
711        }
712
713        self.subscriptions.mark_subscribe(&ticker_key);
714        self.cmd_tx
715            .read()
716            .await
717            .send(HandlerCommand::SubscribeTicker(symbol))
718            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
719        self.subscriptions.confirm_subscribe(&ticker_key);
720        Ok(())
721    }
722
723    /// Unsubscribes from ticker if no more dependent subscriptions.
724    async fn maybe_unsubscribe_ticker(&self, symbol: Symbol) -> Result<(), KrakenWsError> {
725        let ticker_key = format!("ticker:{symbol}");
726
727        if !self.subscriptions.remove_reference(&ticker_key) {
728            return Ok(());
729        }
730
731        self.subscriptions.mark_unsubscribe(&ticker_key);
732        self.cmd_tx
733            .read()
734            .await
735            .send(HandlerCommand::UnsubscribeTicker(symbol))
736            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
737        self.subscriptions.confirm_unsubscribe(&ticker_key);
738        Ok(())
739    }
740
741    /// Ensures book feed is subscribed for the given symbol (for quotes).
742    async fn ensure_book_subscribed(&self, symbol: Symbol) -> Result<(), KrakenWsError> {
743        let book_key = format!("book:{symbol}");
744
745        if !self.subscriptions.add_reference(&book_key) {
746            return Ok(());
747        }
748
749        self.subscriptions.mark_subscribe(&book_key);
750        self.cmd_tx
751            .read()
752            .await
753            .send(HandlerCommand::SubscribeBook(symbol))
754            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
755        self.subscriptions.confirm_subscribe(&book_key);
756        Ok(())
757    }
758
759    /// Unsubscribes from book if no more dependent subscriptions.
760    async fn maybe_unsubscribe_book(&self, symbol: Symbol) -> Result<(), KrakenWsError> {
761        let book_key = format!("book:{symbol}");
762
763        if !self.subscriptions.remove_reference(&book_key) {
764            return Ok(());
765        }
766
767        self.subscriptions.mark_unsubscribe(&book_key);
768        self.cmd_tx
769            .read()
770            .await
771            .send(HandlerCommand::UnsubscribeBook(symbol))
772            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
773        self.subscriptions.confirm_unsubscribe(&book_key);
774        Ok(())
775    }
776
777    /// Gets the output receiver for processed messages.
778    pub fn take_output_rx(
779        &mut self,
780    ) -> Option<tokio::sync::mpsc::UnboundedReceiver<KrakenFuturesWsMessage>> {
781        self.out_rx.take().and_then(|arc| Arc::try_unwrap(arc).ok())
782    }
783
784    /// Sets the account ID for execution reports.
785    ///
786    /// Must be called before subscribing to execution feeds to properly generate
787    /// OrderStatusReport and FillReport objects.
788    pub fn set_account_id(&self, account_id: AccountId) {
789        if let Ok(tx) = self.cmd_tx.try_read()
790            && let Err(e) = tx.send(HandlerCommand::SetAccountId(account_id))
791        {
792            log::debug!("Failed to send account_id to handler: {e}");
793        }
794    }
795
796    /// Caches a client order ID mapping for order tracking.
797    ///
798    /// This caches the trader_id, strategy_id, and instrument_id for an order,
799    /// allowing the handler to emit proper order events with correct identifiers
800    /// when WebSocket messages arrive.
801    pub fn cache_client_order(
802        &self,
803        client_order_id: ClientOrderId,
804        venue_order_id: Option<VenueOrderId>,
805        instrument_id: InstrumentId,
806        trader_id: TraderId,
807        strategy_id: StrategyId,
808    ) {
809        if let Ok(tx) = self.cmd_tx.try_read()
810            && let Err(e) = tx.send(HandlerCommand::CacheClientOrder {
811                client_order_id,
812                venue_order_id,
813                instrument_id,
814                trader_id,
815                strategy_id,
816            })
817        {
818            log::debug!("Failed to cache client order: {e}");
819        }
820    }
821
822    /// Requests a challenge from the WebSocket for authentication.
823    ///
824    /// After calling this, listen for the challenge response message and then
825    /// call `authenticate_with_challenge()` to complete authentication.
826    pub async fn request_challenge(&self) -> Result<(), KrakenWsError> {
827        let credential = self.credential.as_ref().ok_or_else(|| {
828            KrakenWsError::AuthenticationError(
829                "API credentials required for authentication".to_string(),
830            )
831        })?;
832
833        // TODO: Send via WebSocket client when we have direct access
834        // For now, the Python layer will handle the challenge request/response flow
835        log::debug!(
836            "Challenge request prepared for API key: {}",
837            credential.api_key_masked()
838        );
839
840        Ok(())
841    }
842
843    /// Set authentication credentials directly (for when challenge is obtained externally).
844    pub async fn set_auth_credentials(
845        &self,
846        original_challenge: String,
847        signed_challenge: String,
848    ) -> Result<(), KrakenWsError> {
849        let credential = self.credential.as_ref().ok_or_else(|| {
850            KrakenWsError::AuthenticationError("API credentials required".to_string())
851        })?;
852
853        *self.original_challenge.write().await = Some(original_challenge.clone());
854        *self.signed_challenge.write().await = Some(signed_challenge.clone());
855
856        self.cmd_tx
857            .read()
858            .await
859            .send(HandlerCommand::SetAuthCredentials {
860                api_key: credential.api_key().to_string(),
861                original_challenge,
862                signed_challenge,
863            })
864            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
865
866        Ok(())
867    }
868
869    /// Sign a challenge with the API credentials.
870    ///
871    /// Returns the signed challenge on success.
872    pub fn sign_challenge(&self, challenge: &str) -> Result<String, KrakenWsError> {
873        let credential = self.credential.as_ref().ok_or_else(|| {
874            KrakenWsError::AuthenticationError("API credentials required".to_string())
875        })?;
876
877        credential.sign_ws_challenge(challenge).map_err(|e| {
878            KrakenWsError::AuthenticationError(format!("Failed to sign challenge: {e}"))
879        })
880    }
881
882    /// Complete authentication with a received challenge.
883    pub async fn authenticate_with_challenge(&self, challenge: &str) -> Result<(), KrakenWsError> {
884        let credential = self.credential.as_ref().ok_or_else(|| {
885            KrakenWsError::AuthenticationError("API credentials required".to_string())
886        })?;
887
888        let signed_challenge = credential.sign_ws_challenge(challenge).map_err(|e| {
889            KrakenWsError::AuthenticationError(format!("Failed to sign challenge: {e}"))
890        })?;
891
892        self.set_auth_credentials(challenge.to_string(), signed_challenge)
893            .await
894    }
895
896    /// Subscribes to open orders feed (private, requires authentication).
897    pub async fn subscribe_open_orders(&self) -> Result<(), KrakenWsError> {
898        if self.original_challenge.read().await.is_none() {
899            return Err(KrakenWsError::AuthenticationError(
900                "Must authenticate before subscribing to private feeds".to_string(),
901            ));
902        }
903
904        let key = "open_orders";
905        if !self.subscriptions.add_reference(key) {
906            return Ok(());
907        }
908
909        self.subscriptions.mark_subscribe(key);
910
911        self.cmd_tx
912            .read()
913            .await
914            .send(HandlerCommand::SubscribeOpenOrders)
915            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
916
917        self.subscriptions.confirm_subscribe(key);
918        Ok(())
919    }
920
921    /// Subscribes to fills feed (private, requires authentication).
922    pub async fn subscribe_fills(&self) -> Result<(), KrakenWsError> {
923        if self.original_challenge.read().await.is_none() {
924            return Err(KrakenWsError::AuthenticationError(
925                "Must authenticate before subscribing to private feeds".to_string(),
926            ));
927        }
928
929        let key = "fills";
930        if !self.subscriptions.add_reference(key) {
931            return Ok(());
932        }
933
934        self.subscriptions.mark_subscribe(key);
935
936        self.cmd_tx
937            .read()
938            .await
939            .send(HandlerCommand::SubscribeFills)
940            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
941
942        self.subscriptions.confirm_subscribe(key);
943        Ok(())
944    }
945
946    /// Subscribes to both open orders and fills (convenience method).
947    pub async fn subscribe_executions(&self) -> Result<(), KrakenWsError> {
948        self.subscribe_open_orders().await?;
949        self.subscribe_fills().await?;
950        Ok(())
951    }
952}