1use std::{
24 sync::{
25 Arc,
26 atomic::{AtomicBool, AtomicU8, Ordering},
27 },
28 time::Duration,
29};
30
31use arc_swap::ArcSwap;
32use dashmap::DashMap;
33use futures_util::Stream;
34use nautilus_common::live::get_runtime;
35use nautilus_core::{
36 consts::NAUTILUS_USER_AGENT,
37 env::{get_env_var, get_or_env_var_opt},
38};
39use nautilus_model::{
40 data::bar::BarType,
41 enums::OrderType,
42 identifiers::{AccountId, ClientOrderId, InstrumentId},
43 instruments::{Instrument, InstrumentAny},
44};
45use nautilus_network::{
46 http::USER_AGENT,
47 mode::ConnectionMode,
48 websocket::{
49 AUTHENTICATION_TIMEOUT_SECS, AuthTracker, PingHandler, SubscriptionState, WebSocketClient,
50 WebSocketConfig, channel_message_handler,
51 },
52};
53use tokio_tungstenite::tungstenite::Message;
54use ustr::Ustr;
55
56use super::{
57 enums::{BitmexWsAuthAction, BitmexWsAuthChannel, BitmexWsOperation, BitmexWsTopic},
58 error::BitmexWsError,
59 handler::{FeedHandler, HandlerCommand},
60 messages::{BitmexAuthentication, BitmexSubscription, NautilusWsMessage},
61 parse::{is_index_symbol, topic_from_bar_spec},
62};
63use crate::common::{
64 consts::{BITMEX_WS_TOPIC_DELIMITER, BITMEX_WS_URL},
65 credential::Credential,
66};
67
68#[derive(Clone, Debug)]
76#[cfg_attr(
77 feature = "python",
78 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.bitmex")
79)]
80pub struct BitmexWebSocketClient {
81 url: String,
82 credential: Option<Credential>,
83 heartbeat: Option<u64>,
84 account_id: AccountId,
85 auth_tracker: AuthTracker,
86 signal: Arc<AtomicBool>,
87 connection_mode: Arc<ArcSwap<AtomicU8>>,
88 cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
89 out_rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>>,
90 task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
91 subscriptions: SubscriptionState,
92 tracked_subscriptions: Arc<DashMap<String, ()>>,
93 instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
94 order_type_cache: Arc<DashMap<ClientOrderId, OrderType>>,
95 order_symbol_cache: Arc<DashMap<ClientOrderId, Ustr>>,
96}
97
98impl BitmexWebSocketClient {
99 pub fn new(
105 url: Option<String>,
106 api_key: Option<String>,
107 api_secret: Option<String>,
108 account_id: Option<AccountId>,
109 heartbeat: Option<u64>,
110 ) -> anyhow::Result<Self> {
111 let credential = match (api_key, api_secret) {
112 (Some(key), Some(secret)) => Some(Credential::new(key, secret)),
113 (None, None) => None,
114 _ => anyhow::bail!("Both `api_key` and `api_secret` must be provided together"),
115 };
116
117 let account_id = account_id.unwrap_or(AccountId::from("BITMEX-master"));
118
119 let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
120 let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
121
122 let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
126
127 Ok(Self {
128 url: url.unwrap_or(BITMEX_WS_URL.to_string()),
129 credential,
130 heartbeat,
131 account_id,
132 auth_tracker: AuthTracker::new(),
133 signal: Arc::new(AtomicBool::new(false)),
134 connection_mode,
135 cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
136 out_rx: None,
137 task_handle: None,
138 subscriptions: SubscriptionState::new(BITMEX_WS_TOPIC_DELIMITER),
139 tracked_subscriptions: Arc::new(DashMap::new()),
140 instruments_cache: Arc::new(DashMap::new()),
141 order_type_cache: Arc::new(DashMap::new()),
142 order_symbol_cache: Arc::new(DashMap::new()),
143 })
144 }
145
146 pub fn new_with_env(
157 url: Option<String>,
158 api_key: Option<String>,
159 api_secret: Option<String>,
160 account_id: Option<AccountId>,
161 heartbeat: Option<u64>,
162 testnet: bool,
163 ) -> anyhow::Result<Self> {
164 let (api_key_env, api_secret_env) = if testnet {
165 ("BITMEX_TESTNET_API_KEY", "BITMEX_TESTNET_API_SECRET")
166 } else {
167 ("BITMEX_API_KEY", "BITMEX_API_SECRET")
168 };
169
170 let key = get_or_env_var_opt(api_key, api_key_env);
171 let secret = get_or_env_var_opt(api_secret, api_secret_env);
172
173 Self::new(url, key, secret, account_id, heartbeat)
174 }
175
176 pub fn from_env() -> anyhow::Result<Self> {
182 let url = get_env_var("BITMEX_WS_URL")?;
183 let api_key = get_env_var("BITMEX_API_KEY")?;
184 let api_secret = get_env_var("BITMEX_API_SECRET")?;
185
186 Self::new(Some(url), Some(api_key), Some(api_secret), None, None)
187 }
188
189 #[must_use]
191 pub const fn url(&self) -> &str {
192 self.url.as_str()
193 }
194
195 #[must_use]
197 pub fn api_key(&self) -> Option<&str> {
198 self.credential.as_ref().map(|c| c.api_key.as_str())
199 }
200
201 #[must_use]
203 pub fn api_key_masked(&self) -> Option<String> {
204 self.credential.as_ref().map(|c| c.api_key_masked())
205 }
206
207 #[must_use]
209 pub fn is_active(&self) -> bool {
210 let connection_mode_arc = self.connection_mode.load();
211 ConnectionMode::from_atomic(&connection_mode_arc).is_active()
212 && !self.signal.load(Ordering::Relaxed)
213 }
214
215 #[must_use]
217 pub fn is_closed(&self) -> bool {
218 let connection_mode_arc = self.connection_mode.load();
219 ConnectionMode::from_atomic(&connection_mode_arc).is_closed()
220 || self.signal.load(Ordering::Relaxed)
221 }
222
223 pub fn set_account_id(&mut self, account_id: AccountId) {
225 self.account_id = account_id;
226 }
227
228 pub fn cache_instruments(&mut self, instruments: Vec<InstrumentAny>) {
232 self.instruments_cache.clear();
233 let mut count = 0;
234
235 log::debug!("Initializing BitMEX instrument cache");
236
237 for inst in instruments {
238 let symbol = inst.symbol().inner();
239 self.instruments_cache.insert(symbol, inst.clone());
240 log::debug!("Cached instrument: {symbol}");
241 count += 1;
242 }
243
244 log::info!("BitMEX instrument cache initialized with {count} instruments");
245 }
246
247 pub fn cache_instrument(&self, instrument: InstrumentAny) {
251 self.instruments_cache
252 .insert(instrument.symbol().inner(), instrument.clone());
253
254 if let Ok(cmd_tx) = self.cmd_tx.try_read()
257 && let Err(e) = cmd_tx.send(HandlerCommand::UpdateInstrument(instrument))
258 {
259 log::debug!("Failed to send instrument update to handler: {e}");
260 }
261 }
262
263 pub async fn connect(&mut self) -> Result<(), BitmexWsError> {
273 let (client, raw_rx) = self.connect_inner().await?;
274
275 self.connection_mode.store(client.connection_mode_atomic());
277
278 let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
279 self.out_rx = Some(Arc::new(out_rx));
280
281 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
282 *self.cmd_tx.write().await = cmd_tx.clone();
283
284 if let Err(e) = cmd_tx.send(HandlerCommand::SetClient(client)) {
286 return Err(BitmexWsError::ClientError(format!(
287 "Failed to send WebSocketClient to handler: {e}"
288 )));
289 }
290
291 if !self.instruments_cache.is_empty() {
293 let cached_instruments: Vec<InstrumentAny> = self
294 .instruments_cache
295 .iter()
296 .map(|entry| entry.value().clone())
297 .collect();
298 if let Err(e) = cmd_tx.send(HandlerCommand::InitializeInstruments(cached_instruments)) {
299 log::error!("Failed to replay instruments to handler: {e}");
300 }
301 }
302
303 let signal = self.signal.clone();
304 let account_id = self.account_id;
305 let credential = self.credential.clone();
306 let auth_tracker = self.auth_tracker.clone();
307 let subscriptions = self.subscriptions.clone();
308 let order_type_cache = self.order_type_cache.clone();
309 let order_symbol_cache = self.order_symbol_cache.clone();
310 let cmd_tx_for_reconnect = cmd_tx.clone();
311
312 let stream_handle = get_runtime().spawn(async move {
313 let mut handler = FeedHandler::new(
314 signal.clone(),
315 cmd_rx,
316 raw_rx,
317 out_tx,
318 account_id,
319 auth_tracker.clone(),
320 subscriptions.clone(),
321 order_type_cache,
322 order_symbol_cache,
323 );
324
325 let resubscribe_all = || {
327 let topics = subscriptions.all_topics();
329
330 if topics.is_empty() {
331 return;
332 }
333
334 log::debug!(
335 "Resubscribing to confirmed subscriptions: count={}",
336 topics.len()
337 );
338
339 for topic in &topics {
340 subscriptions.mark_subscribe(topic.as_str());
341 }
342
343 let mut payloads = Vec::with_capacity(topics.len());
345 for topic in &topics {
346 let message = BitmexSubscription {
347 op: BitmexWsOperation::Subscribe,
348 args: vec![Ustr::from(topic.as_ref())],
349 };
350 if let Ok(payload) = serde_json::to_string(&message) {
351 payloads.push(payload);
352 }
353 }
354
355 if let Err(e) =
356 cmd_tx_for_reconnect.send(HandlerCommand::Subscribe { topics: payloads })
357 {
358 log::error!("Failed to send resubscribe command: {e}");
359 }
360 };
361
362 loop {
364 match handler.next().await {
365 Some(NautilusWsMessage::Reconnected) => {
366 if signal.load(Ordering::Relaxed) {
367 continue;
368 }
369
370 log::info!("WebSocket reconnected");
371
372 let confirmed_topics: Vec<String> = {
374 let confirmed = subscriptions.confirmed();
375 let mut topics = Vec::new();
376
377 for entry in confirmed.iter() {
378 let (channel, symbols) = entry.pair();
379
380 if *channel == BitmexWsTopic::Instrument.as_ref() {
381 continue;
382 }
383
384 for symbol in symbols {
385 if symbol.is_empty() {
386 topics.push(channel.to_string());
387 } else {
388 topics.push(format!("{channel}:{symbol}"));
389 }
390 }
391 }
392
393 topics
394 };
395
396 if !confirmed_topics.is_empty() {
397 log::debug!(
398 "Marking confirmed subscriptions as pending for replay: count={}",
399 confirmed_topics.len()
400 );
401 for topic in confirmed_topics {
402 subscriptions.mark_failure(&topic);
403 }
404 }
405
406 if let Some(cred) = &credential {
407 log::debug!("Re-authenticating after reconnection");
408
409 let expires =
410 (chrono::Utc::now() + chrono::Duration::seconds(30)).timestamp();
411 let signature = cred.sign("GET", "/realtime", expires, "");
412
413 let auth_message = BitmexAuthentication {
414 op: BitmexWsAuthAction::AuthKeyExpires,
415 args: (cred.api_key.to_string(), expires, signature),
416 };
417
418 if let Ok(payload) = serde_json::to_string(&auth_message) {
419 if let Err(e) = cmd_tx_for_reconnect
420 .send(HandlerCommand::Authenticate { payload })
421 {
422 log::error!("Failed to send reconnection auth command: {e}");
423 }
424 } else {
425 log::error!("Failed to serialize reconnection auth message");
426 }
427 }
428
429 if credential.is_none() {
432 log::debug!("No authentication required, resubscribing immediately");
433 resubscribe_all();
434 }
435
436 continue;
441 }
442 Some(NautilusWsMessage::Authenticated) => {
443 log::debug!("Authenticated after reconnection, resubscribing");
444 resubscribe_all();
445 continue;
446 }
447 Some(msg) => {
448 if handler.send(msg).is_err() {
449 log::error!("Failed to send message (receiver dropped)");
450 break;
451 }
452 }
453 None => {
454 if handler.is_stopped() {
456 log::debug!("Stop signal received, ending message processing");
457 break;
458 }
459 log::warn!("WebSocket stream ended unexpectedly");
461 break;
462 }
463 }
464 }
465
466 log::debug!("Handler task exiting");
467 });
468
469 self.task_handle = Some(Arc::new(stream_handle));
470
471 if self.credential.is_some()
472 && let Err(e) = self.authenticate().await
473 {
474 return Err(e);
475 }
476
477 let instrument_topic = BitmexWsTopic::Instrument.as_ref().to_string();
479 self.subscriptions.mark_subscribe(&instrument_topic);
480 self.tracked_subscriptions.insert(instrument_topic, ());
481
482 let subscribe_msg = BitmexSubscription {
483 op: BitmexWsOperation::Subscribe,
484 args: vec![Ustr::from(BitmexWsTopic::Instrument.as_ref())],
485 };
486
487 match serde_json::to_string(&subscribe_msg) {
488 Ok(subscribe_json) => {
489 if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Subscribe {
490 topics: vec![subscribe_json],
491 }) {
492 log::error!("Failed to send subscribe command for instruments: {e}");
493 } else {
494 log::debug!("Subscribed to all instruments");
495 }
496 }
497 Err(e) => {
498 log::error!("Failed to serialize subscribe message: {e}");
499 }
500 }
501
502 Ok(())
503 }
504
505 async fn connect_inner(
511 &mut self,
512 ) -> Result<
513 (
514 WebSocketClient,
515 tokio::sync::mpsc::UnboundedReceiver<Message>,
516 ),
517 BitmexWsError,
518 > {
519 let (message_handler, rx) = channel_message_handler();
520
521 let ping_handler: PingHandler = Arc::new(move |_payload: Vec<u8>| {
524 });
526
527 let config = WebSocketConfig {
528 url: self.url.clone(),
529 headers: vec![(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())],
530 heartbeat: self.heartbeat,
531 heartbeat_msg: None,
532 reconnect_timeout_ms: Some(5_000),
533 reconnect_delay_initial_ms: None, reconnect_delay_max_ms: None, reconnect_backoff_factor: None, reconnect_jitter_ms: None, reconnect_max_attempts: None,
538 };
539
540 let keyed_quotas = vec![];
541 let client = WebSocketClient::connect(
542 config,
543 Some(message_handler),
544 Some(ping_handler),
545 None, keyed_quotas,
547 None, )
549 .await
550 .map_err(|e| BitmexWsError::ClientError(e.to_string()))?;
551
552 Ok((client, rx))
553 }
554
555 async fn authenticate(&self) -> Result<(), BitmexWsError> {
562 let credential = match &self.credential {
563 Some(credential) => credential,
564 None => {
565 return Err(BitmexWsError::AuthenticationError(
566 "API credentials not available to authenticate".to_string(),
567 ));
568 }
569 };
570
571 let receiver = self.auth_tracker.begin();
572
573 let expires = (chrono::Utc::now() + chrono::Duration::seconds(30)).timestamp();
574 let signature = credential.sign("GET", "/realtime", expires, "");
575
576 let auth_message = BitmexAuthentication {
577 op: BitmexWsAuthAction::AuthKeyExpires,
578 args: (credential.api_key.to_string(), expires, signature),
579 };
580
581 let auth_json = serde_json::to_string(&auth_message).map_err(|e| {
582 let msg = format!("Failed to serialize auth message: {e}");
583 self.auth_tracker.fail(msg.clone());
584 BitmexWsError::AuthenticationError(msg)
585 })?;
586
587 self.cmd_tx
589 .read()
590 .await
591 .send(HandlerCommand::Authenticate { payload: auth_json })
592 .map_err(|e| {
593 let msg = format!("Failed to send authenticate command: {e}");
594 self.auth_tracker.fail(msg.clone());
595 BitmexWsError::AuthenticationError(msg)
596 })?;
597
598 self.auth_tracker
599 .wait_for_result::<BitmexWsError>(
600 Duration::from_secs(AUTHENTICATION_TIMEOUT_SECS),
601 receiver,
602 )
603 .await
604 }
605
606 pub async fn wait_until_active(&self, timeout_secs: f64) -> Result<(), BitmexWsError> {
612 let timeout = Duration::from_secs_f64(timeout_secs);
613
614 tokio::time::timeout(timeout, async {
615 while !self.is_active() {
616 tokio::time::sleep(Duration::from_millis(10)).await;
617 }
618 })
619 .await
620 .map_err(|_| {
621 BitmexWsError::ClientError(format!(
622 "WebSocket connection timeout after {timeout_secs} seconds"
623 ))
624 })?;
625
626 Ok(())
627 }
628
629 pub fn stream(&mut self) -> impl Stream<Item = NautilusWsMessage> + use<> {
637 let rx = self
638 .out_rx
639 .take()
640 .expect("Stream receiver already taken or not connected");
641 let mut rx = Arc::try_unwrap(rx).expect("Cannot take ownership - other references exist");
642 async_stream::stream! {
643 while let Some(msg) = rx.recv().await {
644 yield msg;
645 }
646 }
647 }
648
649 pub async fn close(&mut self) -> Result<(), BitmexWsError> {
659 log::debug!("Starting close process");
660
661 self.signal.store(true, Ordering::Relaxed);
662
663 if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Disconnect) {
665 log::debug!(
666 "Failed to send disconnect command (handler may already be shut down): {e}"
667 );
668 }
669
670 if let Some(task_handle) = self.task_handle.take() {
672 match Arc::try_unwrap(task_handle) {
673 Ok(handle) => {
674 log::debug!("Waiting for task handle to complete");
675 match tokio::time::timeout(Duration::from_secs(2), handle).await {
676 Ok(Ok(())) => log::debug!("Task handle completed successfully"),
677 Ok(Err(e)) => log::error!("Task handle encountered an error: {e:?}"),
678 Err(_) => {
679 log::warn!(
680 "Timeout waiting for task handle, task may still be running"
681 );
682 }
684 }
685 }
686 Err(arc_handle) => {
687 log::debug!(
688 "Cannot take ownership of task handle - other references exist, aborting task"
689 );
690 arc_handle.abort();
691 }
692 }
693 } else {
694 log::debug!("No task handle to await");
695 }
696
697 log::debug!("Closed");
698
699 Ok(())
700 }
701
702 pub async fn subscribe(&self, topics: Vec<String>) -> Result<(), BitmexWsError> {
712 log::debug!("Subscribing to topics: {topics:?}");
713
714 for topic in &topics {
715 self.subscriptions.mark_subscribe(topic.as_str());
716 self.tracked_subscriptions.insert(topic.clone(), ());
717 }
718
719 let mut payloads = Vec::with_capacity(topics.len());
721 for topic in &topics {
722 let message = BitmexSubscription {
723 op: BitmexWsOperation::Subscribe,
724 args: vec![Ustr::from(topic.as_ref())],
725 };
726 let payload = serde_json::to_string(&message).map_err(|e| {
727 BitmexWsError::SubscriptionError(format!("Failed to serialize subscription: {e}"))
728 })?;
729 payloads.push(payload);
730 }
731
732 let cmd = HandlerCommand::Subscribe { topics: payloads };
734
735 self.send_cmd(cmd).await.map_err(|e| {
736 BitmexWsError::SubscriptionError(format!("Failed to send subscribe command: {e}"))
737 })
738 }
739
740 async fn unsubscribe(&self, topics: Vec<String>) -> Result<(), BitmexWsError> {
746 log::debug!("Attempting to unsubscribe from topics: {topics:?}");
747
748 if self.signal.load(Ordering::Relaxed) {
749 log::debug!("Shutdown signal detected, skipping unsubscribe");
750 return Ok(());
751 }
752
753 for topic in &topics {
754 self.subscriptions.mark_unsubscribe(topic.as_str());
755 self.tracked_subscriptions.remove(topic);
756 }
757
758 let mut payloads = Vec::with_capacity(topics.len());
760 for topic in &topics {
761 let message = BitmexSubscription {
762 op: BitmexWsOperation::Unsubscribe,
763 args: vec![Ustr::from(topic.as_ref())],
764 };
765 if let Ok(payload) = serde_json::to_string(&message) {
766 payloads.push(payload);
767 }
768 }
769
770 let cmd = HandlerCommand::Unsubscribe { topics: payloads };
772
773 if let Err(e) = self.send_cmd(cmd).await {
774 log::debug!("Failed to send unsubscribe command: {e}");
775 }
776
777 Ok(())
778 }
779
780 #[must_use]
782 pub fn subscription_count(&self) -> usize {
783 self.subscriptions.len()
784 }
785
786 pub fn get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
787 let symbol = instrument_id.symbol.inner();
788 let confirmed = self.subscriptions.confirmed();
789 let mut channels = Vec::with_capacity(confirmed.len());
790
791 for entry in confirmed.iter() {
792 let (channel, symbols) = entry.pair();
793 if symbols.contains(&symbol) {
794 channels.push(format!("{channel}:{symbol}"));
796 } else {
797 let has_channel_marker = symbols.iter().any(|s| s.is_empty());
798 if has_channel_marker
799 && (*channel == BitmexWsAuthChannel::Execution.as_ref()
800 || *channel == BitmexWsAuthChannel::Order.as_ref())
801 {
802 channels.push(channel.to_string());
804 }
805 }
806 }
807
808 channels
809 }
810
811 pub async fn subscribe_instruments(&self) -> Result<(), BitmexWsError> {
817 log::debug!("Already subscribed to all instruments on connection, skipping");
819 Ok(())
820 }
821
822 pub async fn subscribe_instrument(
828 &self,
829 instrument_id: InstrumentId,
830 ) -> Result<(), BitmexWsError> {
831 log::debug!(
833 "Already subscribed to all instruments on connection (includes {instrument_id}), skipping"
834 );
835 Ok(())
836 }
837
838 pub async fn subscribe_book(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
844 let topic = BitmexWsTopic::OrderBookL2;
845 let symbol = instrument_id.symbol.inner();
846 self.subscribe(vec![format!("{topic}:{symbol}")]).await
847 }
848
849 pub async fn subscribe_book_25(
855 &self,
856 instrument_id: InstrumentId,
857 ) -> Result<(), BitmexWsError> {
858 let topic = BitmexWsTopic::OrderBookL2_25;
859 let symbol = instrument_id.symbol.inner();
860 self.subscribe(vec![format!("{topic}:{symbol}")]).await
861 }
862
863 pub async fn subscribe_book_depth10(
869 &self,
870 instrument_id: InstrumentId,
871 ) -> Result<(), BitmexWsError> {
872 let topic = BitmexWsTopic::OrderBook10;
873 let symbol = instrument_id.symbol.inner();
874 self.subscribe(vec![format!("{topic}:{symbol}")]).await
875 }
876
877 pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
885 let symbol = instrument_id.symbol.inner();
886
887 if is_index_symbol(&instrument_id.symbol.inner()) {
889 log::warn!("Ignoring quote subscription for index symbol: {symbol}");
890 return Ok(());
891 }
892
893 let topic = BitmexWsTopic::Quote;
894 self.subscribe(vec![format!("{topic}:{symbol}")]).await
895 }
896
897 pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
905 let symbol = instrument_id.symbol.inner();
906
907 if is_index_symbol(&symbol) {
909 log::warn!("Ignoring trade subscription for index symbol: {symbol}");
910 return Ok(());
911 }
912
913 let topic = BitmexWsTopic::Trade;
914 self.subscribe(vec![format!("{topic}:{symbol}")]).await
915 }
916
917 pub async fn subscribe_mark_prices(
923 &self,
924 instrument_id: InstrumentId,
925 ) -> Result<(), BitmexWsError> {
926 self.subscribe_instrument(instrument_id).await
927 }
928
929 pub async fn subscribe_index_prices(
935 &self,
936 instrument_id: InstrumentId,
937 ) -> Result<(), BitmexWsError> {
938 self.subscribe_instrument(instrument_id).await
939 }
940
941 pub async fn subscribe_funding_rates(
947 &self,
948 instrument_id: InstrumentId,
949 ) -> Result<(), BitmexWsError> {
950 let topic = BitmexWsTopic::Funding;
951 let symbol = instrument_id.symbol.inner();
952 self.subscribe(vec![format!("{topic}:{symbol}")]).await
953 }
954
955 pub async fn subscribe_bars(&self, bar_type: BarType) -> Result<(), BitmexWsError> {
961 let topic = topic_from_bar_spec(bar_type.spec());
962 let symbol = bar_type.instrument_id().symbol.inner();
963 self.subscribe(vec![format!("{topic}:{symbol}")]).await
964 }
965
966 pub async fn unsubscribe_instruments(&self) -> Result<(), BitmexWsError> {
972 log::debug!(
974 "Instruments subscription maintained for proper operation, skipping unsubscribe"
975 );
976 Ok(())
977 }
978
979 pub async fn unsubscribe_instrument(
985 &self,
986 instrument_id: InstrumentId,
987 ) -> Result<(), BitmexWsError> {
988 log::debug!(
990 "Instruments subscription maintained for proper operation (includes {instrument_id}), skipping unsubscribe"
991 );
992 Ok(())
993 }
994
995 pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
1001 let topic = BitmexWsTopic::OrderBookL2;
1002 let symbol = instrument_id.symbol.inner();
1003 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1004 }
1005
1006 pub async fn unsubscribe_book_25(
1012 &self,
1013 instrument_id: InstrumentId,
1014 ) -> Result<(), BitmexWsError> {
1015 let topic = BitmexWsTopic::OrderBookL2_25;
1016 let symbol = instrument_id.symbol.inner();
1017 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1018 }
1019
1020 pub async fn unsubscribe_book_depth10(
1026 &self,
1027 instrument_id: InstrumentId,
1028 ) -> Result<(), BitmexWsError> {
1029 let topic = BitmexWsTopic::OrderBook10;
1030 let symbol = instrument_id.symbol.inner();
1031 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1032 }
1033
1034 pub async fn unsubscribe_quotes(
1040 &self,
1041 instrument_id: InstrumentId,
1042 ) -> Result<(), BitmexWsError> {
1043 let symbol = instrument_id.symbol.inner();
1044
1045 if is_index_symbol(&symbol) {
1047 return Ok(());
1048 }
1049
1050 let topic = BitmexWsTopic::Quote;
1051 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1052 }
1053
1054 pub async fn unsubscribe_trades(
1060 &self,
1061 instrument_id: InstrumentId,
1062 ) -> Result<(), BitmexWsError> {
1063 let symbol = instrument_id.symbol.inner();
1064
1065 if is_index_symbol(&symbol) {
1067 return Ok(());
1068 }
1069
1070 let topic = BitmexWsTopic::Trade;
1071 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1072 }
1073
1074 pub async fn unsubscribe_mark_prices(
1080 &self,
1081 instrument_id: InstrumentId,
1082 ) -> Result<(), BitmexWsError> {
1083 log::debug!(
1085 "Mark prices for {instrument_id} uses shared instrument channel, skipping unsubscribe"
1086 );
1087 Ok(())
1088 }
1089
1090 pub async fn unsubscribe_index_prices(
1096 &self,
1097 instrument_id: InstrumentId,
1098 ) -> Result<(), BitmexWsError> {
1099 log::debug!(
1101 "Index prices for {instrument_id} uses shared instrument channel, skipping unsubscribe"
1102 );
1103 Ok(())
1104 }
1105
1106 pub async fn unsubscribe_funding_rates(
1112 &self,
1113 instrument_id: InstrumentId,
1114 ) -> Result<(), BitmexWsError> {
1115 log::debug!(
1117 "Funding rates for {instrument_id}, skipping unsubscribe to avoid shutdown race"
1118 );
1119 Ok(())
1120 }
1121
1122 pub async fn unsubscribe_bars(&self, bar_type: BarType) -> Result<(), BitmexWsError> {
1128 let topic = topic_from_bar_spec(bar_type.spec());
1129 let symbol = bar_type.instrument_id().symbol.inner();
1130 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1131 }
1132
1133 pub async fn subscribe_orders(&self) -> Result<(), BitmexWsError> {
1139 if self.credential.is_none() {
1140 return Err(BitmexWsError::MissingCredentials);
1141 }
1142 self.subscribe(vec![BitmexWsAuthChannel::Order.to_string()])
1143 .await
1144 }
1145
1146 pub async fn subscribe_executions(&self) -> Result<(), BitmexWsError> {
1152 if self.credential.is_none() {
1153 return Err(BitmexWsError::MissingCredentials);
1154 }
1155 self.subscribe(vec![BitmexWsAuthChannel::Execution.to_string()])
1156 .await
1157 }
1158
1159 pub async fn subscribe_positions(&self) -> Result<(), BitmexWsError> {
1165 if self.credential.is_none() {
1166 return Err(BitmexWsError::MissingCredentials);
1167 }
1168 self.subscribe(vec![BitmexWsAuthChannel::Position.to_string()])
1169 .await
1170 }
1171
1172 pub async fn subscribe_margin(&self) -> Result<(), BitmexWsError> {
1178 if self.credential.is_none() {
1179 return Err(BitmexWsError::MissingCredentials);
1180 }
1181 self.subscribe(vec![BitmexWsAuthChannel::Margin.to_string()])
1182 .await
1183 }
1184
1185 pub async fn subscribe_wallet(&self) -> Result<(), BitmexWsError> {
1191 if self.credential.is_none() {
1192 return Err(BitmexWsError::MissingCredentials);
1193 }
1194 self.subscribe(vec![BitmexWsAuthChannel::Wallet.to_string()])
1195 .await
1196 }
1197
1198 pub async fn unsubscribe_orders(&self) -> Result<(), BitmexWsError> {
1204 self.unsubscribe(vec![BitmexWsAuthChannel::Order.to_string()])
1205 .await
1206 }
1207
1208 pub async fn unsubscribe_executions(&self) -> Result<(), BitmexWsError> {
1214 self.unsubscribe(vec![BitmexWsAuthChannel::Execution.to_string()])
1215 .await
1216 }
1217
1218 pub async fn unsubscribe_positions(&self) -> Result<(), BitmexWsError> {
1224 self.unsubscribe(vec![BitmexWsAuthChannel::Position.to_string()])
1225 .await
1226 }
1227
1228 pub async fn unsubscribe_margin(&self) -> Result<(), BitmexWsError> {
1234 self.unsubscribe(vec![BitmexWsAuthChannel::Margin.to_string()])
1235 .await
1236 }
1237
1238 pub async fn unsubscribe_wallet(&self) -> Result<(), BitmexWsError> {
1244 self.unsubscribe(vec![BitmexWsAuthChannel::Wallet.to_string()])
1245 .await
1246 }
1247
1248 async fn send_cmd(&self, cmd: HandlerCommand) -> Result<(), BitmexWsError> {
1250 self.cmd_tx
1251 .read()
1252 .await
1253 .send(cmd)
1254 .map_err(|e| BitmexWsError::ClientError(format!("Handler not available: {e}")))
1255 }
1256}
1257
1258#[cfg(test)]
1259mod tests {
1260 use ahash::AHashSet;
1261 use rstest::rstest;
1262 use ustr::Ustr;
1263
1264 use super::*;
1265
1266 #[rstest]
1267 fn test_reconnect_topics_restoration_logic() {
1268 let client = BitmexWebSocketClient::new(
1270 Some("ws://test.com".to_string()),
1271 Some("test_key".to_string()),
1272 Some("test_secret".to_string()),
1273 Some(AccountId::new("BITMEX-TEST")),
1274 None,
1275 )
1276 .unwrap();
1277
1278 let subs = client.subscriptions.confirmed();
1280 subs.insert(Ustr::from(BitmexWsTopic::Trade.as_ref()), {
1281 let mut set = AHashSet::new();
1282 set.insert(Ustr::from("XBTUSD"));
1283 set.insert(Ustr::from("ETHUSD"));
1284 set
1285 });
1286
1287 subs.insert(Ustr::from(BitmexWsTopic::OrderBookL2.as_ref()), {
1288 let mut set = AHashSet::new();
1289 set.insert(Ustr::from("XBTUSD"));
1290 set
1291 });
1292
1293 subs.insert(Ustr::from(BitmexWsAuthChannel::Order.as_ref()), {
1295 let mut set = AHashSet::new();
1296 set.insert(Ustr::from(""));
1297 set
1298 });
1299 subs.insert(Ustr::from(BitmexWsAuthChannel::Position.as_ref()), {
1300 let mut set = AHashSet::new();
1301 set.insert(Ustr::from(""));
1302 set
1303 });
1304
1305 let mut topics_to_restore = Vec::new();
1307 for entry in subs.iter() {
1308 let (channel, symbols) = entry.pair();
1309 for symbol in symbols {
1310 if symbol.is_empty() {
1311 topics_to_restore.push(channel.to_string());
1312 } else {
1313 topics_to_restore.push(format!("{channel}:{symbol}"));
1314 }
1315 }
1316 }
1317
1318 assert!(topics_to_restore.contains(&format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref())));
1320 assert!(topics_to_restore.contains(&format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref())));
1321 assert!(
1322 topics_to_restore.contains(&format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref()))
1323 );
1324 assert!(topics_to_restore.contains(&BitmexWsAuthChannel::Order.as_ref().to_string()));
1325 assert!(topics_to_restore.contains(&BitmexWsAuthChannel::Position.as_ref().to_string()));
1326 assert_eq!(topics_to_restore.len(), 5);
1327 }
1328
1329 #[rstest]
1330 fn test_reconnect_auth_message_building() {
1331 let client_with_creds = BitmexWebSocketClient::new(
1333 Some("ws://test.com".to_string()),
1334 Some("test_key".to_string()),
1335 Some("test_secret".to_string()),
1336 Some(AccountId::new("BITMEX-TEST")),
1337 None,
1338 )
1339 .unwrap();
1340
1341 if let Some(cred) = &client_with_creds.credential {
1343 let expires = (chrono::Utc::now() + chrono::Duration::seconds(30)).timestamp();
1344 let signature = cred.sign("GET", "/realtime", expires, "");
1345
1346 let auth_message = BitmexAuthentication {
1347 op: BitmexWsAuthAction::AuthKeyExpires,
1348 args: (cred.api_key.to_string(), expires, signature),
1349 };
1350
1351 assert_eq!(auth_message.op, BitmexWsAuthAction::AuthKeyExpires);
1353 assert_eq!(auth_message.args.0, "test_key");
1354 assert!(auth_message.args.1 > 0); assert!(!auth_message.args.2.is_empty()); } else {
1357 panic!("Client should have credentials");
1358 }
1359
1360 let client_no_creds = BitmexWebSocketClient::new(
1362 Some("ws://test.com".to_string()),
1363 None,
1364 None,
1365 Some(AccountId::new("BITMEX-TEST")),
1366 None,
1367 )
1368 .unwrap();
1369
1370 assert!(client_no_creds.credential.is_none());
1371 }
1372
1373 #[rstest]
1374 fn test_subscription_state_after_unsubscribe() {
1375 let client = BitmexWebSocketClient::new(
1376 Some("ws://test.com".to_string()),
1377 Some("test_key".to_string()),
1378 Some("test_secret".to_string()),
1379 Some(AccountId::new("BITMEX-TEST")),
1380 None,
1381 )
1382 .unwrap();
1383
1384 let subs = client.subscriptions.confirmed();
1386 subs.insert(Ustr::from(BitmexWsTopic::Trade.as_ref()), {
1387 let mut set = AHashSet::new();
1388 set.insert(Ustr::from("XBTUSD"));
1389 set.insert(Ustr::from("ETHUSD"));
1390 set
1391 });
1392
1393 subs.insert(Ustr::from(BitmexWsTopic::OrderBookL2.as_ref()), {
1394 let mut set = AHashSet::new();
1395 set.insert(Ustr::from("XBTUSD"));
1396 set
1397 });
1398
1399 let topic = format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref());
1401 if let Some((channel, symbol)) = topic.split_once(':')
1402 && let Some(mut entry) = subs.get_mut(&Ustr::from(channel))
1403 {
1404 entry.remove(&Ustr::from(symbol));
1405 if entry.is_empty() {
1406 drop(entry);
1407 subs.remove(&Ustr::from(channel));
1408 }
1409 }
1410
1411 let mut topics_to_restore = Vec::new();
1413 for entry in subs.iter() {
1414 let (channel, symbols) = entry.pair();
1415 for symbol in symbols {
1416 if symbol.is_empty() {
1417 topics_to_restore.push(channel.to_string());
1418 } else {
1419 topics_to_restore.push(format!("{channel}:{symbol}"));
1420 }
1421 }
1422 }
1423
1424 let trade_xbt = format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref());
1426 let trade_eth = format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref());
1427 let book_xbt = format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref());
1428
1429 assert!(topics_to_restore.contains(&trade_xbt));
1430 assert!(!topics_to_restore.contains(&trade_eth));
1431 assert!(topics_to_restore.contains(&book_xbt));
1432 assert_eq!(topics_to_restore.len(), 2);
1433 }
1434
1435 #[rstest]
1436 fn test_race_unsubscribe_failure_recovery() {
1437 let client = BitmexWebSocketClient::new(
1443 Some("ws://test.com".to_string()),
1444 None,
1445 None,
1446 Some(AccountId::new("BITMEX-TEST")),
1447 None,
1448 )
1449 .unwrap();
1450
1451 let topic = format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref());
1452
1453 client.subscriptions.mark_subscribe(&topic);
1455 client.subscriptions.confirm_subscribe(&topic);
1456 assert_eq!(client.subscriptions.len(), 1);
1457
1458 client.subscriptions.mark_unsubscribe(&topic);
1460 assert_eq!(client.subscriptions.len(), 0);
1461 assert_eq!(
1462 client.subscriptions.pending_unsubscribe_topics(),
1463 vec![topic.clone()]
1464 );
1465
1466 client.subscriptions.confirm_unsubscribe(&topic); client.subscriptions.mark_subscribe(&topic); client.subscriptions.confirm_subscribe(&topic); assert_eq!(client.subscriptions.len(), 1);
1474 assert!(client.subscriptions.pending_unsubscribe_topics().is_empty());
1475 assert!(client.subscriptions.pending_subscribe_topics().is_empty());
1476
1477 let all = client.subscriptions.all_topics();
1479 assert_eq!(all.len(), 1);
1480 assert!(all.contains(&topic));
1481 }
1482
1483 #[rstest]
1484 fn test_race_resubscribe_before_unsubscribe_ack() {
1485 let client = BitmexWebSocketClient::new(
1489 Some("ws://test.com".to_string()),
1490 None,
1491 None,
1492 Some(AccountId::new("BITMEX-TEST")),
1493 None,
1494 )
1495 .unwrap();
1496
1497 let topic = format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref());
1498
1499 client.subscriptions.mark_subscribe(&topic);
1501 client.subscriptions.confirm_subscribe(&topic);
1502 assert_eq!(client.subscriptions.len(), 1);
1503
1504 client.subscriptions.mark_unsubscribe(&topic);
1506 assert_eq!(client.subscriptions.len(), 0);
1507 assert_eq!(
1508 client.subscriptions.pending_unsubscribe_topics(),
1509 vec![topic.clone()]
1510 );
1511
1512 client.subscriptions.mark_subscribe(&topic);
1514 assert_eq!(
1515 client.subscriptions.pending_subscribe_topics(),
1516 vec![topic.clone()]
1517 );
1518
1519 client.subscriptions.confirm_unsubscribe(&topic);
1521 assert!(client.subscriptions.pending_unsubscribe_topics().is_empty());
1522 assert_eq!(
1523 client.subscriptions.pending_subscribe_topics(),
1524 vec![topic.clone()]
1525 );
1526
1527 client.subscriptions.confirm_subscribe(&topic);
1529 assert_eq!(client.subscriptions.len(), 1);
1530 assert!(client.subscriptions.pending_subscribe_topics().is_empty());
1531
1532 let all = client.subscriptions.all_topics();
1534 assert_eq!(all.len(), 1);
1535 assert!(all.contains(&topic));
1536 }
1537
1538 #[rstest]
1539 fn test_race_channel_level_reconnection_with_pending_states() {
1540 let client = BitmexWebSocketClient::new(
1542 Some("ws://test.com".to_string()),
1543 Some("test_key".to_string()),
1544 Some("test_secret".to_string()),
1545 Some(AccountId::new("BITMEX-TEST")),
1546 None,
1547 )
1548 .unwrap();
1549
1550 let trade_xbt = format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref());
1553 client.subscriptions.mark_subscribe(&trade_xbt);
1554 client.subscriptions.confirm_subscribe(&trade_xbt);
1555
1556 let order_channel = BitmexWsAuthChannel::Order.as_ref();
1558 client.subscriptions.mark_subscribe(order_channel);
1559 client.subscriptions.confirm_subscribe(order_channel);
1560
1561 let trade_eth = format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref());
1563 client.subscriptions.mark_subscribe(&trade_eth);
1564
1565 let book_xbt = format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref());
1567 client.subscriptions.mark_subscribe(&book_xbt);
1568 client.subscriptions.confirm_subscribe(&book_xbt);
1569 client.subscriptions.mark_unsubscribe(&book_xbt);
1570
1571 let topics_to_restore = client.subscriptions.all_topics();
1573
1574 assert_eq!(topics_to_restore.len(), 3);
1576 assert!(topics_to_restore.contains(&trade_xbt));
1577 assert!(topics_to_restore.contains(&order_channel.to_string()));
1578 assert!(topics_to_restore.contains(&trade_eth));
1579 assert!(!topics_to_restore.contains(&book_xbt)); for topic in &topics_to_restore {
1584 if topic == order_channel {
1585 assert!(
1586 !topic.contains(':'),
1587 "Channel-level topic should not have delimiter"
1588 );
1589 }
1590 }
1591 }
1592}