nautilus_kraken/websocket/futures/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket client for the Kraken Futures v1 streaming API.
17
18use std::sync::{
19    Arc,
20    atomic::{AtomicBool, AtomicU8, Ordering},
21};
22
23use arc_swap::ArcSwap;
24use nautilus_common::live::runtime::get_runtime;
25use nautilus_model::{
26    identifiers::{
27        AccountId, ClientOrderId, InstrumentId, StrategyId, Symbol, TraderId, VenueOrderId,
28    },
29    instruments::InstrumentAny,
30};
31use nautilus_network::{
32    mode::ConnectionMode,
33    websocket::{
34        AuthTracker, SubscriptionState, WebSocketClient, WebSocketConfig, channel_message_handler,
35    },
36};
37use tokio_util::sync::CancellationToken;
38
39use super::{
40    handler::{FuturesFeedHandler, HandlerCommand},
41    messages::{KrakenFuturesFeed, KrakenFuturesWsMessage},
42};
43use crate::{common::credential::KrakenCredential, websocket::error::KrakenWsError};
44
45/// Topic delimiter for Kraken Futures WebSocket subscriptions.
46///
47/// Topics use colon format: `feed:symbol` (e.g., `trades:PF_ETHUSD`).
48pub const KRAKEN_FUTURES_WS_TOPIC_DELIMITER: char = ':';
49
50const WS_PING_MSG: &str = r#"{"event":"ping"}"#;
51
52/// WebSocket client for the Kraken Futures v1 streaming API.
53#[derive(Debug)]
54#[cfg_attr(
55    feature = "python",
56    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
57)]
58pub struct KrakenFuturesWebSocketClient {
59    url: String,
60    heartbeat_secs: Option<u64>,
61    signal: Arc<AtomicBool>,
62    connection_mode: Arc<ArcSwap<AtomicU8>>,
63    cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
64    out_rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<KrakenFuturesWsMessage>>>,
65    task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
66    subscriptions: SubscriptionState,
67    auth_tracker: AuthTracker,
68    cancellation_token: CancellationToken,
69    credential: Option<KrakenCredential>,
70    original_challenge: Arc<tokio::sync::RwLock<Option<String>>>,
71    signed_challenge: Arc<tokio::sync::RwLock<Option<String>>>,
72}
73
74impl Clone for KrakenFuturesWebSocketClient {
75    fn clone(&self) -> Self {
76        Self {
77            url: self.url.clone(),
78            heartbeat_secs: self.heartbeat_secs,
79            signal: Arc::clone(&self.signal),
80            connection_mode: Arc::clone(&self.connection_mode),
81            cmd_tx: Arc::clone(&self.cmd_tx),
82            out_rx: self.out_rx.clone(),
83            task_handle: self.task_handle.clone(),
84            subscriptions: self.subscriptions.clone(),
85            auth_tracker: self.auth_tracker.clone(),
86            cancellation_token: self.cancellation_token.clone(),
87            credential: self.credential.clone(),
88            original_challenge: Arc::clone(&self.original_challenge),
89            signed_challenge: Arc::clone(&self.signed_challenge),
90        }
91    }
92}
93
94impl KrakenFuturesWebSocketClient {
95    /// Creates a new client with the given URL.
96    #[must_use]
97    pub fn new(url: String, heartbeat_secs: Option<u64>) -> Self {
98        Self::with_credentials(url, heartbeat_secs, None)
99    }
100
101    /// Creates a new client with API credentials for authenticated feeds.
102    #[must_use]
103    pub fn with_credentials(
104        url: String,
105        heartbeat_secs: Option<u64>,
106        credential: Option<KrakenCredential>,
107    ) -> Self {
108        let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
109        let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
110        let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
111
112        Self {
113            url,
114            heartbeat_secs,
115            signal: Arc::new(AtomicBool::new(false)),
116            connection_mode,
117            cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
118            out_rx: None,
119            task_handle: None,
120            subscriptions: SubscriptionState::new(KRAKEN_FUTURES_WS_TOPIC_DELIMITER),
121            auth_tracker: AuthTracker::new(),
122            cancellation_token: CancellationToken::new(),
123            credential,
124            original_challenge: Arc::new(tokio::sync::RwLock::new(None)),
125            signed_challenge: Arc::new(tokio::sync::RwLock::new(None)),
126        }
127    }
128
129    /// Returns true if the client has API credentials set.
130    #[must_use]
131    pub fn has_credentials(&self) -> bool {
132        self.credential.is_some()
133    }
134
135    /// Returns the WebSocket URL.
136    #[must_use]
137    pub fn url(&self) -> &str {
138        &self.url
139    }
140
141    /// Returns true if the connection is closed.
142    #[must_use]
143    pub fn is_closed(&self) -> bool {
144        ConnectionMode::from_u8(self.connection_mode.load().load(Ordering::Relaxed))
145            == ConnectionMode::Closed
146    }
147
148    /// Returns true if the connection is active.
149    #[must_use]
150    pub fn is_active(&self) -> bool {
151        ConnectionMode::from_u8(self.connection_mode.load().load(Ordering::Relaxed))
152            == ConnectionMode::Active
153    }
154
155    /// Waits until the WebSocket connection is active or timeout.
156    pub async fn wait_until_active(&self, timeout_secs: f64) -> Result<(), KrakenWsError> {
157        let timeout = tokio::time::Duration::from_secs_f64(timeout_secs);
158
159        tokio::time::timeout(timeout, async {
160            while !self.is_active() {
161                tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
162            }
163        })
164        .await
165        .map_err(|_| {
166            KrakenWsError::ConnectionError(format!(
167                "WebSocket connection timeout after {timeout_secs} seconds"
168            ))
169        })?;
170
171        Ok(())
172    }
173
174    /// Authenticates the WebSocket connection for private feeds.
175    ///
176    /// This sends a challenge request, waits for the response, signs it,
177    /// and stores the credentials for use in private subscriptions.
178    pub async fn authenticate(&self) -> Result<(), KrakenWsError> {
179        let credential = self.credential.as_ref().ok_or_else(|| {
180            KrakenWsError::AuthenticationError("API credentials required".to_string())
181        })?;
182
183        let api_key = credential.api_key().to_string();
184        let (tx, rx) = tokio::sync::oneshot::channel();
185
186        self.cmd_tx
187            .read()
188            .await
189            .send(HandlerCommand::RequestChallenge {
190                api_key: api_key.clone(),
191                response_tx: tx,
192            })
193            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
194
195        let challenge = tokio::time::timeout(tokio::time::Duration::from_secs(10), rx)
196            .await
197            .map_err(|_| {
198                KrakenWsError::AuthenticationError("Timeout waiting for challenge".to_string())
199            })?
200            .map_err(|_| {
201                KrakenWsError::AuthenticationError("Challenge channel closed".to_string())
202            })?;
203
204        let signed_challenge = credential.sign_ws_challenge(&challenge).map_err(|e| {
205            KrakenWsError::AuthenticationError(format!("Failed to sign challenge: {e}"))
206        })?;
207
208        *self.original_challenge.write().await = Some(challenge.clone());
209        *self.signed_challenge.write().await = Some(signed_challenge.clone());
210
211        self.cmd_tx
212            .read()
213            .await
214            .send(HandlerCommand::SetAuthCredentials {
215                api_key,
216                original_challenge: challenge,
217                signed_challenge,
218            })
219            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
220
221        tracing::debug!("Futures WebSocket authentication successful");
222        Ok(())
223    }
224
225    /// Caches instruments for price precision lookup (bulk replace).
226    ///
227    /// Must be called after `connect()` when the handler is ready to receive commands.
228    pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
229        if let Ok(tx) = self.cmd_tx.try_read()
230            && let Err(e) = tx.send(HandlerCommand::InitializeInstruments(instruments))
231        {
232            tracing::debug!("Failed to send instruments to handler: {e}");
233        }
234    }
235
236    /// Caches a single instrument for price precision lookup (upsert).
237    ///
238    /// Must be called after `connect()` when the handler is ready to receive commands.
239    pub fn cache_instrument(&self, instrument: InstrumentAny) {
240        if let Ok(tx) = self.cmd_tx.try_read()
241            && let Err(e) = tx.send(HandlerCommand::UpdateInstrument(instrument))
242        {
243            tracing::debug!("Failed to send instrument update to handler: {e}");
244        }
245    }
246
247    /// Connects to the WebSocket server.
248    pub async fn connect(&mut self) -> Result<(), KrakenWsError> {
249        tracing::debug!("Connecting to Futures WebSocket: {}", self.url);
250
251        self.signal.store(false, Ordering::Relaxed);
252
253        let (raw_handler, raw_rx) = channel_message_handler();
254
255        let ws_config = WebSocketConfig {
256            url: self.url.clone(),
257            headers: vec![],
258            message_handler: Some(raw_handler),
259            ping_handler: None,
260            heartbeat: self.heartbeat_secs,
261            heartbeat_msg: Some(WS_PING_MSG.to_string()),
262            reconnect_timeout_ms: Some(5_000),
263            reconnect_delay_initial_ms: Some(500),
264            reconnect_delay_max_ms: Some(5_000),
265            reconnect_backoff_factor: Some(1.5),
266            reconnect_jitter_ms: Some(250),
267            reconnect_max_attempts: None,
268        };
269
270        let ws_client = WebSocketClient::connect(ws_config, None, vec![], None)
271            .await
272            .map_err(|e| KrakenWsError::ConnectionError(e.to_string()))?;
273
274        self.connection_mode
275            .store(ws_client.connection_mode_atomic());
276
277        let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<KrakenFuturesWsMessage>();
278        self.out_rx = Some(Arc::new(out_rx));
279
280        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
281        *self.cmd_tx.write().await = cmd_tx.clone();
282
283        if let Err(e) = cmd_tx.send(HandlerCommand::SetClient(ws_client)) {
284            return Err(KrakenWsError::ConnectionError(format!(
285                "Failed to send WebSocketClient to handler: {e}"
286            )));
287        }
288
289        let signal = self.signal.clone();
290        let subscriptions = self.subscriptions.clone();
291        let cmd_tx_for_reconnect = cmd_tx.clone();
292        let credential_for_reconnect = self.credential.clone();
293
294        let stream_handle = get_runtime().spawn(async move {
295            let mut handler =
296                FuturesFeedHandler::new(signal.clone(), cmd_rx, raw_rx, subscriptions.clone());
297
298            loop {
299                match handler.next().await {
300                    Some(KrakenFuturesWsMessage::Reconnected) => {
301                        if signal.load(Ordering::Relaxed) {
302                            continue;
303                        }
304                        tracing::info!("WebSocket reconnected, resubscribing");
305
306                        // Mark all confirmed as failed to transition to pending for replay
307                        let confirmed_topics = subscriptions.all_topics();
308                        for topic in &confirmed_topics {
309                            subscriptions.mark_failure(topic);
310                        }
311
312                        let topics = subscriptions.all_topics();
313                        if topics.is_empty() {
314                            tracing::debug!("No subscriptions to restore after reconnection");
315                        } else {
316                            // Check if we have private subscriptions that need re-authentication
317                            let has_private_subs = topics.iter().any(|t| {
318                                t == "open_orders"
319                                    || t == "fills"
320                                    || t.starts_with("open_orders:")
321                                    || t.starts_with("fills:")
322                            });
323
324                            if has_private_subs {
325                                if let Some(ref cred) = credential_for_reconnect {
326                                    // Request fresh challenge for the new connection
327                                    let (tx, rx) = tokio::sync::oneshot::channel();
328                                    if let Err(e) = cmd_tx_for_reconnect.send(
329                                        HandlerCommand::RequestChallenge {
330                                            api_key: cred.api_key().to_string(),
331                                            response_tx: tx,
332                                        },
333                                    ) {
334                                        tracing::error!(
335                                            error = %e,
336                                            "Failed to request challenge for reconnect"
337                                        );
338                                    } else {
339                                        match tokio::time::timeout(
340                                            tokio::time::Duration::from_secs(10),
341                                            rx,
342                                        )
343                                        .await
344                                        {
345                                            Ok(Ok(challenge)) => {
346                                                match cred.sign_ws_challenge(&challenge) {
347                                                    Ok(signed) => {
348                                                        if let Err(e) = cmd_tx_for_reconnect.send(
349                                                            HandlerCommand::SetAuthCredentials {
350                                                                api_key: cred.api_key().to_string(),
351                                                                original_challenge: challenge,
352                                                                signed_challenge: signed,
353                                                            },
354                                                        ) {
355                                                            tracing::error!(
356                                                                error = %e,
357                                                                "Failed to set auth credentials"
358                                                            );
359                                                        } else {
360                                                            tracing::debug!(
361                                                                "Re-authenticated after reconnect"
362                                                            );
363                                                        }
364                                                    }
365                                                    Err(e) => {
366                                                        tracing::error!(
367                                                            error = %e,
368                                                            "Failed to sign challenge for reconnect"
369                                                        );
370                                                    }
371                                                }
372                                            }
373                                            Ok(Err(_)) => {
374                                                tracing::error!(
375                                                    "Challenge channel closed during reconnect"
376                                                );
377                                            }
378                                            Err(_) => {
379                                                tracing::error!(
380                                                    "Timeout waiting for challenge during reconnect"
381                                                );
382                                            }
383                                        }
384                                    }
385                                } else {
386                                    tracing::warn!(
387                                        "Private subscriptions exist but no credentials available"
388                                    );
389                                }
390                            }
391
392                            tracing::info!(
393                                count = topics.len(),
394                                "Resubscribing after reconnection"
395                            );
396
397                            for topic in &topics {
398                                let cmd =
399                                    if let Some((feed_str, symbol_str)) = topic.split_once(':') {
400                                        let symbol = Symbol::from(symbol_str);
401                                        match feed_str.parse::<KrakenFuturesFeed>() {
402                                            Ok(KrakenFuturesFeed::Trade) => {
403                                                Some(HandlerCommand::SubscribeTrade(symbol))
404                                            }
405                                            Ok(KrakenFuturesFeed::Book) => {
406                                                Some(HandlerCommand::SubscribeBook(symbol))
407                                            }
408                                            Ok(KrakenFuturesFeed::Ticker) => {
409                                                Some(HandlerCommand::SubscribeTicker(symbol))
410                                            }
411                                            Ok(KrakenFuturesFeed::OpenOrders) => {
412                                                Some(HandlerCommand::SubscribeOpenOrders)
413                                            }
414                                            Ok(KrakenFuturesFeed::Fills) => {
415                                                Some(HandlerCommand::SubscribeFills)
416                                            }
417                                            Ok(_) | Err(_) => None,
418                                        }
419                                    } else {
420                                        match topic.parse::<KrakenFuturesFeed>() {
421                                            Ok(KrakenFuturesFeed::OpenOrders) => {
422                                                Some(HandlerCommand::SubscribeOpenOrders)
423                                            }
424                                            Ok(KrakenFuturesFeed::Fills) => {
425                                                Some(HandlerCommand::SubscribeFills)
426                                            }
427                                            Ok(_) | Err(_) => None,
428                                        }
429                                    };
430
431                                if let Some(cmd) = cmd
432                                    && let Err(e) = cmd_tx_for_reconnect.send(cmd)
433                                {
434                                    tracing::error!(
435                                        error = %e, topic,
436                                        "Failed to send resubscribe command"
437                                    );
438                                }
439
440                                subscriptions.mark_subscribe(topic);
441                            }
442                        }
443
444                        if let Err(e) = out_tx.send(KrakenFuturesWsMessage::Reconnected) {
445                            tracing::debug!("Output channel closed: {e}");
446                            break;
447                        }
448                        continue;
449                    }
450                    Some(msg) => {
451                        if let Err(e) = out_tx.send(msg) {
452                            tracing::debug!("Output channel closed: {e}");
453                            break;
454                        }
455                    }
456                    None => {
457                        tracing::debug!("Handler stream ended");
458                        break;
459                    }
460                }
461            }
462
463            tracing::debug!("Futures handler task exiting");
464        });
465
466        self.task_handle = Some(Arc::new(stream_handle));
467
468        tracing::debug!("Futures WebSocket connected successfully");
469        Ok(())
470    }
471
472    /// Disconnects from the WebSocket server.
473    pub async fn disconnect(&mut self) -> Result<(), KrakenWsError> {
474        tracing::debug!("Disconnecting Futures WebSocket");
475
476        self.signal.store(true, Ordering::Relaxed);
477
478        if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Disconnect) {
479            tracing::debug!(
480                "Failed to send disconnect command (handler may already be shut down): {e}"
481            );
482        }
483
484        if let Some(task_handle) = self.task_handle.take() {
485            match Arc::try_unwrap(task_handle) {
486                Ok(handle) => {
487                    match tokio::time::timeout(tokio::time::Duration::from_secs(2), handle).await {
488                        Ok(Ok(())) => tracing::debug!("Task handle completed successfully"),
489                        Ok(Err(e)) => tracing::error!("Task handle encountered an error: {e:?}"),
490                        Err(_) => {
491                            tracing::warn!("Timeout waiting for task handle");
492                        }
493                    }
494                }
495                Err(arc_handle) => {
496                    tracing::debug!("Cannot take ownership of task handle, aborting");
497                    arc_handle.abort();
498                }
499            }
500        }
501
502        self.subscriptions.clear();
503        self.auth_tracker.fail("Disconnected");
504        Ok(())
505    }
506
507    /// Closes the WebSocket connection.
508    pub async fn close(&mut self) -> Result<(), KrakenWsError> {
509        self.disconnect().await
510    }
511
512    /// Subscribes to mark price updates for the given instrument.
513    pub async fn subscribe_mark_price(
514        &self,
515        instrument_id: InstrumentId,
516    ) -> Result<(), KrakenWsError> {
517        let symbol = instrument_id.symbol;
518        let key = format!("mark:{symbol}");
519
520        if !self.subscriptions.add_reference(&key) {
521            return Ok(());
522        }
523
524        self.subscriptions.mark_subscribe(&key);
525        self.subscriptions.confirm_subscribe(&key);
526        self.ensure_ticker_subscribed(symbol).await
527    }
528
529    /// Unsubscribes from mark price updates for the given instrument.
530    pub async fn unsubscribe_mark_price(
531        &self,
532        instrument_id: InstrumentId,
533    ) -> Result<(), KrakenWsError> {
534        let symbol = instrument_id.symbol;
535        let key = format!("mark:{symbol}");
536
537        if !self.subscriptions.remove_reference(&key) {
538            return Ok(());
539        }
540
541        self.subscriptions.mark_unsubscribe(&key);
542        self.subscriptions.confirm_unsubscribe(&key);
543        self.maybe_unsubscribe_ticker(symbol).await
544    }
545
546    /// Subscribes to index price updates for the given instrument.
547    pub async fn subscribe_index_price(
548        &self,
549        instrument_id: InstrumentId,
550    ) -> Result<(), KrakenWsError> {
551        let symbol = instrument_id.symbol;
552        let key = format!("index:{symbol}");
553
554        if !self.subscriptions.add_reference(&key) {
555            return Ok(());
556        }
557
558        self.subscriptions.mark_subscribe(&key);
559        self.subscriptions.confirm_subscribe(&key);
560        self.ensure_ticker_subscribed(symbol).await
561    }
562
563    /// Unsubscribes from index price updates for the given instrument.
564    pub async fn unsubscribe_index_price(
565        &self,
566        instrument_id: InstrumentId,
567    ) -> Result<(), KrakenWsError> {
568        let symbol = instrument_id.symbol;
569        let key = format!("index:{symbol}");
570
571        if !self.subscriptions.remove_reference(&key) {
572            return Ok(());
573        }
574
575        self.subscriptions.mark_unsubscribe(&key);
576        self.subscriptions.confirm_unsubscribe(&key);
577        self.maybe_unsubscribe_ticker(symbol).await
578    }
579
580    /// Subscribes to quote updates for the given instrument.
581    ///
582    /// Uses the order book channel for low-latency top-of-book quotes.
583    pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> Result<(), KrakenWsError> {
584        let symbol = instrument_id.symbol;
585        let key = format!("quotes:{symbol}");
586
587        if !self.subscriptions.add_reference(&key) {
588            return Ok(());
589        }
590
591        self.subscriptions.mark_subscribe(&key);
592        self.subscriptions.confirm_subscribe(&key);
593
594        // Use book feed for low-latency quotes (not throttled ticker)
595        self.ensure_book_subscribed(symbol).await
596    }
597
598    /// Unsubscribes from quote updates for the given instrument.
599    pub async fn unsubscribe_quotes(
600        &self,
601        instrument_id: InstrumentId,
602    ) -> Result<(), KrakenWsError> {
603        let symbol = instrument_id.symbol;
604        let key = format!("quotes:{symbol}");
605
606        if !self.subscriptions.remove_reference(&key) {
607            return Ok(());
608        }
609
610        self.subscriptions.mark_unsubscribe(&key);
611        self.subscriptions.confirm_unsubscribe(&key);
612        self.maybe_unsubscribe_book(symbol).await
613    }
614
615    /// Subscribes to trade updates for the given instrument.
616    pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> Result<(), KrakenWsError> {
617        let symbol = instrument_id.symbol;
618        let key = format!("trades:{symbol}");
619
620        if !self.subscriptions.add_reference(&key) {
621            return Ok(());
622        }
623
624        self.subscriptions.mark_subscribe(&key);
625
626        self.cmd_tx
627            .read()
628            .await
629            .send(HandlerCommand::SubscribeTrade(symbol))
630            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
631
632        self.subscriptions.confirm_subscribe(&key);
633        Ok(())
634    }
635
636    /// Unsubscribes from trade updates for the given instrument.
637    pub async fn unsubscribe_trades(
638        &self,
639        instrument_id: InstrumentId,
640    ) -> Result<(), KrakenWsError> {
641        let symbol = instrument_id.symbol;
642        let key = format!("trades:{symbol}");
643
644        if !self.subscriptions.remove_reference(&key) {
645            return Ok(());
646        }
647
648        self.subscriptions.mark_unsubscribe(&key);
649
650        self.cmd_tx
651            .read()
652            .await
653            .send(HandlerCommand::UnsubscribeTrade(symbol))
654            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
655
656        self.subscriptions.confirm_unsubscribe(&key);
657        Ok(())
658    }
659
660    /// Subscribes to order book updates for the given instrument.
661    ///
662    /// Note: The `depth` parameter is accepted for API compatibility with spot client but is
663    /// not used by Kraken Futures (full book is always returned).
664    pub async fn subscribe_book(
665        &self,
666        instrument_id: InstrumentId,
667        _depth: Option<u32>,
668    ) -> Result<(), KrakenWsError> {
669        let symbol = instrument_id.symbol;
670        let key = format!("book:{symbol}");
671
672        if !self.subscriptions.add_reference(&key) {
673            return Ok(());
674        }
675
676        self.subscriptions.mark_subscribe(&key);
677
678        self.cmd_tx
679            .read()
680            .await
681            .send(HandlerCommand::SubscribeBook(symbol))
682            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
683
684        self.subscriptions.confirm_subscribe(&key);
685        Ok(())
686    }
687
688    /// Unsubscribes from order book updates for the given instrument.
689    pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> Result<(), KrakenWsError> {
690        let symbol = instrument_id.symbol;
691        let key = format!("book:{symbol}");
692
693        if !self.subscriptions.remove_reference(&key) {
694            return Ok(());
695        }
696
697        self.subscriptions.mark_unsubscribe(&key);
698
699        self.cmd_tx
700            .read()
701            .await
702            .send(HandlerCommand::UnsubscribeBook(symbol))
703            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
704
705        self.subscriptions.confirm_unsubscribe(&key);
706        Ok(())
707    }
708
709    /// Ensures ticker feed is subscribed for the given symbol.
710    async fn ensure_ticker_subscribed(&self, symbol: Symbol) -> Result<(), KrakenWsError> {
711        let ticker_key = format!("ticker:{symbol}");
712
713        if !self.subscriptions.add_reference(&ticker_key) {
714            return Ok(());
715        }
716
717        self.subscriptions.mark_subscribe(&ticker_key);
718        self.cmd_tx
719            .read()
720            .await
721            .send(HandlerCommand::SubscribeTicker(symbol))
722            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
723        self.subscriptions.confirm_subscribe(&ticker_key);
724        Ok(())
725    }
726
727    /// Unsubscribes from ticker if no more dependent subscriptions.
728    async fn maybe_unsubscribe_ticker(&self, symbol: Symbol) -> Result<(), KrakenWsError> {
729        let ticker_key = format!("ticker:{symbol}");
730
731        if !self.subscriptions.remove_reference(&ticker_key) {
732            return Ok(());
733        }
734
735        self.subscriptions.mark_unsubscribe(&ticker_key);
736        self.cmd_tx
737            .read()
738            .await
739            .send(HandlerCommand::UnsubscribeTicker(symbol))
740            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
741        self.subscriptions.confirm_unsubscribe(&ticker_key);
742        Ok(())
743    }
744
745    /// Ensures book feed is subscribed for the given symbol (for quotes).
746    async fn ensure_book_subscribed(&self, symbol: Symbol) -> Result<(), KrakenWsError> {
747        let book_key = format!("book:{symbol}");
748
749        if !self.subscriptions.add_reference(&book_key) {
750            return Ok(());
751        }
752
753        self.subscriptions.mark_subscribe(&book_key);
754        self.cmd_tx
755            .read()
756            .await
757            .send(HandlerCommand::SubscribeBook(symbol))
758            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
759        self.subscriptions.confirm_subscribe(&book_key);
760        Ok(())
761    }
762
763    /// Unsubscribes from book if no more dependent subscriptions.
764    async fn maybe_unsubscribe_book(&self, symbol: Symbol) -> Result<(), KrakenWsError> {
765        let book_key = format!("book:{symbol}");
766
767        if !self.subscriptions.remove_reference(&book_key) {
768            return Ok(());
769        }
770
771        self.subscriptions.mark_unsubscribe(&book_key);
772        self.cmd_tx
773            .read()
774            .await
775            .send(HandlerCommand::UnsubscribeBook(symbol))
776            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
777        self.subscriptions.confirm_unsubscribe(&book_key);
778        Ok(())
779    }
780
781    /// Gets the output receiver for processed messages.
782    pub fn take_output_rx(
783        &mut self,
784    ) -> Option<tokio::sync::mpsc::UnboundedReceiver<KrakenFuturesWsMessage>> {
785        self.out_rx.take().and_then(|arc| Arc::try_unwrap(arc).ok())
786    }
787
788    /// Sets the account ID for execution reports.
789    ///
790    /// Must be called before subscribing to execution feeds to properly generate
791    /// OrderStatusReport and FillReport objects.
792    pub fn set_account_id(&self, account_id: AccountId) {
793        if let Ok(tx) = self.cmd_tx.try_read()
794            && let Err(e) = tx.send(HandlerCommand::SetAccountId(account_id))
795        {
796            tracing::debug!("Failed to send account_id to handler: {e}");
797        }
798    }
799
800    /// Caches a client order ID mapping for order tracking.
801    ///
802    /// This caches the trader_id, strategy_id, and instrument_id for an order,
803    /// allowing the handler to emit proper order events with correct identifiers
804    /// when WebSocket messages arrive.
805    pub fn cache_client_order(
806        &self,
807        client_order_id: ClientOrderId,
808        venue_order_id: Option<VenueOrderId>,
809        instrument_id: InstrumentId,
810        trader_id: TraderId,
811        strategy_id: StrategyId,
812    ) {
813        if let Ok(tx) = self.cmd_tx.try_read()
814            && let Err(e) = tx.send(HandlerCommand::CacheClientOrder {
815                client_order_id,
816                venue_order_id,
817                instrument_id,
818                trader_id,
819                strategy_id,
820            })
821        {
822            tracing::debug!("Failed to cache client order: {e}");
823        }
824    }
825
826    /// Requests a challenge from the WebSocket for authentication.
827    ///
828    /// After calling this, listen for the challenge response message and then
829    /// call `authenticate_with_challenge()` to complete authentication.
830    pub async fn request_challenge(&self) -> Result<(), KrakenWsError> {
831        let credential = self.credential.as_ref().ok_or_else(|| {
832            KrakenWsError::AuthenticationError(
833                "API credentials required for authentication".to_string(),
834            )
835        })?;
836
837        // TODO: Send via WebSocket client when we have direct access
838        // For now, the Python layer will handle the challenge request/response flow
839        tracing::debug!(
840            "Challenge request prepared for API key: {}",
841            credential.api_key_masked()
842        );
843
844        Ok(())
845    }
846
847    /// Set authentication credentials directly (for when challenge is obtained externally).
848    pub async fn set_auth_credentials(
849        &self,
850        original_challenge: String,
851        signed_challenge: String,
852    ) -> Result<(), KrakenWsError> {
853        let credential = self.credential.as_ref().ok_or_else(|| {
854            KrakenWsError::AuthenticationError("API credentials required".to_string())
855        })?;
856
857        *self.original_challenge.write().await = Some(original_challenge.clone());
858        *self.signed_challenge.write().await = Some(signed_challenge.clone());
859
860        self.cmd_tx
861            .read()
862            .await
863            .send(HandlerCommand::SetAuthCredentials {
864                api_key: credential.api_key().to_string(),
865                original_challenge,
866                signed_challenge,
867            })
868            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
869
870        Ok(())
871    }
872
873    /// Sign a challenge with the API credentials.
874    ///
875    /// Returns the signed challenge on success.
876    pub fn sign_challenge(&self, challenge: &str) -> Result<String, KrakenWsError> {
877        let credential = self.credential.as_ref().ok_or_else(|| {
878            KrakenWsError::AuthenticationError("API credentials required".to_string())
879        })?;
880
881        credential.sign_ws_challenge(challenge).map_err(|e| {
882            KrakenWsError::AuthenticationError(format!("Failed to sign challenge: {e}"))
883        })
884    }
885
886    /// Complete authentication with a received challenge.
887    pub async fn authenticate_with_challenge(&self, challenge: &str) -> Result<(), KrakenWsError> {
888        let credential = self.credential.as_ref().ok_or_else(|| {
889            KrakenWsError::AuthenticationError("API credentials required".to_string())
890        })?;
891
892        let signed_challenge = credential.sign_ws_challenge(challenge).map_err(|e| {
893            KrakenWsError::AuthenticationError(format!("Failed to sign challenge: {e}"))
894        })?;
895
896        self.set_auth_credentials(challenge.to_string(), signed_challenge)
897            .await
898    }
899
900    /// Subscribes to open orders feed (private, requires authentication).
901    pub async fn subscribe_open_orders(&self) -> Result<(), KrakenWsError> {
902        if self.original_challenge.read().await.is_none() {
903            return Err(KrakenWsError::AuthenticationError(
904                "Must authenticate before subscribing to private feeds".to_string(),
905            ));
906        }
907
908        let key = "open_orders";
909        if !self.subscriptions.add_reference(key) {
910            return Ok(());
911        }
912
913        self.subscriptions.mark_subscribe(key);
914
915        self.cmd_tx
916            .read()
917            .await
918            .send(HandlerCommand::SubscribeOpenOrders)
919            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
920
921        self.subscriptions.confirm_subscribe(key);
922        Ok(())
923    }
924
925    /// Subscribes to fills feed (private, requires authentication).
926    pub async fn subscribe_fills(&self) -> Result<(), KrakenWsError> {
927        if self.original_challenge.read().await.is_none() {
928            return Err(KrakenWsError::AuthenticationError(
929                "Must authenticate before subscribing to private feeds".to_string(),
930            ));
931        }
932
933        let key = "fills";
934        if !self.subscriptions.add_reference(key) {
935            return Ok(());
936        }
937
938        self.subscriptions.mark_subscribe(key);
939
940        self.cmd_tx
941            .read()
942            .await
943            .send(HandlerCommand::SubscribeFills)
944            .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
945
946        self.subscriptions.confirm_subscribe(key);
947        Ok(())
948    }
949
950    /// Subscribes to both open orders and fills (convenience method).
951    pub async fn subscribe_executions(&self) -> Result<(), KrakenWsError> {
952        self.subscribe_open_orders().await?;
953        self.subscribe_fills().await?;
954        Ok(())
955    }
956}