1use std::sync::{
19 Arc,
20 atomic::{AtomicBool, AtomicU8, Ordering},
21};
22
23use arc_swap::ArcSwap;
24use nautilus_common::live::runtime::get_runtime;
25use nautilus_model::{
26 identifiers::{
27 AccountId, ClientOrderId, InstrumentId, StrategyId, Symbol, TraderId, VenueOrderId,
28 },
29 instruments::InstrumentAny,
30};
31use nautilus_network::{
32 mode::ConnectionMode,
33 websocket::{
34 AuthTracker, SubscriptionState, WebSocketClient, WebSocketConfig, channel_message_handler,
35 },
36};
37use tokio_util::sync::CancellationToken;
38
39use super::{
40 handler::{FuturesFeedHandler, HandlerCommand},
41 messages::{KrakenFuturesFeed, KrakenFuturesWsMessage},
42};
43use crate::{common::credential::KrakenCredential, websocket::error::KrakenWsError};
44
45pub const KRAKEN_FUTURES_WS_TOPIC_DELIMITER: char = ':';
49
50const WS_PING_MSG: &str = r#"{"event":"ping"}"#;
51
52#[derive(Debug)]
54#[cfg_attr(
55 feature = "python",
56 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
57)]
58pub struct KrakenFuturesWebSocketClient {
59 url: String,
60 heartbeat_secs: Option<u64>,
61 signal: Arc<AtomicBool>,
62 connection_mode: Arc<ArcSwap<AtomicU8>>,
63 cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
64 out_rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<KrakenFuturesWsMessage>>>,
65 task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
66 subscriptions: SubscriptionState,
67 auth_tracker: AuthTracker,
68 cancellation_token: CancellationToken,
69 credential: Option<KrakenCredential>,
70 original_challenge: Arc<tokio::sync::RwLock<Option<String>>>,
71 signed_challenge: Arc<tokio::sync::RwLock<Option<String>>>,
72}
73
74impl Clone for KrakenFuturesWebSocketClient {
75 fn clone(&self) -> Self {
76 Self {
77 url: self.url.clone(),
78 heartbeat_secs: self.heartbeat_secs,
79 signal: Arc::clone(&self.signal),
80 connection_mode: Arc::clone(&self.connection_mode),
81 cmd_tx: Arc::clone(&self.cmd_tx),
82 out_rx: self.out_rx.clone(),
83 task_handle: self.task_handle.clone(),
84 subscriptions: self.subscriptions.clone(),
85 auth_tracker: self.auth_tracker.clone(),
86 cancellation_token: self.cancellation_token.clone(),
87 credential: self.credential.clone(),
88 original_challenge: Arc::clone(&self.original_challenge),
89 signed_challenge: Arc::clone(&self.signed_challenge),
90 }
91 }
92}
93
94impl KrakenFuturesWebSocketClient {
95 #[must_use]
97 pub fn new(url: String, heartbeat_secs: Option<u64>) -> Self {
98 Self::with_credentials(url, heartbeat_secs, None)
99 }
100
101 #[must_use]
103 pub fn with_credentials(
104 url: String,
105 heartbeat_secs: Option<u64>,
106 credential: Option<KrakenCredential>,
107 ) -> Self {
108 let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
109 let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
110 let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
111
112 Self {
113 url,
114 heartbeat_secs,
115 signal: Arc::new(AtomicBool::new(false)),
116 connection_mode,
117 cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
118 out_rx: None,
119 task_handle: None,
120 subscriptions: SubscriptionState::new(KRAKEN_FUTURES_WS_TOPIC_DELIMITER),
121 auth_tracker: AuthTracker::new(),
122 cancellation_token: CancellationToken::new(),
123 credential,
124 original_challenge: Arc::new(tokio::sync::RwLock::new(None)),
125 signed_challenge: Arc::new(tokio::sync::RwLock::new(None)),
126 }
127 }
128
129 #[must_use]
131 pub fn has_credentials(&self) -> bool {
132 self.credential.is_some()
133 }
134
135 #[must_use]
137 pub fn url(&self) -> &str {
138 &self.url
139 }
140
141 #[must_use]
143 pub fn is_closed(&self) -> bool {
144 ConnectionMode::from_u8(self.connection_mode.load().load(Ordering::Relaxed))
145 == ConnectionMode::Closed
146 }
147
148 #[must_use]
150 pub fn is_active(&self) -> bool {
151 ConnectionMode::from_u8(self.connection_mode.load().load(Ordering::Relaxed))
152 == ConnectionMode::Active
153 }
154
155 pub async fn wait_until_active(&self, timeout_secs: f64) -> Result<(), KrakenWsError> {
157 let timeout = tokio::time::Duration::from_secs_f64(timeout_secs);
158
159 tokio::time::timeout(timeout, async {
160 while !self.is_active() {
161 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
162 }
163 })
164 .await
165 .map_err(|_| {
166 KrakenWsError::ConnectionError(format!(
167 "WebSocket connection timeout after {timeout_secs} seconds"
168 ))
169 })?;
170
171 Ok(())
172 }
173
174 pub async fn authenticate(&self) -> Result<(), KrakenWsError> {
179 let credential = self.credential.as_ref().ok_or_else(|| {
180 KrakenWsError::AuthenticationError("API credentials required".to_string())
181 })?;
182
183 let api_key = credential.api_key().to_string();
184 let (tx, rx) = tokio::sync::oneshot::channel();
185
186 self.cmd_tx
187 .read()
188 .await
189 .send(HandlerCommand::RequestChallenge {
190 api_key: api_key.clone(),
191 response_tx: tx,
192 })
193 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
194
195 let challenge = tokio::time::timeout(tokio::time::Duration::from_secs(10), rx)
196 .await
197 .map_err(|_| {
198 KrakenWsError::AuthenticationError("Timeout waiting for challenge".to_string())
199 })?
200 .map_err(|_| {
201 KrakenWsError::AuthenticationError("Challenge channel closed".to_string())
202 })?;
203
204 let signed_challenge = credential.sign_ws_challenge(&challenge).map_err(|e| {
205 KrakenWsError::AuthenticationError(format!("Failed to sign challenge: {e}"))
206 })?;
207
208 *self.original_challenge.write().await = Some(challenge.clone());
209 *self.signed_challenge.write().await = Some(signed_challenge.clone());
210
211 self.cmd_tx
212 .read()
213 .await
214 .send(HandlerCommand::SetAuthCredentials {
215 api_key,
216 original_challenge: challenge,
217 signed_challenge,
218 })
219 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
220
221 tracing::debug!("Futures WebSocket authentication successful");
222 Ok(())
223 }
224
225 pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
229 if let Ok(tx) = self.cmd_tx.try_read()
230 && let Err(e) = tx.send(HandlerCommand::InitializeInstruments(instruments))
231 {
232 tracing::debug!("Failed to send instruments to handler: {e}");
233 }
234 }
235
236 pub fn cache_instrument(&self, instrument: InstrumentAny) {
240 if let Ok(tx) = self.cmd_tx.try_read()
241 && let Err(e) = tx.send(HandlerCommand::UpdateInstrument(instrument))
242 {
243 tracing::debug!("Failed to send instrument update to handler: {e}");
244 }
245 }
246
247 pub async fn connect(&mut self) -> Result<(), KrakenWsError> {
249 tracing::debug!("Connecting to Futures WebSocket: {}", self.url);
250
251 self.signal.store(false, Ordering::Relaxed);
252
253 let (raw_handler, raw_rx) = channel_message_handler();
254
255 let ws_config = WebSocketConfig {
256 url: self.url.clone(),
257 headers: vec![],
258 message_handler: Some(raw_handler),
259 ping_handler: None,
260 heartbeat: self.heartbeat_secs,
261 heartbeat_msg: Some(WS_PING_MSG.to_string()),
262 reconnect_timeout_ms: Some(5_000),
263 reconnect_delay_initial_ms: Some(500),
264 reconnect_delay_max_ms: Some(5_000),
265 reconnect_backoff_factor: Some(1.5),
266 reconnect_jitter_ms: Some(250),
267 reconnect_max_attempts: None,
268 };
269
270 let ws_client = WebSocketClient::connect(ws_config, None, vec![], None)
271 .await
272 .map_err(|e| KrakenWsError::ConnectionError(e.to_string()))?;
273
274 self.connection_mode
275 .store(ws_client.connection_mode_atomic());
276
277 let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<KrakenFuturesWsMessage>();
278 self.out_rx = Some(Arc::new(out_rx));
279
280 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
281 *self.cmd_tx.write().await = cmd_tx.clone();
282
283 if let Err(e) = cmd_tx.send(HandlerCommand::SetClient(ws_client)) {
284 return Err(KrakenWsError::ConnectionError(format!(
285 "Failed to send WebSocketClient to handler: {e}"
286 )));
287 }
288
289 let signal = self.signal.clone();
290 let subscriptions = self.subscriptions.clone();
291 let cmd_tx_for_reconnect = cmd_tx.clone();
292 let credential_for_reconnect = self.credential.clone();
293
294 let stream_handle = get_runtime().spawn(async move {
295 let mut handler =
296 FuturesFeedHandler::new(signal.clone(), cmd_rx, raw_rx, subscriptions.clone());
297
298 loop {
299 match handler.next().await {
300 Some(KrakenFuturesWsMessage::Reconnected) => {
301 if signal.load(Ordering::Relaxed) {
302 continue;
303 }
304 tracing::info!("WebSocket reconnected, resubscribing");
305
306 let confirmed_topics = subscriptions.all_topics();
308 for topic in &confirmed_topics {
309 subscriptions.mark_failure(topic);
310 }
311
312 let topics = subscriptions.all_topics();
313 if topics.is_empty() {
314 tracing::debug!("No subscriptions to restore after reconnection");
315 } else {
316 let has_private_subs = topics.iter().any(|t| {
318 t == "open_orders"
319 || t == "fills"
320 || t.starts_with("open_orders:")
321 || t.starts_with("fills:")
322 });
323
324 if has_private_subs {
325 if let Some(ref cred) = credential_for_reconnect {
326 let (tx, rx) = tokio::sync::oneshot::channel();
328 if let Err(e) = cmd_tx_for_reconnect.send(
329 HandlerCommand::RequestChallenge {
330 api_key: cred.api_key().to_string(),
331 response_tx: tx,
332 },
333 ) {
334 tracing::error!(
335 error = %e,
336 "Failed to request challenge for reconnect"
337 );
338 } else {
339 match tokio::time::timeout(
340 tokio::time::Duration::from_secs(10),
341 rx,
342 )
343 .await
344 {
345 Ok(Ok(challenge)) => {
346 match cred.sign_ws_challenge(&challenge) {
347 Ok(signed) => {
348 if let Err(e) = cmd_tx_for_reconnect.send(
349 HandlerCommand::SetAuthCredentials {
350 api_key: cred.api_key().to_string(),
351 original_challenge: challenge,
352 signed_challenge: signed,
353 },
354 ) {
355 tracing::error!(
356 error = %e,
357 "Failed to set auth credentials"
358 );
359 } else {
360 tracing::debug!(
361 "Re-authenticated after reconnect"
362 );
363 }
364 }
365 Err(e) => {
366 tracing::error!(
367 error = %e,
368 "Failed to sign challenge for reconnect"
369 );
370 }
371 }
372 }
373 Ok(Err(_)) => {
374 tracing::error!(
375 "Challenge channel closed during reconnect"
376 );
377 }
378 Err(_) => {
379 tracing::error!(
380 "Timeout waiting for challenge during reconnect"
381 );
382 }
383 }
384 }
385 } else {
386 tracing::warn!(
387 "Private subscriptions exist but no credentials available"
388 );
389 }
390 }
391
392 tracing::info!(
393 count = topics.len(),
394 "Resubscribing after reconnection"
395 );
396
397 for topic in &topics {
398 let cmd =
399 if let Some((feed_str, symbol_str)) = topic.split_once(':') {
400 let symbol = Symbol::from(symbol_str);
401 match feed_str.parse::<KrakenFuturesFeed>() {
402 Ok(KrakenFuturesFeed::Trade) => {
403 Some(HandlerCommand::SubscribeTrade(symbol))
404 }
405 Ok(KrakenFuturesFeed::Book) => {
406 Some(HandlerCommand::SubscribeBook(symbol))
407 }
408 Ok(KrakenFuturesFeed::Ticker) => {
409 Some(HandlerCommand::SubscribeTicker(symbol))
410 }
411 Ok(KrakenFuturesFeed::OpenOrders) => {
412 Some(HandlerCommand::SubscribeOpenOrders)
413 }
414 Ok(KrakenFuturesFeed::Fills) => {
415 Some(HandlerCommand::SubscribeFills)
416 }
417 Ok(_) | Err(_) => None,
418 }
419 } else {
420 match topic.parse::<KrakenFuturesFeed>() {
421 Ok(KrakenFuturesFeed::OpenOrders) => {
422 Some(HandlerCommand::SubscribeOpenOrders)
423 }
424 Ok(KrakenFuturesFeed::Fills) => {
425 Some(HandlerCommand::SubscribeFills)
426 }
427 Ok(_) | Err(_) => None,
428 }
429 };
430
431 if let Some(cmd) = cmd
432 && let Err(e) = cmd_tx_for_reconnect.send(cmd)
433 {
434 tracing::error!(
435 error = %e, topic,
436 "Failed to send resubscribe command"
437 );
438 }
439
440 subscriptions.mark_subscribe(topic);
441 }
442 }
443
444 if let Err(e) = out_tx.send(KrakenFuturesWsMessage::Reconnected) {
445 tracing::debug!("Output channel closed: {e}");
446 break;
447 }
448 continue;
449 }
450 Some(msg) => {
451 if let Err(e) = out_tx.send(msg) {
452 tracing::debug!("Output channel closed: {e}");
453 break;
454 }
455 }
456 None => {
457 tracing::debug!("Handler stream ended");
458 break;
459 }
460 }
461 }
462
463 tracing::debug!("Futures handler task exiting");
464 });
465
466 self.task_handle = Some(Arc::new(stream_handle));
467
468 tracing::debug!("Futures WebSocket connected successfully");
469 Ok(())
470 }
471
472 pub async fn disconnect(&mut self) -> Result<(), KrakenWsError> {
474 tracing::debug!("Disconnecting Futures WebSocket");
475
476 self.signal.store(true, Ordering::Relaxed);
477
478 if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Disconnect) {
479 tracing::debug!(
480 "Failed to send disconnect command (handler may already be shut down): {e}"
481 );
482 }
483
484 if let Some(task_handle) = self.task_handle.take() {
485 match Arc::try_unwrap(task_handle) {
486 Ok(handle) => {
487 match tokio::time::timeout(tokio::time::Duration::from_secs(2), handle).await {
488 Ok(Ok(())) => tracing::debug!("Task handle completed successfully"),
489 Ok(Err(e)) => tracing::error!("Task handle encountered an error: {e:?}"),
490 Err(_) => {
491 tracing::warn!("Timeout waiting for task handle");
492 }
493 }
494 }
495 Err(arc_handle) => {
496 tracing::debug!("Cannot take ownership of task handle, aborting");
497 arc_handle.abort();
498 }
499 }
500 }
501
502 self.subscriptions.clear();
503 self.auth_tracker.fail("Disconnected");
504 Ok(())
505 }
506
507 pub async fn close(&mut self) -> Result<(), KrakenWsError> {
509 self.disconnect().await
510 }
511
512 pub async fn subscribe_mark_price(
514 &self,
515 instrument_id: InstrumentId,
516 ) -> Result<(), KrakenWsError> {
517 let symbol = instrument_id.symbol;
518 let key = format!("mark:{symbol}");
519
520 if !self.subscriptions.add_reference(&key) {
521 return Ok(());
522 }
523
524 self.subscriptions.mark_subscribe(&key);
525 self.subscriptions.confirm_subscribe(&key);
526 self.ensure_ticker_subscribed(symbol).await
527 }
528
529 pub async fn unsubscribe_mark_price(
531 &self,
532 instrument_id: InstrumentId,
533 ) -> Result<(), KrakenWsError> {
534 let symbol = instrument_id.symbol;
535 let key = format!("mark:{symbol}");
536
537 if !self.subscriptions.remove_reference(&key) {
538 return Ok(());
539 }
540
541 self.subscriptions.mark_unsubscribe(&key);
542 self.subscriptions.confirm_unsubscribe(&key);
543 self.maybe_unsubscribe_ticker(symbol).await
544 }
545
546 pub async fn subscribe_index_price(
548 &self,
549 instrument_id: InstrumentId,
550 ) -> Result<(), KrakenWsError> {
551 let symbol = instrument_id.symbol;
552 let key = format!("index:{symbol}");
553
554 if !self.subscriptions.add_reference(&key) {
555 return Ok(());
556 }
557
558 self.subscriptions.mark_subscribe(&key);
559 self.subscriptions.confirm_subscribe(&key);
560 self.ensure_ticker_subscribed(symbol).await
561 }
562
563 pub async fn unsubscribe_index_price(
565 &self,
566 instrument_id: InstrumentId,
567 ) -> Result<(), KrakenWsError> {
568 let symbol = instrument_id.symbol;
569 let key = format!("index:{symbol}");
570
571 if !self.subscriptions.remove_reference(&key) {
572 return Ok(());
573 }
574
575 self.subscriptions.mark_unsubscribe(&key);
576 self.subscriptions.confirm_unsubscribe(&key);
577 self.maybe_unsubscribe_ticker(symbol).await
578 }
579
580 pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> Result<(), KrakenWsError> {
584 let symbol = instrument_id.symbol;
585 let key = format!("quotes:{symbol}");
586
587 if !self.subscriptions.add_reference(&key) {
588 return Ok(());
589 }
590
591 self.subscriptions.mark_subscribe(&key);
592 self.subscriptions.confirm_subscribe(&key);
593
594 self.ensure_book_subscribed(symbol).await
596 }
597
598 pub async fn unsubscribe_quotes(
600 &self,
601 instrument_id: InstrumentId,
602 ) -> Result<(), KrakenWsError> {
603 let symbol = instrument_id.symbol;
604 let key = format!("quotes:{symbol}");
605
606 if !self.subscriptions.remove_reference(&key) {
607 return Ok(());
608 }
609
610 self.subscriptions.mark_unsubscribe(&key);
611 self.subscriptions.confirm_unsubscribe(&key);
612 self.maybe_unsubscribe_book(symbol).await
613 }
614
615 pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> Result<(), KrakenWsError> {
617 let symbol = instrument_id.symbol;
618 let key = format!("trades:{symbol}");
619
620 if !self.subscriptions.add_reference(&key) {
621 return Ok(());
622 }
623
624 self.subscriptions.mark_subscribe(&key);
625
626 self.cmd_tx
627 .read()
628 .await
629 .send(HandlerCommand::SubscribeTrade(symbol))
630 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
631
632 self.subscriptions.confirm_subscribe(&key);
633 Ok(())
634 }
635
636 pub async fn unsubscribe_trades(
638 &self,
639 instrument_id: InstrumentId,
640 ) -> Result<(), KrakenWsError> {
641 let symbol = instrument_id.symbol;
642 let key = format!("trades:{symbol}");
643
644 if !self.subscriptions.remove_reference(&key) {
645 return Ok(());
646 }
647
648 self.subscriptions.mark_unsubscribe(&key);
649
650 self.cmd_tx
651 .read()
652 .await
653 .send(HandlerCommand::UnsubscribeTrade(symbol))
654 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
655
656 self.subscriptions.confirm_unsubscribe(&key);
657 Ok(())
658 }
659
660 pub async fn subscribe_book(
665 &self,
666 instrument_id: InstrumentId,
667 _depth: Option<u32>,
668 ) -> Result<(), KrakenWsError> {
669 let symbol = instrument_id.symbol;
670 let key = format!("book:{symbol}");
671
672 if !self.subscriptions.add_reference(&key) {
673 return Ok(());
674 }
675
676 self.subscriptions.mark_subscribe(&key);
677
678 self.cmd_tx
679 .read()
680 .await
681 .send(HandlerCommand::SubscribeBook(symbol))
682 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
683
684 self.subscriptions.confirm_subscribe(&key);
685 Ok(())
686 }
687
688 pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> Result<(), KrakenWsError> {
690 let symbol = instrument_id.symbol;
691 let key = format!("book:{symbol}");
692
693 if !self.subscriptions.remove_reference(&key) {
694 return Ok(());
695 }
696
697 self.subscriptions.mark_unsubscribe(&key);
698
699 self.cmd_tx
700 .read()
701 .await
702 .send(HandlerCommand::UnsubscribeBook(symbol))
703 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
704
705 self.subscriptions.confirm_unsubscribe(&key);
706 Ok(())
707 }
708
709 async fn ensure_ticker_subscribed(&self, symbol: Symbol) -> Result<(), KrakenWsError> {
711 let ticker_key = format!("ticker:{symbol}");
712
713 if !self.subscriptions.add_reference(&ticker_key) {
714 return Ok(());
715 }
716
717 self.subscriptions.mark_subscribe(&ticker_key);
718 self.cmd_tx
719 .read()
720 .await
721 .send(HandlerCommand::SubscribeTicker(symbol))
722 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
723 self.subscriptions.confirm_subscribe(&ticker_key);
724 Ok(())
725 }
726
727 async fn maybe_unsubscribe_ticker(&self, symbol: Symbol) -> Result<(), KrakenWsError> {
729 let ticker_key = format!("ticker:{symbol}");
730
731 if !self.subscriptions.remove_reference(&ticker_key) {
732 return Ok(());
733 }
734
735 self.subscriptions.mark_unsubscribe(&ticker_key);
736 self.cmd_tx
737 .read()
738 .await
739 .send(HandlerCommand::UnsubscribeTicker(symbol))
740 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
741 self.subscriptions.confirm_unsubscribe(&ticker_key);
742 Ok(())
743 }
744
745 async fn ensure_book_subscribed(&self, symbol: Symbol) -> Result<(), KrakenWsError> {
747 let book_key = format!("book:{symbol}");
748
749 if !self.subscriptions.add_reference(&book_key) {
750 return Ok(());
751 }
752
753 self.subscriptions.mark_subscribe(&book_key);
754 self.cmd_tx
755 .read()
756 .await
757 .send(HandlerCommand::SubscribeBook(symbol))
758 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
759 self.subscriptions.confirm_subscribe(&book_key);
760 Ok(())
761 }
762
763 async fn maybe_unsubscribe_book(&self, symbol: Symbol) -> Result<(), KrakenWsError> {
765 let book_key = format!("book:{symbol}");
766
767 if !self.subscriptions.remove_reference(&book_key) {
768 return Ok(());
769 }
770
771 self.subscriptions.mark_unsubscribe(&book_key);
772 self.cmd_tx
773 .read()
774 .await
775 .send(HandlerCommand::UnsubscribeBook(symbol))
776 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
777 self.subscriptions.confirm_unsubscribe(&book_key);
778 Ok(())
779 }
780
781 pub fn take_output_rx(
783 &mut self,
784 ) -> Option<tokio::sync::mpsc::UnboundedReceiver<KrakenFuturesWsMessage>> {
785 self.out_rx.take().and_then(|arc| Arc::try_unwrap(arc).ok())
786 }
787
788 pub fn set_account_id(&self, account_id: AccountId) {
793 if let Ok(tx) = self.cmd_tx.try_read()
794 && let Err(e) = tx.send(HandlerCommand::SetAccountId(account_id))
795 {
796 tracing::debug!("Failed to send account_id to handler: {e}");
797 }
798 }
799
800 pub fn cache_client_order(
806 &self,
807 client_order_id: ClientOrderId,
808 venue_order_id: Option<VenueOrderId>,
809 instrument_id: InstrumentId,
810 trader_id: TraderId,
811 strategy_id: StrategyId,
812 ) {
813 if let Ok(tx) = self.cmd_tx.try_read()
814 && let Err(e) = tx.send(HandlerCommand::CacheClientOrder {
815 client_order_id,
816 venue_order_id,
817 instrument_id,
818 trader_id,
819 strategy_id,
820 })
821 {
822 tracing::debug!("Failed to cache client order: {e}");
823 }
824 }
825
826 pub async fn request_challenge(&self) -> Result<(), KrakenWsError> {
831 let credential = self.credential.as_ref().ok_or_else(|| {
832 KrakenWsError::AuthenticationError(
833 "API credentials required for authentication".to_string(),
834 )
835 })?;
836
837 tracing::debug!(
840 "Challenge request prepared for API key: {}",
841 credential.api_key_masked()
842 );
843
844 Ok(())
845 }
846
847 pub async fn set_auth_credentials(
849 &self,
850 original_challenge: String,
851 signed_challenge: String,
852 ) -> Result<(), KrakenWsError> {
853 let credential = self.credential.as_ref().ok_or_else(|| {
854 KrakenWsError::AuthenticationError("API credentials required".to_string())
855 })?;
856
857 *self.original_challenge.write().await = Some(original_challenge.clone());
858 *self.signed_challenge.write().await = Some(signed_challenge.clone());
859
860 self.cmd_tx
861 .read()
862 .await
863 .send(HandlerCommand::SetAuthCredentials {
864 api_key: credential.api_key().to_string(),
865 original_challenge,
866 signed_challenge,
867 })
868 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
869
870 Ok(())
871 }
872
873 pub fn sign_challenge(&self, challenge: &str) -> Result<String, KrakenWsError> {
877 let credential = self.credential.as_ref().ok_or_else(|| {
878 KrakenWsError::AuthenticationError("API credentials required".to_string())
879 })?;
880
881 credential.sign_ws_challenge(challenge).map_err(|e| {
882 KrakenWsError::AuthenticationError(format!("Failed to sign challenge: {e}"))
883 })
884 }
885
886 pub async fn authenticate_with_challenge(&self, challenge: &str) -> Result<(), KrakenWsError> {
888 let credential = self.credential.as_ref().ok_or_else(|| {
889 KrakenWsError::AuthenticationError("API credentials required".to_string())
890 })?;
891
892 let signed_challenge = credential.sign_ws_challenge(challenge).map_err(|e| {
893 KrakenWsError::AuthenticationError(format!("Failed to sign challenge: {e}"))
894 })?;
895
896 self.set_auth_credentials(challenge.to_string(), signed_challenge)
897 .await
898 }
899
900 pub async fn subscribe_open_orders(&self) -> Result<(), KrakenWsError> {
902 if self.original_challenge.read().await.is_none() {
903 return Err(KrakenWsError::AuthenticationError(
904 "Must authenticate before subscribing to private feeds".to_string(),
905 ));
906 }
907
908 let key = "open_orders";
909 if !self.subscriptions.add_reference(key) {
910 return Ok(());
911 }
912
913 self.subscriptions.mark_subscribe(key);
914
915 self.cmd_tx
916 .read()
917 .await
918 .send(HandlerCommand::SubscribeOpenOrders)
919 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
920
921 self.subscriptions.confirm_subscribe(key);
922 Ok(())
923 }
924
925 pub async fn subscribe_fills(&self) -> Result<(), KrakenWsError> {
927 if self.original_challenge.read().await.is_none() {
928 return Err(KrakenWsError::AuthenticationError(
929 "Must authenticate before subscribing to private feeds".to_string(),
930 ));
931 }
932
933 let key = "fills";
934 if !self.subscriptions.add_reference(key) {
935 return Ok(());
936 }
937
938 self.subscriptions.mark_subscribe(key);
939
940 self.cmd_tx
941 .read()
942 .await
943 .send(HandlerCommand::SubscribeFills)
944 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
945
946 self.subscriptions.confirm_subscribe(key);
947 Ok(())
948 }
949
950 pub async fn subscribe_executions(&self) -> Result<(), KrakenWsError> {
952 self.subscribe_open_orders().await?;
953 self.subscribe_fills().await?;
954 Ok(())
955 }
956}