1use std::sync::{
19 Arc,
20 atomic::{AtomicBool, AtomicU8, Ordering},
21};
22
23use arc_swap::ArcSwap;
24use nautilus_common::live::get_runtime;
25use nautilus_model::{
26 identifiers::{
27 AccountId, ClientOrderId, InstrumentId, StrategyId, Symbol, TraderId, VenueOrderId,
28 },
29 instruments::InstrumentAny,
30};
31use nautilus_network::{
32 mode::ConnectionMode,
33 websocket::{
34 AuthTracker, SubscriptionState, WebSocketClient, WebSocketConfig, channel_message_handler,
35 },
36};
37use tokio_util::sync::CancellationToken;
38
39use super::{
40 handler::{FuturesFeedHandler, HandlerCommand},
41 messages::{KrakenFuturesFeed, KrakenFuturesWsMessage},
42};
43use crate::{common::credential::KrakenCredential, websocket::error::KrakenWsError};
44
45pub const KRAKEN_FUTURES_WS_TOPIC_DELIMITER: char = ':';
49
50const WS_PING_MSG: &str = r#"{"event":"ping"}"#;
51
52#[derive(Debug)]
54#[cfg_attr(
55 feature = "python",
56 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.kraken")
57)]
58pub struct KrakenFuturesWebSocketClient {
59 url: String,
60 heartbeat_secs: Option<u64>,
61 signal: Arc<AtomicBool>,
62 connection_mode: Arc<ArcSwap<AtomicU8>>,
63 cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
64 out_rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<KrakenFuturesWsMessage>>>,
65 task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
66 subscriptions: SubscriptionState,
67 auth_tracker: AuthTracker,
68 cancellation_token: CancellationToken,
69 credential: Option<KrakenCredential>,
70 original_challenge: Arc<tokio::sync::RwLock<Option<String>>>,
71 signed_challenge: Arc<tokio::sync::RwLock<Option<String>>>,
72}
73
74impl Clone for KrakenFuturesWebSocketClient {
75 fn clone(&self) -> Self {
76 Self {
77 url: self.url.clone(),
78 heartbeat_secs: self.heartbeat_secs,
79 signal: Arc::clone(&self.signal),
80 connection_mode: Arc::clone(&self.connection_mode),
81 cmd_tx: Arc::clone(&self.cmd_tx),
82 out_rx: self.out_rx.clone(),
83 task_handle: self.task_handle.clone(),
84 subscriptions: self.subscriptions.clone(),
85 auth_tracker: self.auth_tracker.clone(),
86 cancellation_token: self.cancellation_token.clone(),
87 credential: self.credential.clone(),
88 original_challenge: Arc::clone(&self.original_challenge),
89 signed_challenge: Arc::clone(&self.signed_challenge),
90 }
91 }
92}
93
94impl KrakenFuturesWebSocketClient {
95 #[must_use]
97 pub fn new(url: String, heartbeat_secs: Option<u64>) -> Self {
98 Self::with_credentials(url, heartbeat_secs, None)
99 }
100
101 #[must_use]
103 pub fn with_credentials(
104 url: String,
105 heartbeat_secs: Option<u64>,
106 credential: Option<KrakenCredential>,
107 ) -> Self {
108 let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
109 let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
110 let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
111
112 Self {
113 url,
114 heartbeat_secs,
115 signal: Arc::new(AtomicBool::new(false)),
116 connection_mode,
117 cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
118 out_rx: None,
119 task_handle: None,
120 subscriptions: SubscriptionState::new(KRAKEN_FUTURES_WS_TOPIC_DELIMITER),
121 auth_tracker: AuthTracker::new(),
122 cancellation_token: CancellationToken::new(),
123 credential,
124 original_challenge: Arc::new(tokio::sync::RwLock::new(None)),
125 signed_challenge: Arc::new(tokio::sync::RwLock::new(None)),
126 }
127 }
128
129 #[must_use]
131 pub fn has_credentials(&self) -> bool {
132 self.credential.is_some()
133 }
134
135 #[must_use]
137 pub fn url(&self) -> &str {
138 &self.url
139 }
140
141 #[must_use]
143 pub fn is_closed(&self) -> bool {
144 ConnectionMode::from_u8(self.connection_mode.load().load(Ordering::Relaxed))
145 == ConnectionMode::Closed
146 }
147
148 #[must_use]
150 pub fn is_active(&self) -> bool {
151 ConnectionMode::from_u8(self.connection_mode.load().load(Ordering::Relaxed))
152 == ConnectionMode::Active
153 }
154
155 pub async fn wait_until_active(&self, timeout_secs: f64) -> Result<(), KrakenWsError> {
157 let timeout = tokio::time::Duration::from_secs_f64(timeout_secs);
158
159 tokio::time::timeout(timeout, async {
160 while !self.is_active() {
161 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
162 }
163 })
164 .await
165 .map_err(|_| {
166 KrakenWsError::ConnectionError(format!(
167 "WebSocket connection timeout after {timeout_secs} seconds"
168 ))
169 })?;
170
171 Ok(())
172 }
173
174 pub async fn authenticate(&self) -> Result<(), KrakenWsError> {
179 let credential = self.credential.as_ref().ok_or_else(|| {
180 KrakenWsError::AuthenticationError("API credentials required".to_string())
181 })?;
182
183 let api_key = credential.api_key().to_string();
184 let (tx, rx) = tokio::sync::oneshot::channel();
185
186 self.cmd_tx
187 .read()
188 .await
189 .send(HandlerCommand::RequestChallenge {
190 api_key: api_key.clone(),
191 response_tx: tx,
192 })
193 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
194
195 let challenge = tokio::time::timeout(tokio::time::Duration::from_secs(10), rx)
196 .await
197 .map_err(|_| {
198 KrakenWsError::AuthenticationError("Timeout waiting for challenge".to_string())
199 })?
200 .map_err(|_| {
201 KrakenWsError::AuthenticationError("Challenge channel closed".to_string())
202 })?;
203
204 let signed_challenge = credential.sign_ws_challenge(&challenge).map_err(|e| {
205 KrakenWsError::AuthenticationError(format!("Failed to sign challenge: {e}"))
206 })?;
207
208 *self.original_challenge.write().await = Some(challenge.clone());
209 *self.signed_challenge.write().await = Some(signed_challenge.clone());
210
211 self.cmd_tx
212 .read()
213 .await
214 .send(HandlerCommand::SetAuthCredentials {
215 api_key,
216 original_challenge: challenge,
217 signed_challenge,
218 })
219 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
220
221 tracing::debug!("Futures WebSocket authentication successful");
222 Ok(())
223 }
224
225 pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
229 if let Ok(tx) = self.cmd_tx.try_read()
230 && let Err(e) = tx.send(HandlerCommand::InitializeInstruments(instruments))
231 {
232 tracing::debug!("Failed to send instruments to handler: {e}");
233 }
234 }
235
236 pub fn cache_instrument(&self, instrument: InstrumentAny) {
240 if let Ok(tx) = self.cmd_tx.try_read()
241 && let Err(e) = tx.send(HandlerCommand::UpdateInstrument(instrument))
242 {
243 tracing::debug!("Failed to send instrument update to handler: {e}");
244 }
245 }
246
247 pub async fn connect(&mut self) -> Result<(), KrakenWsError> {
249 tracing::debug!("Connecting to Futures WebSocket: {}", self.url);
250
251 self.signal.store(false, Ordering::Relaxed);
252
253 let (raw_handler, raw_rx) = channel_message_handler();
254
255 let ws_config = WebSocketConfig {
256 url: self.url.clone(),
257 headers: vec![],
258 heartbeat: self.heartbeat_secs,
259 heartbeat_msg: Some(WS_PING_MSG.to_string()),
260 reconnect_timeout_ms: Some(5_000),
261 reconnect_delay_initial_ms: Some(500),
262 reconnect_delay_max_ms: Some(5_000),
263 reconnect_backoff_factor: Some(1.5),
264 reconnect_jitter_ms: Some(250),
265 reconnect_max_attempts: None,
266 };
267
268 let ws_client =
269 WebSocketClient::connect(ws_config, Some(raw_handler), None, None, vec![], None)
270 .await
271 .map_err(|e| KrakenWsError::ConnectionError(e.to_string()))?;
272
273 self.connection_mode
274 .store(ws_client.connection_mode_atomic());
275
276 let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<KrakenFuturesWsMessage>();
277 self.out_rx = Some(Arc::new(out_rx));
278
279 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
280 *self.cmd_tx.write().await = cmd_tx.clone();
281
282 if let Err(e) = cmd_tx.send(HandlerCommand::SetClient(ws_client)) {
283 return Err(KrakenWsError::ConnectionError(format!(
284 "Failed to send WebSocketClient to handler: {e}"
285 )));
286 }
287
288 let signal = self.signal.clone();
289 let subscriptions = self.subscriptions.clone();
290 let cmd_tx_for_reconnect = cmd_tx.clone();
291 let credential_for_reconnect = self.credential.clone();
292
293 let stream_handle = get_runtime().spawn(async move {
294 let mut handler =
295 FuturesFeedHandler::new(signal.clone(), cmd_rx, raw_rx, subscriptions.clone());
296
297 loop {
298 match handler.next().await {
299 Some(KrakenFuturesWsMessage::Reconnected) => {
300 if signal.load(Ordering::Relaxed) {
301 continue;
302 }
303 tracing::info!("WebSocket reconnected, resubscribing");
304
305 let confirmed_topics = subscriptions.all_topics();
307 for topic in &confirmed_topics {
308 subscriptions.mark_failure(topic);
309 }
310
311 let topics = subscriptions.all_topics();
312 if topics.is_empty() {
313 tracing::debug!("No subscriptions to restore after reconnection");
314 } else {
315 let has_private_subs = topics.iter().any(|t| {
317 t == "open_orders"
318 || t == "fills"
319 || t.starts_with("open_orders:")
320 || t.starts_with("fills:")
321 });
322
323 if has_private_subs {
324 if let Some(ref cred) = credential_for_reconnect {
325 let (tx, rx) = tokio::sync::oneshot::channel();
327 if let Err(e) = cmd_tx_for_reconnect.send(
328 HandlerCommand::RequestChallenge {
329 api_key: cred.api_key().to_string(),
330 response_tx: tx,
331 },
332 ) {
333 tracing::error!(
334 error = %e,
335 "Failed to request challenge for reconnect"
336 );
337 } else {
338 match tokio::time::timeout(
339 tokio::time::Duration::from_secs(10),
340 rx,
341 )
342 .await
343 {
344 Ok(Ok(challenge)) => {
345 match cred.sign_ws_challenge(&challenge) {
346 Ok(signed) => {
347 if let Err(e) = cmd_tx_for_reconnect.send(
348 HandlerCommand::SetAuthCredentials {
349 api_key: cred.api_key().to_string(),
350 original_challenge: challenge,
351 signed_challenge: signed,
352 },
353 ) {
354 tracing::error!(
355 error = %e,
356 "Failed to set auth credentials"
357 );
358 } else {
359 tracing::debug!(
360 "Re-authenticated after reconnect"
361 );
362 }
363 }
364 Err(e) => {
365 tracing::error!(
366 error = %e,
367 "Failed to sign challenge for reconnect"
368 );
369 }
370 }
371 }
372 Ok(Err(_)) => {
373 tracing::error!(
374 "Challenge channel closed during reconnect"
375 );
376 }
377 Err(_) => {
378 tracing::error!(
379 "Timeout waiting for challenge during reconnect"
380 );
381 }
382 }
383 }
384 } else {
385 tracing::warn!(
386 "Private subscriptions exist but no credentials available"
387 );
388 }
389 }
390
391 tracing::info!(
392 count = topics.len(),
393 "Resubscribing after reconnection"
394 );
395
396 for topic in &topics {
397 let cmd =
398 if let Some((feed_str, symbol_str)) = topic.split_once(':') {
399 let symbol = Symbol::from(symbol_str);
400 match feed_str.parse::<KrakenFuturesFeed>() {
401 Ok(KrakenFuturesFeed::Trade) => {
402 Some(HandlerCommand::SubscribeTrade(symbol))
403 }
404 Ok(KrakenFuturesFeed::Book) => {
405 Some(HandlerCommand::SubscribeBook(symbol))
406 }
407 Ok(KrakenFuturesFeed::Ticker) => {
408 Some(HandlerCommand::SubscribeTicker(symbol))
409 }
410 Ok(KrakenFuturesFeed::OpenOrders) => {
411 Some(HandlerCommand::SubscribeOpenOrders)
412 }
413 Ok(KrakenFuturesFeed::Fills) => {
414 Some(HandlerCommand::SubscribeFills)
415 }
416 Ok(_) | Err(_) => None,
417 }
418 } else {
419 match topic.parse::<KrakenFuturesFeed>() {
420 Ok(KrakenFuturesFeed::OpenOrders) => {
421 Some(HandlerCommand::SubscribeOpenOrders)
422 }
423 Ok(KrakenFuturesFeed::Fills) => {
424 Some(HandlerCommand::SubscribeFills)
425 }
426 Ok(_) | Err(_) => None,
427 }
428 };
429
430 if let Some(cmd) = cmd
431 && let Err(e) = cmd_tx_for_reconnect.send(cmd)
432 {
433 tracing::error!(
434 error = %e, topic,
435 "Failed to send resubscribe command"
436 );
437 }
438
439 subscriptions.mark_subscribe(topic);
440 }
441 }
442
443 if let Err(e) = out_tx.send(KrakenFuturesWsMessage::Reconnected) {
444 tracing::debug!("Output channel closed: {e}");
445 break;
446 }
447 continue;
448 }
449 Some(msg) => {
450 if let Err(e) = out_tx.send(msg) {
451 tracing::debug!("Output channel closed: {e}");
452 break;
453 }
454 }
455 None => {
456 tracing::debug!("Handler stream ended");
457 break;
458 }
459 }
460 }
461
462 tracing::debug!("Futures handler task exiting");
463 });
464
465 self.task_handle = Some(Arc::new(stream_handle));
466
467 tracing::debug!("Futures WebSocket connected successfully");
468 Ok(())
469 }
470
471 pub async fn disconnect(&mut self) -> Result<(), KrakenWsError> {
473 tracing::debug!("Disconnecting Futures WebSocket");
474
475 self.signal.store(true, Ordering::Relaxed);
476
477 if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Disconnect) {
478 tracing::debug!(
479 "Failed to send disconnect command (handler may already be shut down): {e}"
480 );
481 }
482
483 if let Some(task_handle) = self.task_handle.take() {
484 match Arc::try_unwrap(task_handle) {
485 Ok(handle) => {
486 match tokio::time::timeout(tokio::time::Duration::from_secs(2), handle).await {
487 Ok(Ok(())) => tracing::debug!("Task handle completed successfully"),
488 Ok(Err(e)) => tracing::error!("Task handle encountered an error: {e:?}"),
489 Err(_) => {
490 tracing::warn!("Timeout waiting for task handle");
491 }
492 }
493 }
494 Err(arc_handle) => {
495 tracing::debug!("Cannot take ownership of task handle, aborting");
496 arc_handle.abort();
497 }
498 }
499 }
500
501 self.subscriptions.clear();
502 self.auth_tracker.fail("Disconnected");
503 Ok(())
504 }
505
506 pub async fn close(&mut self) -> Result<(), KrakenWsError> {
508 self.disconnect().await
509 }
510
511 pub async fn subscribe_mark_price(
513 &self,
514 instrument_id: InstrumentId,
515 ) -> Result<(), KrakenWsError> {
516 let symbol = instrument_id.symbol;
517 let key = format!("mark:{symbol}");
518
519 if !self.subscriptions.add_reference(&key) {
520 return Ok(());
521 }
522
523 self.subscriptions.mark_subscribe(&key);
524 self.subscriptions.confirm_subscribe(&key);
525 self.ensure_ticker_subscribed(symbol).await
526 }
527
528 pub async fn unsubscribe_mark_price(
530 &self,
531 instrument_id: InstrumentId,
532 ) -> Result<(), KrakenWsError> {
533 let symbol = instrument_id.symbol;
534 let key = format!("mark:{symbol}");
535
536 if !self.subscriptions.remove_reference(&key) {
537 return Ok(());
538 }
539
540 self.subscriptions.mark_unsubscribe(&key);
541 self.subscriptions.confirm_unsubscribe(&key);
542 self.maybe_unsubscribe_ticker(symbol).await
543 }
544
545 pub async fn subscribe_index_price(
547 &self,
548 instrument_id: InstrumentId,
549 ) -> Result<(), KrakenWsError> {
550 let symbol = instrument_id.symbol;
551 let key = format!("index:{symbol}");
552
553 if !self.subscriptions.add_reference(&key) {
554 return Ok(());
555 }
556
557 self.subscriptions.mark_subscribe(&key);
558 self.subscriptions.confirm_subscribe(&key);
559 self.ensure_ticker_subscribed(symbol).await
560 }
561
562 pub async fn unsubscribe_index_price(
564 &self,
565 instrument_id: InstrumentId,
566 ) -> Result<(), KrakenWsError> {
567 let symbol = instrument_id.symbol;
568 let key = format!("index:{symbol}");
569
570 if !self.subscriptions.remove_reference(&key) {
571 return Ok(());
572 }
573
574 self.subscriptions.mark_unsubscribe(&key);
575 self.subscriptions.confirm_unsubscribe(&key);
576 self.maybe_unsubscribe_ticker(symbol).await
577 }
578
579 pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> Result<(), KrakenWsError> {
583 let symbol = instrument_id.symbol;
584 let key = format!("quotes:{symbol}");
585
586 if !self.subscriptions.add_reference(&key) {
587 return Ok(());
588 }
589
590 self.subscriptions.mark_subscribe(&key);
591 self.subscriptions.confirm_subscribe(&key);
592
593 self.ensure_book_subscribed(symbol).await
595 }
596
597 pub async fn unsubscribe_quotes(
599 &self,
600 instrument_id: InstrumentId,
601 ) -> Result<(), KrakenWsError> {
602 let symbol = instrument_id.symbol;
603 let key = format!("quotes:{symbol}");
604
605 if !self.subscriptions.remove_reference(&key) {
606 return Ok(());
607 }
608
609 self.subscriptions.mark_unsubscribe(&key);
610 self.subscriptions.confirm_unsubscribe(&key);
611 self.maybe_unsubscribe_book(symbol).await
612 }
613
614 pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> Result<(), KrakenWsError> {
616 let symbol = instrument_id.symbol;
617 let key = format!("trades:{symbol}");
618
619 if !self.subscriptions.add_reference(&key) {
620 return Ok(());
621 }
622
623 self.subscriptions.mark_subscribe(&key);
624
625 self.cmd_tx
626 .read()
627 .await
628 .send(HandlerCommand::SubscribeTrade(symbol))
629 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
630
631 self.subscriptions.confirm_subscribe(&key);
632 Ok(())
633 }
634
635 pub async fn unsubscribe_trades(
637 &self,
638 instrument_id: InstrumentId,
639 ) -> Result<(), KrakenWsError> {
640 let symbol = instrument_id.symbol;
641 let key = format!("trades:{symbol}");
642
643 if !self.subscriptions.remove_reference(&key) {
644 return Ok(());
645 }
646
647 self.subscriptions.mark_unsubscribe(&key);
648
649 self.cmd_tx
650 .read()
651 .await
652 .send(HandlerCommand::UnsubscribeTrade(symbol))
653 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
654
655 self.subscriptions.confirm_unsubscribe(&key);
656 Ok(())
657 }
658
659 pub async fn subscribe_book(
664 &self,
665 instrument_id: InstrumentId,
666 _depth: Option<u32>,
667 ) -> Result<(), KrakenWsError> {
668 let symbol = instrument_id.symbol;
669 let key = format!("book:{symbol}");
670
671 if !self.subscriptions.add_reference(&key) {
672 return Ok(());
673 }
674
675 self.subscriptions.mark_subscribe(&key);
676
677 self.cmd_tx
678 .read()
679 .await
680 .send(HandlerCommand::SubscribeBook(symbol))
681 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
682
683 self.subscriptions.confirm_subscribe(&key);
684 Ok(())
685 }
686
687 pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> Result<(), KrakenWsError> {
689 let symbol = instrument_id.symbol;
690 let key = format!("book:{symbol}");
691
692 if !self.subscriptions.remove_reference(&key) {
693 return Ok(());
694 }
695
696 self.subscriptions.mark_unsubscribe(&key);
697
698 self.cmd_tx
699 .read()
700 .await
701 .send(HandlerCommand::UnsubscribeBook(symbol))
702 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
703
704 self.subscriptions.confirm_unsubscribe(&key);
705 Ok(())
706 }
707
708 async fn ensure_ticker_subscribed(&self, symbol: Symbol) -> Result<(), KrakenWsError> {
710 let ticker_key = format!("ticker:{symbol}");
711
712 if !self.subscriptions.add_reference(&ticker_key) {
713 return Ok(());
714 }
715
716 self.subscriptions.mark_subscribe(&ticker_key);
717 self.cmd_tx
718 .read()
719 .await
720 .send(HandlerCommand::SubscribeTicker(symbol))
721 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
722 self.subscriptions.confirm_subscribe(&ticker_key);
723 Ok(())
724 }
725
726 async fn maybe_unsubscribe_ticker(&self, symbol: Symbol) -> Result<(), KrakenWsError> {
728 let ticker_key = format!("ticker:{symbol}");
729
730 if !self.subscriptions.remove_reference(&ticker_key) {
731 return Ok(());
732 }
733
734 self.subscriptions.mark_unsubscribe(&ticker_key);
735 self.cmd_tx
736 .read()
737 .await
738 .send(HandlerCommand::UnsubscribeTicker(symbol))
739 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
740 self.subscriptions.confirm_unsubscribe(&ticker_key);
741 Ok(())
742 }
743
744 async fn ensure_book_subscribed(&self, symbol: Symbol) -> Result<(), KrakenWsError> {
746 let book_key = format!("book:{symbol}");
747
748 if !self.subscriptions.add_reference(&book_key) {
749 return Ok(());
750 }
751
752 self.subscriptions.mark_subscribe(&book_key);
753 self.cmd_tx
754 .read()
755 .await
756 .send(HandlerCommand::SubscribeBook(symbol))
757 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
758 self.subscriptions.confirm_subscribe(&book_key);
759 Ok(())
760 }
761
762 async fn maybe_unsubscribe_book(&self, symbol: Symbol) -> Result<(), KrakenWsError> {
764 let book_key = format!("book:{symbol}");
765
766 if !self.subscriptions.remove_reference(&book_key) {
767 return Ok(());
768 }
769
770 self.subscriptions.mark_unsubscribe(&book_key);
771 self.cmd_tx
772 .read()
773 .await
774 .send(HandlerCommand::UnsubscribeBook(symbol))
775 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
776 self.subscriptions.confirm_unsubscribe(&book_key);
777 Ok(())
778 }
779
780 pub fn take_output_rx(
782 &mut self,
783 ) -> Option<tokio::sync::mpsc::UnboundedReceiver<KrakenFuturesWsMessage>> {
784 self.out_rx.take().and_then(|arc| Arc::try_unwrap(arc).ok())
785 }
786
787 pub fn set_account_id(&self, account_id: AccountId) {
792 if let Ok(tx) = self.cmd_tx.try_read()
793 && let Err(e) = tx.send(HandlerCommand::SetAccountId(account_id))
794 {
795 tracing::debug!("Failed to send account_id to handler: {e}");
796 }
797 }
798
799 pub fn cache_client_order(
805 &self,
806 client_order_id: ClientOrderId,
807 venue_order_id: Option<VenueOrderId>,
808 instrument_id: InstrumentId,
809 trader_id: TraderId,
810 strategy_id: StrategyId,
811 ) {
812 if let Ok(tx) = self.cmd_tx.try_read()
813 && let Err(e) = tx.send(HandlerCommand::CacheClientOrder {
814 client_order_id,
815 venue_order_id,
816 instrument_id,
817 trader_id,
818 strategy_id,
819 })
820 {
821 tracing::debug!("Failed to cache client order: {e}");
822 }
823 }
824
825 pub async fn request_challenge(&self) -> Result<(), KrakenWsError> {
830 let credential = self.credential.as_ref().ok_or_else(|| {
831 KrakenWsError::AuthenticationError(
832 "API credentials required for authentication".to_string(),
833 )
834 })?;
835
836 tracing::debug!(
839 "Challenge request prepared for API key: {}",
840 credential.api_key_masked()
841 );
842
843 Ok(())
844 }
845
846 pub async fn set_auth_credentials(
848 &self,
849 original_challenge: String,
850 signed_challenge: String,
851 ) -> Result<(), KrakenWsError> {
852 let credential = self.credential.as_ref().ok_or_else(|| {
853 KrakenWsError::AuthenticationError("API credentials required".to_string())
854 })?;
855
856 *self.original_challenge.write().await = Some(original_challenge.clone());
857 *self.signed_challenge.write().await = Some(signed_challenge.clone());
858
859 self.cmd_tx
860 .read()
861 .await
862 .send(HandlerCommand::SetAuthCredentials {
863 api_key: credential.api_key().to_string(),
864 original_challenge,
865 signed_challenge,
866 })
867 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
868
869 Ok(())
870 }
871
872 pub fn sign_challenge(&self, challenge: &str) -> Result<String, KrakenWsError> {
876 let credential = self.credential.as_ref().ok_or_else(|| {
877 KrakenWsError::AuthenticationError("API credentials required".to_string())
878 })?;
879
880 credential.sign_ws_challenge(challenge).map_err(|e| {
881 KrakenWsError::AuthenticationError(format!("Failed to sign challenge: {e}"))
882 })
883 }
884
885 pub async fn authenticate_with_challenge(&self, challenge: &str) -> Result<(), KrakenWsError> {
887 let credential = self.credential.as_ref().ok_or_else(|| {
888 KrakenWsError::AuthenticationError("API credentials required".to_string())
889 })?;
890
891 let signed_challenge = credential.sign_ws_challenge(challenge).map_err(|e| {
892 KrakenWsError::AuthenticationError(format!("Failed to sign challenge: {e}"))
893 })?;
894
895 self.set_auth_credentials(challenge.to_string(), signed_challenge)
896 .await
897 }
898
899 pub async fn subscribe_open_orders(&self) -> Result<(), KrakenWsError> {
901 if self.original_challenge.read().await.is_none() {
902 return Err(KrakenWsError::AuthenticationError(
903 "Must authenticate before subscribing to private feeds".to_string(),
904 ));
905 }
906
907 let key = "open_orders";
908 if !self.subscriptions.add_reference(key) {
909 return Ok(());
910 }
911
912 self.subscriptions.mark_subscribe(key);
913
914 self.cmd_tx
915 .read()
916 .await
917 .send(HandlerCommand::SubscribeOpenOrders)
918 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
919
920 self.subscriptions.confirm_subscribe(key);
921 Ok(())
922 }
923
924 pub async fn subscribe_fills(&self) -> Result<(), KrakenWsError> {
926 if self.original_challenge.read().await.is_none() {
927 return Err(KrakenWsError::AuthenticationError(
928 "Must authenticate before subscribing to private feeds".to_string(),
929 ));
930 }
931
932 let key = "fills";
933 if !self.subscriptions.add_reference(key) {
934 return Ok(());
935 }
936
937 self.subscriptions.mark_subscribe(key);
938
939 self.cmd_tx
940 .read()
941 .await
942 .send(HandlerCommand::SubscribeFills)
943 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
944
945 self.subscriptions.confirm_subscribe(key);
946 Ok(())
947 }
948
949 pub async fn subscribe_executions(&self) -> Result<(), KrakenWsError> {
951 self.subscribe_open_orders().await?;
952 self.subscribe_fills().await?;
953 Ok(())
954 }
955}