1use std::sync::{
19 Arc,
20 atomic::{AtomicBool, AtomicU8, Ordering},
21};
22
23use arc_swap::ArcSwap;
24use nautilus_common::live::get_runtime;
25use nautilus_model::{
26 identifiers::{
27 AccountId, ClientOrderId, InstrumentId, StrategyId, Symbol, TraderId, VenueOrderId,
28 },
29 instruments::InstrumentAny,
30};
31use nautilus_network::{
32 mode::ConnectionMode,
33 websocket::{
34 AuthTracker, SubscriptionState, WebSocketClient, WebSocketConfig, channel_message_handler,
35 },
36};
37use tokio_util::sync::CancellationToken;
38
39use super::{
40 handler::{FuturesFeedHandler, HandlerCommand},
41 messages::{KrakenFuturesFeed, KrakenFuturesWsMessage},
42};
43use crate::{common::credential::KrakenCredential, websocket::error::KrakenWsError};
44
45pub const KRAKEN_FUTURES_WS_TOPIC_DELIMITER: char = ':';
49
50#[derive(Debug)]
52#[cfg_attr(
53 feature = "python",
54 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.kraken")
55)]
56pub struct KrakenFuturesWebSocketClient {
57 url: String,
58 heartbeat_secs: Option<u64>,
59 signal: Arc<AtomicBool>,
60 connection_mode: Arc<ArcSwap<AtomicU8>>,
61 cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
62 out_rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<KrakenFuturesWsMessage>>>,
63 task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
64 subscriptions: SubscriptionState,
65 auth_tracker: AuthTracker,
66 cancellation_token: CancellationToken,
67 credential: Option<KrakenCredential>,
68 original_challenge: Arc<tokio::sync::RwLock<Option<String>>>,
69 signed_challenge: Arc<tokio::sync::RwLock<Option<String>>>,
70}
71
72impl Clone for KrakenFuturesWebSocketClient {
73 fn clone(&self) -> Self {
74 Self {
75 url: self.url.clone(),
76 heartbeat_secs: self.heartbeat_secs,
77 signal: Arc::clone(&self.signal),
78 connection_mode: Arc::clone(&self.connection_mode),
79 cmd_tx: Arc::clone(&self.cmd_tx),
80 out_rx: self.out_rx.clone(),
81 task_handle: self.task_handle.clone(),
82 subscriptions: self.subscriptions.clone(),
83 auth_tracker: self.auth_tracker.clone(),
84 cancellation_token: self.cancellation_token.clone(),
85 credential: self.credential.clone(),
86 original_challenge: Arc::clone(&self.original_challenge),
87 signed_challenge: Arc::clone(&self.signed_challenge),
88 }
89 }
90}
91
92impl KrakenFuturesWebSocketClient {
93 #[must_use]
95 pub fn new(url: String, heartbeat_secs: Option<u64>) -> Self {
96 Self::with_credentials(url, heartbeat_secs, None)
97 }
98
99 #[must_use]
101 pub fn with_credentials(
102 url: String,
103 heartbeat_secs: Option<u64>,
104 credential: Option<KrakenCredential>,
105 ) -> Self {
106 let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
107 let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
108 let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
109
110 Self {
111 url,
112 heartbeat_secs,
113 signal: Arc::new(AtomicBool::new(false)),
114 connection_mode,
115 cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
116 out_rx: None,
117 task_handle: None,
118 subscriptions: SubscriptionState::new(KRAKEN_FUTURES_WS_TOPIC_DELIMITER),
119 auth_tracker: AuthTracker::new(),
120 cancellation_token: CancellationToken::new(),
121 credential,
122 original_challenge: Arc::new(tokio::sync::RwLock::new(None)),
123 signed_challenge: Arc::new(tokio::sync::RwLock::new(None)),
124 }
125 }
126
127 #[must_use]
129 pub fn has_credentials(&self) -> bool {
130 self.credential.is_some()
131 }
132
133 #[must_use]
135 pub fn url(&self) -> &str {
136 &self.url
137 }
138
139 #[must_use]
141 pub fn is_closed(&self) -> bool {
142 ConnectionMode::from_u8(self.connection_mode.load().load(Ordering::Relaxed))
143 == ConnectionMode::Closed
144 }
145
146 #[must_use]
148 pub fn is_active(&self) -> bool {
149 ConnectionMode::from_u8(self.connection_mode.load().load(Ordering::Relaxed))
150 == ConnectionMode::Active
151 }
152
153 pub async fn wait_until_active(&self, timeout_secs: f64) -> Result<(), KrakenWsError> {
155 let timeout = tokio::time::Duration::from_secs_f64(timeout_secs);
156
157 tokio::time::timeout(timeout, async {
158 while !self.is_active() {
159 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
160 }
161 })
162 .await
163 .map_err(|_| {
164 KrakenWsError::ConnectionError(format!(
165 "WebSocket connection timeout after {timeout_secs} seconds"
166 ))
167 })?;
168
169 Ok(())
170 }
171
172 pub async fn authenticate(&self) -> Result<(), KrakenWsError> {
177 let credential = self.credential.as_ref().ok_or_else(|| {
178 KrakenWsError::AuthenticationError("API credentials required".to_string())
179 })?;
180
181 let api_key = credential.api_key().to_string();
182 let (tx, rx) = tokio::sync::oneshot::channel();
183
184 self.cmd_tx
185 .read()
186 .await
187 .send(HandlerCommand::RequestChallenge {
188 api_key: api_key.clone(),
189 response_tx: tx,
190 })
191 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
192
193 let challenge = tokio::time::timeout(tokio::time::Duration::from_secs(10), rx)
194 .await
195 .map_err(|_| {
196 KrakenWsError::AuthenticationError("Timeout waiting for challenge".to_string())
197 })?
198 .map_err(|_| {
199 KrakenWsError::AuthenticationError("Challenge channel closed".to_string())
200 })?;
201
202 let signed_challenge = credential.sign_ws_challenge(&challenge).map_err(|e| {
203 KrakenWsError::AuthenticationError(format!("Failed to sign challenge: {e}"))
204 })?;
205
206 *self.original_challenge.write().await = Some(challenge.clone());
207 *self.signed_challenge.write().await = Some(signed_challenge.clone());
208
209 self.cmd_tx
210 .read()
211 .await
212 .send(HandlerCommand::SetAuthCredentials {
213 api_key,
214 original_challenge: challenge,
215 signed_challenge,
216 })
217 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
218
219 log::debug!("Futures WebSocket authentication successful");
220 Ok(())
221 }
222
223 pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
227 if let Ok(tx) = self.cmd_tx.try_read()
228 && let Err(e) = tx.send(HandlerCommand::InitializeInstruments(instruments))
229 {
230 log::debug!("Failed to send instruments to handler: {e}");
231 }
232 }
233
234 pub fn cache_instrument(&self, instrument: InstrumentAny) {
238 if let Ok(tx) = self.cmd_tx.try_read()
239 && let Err(e) = tx.send(HandlerCommand::UpdateInstrument(instrument))
240 {
241 log::debug!("Failed to send instrument update to handler: {e}");
242 }
243 }
244
245 pub async fn connect(&mut self) -> Result<(), KrakenWsError> {
247 log::debug!("Connecting to Futures WebSocket: {}", self.url);
248
249 self.signal.store(false, Ordering::Relaxed);
250
251 let (raw_handler, raw_rx) = channel_message_handler();
252
253 let ws_config = WebSocketConfig {
254 url: self.url.clone(),
255 headers: vec![],
256 heartbeat: self.heartbeat_secs,
257 heartbeat_msg: None, reconnect_timeout_ms: Some(5_000),
259 reconnect_delay_initial_ms: Some(500),
260 reconnect_delay_max_ms: Some(5_000),
261 reconnect_backoff_factor: Some(1.5),
262 reconnect_jitter_ms: Some(250),
263 reconnect_max_attempts: None,
264 };
265
266 let ws_client =
267 WebSocketClient::connect(ws_config, Some(raw_handler), None, None, vec![], None)
268 .await
269 .map_err(|e| KrakenWsError::ConnectionError(e.to_string()))?;
270
271 self.connection_mode
272 .store(ws_client.connection_mode_atomic());
273
274 let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<KrakenFuturesWsMessage>();
275 self.out_rx = Some(Arc::new(out_rx));
276
277 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
278 *self.cmd_tx.write().await = cmd_tx.clone();
279
280 if let Err(e) = cmd_tx.send(HandlerCommand::SetClient(ws_client)) {
281 return Err(KrakenWsError::ConnectionError(format!(
282 "Failed to send WebSocketClient to handler: {e}"
283 )));
284 }
285
286 let signal = self.signal.clone();
287 let subscriptions = self.subscriptions.clone();
288 let cmd_tx_for_reconnect = cmd_tx.clone();
289 let credential_for_reconnect = self.credential.clone();
290
291 let stream_handle = get_runtime().spawn(async move {
292 let mut handler =
293 FuturesFeedHandler::new(signal.clone(), cmd_rx, raw_rx, subscriptions.clone());
294
295 loop {
296 match handler.next().await {
297 Some(KrakenFuturesWsMessage::Reconnected) => {
298 if signal.load(Ordering::Relaxed) {
299 continue;
300 }
301 log::info!("WebSocket reconnected, resubscribing");
302
303 let confirmed_topics = subscriptions.all_topics();
305 for topic in &confirmed_topics {
306 subscriptions.mark_failure(topic);
307 }
308
309 let topics = subscriptions.all_topics();
310 if topics.is_empty() {
311 log::debug!("No subscriptions to restore after reconnection");
312 } else {
313 let has_private_subs = topics.iter().any(|t| {
315 t == "open_orders"
316 || t == "fills"
317 || t.starts_with("open_orders:")
318 || t.starts_with("fills:")
319 });
320
321 if has_private_subs {
322 if let Some(ref cred) = credential_for_reconnect {
323 let (tx, rx) = tokio::sync::oneshot::channel();
325 if let Err(e) = cmd_tx_for_reconnect.send(
326 HandlerCommand::RequestChallenge {
327 api_key: cred.api_key().to_string(),
328 response_tx: tx,
329 },
330 ) {
331 log::error!(
332 "Failed to request challenge for reconnect: {e}"
333 );
334 } else {
335 match tokio::time::timeout(
336 tokio::time::Duration::from_secs(10),
337 rx,
338 )
339 .await
340 {
341 Ok(Ok(challenge)) => {
342 match cred.sign_ws_challenge(&challenge) {
343 Ok(signed) => {
344 if let Err(e) = cmd_tx_for_reconnect.send(
345 HandlerCommand::SetAuthCredentials {
346 api_key: cred.api_key().to_string(),
347 original_challenge: challenge,
348 signed_challenge: signed,
349 },
350 ) {
351 log::error!(
352 "Failed to set auth credentials: {e}"
353 );
354 } else {
355 log::debug!(
356 "Re-authenticated after reconnect"
357 );
358 }
359 }
360 Err(e) => {
361 log::error!(
362 "Failed to sign challenge for reconnect: {e}"
363 );
364 }
365 }
366 }
367 Ok(Err(_)) => {
368 log::error!(
369 "Challenge channel closed during reconnect"
370 );
371 }
372 Err(_) => {
373 log::error!(
374 "Timeout waiting for challenge during reconnect"
375 );
376 }
377 }
378 }
379 } else {
380 log::warn!(
381 "Private subscriptions exist but no credentials available"
382 );
383 }
384 }
385
386 log::info!(
387 "Resubscribing after reconnection: count={}",
388 topics.len()
389 );
390
391 for topic in &topics {
392 let cmd =
393 if let Some((feed_str, symbol_str)) = topic.split_once(':') {
394 let symbol = Symbol::from(symbol_str);
395 match feed_str.parse::<KrakenFuturesFeed>() {
396 Ok(KrakenFuturesFeed::Trade) => {
397 Some(HandlerCommand::SubscribeTrade(symbol))
398 }
399 Ok(KrakenFuturesFeed::Book) => {
400 Some(HandlerCommand::SubscribeBook(symbol))
401 }
402 Ok(KrakenFuturesFeed::Ticker) => {
403 Some(HandlerCommand::SubscribeTicker(symbol))
404 }
405 Ok(KrakenFuturesFeed::OpenOrders) => {
406 Some(HandlerCommand::SubscribeOpenOrders)
407 }
408 Ok(KrakenFuturesFeed::Fills) => {
409 Some(HandlerCommand::SubscribeFills)
410 }
411 Ok(_) | Err(_) => None,
412 }
413 } else {
414 match topic.parse::<KrakenFuturesFeed>() {
415 Ok(KrakenFuturesFeed::OpenOrders) => {
416 Some(HandlerCommand::SubscribeOpenOrders)
417 }
418 Ok(KrakenFuturesFeed::Fills) => {
419 Some(HandlerCommand::SubscribeFills)
420 }
421 Ok(_) | Err(_) => None,
422 }
423 };
424
425 if let Some(cmd) = cmd
426 && let Err(e) = cmd_tx_for_reconnect.send(cmd)
427 {
428 log::error!(
429 "Failed to send resubscribe command: error={e}, \
430 topic={topic}"
431 );
432 }
433
434 subscriptions.mark_subscribe(topic);
435 }
436 }
437
438 if let Err(e) = out_tx.send(KrakenFuturesWsMessage::Reconnected) {
439 log::debug!("Output channel closed: {e}");
440 break;
441 }
442 continue;
443 }
444 Some(msg) => {
445 if let Err(e) = out_tx.send(msg) {
446 log::debug!("Output channel closed: {e}");
447 break;
448 }
449 }
450 None => {
451 log::debug!("Handler stream ended");
452 break;
453 }
454 }
455 }
456
457 log::debug!("Futures handler task exiting");
458 });
459
460 self.task_handle = Some(Arc::new(stream_handle));
461
462 log::debug!("Futures WebSocket connected successfully");
463 Ok(())
464 }
465
466 pub async fn disconnect(&mut self) -> Result<(), KrakenWsError> {
468 log::debug!("Disconnecting Futures WebSocket");
469
470 self.signal.store(true, Ordering::Relaxed);
471
472 if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Disconnect) {
473 log::debug!(
474 "Failed to send disconnect command (handler may already be shut down): {e}"
475 );
476 }
477
478 if let Some(task_handle) = self.task_handle.take() {
479 match Arc::try_unwrap(task_handle) {
480 Ok(handle) => {
481 match tokio::time::timeout(tokio::time::Duration::from_secs(2), handle).await {
482 Ok(Ok(())) => log::debug!("Task handle completed successfully"),
483 Ok(Err(e)) => log::error!("Task handle encountered an error: {e:?}"),
484 Err(_) => {
485 log::warn!("Timeout waiting for task handle");
486 }
487 }
488 }
489 Err(arc_handle) => {
490 log::debug!("Cannot take ownership of task handle, aborting");
491 arc_handle.abort();
492 }
493 }
494 }
495
496 self.subscriptions.clear();
497 self.auth_tracker.fail("Disconnected");
498 Ok(())
499 }
500
501 pub async fn close(&mut self) -> Result<(), KrakenWsError> {
503 self.disconnect().await
504 }
505
506 pub async fn subscribe_mark_price(
508 &self,
509 instrument_id: InstrumentId,
510 ) -> Result<(), KrakenWsError> {
511 let symbol = instrument_id.symbol;
512 let key = format!("mark:{symbol}");
513
514 if !self.subscriptions.add_reference(&key) {
515 return Ok(());
516 }
517
518 self.subscriptions.mark_subscribe(&key);
519 self.subscriptions.confirm_subscribe(&key);
520 self.ensure_ticker_subscribed(symbol).await
521 }
522
523 pub async fn unsubscribe_mark_price(
525 &self,
526 instrument_id: InstrumentId,
527 ) -> Result<(), KrakenWsError> {
528 let symbol = instrument_id.symbol;
529 let key = format!("mark:{symbol}");
530
531 if !self.subscriptions.remove_reference(&key) {
532 return Ok(());
533 }
534
535 self.subscriptions.mark_unsubscribe(&key);
536 self.subscriptions.confirm_unsubscribe(&key);
537 self.maybe_unsubscribe_ticker(symbol).await
538 }
539
540 pub async fn subscribe_index_price(
542 &self,
543 instrument_id: InstrumentId,
544 ) -> Result<(), KrakenWsError> {
545 let symbol = instrument_id.symbol;
546 let key = format!("index:{symbol}");
547
548 if !self.subscriptions.add_reference(&key) {
549 return Ok(());
550 }
551
552 self.subscriptions.mark_subscribe(&key);
553 self.subscriptions.confirm_subscribe(&key);
554 self.ensure_ticker_subscribed(symbol).await
555 }
556
557 pub async fn unsubscribe_index_price(
559 &self,
560 instrument_id: InstrumentId,
561 ) -> Result<(), KrakenWsError> {
562 let symbol = instrument_id.symbol;
563 let key = format!("index:{symbol}");
564
565 if !self.subscriptions.remove_reference(&key) {
566 return Ok(());
567 }
568
569 self.subscriptions.mark_unsubscribe(&key);
570 self.subscriptions.confirm_unsubscribe(&key);
571 self.maybe_unsubscribe_ticker(symbol).await
572 }
573
574 pub async fn subscribe_funding_rate(
576 &self,
577 instrument_id: InstrumentId,
578 ) -> Result<(), KrakenWsError> {
579 let symbol = instrument_id.symbol;
580 let key = format!("funding:{symbol}");
581
582 if !self.subscriptions.add_reference(&key) {
583 return Ok(());
584 }
585
586 self.subscriptions.mark_subscribe(&key);
587 self.subscriptions.confirm_subscribe(&key);
588 self.ensure_ticker_subscribed(symbol).await
589 }
590
591 pub async fn unsubscribe_funding_rate(
593 &self,
594 instrument_id: InstrumentId,
595 ) -> Result<(), KrakenWsError> {
596 let symbol = instrument_id.symbol;
597 let key = format!("funding:{symbol}");
598
599 if !self.subscriptions.remove_reference(&key) {
600 return Ok(());
601 }
602
603 self.subscriptions.mark_unsubscribe(&key);
604 self.subscriptions.confirm_unsubscribe(&key);
605 self.maybe_unsubscribe_ticker(symbol).await
606 }
607
608 pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> Result<(), KrakenWsError> {
612 let symbol = instrument_id.symbol;
613 let key = format!("quotes:{symbol}");
614
615 if !self.subscriptions.add_reference(&key) {
616 return Ok(());
617 }
618
619 self.subscriptions.mark_subscribe(&key);
620 self.subscriptions.confirm_subscribe(&key);
621
622 self.ensure_book_subscribed(symbol).await
624 }
625
626 pub async fn unsubscribe_quotes(
628 &self,
629 instrument_id: InstrumentId,
630 ) -> Result<(), KrakenWsError> {
631 let symbol = instrument_id.symbol;
632 let key = format!("quotes:{symbol}");
633
634 if !self.subscriptions.remove_reference(&key) {
635 return Ok(());
636 }
637
638 self.subscriptions.mark_unsubscribe(&key);
639 self.subscriptions.confirm_unsubscribe(&key);
640 self.maybe_unsubscribe_book(symbol).await
641 }
642
643 pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> Result<(), KrakenWsError> {
645 let symbol = instrument_id.symbol;
646 let key = format!("trades:{symbol}");
647
648 if !self.subscriptions.add_reference(&key) {
649 return Ok(());
650 }
651
652 self.subscriptions.mark_subscribe(&key);
653
654 self.cmd_tx
655 .read()
656 .await
657 .send(HandlerCommand::SubscribeTrade(symbol))
658 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
659
660 self.subscriptions.confirm_subscribe(&key);
661 Ok(())
662 }
663
664 pub async fn unsubscribe_trades(
666 &self,
667 instrument_id: InstrumentId,
668 ) -> Result<(), KrakenWsError> {
669 let symbol = instrument_id.symbol;
670 let key = format!("trades:{symbol}");
671
672 if !self.subscriptions.remove_reference(&key) {
673 return Ok(());
674 }
675
676 self.subscriptions.mark_unsubscribe(&key);
677
678 self.cmd_tx
679 .read()
680 .await
681 .send(HandlerCommand::UnsubscribeTrade(symbol))
682 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
683
684 self.subscriptions.confirm_unsubscribe(&key);
685 Ok(())
686 }
687
688 pub async fn subscribe_book(
693 &self,
694 instrument_id: InstrumentId,
695 _depth: Option<u32>,
696 ) -> Result<(), KrakenWsError> {
697 let symbol = instrument_id.symbol;
698 let key = format!("book:{symbol}");
699
700 if !self.subscriptions.add_reference(&key) {
701 return Ok(());
702 }
703
704 self.subscriptions.mark_subscribe(&key);
705
706 self.cmd_tx
707 .read()
708 .await
709 .send(HandlerCommand::SubscribeBook(symbol))
710 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
711
712 self.subscriptions.confirm_subscribe(&key);
713 Ok(())
714 }
715
716 pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> Result<(), KrakenWsError> {
718 let symbol = instrument_id.symbol;
719 let key = format!("book:{symbol}");
720
721 if !self.subscriptions.remove_reference(&key) {
722 return Ok(());
723 }
724
725 self.subscriptions.mark_unsubscribe(&key);
726
727 self.cmd_tx
728 .read()
729 .await
730 .send(HandlerCommand::UnsubscribeBook(symbol))
731 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
732
733 self.subscriptions.confirm_unsubscribe(&key);
734 Ok(())
735 }
736
737 async fn ensure_ticker_subscribed(&self, symbol: Symbol) -> Result<(), KrakenWsError> {
739 let ticker_key = format!("ticker:{symbol}");
740
741 if !self.subscriptions.add_reference(&ticker_key) {
742 return Ok(());
743 }
744
745 self.subscriptions.mark_subscribe(&ticker_key);
746 self.cmd_tx
747 .read()
748 .await
749 .send(HandlerCommand::SubscribeTicker(symbol))
750 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
751 self.subscriptions.confirm_subscribe(&ticker_key);
752 Ok(())
753 }
754
755 async fn maybe_unsubscribe_ticker(&self, symbol: Symbol) -> Result<(), KrakenWsError> {
757 let ticker_key = format!("ticker:{symbol}");
758
759 if !self.subscriptions.remove_reference(&ticker_key) {
760 return Ok(());
761 }
762
763 self.subscriptions.mark_unsubscribe(&ticker_key);
764 self.cmd_tx
765 .read()
766 .await
767 .send(HandlerCommand::UnsubscribeTicker(symbol))
768 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
769 self.subscriptions.confirm_unsubscribe(&ticker_key);
770 Ok(())
771 }
772
773 async fn ensure_book_subscribed(&self, symbol: Symbol) -> Result<(), KrakenWsError> {
775 let book_key = format!("book:{symbol}");
776
777 if !self.subscriptions.add_reference(&book_key) {
778 return Ok(());
779 }
780
781 self.subscriptions.mark_subscribe(&book_key);
782 self.cmd_tx
783 .read()
784 .await
785 .send(HandlerCommand::SubscribeBook(symbol))
786 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
787 self.subscriptions.confirm_subscribe(&book_key);
788 Ok(())
789 }
790
791 async fn maybe_unsubscribe_book(&self, symbol: Symbol) -> Result<(), KrakenWsError> {
793 let book_key = format!("book:{symbol}");
794
795 if !self.subscriptions.remove_reference(&book_key) {
796 return Ok(());
797 }
798
799 self.subscriptions.mark_unsubscribe(&book_key);
800 self.cmd_tx
801 .read()
802 .await
803 .send(HandlerCommand::UnsubscribeBook(symbol))
804 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
805 self.subscriptions.confirm_unsubscribe(&book_key);
806 Ok(())
807 }
808
809 pub fn take_output_rx(
811 &mut self,
812 ) -> Option<tokio::sync::mpsc::UnboundedReceiver<KrakenFuturesWsMessage>> {
813 self.out_rx.take().and_then(|arc| Arc::try_unwrap(arc).ok())
814 }
815
816 pub fn set_account_id(&self, account_id: AccountId) {
821 if let Ok(tx) = self.cmd_tx.try_read()
822 && let Err(e) = tx.send(HandlerCommand::SetAccountId(account_id))
823 {
824 log::debug!("Failed to send account_id to handler: {e}");
825 }
826 }
827
828 pub fn cache_client_order(
834 &self,
835 client_order_id: ClientOrderId,
836 venue_order_id: Option<VenueOrderId>,
837 instrument_id: InstrumentId,
838 trader_id: TraderId,
839 strategy_id: StrategyId,
840 ) {
841 if let Ok(tx) = self.cmd_tx.try_read()
842 && let Err(e) = tx.send(HandlerCommand::CacheClientOrder {
843 client_order_id,
844 venue_order_id,
845 instrument_id,
846 trader_id,
847 strategy_id,
848 })
849 {
850 log::debug!("Failed to cache client order: {e}");
851 }
852 }
853
854 pub async fn request_challenge(&self) -> Result<(), KrakenWsError> {
859 let credential = self.credential.as_ref().ok_or_else(|| {
860 KrakenWsError::AuthenticationError(
861 "API credentials required for authentication".to_string(),
862 )
863 })?;
864
865 log::debug!(
868 "Challenge request prepared for API key: {}",
869 credential.api_key_masked()
870 );
871
872 Ok(())
873 }
874
875 pub async fn set_auth_credentials(
877 &self,
878 original_challenge: String,
879 signed_challenge: String,
880 ) -> Result<(), KrakenWsError> {
881 let credential = self.credential.as_ref().ok_or_else(|| {
882 KrakenWsError::AuthenticationError("API credentials required".to_string())
883 })?;
884
885 *self.original_challenge.write().await = Some(original_challenge.clone());
886 *self.signed_challenge.write().await = Some(signed_challenge.clone());
887
888 self.cmd_tx
889 .read()
890 .await
891 .send(HandlerCommand::SetAuthCredentials {
892 api_key: credential.api_key().to_string(),
893 original_challenge,
894 signed_challenge,
895 })
896 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
897
898 Ok(())
899 }
900
901 pub fn sign_challenge(&self, challenge: &str) -> Result<String, KrakenWsError> {
905 let credential = self.credential.as_ref().ok_or_else(|| {
906 KrakenWsError::AuthenticationError("API credentials required".to_string())
907 })?;
908
909 credential.sign_ws_challenge(challenge).map_err(|e| {
910 KrakenWsError::AuthenticationError(format!("Failed to sign challenge: {e}"))
911 })
912 }
913
914 pub async fn authenticate_with_challenge(&self, challenge: &str) -> Result<(), KrakenWsError> {
916 let credential = self.credential.as_ref().ok_or_else(|| {
917 KrakenWsError::AuthenticationError("API credentials required".to_string())
918 })?;
919
920 let signed_challenge = credential.sign_ws_challenge(challenge).map_err(|e| {
921 KrakenWsError::AuthenticationError(format!("Failed to sign challenge: {e}"))
922 })?;
923
924 self.set_auth_credentials(challenge.to_string(), signed_challenge)
925 .await
926 }
927
928 pub async fn subscribe_open_orders(&self) -> Result<(), KrakenWsError> {
930 if self.original_challenge.read().await.is_none() {
931 return Err(KrakenWsError::AuthenticationError(
932 "Must authenticate before subscribing to private feeds".to_string(),
933 ));
934 }
935
936 let key = "open_orders";
937 if !self.subscriptions.add_reference(key) {
938 return Ok(());
939 }
940
941 self.subscriptions.mark_subscribe(key);
942
943 self.cmd_tx
944 .read()
945 .await
946 .send(HandlerCommand::SubscribeOpenOrders)
947 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
948
949 self.subscriptions.confirm_subscribe(key);
950 Ok(())
951 }
952
953 pub async fn subscribe_fills(&self) -> Result<(), KrakenWsError> {
955 if self.original_challenge.read().await.is_none() {
956 return Err(KrakenWsError::AuthenticationError(
957 "Must authenticate before subscribing to private feeds".to_string(),
958 ));
959 }
960
961 let key = "fills";
962 if !self.subscriptions.add_reference(key) {
963 return Ok(());
964 }
965
966 self.subscriptions.mark_subscribe(key);
967
968 self.cmd_tx
969 .read()
970 .await
971 .send(HandlerCommand::SubscribeFills)
972 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
973
974 self.subscriptions.confirm_subscribe(key);
975 Ok(())
976 }
977
978 pub async fn subscribe_executions(&self) -> Result<(), KrakenWsError> {
980 self.subscribe_open_orders().await?;
981 self.subscribe_fills().await?;
982 Ok(())
983 }
984}