1use std::sync::{
19 Arc,
20 atomic::{AtomicBool, AtomicU8, Ordering},
21};
22
23use arc_swap::ArcSwap;
24use nautilus_common::live::get_runtime;
25use nautilus_model::{
26 identifiers::{
27 AccountId, ClientOrderId, InstrumentId, StrategyId, Symbol, TraderId, VenueOrderId,
28 },
29 instruments::InstrumentAny,
30};
31use nautilus_network::{
32 mode::ConnectionMode,
33 websocket::{
34 AuthTracker, SubscriptionState, WebSocketClient, WebSocketConfig, channel_message_handler,
35 },
36};
37use tokio_util::sync::CancellationToken;
38
39use super::{
40 handler::{FuturesFeedHandler, HandlerCommand},
41 messages::{KrakenFuturesFeed, KrakenFuturesWsMessage},
42};
43use crate::{common::credential::KrakenCredential, websocket::error::KrakenWsError};
44
45pub const KRAKEN_FUTURES_WS_TOPIC_DELIMITER: char = ':';
49
50const WS_PING_MSG: &str = r#"{"event":"ping"}"#;
51
52#[derive(Debug)]
54#[cfg_attr(
55 feature = "python",
56 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.kraken")
57)]
58pub struct KrakenFuturesWebSocketClient {
59 url: String,
60 heartbeat_secs: Option<u64>,
61 signal: Arc<AtomicBool>,
62 connection_mode: Arc<ArcSwap<AtomicU8>>,
63 cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
64 out_rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<KrakenFuturesWsMessage>>>,
65 task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
66 subscriptions: SubscriptionState,
67 auth_tracker: AuthTracker,
68 cancellation_token: CancellationToken,
69 credential: Option<KrakenCredential>,
70 original_challenge: Arc<tokio::sync::RwLock<Option<String>>>,
71 signed_challenge: Arc<tokio::sync::RwLock<Option<String>>>,
72}
73
74impl Clone for KrakenFuturesWebSocketClient {
75 fn clone(&self) -> Self {
76 Self {
77 url: self.url.clone(),
78 heartbeat_secs: self.heartbeat_secs,
79 signal: Arc::clone(&self.signal),
80 connection_mode: Arc::clone(&self.connection_mode),
81 cmd_tx: Arc::clone(&self.cmd_tx),
82 out_rx: self.out_rx.clone(),
83 task_handle: self.task_handle.clone(),
84 subscriptions: self.subscriptions.clone(),
85 auth_tracker: self.auth_tracker.clone(),
86 cancellation_token: self.cancellation_token.clone(),
87 credential: self.credential.clone(),
88 original_challenge: Arc::clone(&self.original_challenge),
89 signed_challenge: Arc::clone(&self.signed_challenge),
90 }
91 }
92}
93
94impl KrakenFuturesWebSocketClient {
95 #[must_use]
97 pub fn new(url: String, heartbeat_secs: Option<u64>) -> Self {
98 Self::with_credentials(url, heartbeat_secs, None)
99 }
100
101 #[must_use]
103 pub fn with_credentials(
104 url: String,
105 heartbeat_secs: Option<u64>,
106 credential: Option<KrakenCredential>,
107 ) -> Self {
108 let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
109 let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
110 let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
111
112 Self {
113 url,
114 heartbeat_secs,
115 signal: Arc::new(AtomicBool::new(false)),
116 connection_mode,
117 cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
118 out_rx: None,
119 task_handle: None,
120 subscriptions: SubscriptionState::new(KRAKEN_FUTURES_WS_TOPIC_DELIMITER),
121 auth_tracker: AuthTracker::new(),
122 cancellation_token: CancellationToken::new(),
123 credential,
124 original_challenge: Arc::new(tokio::sync::RwLock::new(None)),
125 signed_challenge: Arc::new(tokio::sync::RwLock::new(None)),
126 }
127 }
128
129 #[must_use]
131 pub fn has_credentials(&self) -> bool {
132 self.credential.is_some()
133 }
134
135 #[must_use]
137 pub fn url(&self) -> &str {
138 &self.url
139 }
140
141 #[must_use]
143 pub fn is_closed(&self) -> bool {
144 ConnectionMode::from_u8(self.connection_mode.load().load(Ordering::Relaxed))
145 == ConnectionMode::Closed
146 }
147
148 #[must_use]
150 pub fn is_active(&self) -> bool {
151 ConnectionMode::from_u8(self.connection_mode.load().load(Ordering::Relaxed))
152 == ConnectionMode::Active
153 }
154
155 pub async fn wait_until_active(&self, timeout_secs: f64) -> Result<(), KrakenWsError> {
157 let timeout = tokio::time::Duration::from_secs_f64(timeout_secs);
158
159 tokio::time::timeout(timeout, async {
160 while !self.is_active() {
161 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
162 }
163 })
164 .await
165 .map_err(|_| {
166 KrakenWsError::ConnectionError(format!(
167 "WebSocket connection timeout after {timeout_secs} seconds"
168 ))
169 })?;
170
171 Ok(())
172 }
173
174 pub async fn authenticate(&self) -> Result<(), KrakenWsError> {
179 let credential = self.credential.as_ref().ok_or_else(|| {
180 KrakenWsError::AuthenticationError("API credentials required".to_string())
181 })?;
182
183 let api_key = credential.api_key().to_string();
184 let (tx, rx) = tokio::sync::oneshot::channel();
185
186 self.cmd_tx
187 .read()
188 .await
189 .send(HandlerCommand::RequestChallenge {
190 api_key: api_key.clone(),
191 response_tx: tx,
192 })
193 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
194
195 let challenge = tokio::time::timeout(tokio::time::Duration::from_secs(10), rx)
196 .await
197 .map_err(|_| {
198 KrakenWsError::AuthenticationError("Timeout waiting for challenge".to_string())
199 })?
200 .map_err(|_| {
201 KrakenWsError::AuthenticationError("Challenge channel closed".to_string())
202 })?;
203
204 let signed_challenge = credential.sign_ws_challenge(&challenge).map_err(|e| {
205 KrakenWsError::AuthenticationError(format!("Failed to sign challenge: {e}"))
206 })?;
207
208 *self.original_challenge.write().await = Some(challenge.clone());
209 *self.signed_challenge.write().await = Some(signed_challenge.clone());
210
211 self.cmd_tx
212 .read()
213 .await
214 .send(HandlerCommand::SetAuthCredentials {
215 api_key,
216 original_challenge: challenge,
217 signed_challenge,
218 })
219 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
220
221 log::debug!("Futures WebSocket authentication successful");
222 Ok(())
223 }
224
225 pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
229 if let Ok(tx) = self.cmd_tx.try_read()
230 && let Err(e) = tx.send(HandlerCommand::InitializeInstruments(instruments))
231 {
232 log::debug!("Failed to send instruments to handler: {e}");
233 }
234 }
235
236 pub fn cache_instrument(&self, instrument: InstrumentAny) {
240 if let Ok(tx) = self.cmd_tx.try_read()
241 && let Err(e) = tx.send(HandlerCommand::UpdateInstrument(instrument))
242 {
243 log::debug!("Failed to send instrument update to handler: {e}");
244 }
245 }
246
247 pub async fn connect(&mut self) -> Result<(), KrakenWsError> {
249 log::debug!("Connecting to Futures WebSocket: {}", self.url);
250
251 self.signal.store(false, Ordering::Relaxed);
252
253 let (raw_handler, raw_rx) = channel_message_handler();
254
255 let ws_config = WebSocketConfig {
256 url: self.url.clone(),
257 headers: vec![],
258 heartbeat: self.heartbeat_secs,
259 heartbeat_msg: Some(WS_PING_MSG.to_string()),
260 reconnect_timeout_ms: Some(5_000),
261 reconnect_delay_initial_ms: Some(500),
262 reconnect_delay_max_ms: Some(5_000),
263 reconnect_backoff_factor: Some(1.5),
264 reconnect_jitter_ms: Some(250),
265 reconnect_max_attempts: None,
266 };
267
268 let ws_client =
269 WebSocketClient::connect(ws_config, Some(raw_handler), None, None, vec![], None)
270 .await
271 .map_err(|e| KrakenWsError::ConnectionError(e.to_string()))?;
272
273 self.connection_mode
274 .store(ws_client.connection_mode_atomic());
275
276 let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<KrakenFuturesWsMessage>();
277 self.out_rx = Some(Arc::new(out_rx));
278
279 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
280 *self.cmd_tx.write().await = cmd_tx.clone();
281
282 if let Err(e) = cmd_tx.send(HandlerCommand::SetClient(ws_client)) {
283 return Err(KrakenWsError::ConnectionError(format!(
284 "Failed to send WebSocketClient to handler: {e}"
285 )));
286 }
287
288 let signal = self.signal.clone();
289 let subscriptions = self.subscriptions.clone();
290 let cmd_tx_for_reconnect = cmd_tx.clone();
291 let credential_for_reconnect = self.credential.clone();
292
293 let stream_handle = get_runtime().spawn(async move {
294 let mut handler =
295 FuturesFeedHandler::new(signal.clone(), cmd_rx, raw_rx, subscriptions.clone());
296
297 loop {
298 match handler.next().await {
299 Some(KrakenFuturesWsMessage::Reconnected) => {
300 if signal.load(Ordering::Relaxed) {
301 continue;
302 }
303 log::info!("WebSocket reconnected, resubscribing");
304
305 let confirmed_topics = subscriptions.all_topics();
307 for topic in &confirmed_topics {
308 subscriptions.mark_failure(topic);
309 }
310
311 let topics = subscriptions.all_topics();
312 if topics.is_empty() {
313 log::debug!("No subscriptions to restore after reconnection");
314 } else {
315 let has_private_subs = topics.iter().any(|t| {
317 t == "open_orders"
318 || t == "fills"
319 || t.starts_with("open_orders:")
320 || t.starts_with("fills:")
321 });
322
323 if has_private_subs {
324 if let Some(ref cred) = credential_for_reconnect {
325 let (tx, rx) = tokio::sync::oneshot::channel();
327 if let Err(e) = cmd_tx_for_reconnect.send(
328 HandlerCommand::RequestChallenge {
329 api_key: cred.api_key().to_string(),
330 response_tx: tx,
331 },
332 ) {
333 log::error!(
334 "Failed to request challenge for reconnect: {e}"
335 );
336 } else {
337 match tokio::time::timeout(
338 tokio::time::Duration::from_secs(10),
339 rx,
340 )
341 .await
342 {
343 Ok(Ok(challenge)) => {
344 match cred.sign_ws_challenge(&challenge) {
345 Ok(signed) => {
346 if let Err(e) = cmd_tx_for_reconnect.send(
347 HandlerCommand::SetAuthCredentials {
348 api_key: cred.api_key().to_string(),
349 original_challenge: challenge,
350 signed_challenge: signed,
351 },
352 ) {
353 log::error!(
354 "Failed to set auth credentials: {e}"
355 );
356 } else {
357 log::debug!(
358 "Re-authenticated after reconnect"
359 );
360 }
361 }
362 Err(e) => {
363 log::error!(
364 "Failed to sign challenge for reconnect: {e}"
365 );
366 }
367 }
368 }
369 Ok(Err(_)) => {
370 log::error!(
371 "Challenge channel closed during reconnect"
372 );
373 }
374 Err(_) => {
375 log::error!(
376 "Timeout waiting for challenge during reconnect"
377 );
378 }
379 }
380 }
381 } else {
382 log::warn!(
383 "Private subscriptions exist but no credentials available"
384 );
385 }
386 }
387
388 log::info!(
389 "Resubscribing after reconnection: count={}",
390 topics.len()
391 );
392
393 for topic in &topics {
394 let cmd =
395 if let Some((feed_str, symbol_str)) = topic.split_once(':') {
396 let symbol = Symbol::from(symbol_str);
397 match feed_str.parse::<KrakenFuturesFeed>() {
398 Ok(KrakenFuturesFeed::Trade) => {
399 Some(HandlerCommand::SubscribeTrade(symbol))
400 }
401 Ok(KrakenFuturesFeed::Book) => {
402 Some(HandlerCommand::SubscribeBook(symbol))
403 }
404 Ok(KrakenFuturesFeed::Ticker) => {
405 Some(HandlerCommand::SubscribeTicker(symbol))
406 }
407 Ok(KrakenFuturesFeed::OpenOrders) => {
408 Some(HandlerCommand::SubscribeOpenOrders)
409 }
410 Ok(KrakenFuturesFeed::Fills) => {
411 Some(HandlerCommand::SubscribeFills)
412 }
413 Ok(_) | Err(_) => None,
414 }
415 } else {
416 match topic.parse::<KrakenFuturesFeed>() {
417 Ok(KrakenFuturesFeed::OpenOrders) => {
418 Some(HandlerCommand::SubscribeOpenOrders)
419 }
420 Ok(KrakenFuturesFeed::Fills) => {
421 Some(HandlerCommand::SubscribeFills)
422 }
423 Ok(_) | Err(_) => None,
424 }
425 };
426
427 if let Some(cmd) = cmd
428 && let Err(e) = cmd_tx_for_reconnect.send(cmd)
429 {
430 log::error!(
431 "Failed to send resubscribe command: error={e}, \
432 topic={topic}"
433 );
434 }
435
436 subscriptions.mark_subscribe(topic);
437 }
438 }
439
440 if let Err(e) = out_tx.send(KrakenFuturesWsMessage::Reconnected) {
441 log::debug!("Output channel closed: {e}");
442 break;
443 }
444 continue;
445 }
446 Some(msg) => {
447 if let Err(e) = out_tx.send(msg) {
448 log::debug!("Output channel closed: {e}");
449 break;
450 }
451 }
452 None => {
453 log::debug!("Handler stream ended");
454 break;
455 }
456 }
457 }
458
459 log::debug!("Futures handler task exiting");
460 });
461
462 self.task_handle = Some(Arc::new(stream_handle));
463
464 log::debug!("Futures WebSocket connected successfully");
465 Ok(())
466 }
467
468 pub async fn disconnect(&mut self) -> Result<(), KrakenWsError> {
470 log::debug!("Disconnecting Futures WebSocket");
471
472 self.signal.store(true, Ordering::Relaxed);
473
474 if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Disconnect) {
475 log::debug!(
476 "Failed to send disconnect command (handler may already be shut down): {e}"
477 );
478 }
479
480 if let Some(task_handle) = self.task_handle.take() {
481 match Arc::try_unwrap(task_handle) {
482 Ok(handle) => {
483 match tokio::time::timeout(tokio::time::Duration::from_secs(2), handle).await {
484 Ok(Ok(())) => log::debug!("Task handle completed successfully"),
485 Ok(Err(e)) => log::error!("Task handle encountered an error: {e:?}"),
486 Err(_) => {
487 log::warn!("Timeout waiting for task handle");
488 }
489 }
490 }
491 Err(arc_handle) => {
492 log::debug!("Cannot take ownership of task handle, aborting");
493 arc_handle.abort();
494 }
495 }
496 }
497
498 self.subscriptions.clear();
499 self.auth_tracker.fail("Disconnected");
500 Ok(())
501 }
502
503 pub async fn close(&mut self) -> Result<(), KrakenWsError> {
505 self.disconnect().await
506 }
507
508 pub async fn subscribe_mark_price(
510 &self,
511 instrument_id: InstrumentId,
512 ) -> Result<(), KrakenWsError> {
513 let symbol = instrument_id.symbol;
514 let key = format!("mark:{symbol}");
515
516 if !self.subscriptions.add_reference(&key) {
517 return Ok(());
518 }
519
520 self.subscriptions.mark_subscribe(&key);
521 self.subscriptions.confirm_subscribe(&key);
522 self.ensure_ticker_subscribed(symbol).await
523 }
524
525 pub async fn unsubscribe_mark_price(
527 &self,
528 instrument_id: InstrumentId,
529 ) -> Result<(), KrakenWsError> {
530 let symbol = instrument_id.symbol;
531 let key = format!("mark:{symbol}");
532
533 if !self.subscriptions.remove_reference(&key) {
534 return Ok(());
535 }
536
537 self.subscriptions.mark_unsubscribe(&key);
538 self.subscriptions.confirm_unsubscribe(&key);
539 self.maybe_unsubscribe_ticker(symbol).await
540 }
541
542 pub async fn subscribe_index_price(
544 &self,
545 instrument_id: InstrumentId,
546 ) -> Result<(), KrakenWsError> {
547 let symbol = instrument_id.symbol;
548 let key = format!("index:{symbol}");
549
550 if !self.subscriptions.add_reference(&key) {
551 return Ok(());
552 }
553
554 self.subscriptions.mark_subscribe(&key);
555 self.subscriptions.confirm_subscribe(&key);
556 self.ensure_ticker_subscribed(symbol).await
557 }
558
559 pub async fn unsubscribe_index_price(
561 &self,
562 instrument_id: InstrumentId,
563 ) -> Result<(), KrakenWsError> {
564 let symbol = instrument_id.symbol;
565 let key = format!("index:{symbol}");
566
567 if !self.subscriptions.remove_reference(&key) {
568 return Ok(());
569 }
570
571 self.subscriptions.mark_unsubscribe(&key);
572 self.subscriptions.confirm_unsubscribe(&key);
573 self.maybe_unsubscribe_ticker(symbol).await
574 }
575
576 pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> Result<(), KrakenWsError> {
580 let symbol = instrument_id.symbol;
581 let key = format!("quotes:{symbol}");
582
583 if !self.subscriptions.add_reference(&key) {
584 return Ok(());
585 }
586
587 self.subscriptions.mark_subscribe(&key);
588 self.subscriptions.confirm_subscribe(&key);
589
590 self.ensure_book_subscribed(symbol).await
592 }
593
594 pub async fn unsubscribe_quotes(
596 &self,
597 instrument_id: InstrumentId,
598 ) -> Result<(), KrakenWsError> {
599 let symbol = instrument_id.symbol;
600 let key = format!("quotes:{symbol}");
601
602 if !self.subscriptions.remove_reference(&key) {
603 return Ok(());
604 }
605
606 self.subscriptions.mark_unsubscribe(&key);
607 self.subscriptions.confirm_unsubscribe(&key);
608 self.maybe_unsubscribe_book(symbol).await
609 }
610
611 pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> Result<(), KrakenWsError> {
613 let symbol = instrument_id.symbol;
614 let key = format!("trades:{symbol}");
615
616 if !self.subscriptions.add_reference(&key) {
617 return Ok(());
618 }
619
620 self.subscriptions.mark_subscribe(&key);
621
622 self.cmd_tx
623 .read()
624 .await
625 .send(HandlerCommand::SubscribeTrade(symbol))
626 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
627
628 self.subscriptions.confirm_subscribe(&key);
629 Ok(())
630 }
631
632 pub async fn unsubscribe_trades(
634 &self,
635 instrument_id: InstrumentId,
636 ) -> Result<(), KrakenWsError> {
637 let symbol = instrument_id.symbol;
638 let key = format!("trades:{symbol}");
639
640 if !self.subscriptions.remove_reference(&key) {
641 return Ok(());
642 }
643
644 self.subscriptions.mark_unsubscribe(&key);
645
646 self.cmd_tx
647 .read()
648 .await
649 .send(HandlerCommand::UnsubscribeTrade(symbol))
650 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
651
652 self.subscriptions.confirm_unsubscribe(&key);
653 Ok(())
654 }
655
656 pub async fn subscribe_book(
661 &self,
662 instrument_id: InstrumentId,
663 _depth: Option<u32>,
664 ) -> Result<(), KrakenWsError> {
665 let symbol = instrument_id.symbol;
666 let key = format!("book:{symbol}");
667
668 if !self.subscriptions.add_reference(&key) {
669 return Ok(());
670 }
671
672 self.subscriptions.mark_subscribe(&key);
673
674 self.cmd_tx
675 .read()
676 .await
677 .send(HandlerCommand::SubscribeBook(symbol))
678 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
679
680 self.subscriptions.confirm_subscribe(&key);
681 Ok(())
682 }
683
684 pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> Result<(), KrakenWsError> {
686 let symbol = instrument_id.symbol;
687 let key = format!("book:{symbol}");
688
689 if !self.subscriptions.remove_reference(&key) {
690 return Ok(());
691 }
692
693 self.subscriptions.mark_unsubscribe(&key);
694
695 self.cmd_tx
696 .read()
697 .await
698 .send(HandlerCommand::UnsubscribeBook(symbol))
699 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
700
701 self.subscriptions.confirm_unsubscribe(&key);
702 Ok(())
703 }
704
705 async fn ensure_ticker_subscribed(&self, symbol: Symbol) -> Result<(), KrakenWsError> {
707 let ticker_key = format!("ticker:{symbol}");
708
709 if !self.subscriptions.add_reference(&ticker_key) {
710 return Ok(());
711 }
712
713 self.subscriptions.mark_subscribe(&ticker_key);
714 self.cmd_tx
715 .read()
716 .await
717 .send(HandlerCommand::SubscribeTicker(symbol))
718 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
719 self.subscriptions.confirm_subscribe(&ticker_key);
720 Ok(())
721 }
722
723 async fn maybe_unsubscribe_ticker(&self, symbol: Symbol) -> Result<(), KrakenWsError> {
725 let ticker_key = format!("ticker:{symbol}");
726
727 if !self.subscriptions.remove_reference(&ticker_key) {
728 return Ok(());
729 }
730
731 self.subscriptions.mark_unsubscribe(&ticker_key);
732 self.cmd_tx
733 .read()
734 .await
735 .send(HandlerCommand::UnsubscribeTicker(symbol))
736 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
737 self.subscriptions.confirm_unsubscribe(&ticker_key);
738 Ok(())
739 }
740
741 async fn ensure_book_subscribed(&self, symbol: Symbol) -> Result<(), KrakenWsError> {
743 let book_key = format!("book:{symbol}");
744
745 if !self.subscriptions.add_reference(&book_key) {
746 return Ok(());
747 }
748
749 self.subscriptions.mark_subscribe(&book_key);
750 self.cmd_tx
751 .read()
752 .await
753 .send(HandlerCommand::SubscribeBook(symbol))
754 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
755 self.subscriptions.confirm_subscribe(&book_key);
756 Ok(())
757 }
758
759 async fn maybe_unsubscribe_book(&self, symbol: Symbol) -> Result<(), KrakenWsError> {
761 let book_key = format!("book:{symbol}");
762
763 if !self.subscriptions.remove_reference(&book_key) {
764 return Ok(());
765 }
766
767 self.subscriptions.mark_unsubscribe(&book_key);
768 self.cmd_tx
769 .read()
770 .await
771 .send(HandlerCommand::UnsubscribeBook(symbol))
772 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
773 self.subscriptions.confirm_unsubscribe(&book_key);
774 Ok(())
775 }
776
777 pub fn take_output_rx(
779 &mut self,
780 ) -> Option<tokio::sync::mpsc::UnboundedReceiver<KrakenFuturesWsMessage>> {
781 self.out_rx.take().and_then(|arc| Arc::try_unwrap(arc).ok())
782 }
783
784 pub fn set_account_id(&self, account_id: AccountId) {
789 if let Ok(tx) = self.cmd_tx.try_read()
790 && let Err(e) = tx.send(HandlerCommand::SetAccountId(account_id))
791 {
792 log::debug!("Failed to send account_id to handler: {e}");
793 }
794 }
795
796 pub fn cache_client_order(
802 &self,
803 client_order_id: ClientOrderId,
804 venue_order_id: Option<VenueOrderId>,
805 instrument_id: InstrumentId,
806 trader_id: TraderId,
807 strategy_id: StrategyId,
808 ) {
809 if let Ok(tx) = self.cmd_tx.try_read()
810 && let Err(e) = tx.send(HandlerCommand::CacheClientOrder {
811 client_order_id,
812 venue_order_id,
813 instrument_id,
814 trader_id,
815 strategy_id,
816 })
817 {
818 log::debug!("Failed to cache client order: {e}");
819 }
820 }
821
822 pub async fn request_challenge(&self) -> Result<(), KrakenWsError> {
827 let credential = self.credential.as_ref().ok_or_else(|| {
828 KrakenWsError::AuthenticationError(
829 "API credentials required for authentication".to_string(),
830 )
831 })?;
832
833 log::debug!(
836 "Challenge request prepared for API key: {}",
837 credential.api_key_masked()
838 );
839
840 Ok(())
841 }
842
843 pub async fn set_auth_credentials(
845 &self,
846 original_challenge: String,
847 signed_challenge: String,
848 ) -> Result<(), KrakenWsError> {
849 let credential = self.credential.as_ref().ok_or_else(|| {
850 KrakenWsError::AuthenticationError("API credentials required".to_string())
851 })?;
852
853 *self.original_challenge.write().await = Some(original_challenge.clone());
854 *self.signed_challenge.write().await = Some(signed_challenge.clone());
855
856 self.cmd_tx
857 .read()
858 .await
859 .send(HandlerCommand::SetAuthCredentials {
860 api_key: credential.api_key().to_string(),
861 original_challenge,
862 signed_challenge,
863 })
864 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
865
866 Ok(())
867 }
868
869 pub fn sign_challenge(&self, challenge: &str) -> Result<String, KrakenWsError> {
873 let credential = self.credential.as_ref().ok_or_else(|| {
874 KrakenWsError::AuthenticationError("API credentials required".to_string())
875 })?;
876
877 credential.sign_ws_challenge(challenge).map_err(|e| {
878 KrakenWsError::AuthenticationError(format!("Failed to sign challenge: {e}"))
879 })
880 }
881
882 pub async fn authenticate_with_challenge(&self, challenge: &str) -> Result<(), KrakenWsError> {
884 let credential = self.credential.as_ref().ok_or_else(|| {
885 KrakenWsError::AuthenticationError("API credentials required".to_string())
886 })?;
887
888 let signed_challenge = credential.sign_ws_challenge(challenge).map_err(|e| {
889 KrakenWsError::AuthenticationError(format!("Failed to sign challenge: {e}"))
890 })?;
891
892 self.set_auth_credentials(challenge.to_string(), signed_challenge)
893 .await
894 }
895
896 pub async fn subscribe_open_orders(&self) -> Result<(), KrakenWsError> {
898 if self.original_challenge.read().await.is_none() {
899 return Err(KrakenWsError::AuthenticationError(
900 "Must authenticate before subscribing to private feeds".to_string(),
901 ));
902 }
903
904 let key = "open_orders";
905 if !self.subscriptions.add_reference(key) {
906 return Ok(());
907 }
908
909 self.subscriptions.mark_subscribe(key);
910
911 self.cmd_tx
912 .read()
913 .await
914 .send(HandlerCommand::SubscribeOpenOrders)
915 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
916
917 self.subscriptions.confirm_subscribe(key);
918 Ok(())
919 }
920
921 pub async fn subscribe_fills(&self) -> Result<(), KrakenWsError> {
923 if self.original_challenge.read().await.is_none() {
924 return Err(KrakenWsError::AuthenticationError(
925 "Must authenticate before subscribing to private feeds".to_string(),
926 ));
927 }
928
929 let key = "fills";
930 if !self.subscriptions.add_reference(key) {
931 return Ok(());
932 }
933
934 self.subscriptions.mark_subscribe(key);
935
936 self.cmd_tx
937 .read()
938 .await
939 .send(HandlerCommand::SubscribeFills)
940 .map_err(|e| KrakenWsError::ChannelError(e.to_string()))?;
941
942 self.subscriptions.confirm_subscribe(key);
943 Ok(())
944 }
945
946 pub async fn subscribe_executions(&self) -> Result<(), KrakenWsError> {
948 self.subscribe_open_orders().await?;
949 self.subscribe_fills().await?;
950 Ok(())
951 }
952}