1use std::{
24 sync::{
25 Arc,
26 atomic::{AtomicBool, AtomicU8, Ordering},
27 },
28 time::Duration,
29};
30
31use arc_swap::ArcSwap;
32use dashmap::DashMap;
33use futures_util::Stream;
34use nautilus_common::live::get_runtime;
35use nautilus_core::{
36 consts::NAUTILUS_USER_AGENT,
37 env::{get_env_var, get_or_env_var_opt},
38};
39use nautilus_model::{
40 data::bar::BarType,
41 enums::OrderType,
42 identifiers::{AccountId, ClientOrderId, InstrumentId},
43 instruments::{Instrument, InstrumentAny},
44};
45use nautilus_network::{
46 http::USER_AGENT,
47 mode::ConnectionMode,
48 websocket::{
49 AUTHENTICATION_TIMEOUT_SECS, AuthTracker, PingHandler, SubscriptionState, WebSocketClient,
50 WebSocketConfig, channel_message_handler,
51 },
52};
53use tokio_tungstenite::tungstenite::Message;
54use ustr::Ustr;
55
56use super::{
57 enums::{BitmexWsAuthAction, BitmexWsAuthChannel, BitmexWsOperation, BitmexWsTopic},
58 error::BitmexWsError,
59 handler::{FeedHandler, HandlerCommand},
60 messages::{BitmexAuthentication, BitmexSubscription, NautilusWsMessage},
61 parse::{is_index_symbol, topic_from_bar_spec},
62};
63use crate::common::{
64 consts::{BITMEX_WS_TOPIC_DELIMITER, BITMEX_WS_URL},
65 credential::Credential,
66};
67
68#[derive(Clone, Debug)]
76#[cfg_attr(
77 feature = "python",
78 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.bitmex", from_py_object)
79)]
80pub struct BitmexWebSocketClient {
81 url: String,
82 credential: Option<Credential>,
83 heartbeat: Option<u64>,
84 account_id: AccountId,
85 auth_tracker: AuthTracker,
86 signal: Arc<AtomicBool>,
87 connection_mode: Arc<ArcSwap<AtomicU8>>,
88 cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
89 out_rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>>,
90 task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
91 subscriptions: SubscriptionState,
92 tracked_subscriptions: Arc<DashMap<String, ()>>,
93 instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
94 order_type_cache: Arc<DashMap<ClientOrderId, OrderType>>,
95 order_symbol_cache: Arc<DashMap<ClientOrderId, Ustr>>,
96}
97
98impl BitmexWebSocketClient {
99 pub fn new(
105 url: Option<String>,
106 api_key: Option<String>,
107 api_secret: Option<String>,
108 account_id: Option<AccountId>,
109 heartbeat: Option<u64>,
110 ) -> anyhow::Result<Self> {
111 let credential = match (api_key, api_secret) {
112 (Some(key), Some(secret)) => Some(Credential::new(key, secret)),
113 (None, None) => None,
114 _ => anyhow::bail!("Both `api_key` and `api_secret` must be provided together"),
115 };
116
117 let account_id = account_id.unwrap_or(AccountId::from("BITMEX-master"));
118
119 let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
120 let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
121
122 let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
126
127 Ok(Self {
128 url: url.unwrap_or(BITMEX_WS_URL.to_string()),
129 credential,
130 heartbeat,
131 account_id,
132 auth_tracker: AuthTracker::new(),
133 signal: Arc::new(AtomicBool::new(false)),
134 connection_mode,
135 cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
136 out_rx: None,
137 task_handle: None,
138 subscriptions: SubscriptionState::new(BITMEX_WS_TOPIC_DELIMITER),
139 tracked_subscriptions: Arc::new(DashMap::new()),
140 instruments_cache: Arc::new(DashMap::new()),
141 order_type_cache: Arc::new(DashMap::new()),
142 order_symbol_cache: Arc::new(DashMap::new()),
143 })
144 }
145
146 pub fn new_with_env(
157 url: Option<String>,
158 api_key: Option<String>,
159 api_secret: Option<String>,
160 account_id: Option<AccountId>,
161 heartbeat: Option<u64>,
162 testnet: bool,
163 ) -> anyhow::Result<Self> {
164 let (api_key_env, api_secret_env) = if testnet {
165 ("BITMEX_TESTNET_API_KEY", "BITMEX_TESTNET_API_SECRET")
166 } else {
167 ("BITMEX_API_KEY", "BITMEX_API_SECRET")
168 };
169
170 let key = get_or_env_var_opt(api_key, api_key_env);
171 let secret = get_or_env_var_opt(api_secret, api_secret_env);
172
173 Self::new(url, key, secret, account_id, heartbeat)
174 }
175
176 pub fn from_env() -> anyhow::Result<Self> {
182 let url = get_env_var("BITMEX_WS_URL")?;
183 let api_key = get_env_var("BITMEX_API_KEY")?;
184 let api_secret = get_env_var("BITMEX_API_SECRET")?;
185
186 Self::new(Some(url), Some(api_key), Some(api_secret), None, None)
187 }
188
189 #[must_use]
191 pub const fn url(&self) -> &str {
192 self.url.as_str()
193 }
194
195 #[must_use]
197 pub fn api_key(&self) -> Option<&str> {
198 self.credential.as_ref().map(|c| c.api_key.as_str())
199 }
200
201 #[must_use]
203 pub fn api_key_masked(&self) -> Option<String> {
204 self.credential.as_ref().map(|c| c.api_key_masked())
205 }
206
207 #[must_use]
209 pub fn is_active(&self) -> bool {
210 let connection_mode_arc = self.connection_mode.load();
211 ConnectionMode::from_atomic(&connection_mode_arc).is_active()
212 && !self.signal.load(Ordering::Relaxed)
213 }
214
215 #[must_use]
217 pub fn is_closed(&self) -> bool {
218 let connection_mode_arc = self.connection_mode.load();
219 ConnectionMode::from_atomic(&connection_mode_arc).is_closed()
220 || self.signal.load(Ordering::Relaxed)
221 }
222
223 pub fn set_account_id(&mut self, account_id: AccountId) {
225 self.account_id = account_id;
226 }
227
228 pub fn cache_instruments(&mut self, instruments: Vec<InstrumentAny>) {
232 self.instruments_cache.clear();
233 let mut count = 0;
234
235 log::debug!("Initializing BitMEX instrument cache");
236
237 for inst in instruments {
238 let symbol = inst.symbol().inner();
239 self.instruments_cache.insert(symbol, inst.clone());
240 log::debug!("Cached instrument: {symbol}");
241 count += 1;
242 }
243
244 log::info!("BitMEX instrument cache initialized with {count} instruments");
245 }
246
247 pub fn cache_instrument(&self, instrument: InstrumentAny) {
251 self.instruments_cache
252 .insert(instrument.symbol().inner(), instrument.clone());
253
254 if let Ok(cmd_tx) = self.cmd_tx.try_read()
257 && let Err(e) = cmd_tx.send(HandlerCommand::UpdateInstrument(instrument))
258 {
259 log::debug!("Failed to send instrument update to handler: {e}");
260 }
261 }
262
263 pub async fn connect(&mut self) -> Result<(), BitmexWsError> {
273 let (client, raw_rx) = self.connect_inner().await?;
274
275 self.signal.store(false, Ordering::Relaxed);
277
278 self.connection_mode.store(client.connection_mode_atomic());
280
281 let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
282 self.out_rx = Some(Arc::new(out_rx));
283
284 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
285 *self.cmd_tx.write().await = cmd_tx.clone();
286
287 if let Err(e) = cmd_tx.send(HandlerCommand::SetClient(client)) {
289 return Err(BitmexWsError::ClientError(format!(
290 "Failed to send WebSocketClient to handler: {e}"
291 )));
292 }
293
294 if !self.instruments_cache.is_empty() {
296 let cached_instruments: Vec<InstrumentAny> = self
297 .instruments_cache
298 .iter()
299 .map(|entry| entry.value().clone())
300 .collect();
301 if let Err(e) = cmd_tx.send(HandlerCommand::InitializeInstruments(cached_instruments)) {
302 log::error!("Failed to replay instruments to handler: {e}");
303 }
304 }
305
306 let signal = self.signal.clone();
307 let account_id = self.account_id;
308 let credential = self.credential.clone();
309 let auth_tracker = self.auth_tracker.clone();
310 let subscriptions = self.subscriptions.clone();
311 let order_type_cache = self.order_type_cache.clone();
312 let order_symbol_cache = self.order_symbol_cache.clone();
313 let cmd_tx_for_reconnect = cmd_tx.clone();
314
315 let stream_handle = get_runtime().spawn(async move {
316 let mut handler = FeedHandler::new(
317 signal.clone(),
318 cmd_rx,
319 raw_rx,
320 out_tx,
321 account_id,
322 auth_tracker.clone(),
323 subscriptions.clone(),
324 order_type_cache,
325 order_symbol_cache,
326 );
327
328 let resubscribe_all = || {
330 let topics = subscriptions.all_topics();
332
333 if topics.is_empty() {
334 return;
335 }
336
337 log::debug!(
338 "Resubscribing to confirmed subscriptions: count={}",
339 topics.len()
340 );
341
342 for topic in &topics {
343 subscriptions.mark_subscribe(topic.as_str());
344 }
345
346 let mut payloads = Vec::with_capacity(topics.len());
348 for topic in &topics {
349 let message = BitmexSubscription {
350 op: BitmexWsOperation::Subscribe,
351 args: vec![Ustr::from(topic.as_ref())],
352 };
353 if let Ok(payload) = serde_json::to_string(&message) {
354 payloads.push(payload);
355 }
356 }
357
358 if let Err(e) =
359 cmd_tx_for_reconnect.send(HandlerCommand::Subscribe { topics: payloads })
360 {
361 log::error!("Failed to send resubscribe command: {e}");
362 }
363 };
364
365 loop {
367 match handler.next().await {
368 Some(NautilusWsMessage::Reconnected) => {
369 if signal.load(Ordering::Relaxed) {
370 continue;
371 }
372
373 log::info!("WebSocket reconnected");
374
375 let confirmed_topics: Vec<String> = {
377 let confirmed = subscriptions.confirmed();
378 let mut topics = Vec::new();
379
380 for entry in confirmed.iter() {
381 let (channel, symbols) = entry.pair();
382
383 if *channel == BitmexWsTopic::Instrument.as_ref() {
384 continue;
385 }
386
387 for symbol in symbols {
388 if symbol.is_empty() {
389 topics.push(channel.to_string());
390 } else {
391 topics.push(format!("{channel}:{symbol}"));
392 }
393 }
394 }
395
396 topics
397 };
398
399 if !confirmed_topics.is_empty() {
400 log::debug!(
401 "Marking confirmed subscriptions as pending for replay: count={}",
402 confirmed_topics.len()
403 );
404 for topic in confirmed_topics {
405 subscriptions.mark_failure(&topic);
406 }
407 }
408
409 if let Some(cred) = &credential {
410 log::debug!("Re-authenticating after reconnection");
411
412 let expires =
413 (chrono::Utc::now() + chrono::Duration::seconds(30)).timestamp();
414 let signature = cred.sign("GET", "/realtime", expires, "");
415
416 let auth_message = BitmexAuthentication {
417 op: BitmexWsAuthAction::AuthKeyExpires,
418 args: (cred.api_key.to_string(), expires, signature),
419 };
420
421 if let Ok(payload) = serde_json::to_string(&auth_message) {
422 if let Err(e) = cmd_tx_for_reconnect
423 .send(HandlerCommand::Authenticate { payload })
424 {
425 log::error!("Failed to send reconnection auth command: {e}");
426 }
427 } else {
428 log::error!("Failed to serialize reconnection auth message");
429 }
430 }
431
432 if credential.is_none() {
435 log::debug!("No authentication required, resubscribing immediately");
436 resubscribe_all();
437 }
438
439 if handler.send(NautilusWsMessage::Reconnected).is_err() {
440 log::error!("Failed to forward reconnect event (receiver dropped)");
441 break;
442 }
443
444 continue;
445 }
446 Some(NautilusWsMessage::Authenticated) => {
447 log::debug!("Authenticated after reconnection, resubscribing");
448 resubscribe_all();
449 continue;
450 }
451 Some(msg) => {
452 if handler.send(msg).is_err() {
453 log::error!("Failed to send message (receiver dropped)");
454 break;
455 }
456 }
457 None => {
458 if handler.is_stopped() {
460 log::debug!("Stop signal received, ending message processing");
461 break;
462 }
463 log::warn!("WebSocket stream ended unexpectedly");
465 break;
466 }
467 }
468 }
469
470 log::debug!("Handler task exiting");
471 });
472
473 self.task_handle = Some(Arc::new(stream_handle));
474
475 if self.credential.is_some()
476 && let Err(e) = self.authenticate().await
477 {
478 if let Some(handle) = self.task_handle.take() {
479 handle.abort();
480 }
481 self.signal.store(true, Ordering::Relaxed);
482 return Err(e);
483 }
484
485 let instrument_topic = BitmexWsTopic::Instrument.as_ref().to_string();
487 self.subscriptions.mark_subscribe(&instrument_topic);
488 self.tracked_subscriptions.insert(instrument_topic, ());
489
490 let subscribe_msg = BitmexSubscription {
491 op: BitmexWsOperation::Subscribe,
492 args: vec![Ustr::from(BitmexWsTopic::Instrument.as_ref())],
493 };
494
495 match serde_json::to_string(&subscribe_msg) {
496 Ok(subscribe_json) => {
497 if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Subscribe {
498 topics: vec![subscribe_json],
499 }) {
500 log::error!("Failed to send subscribe command for instruments: {e}");
501 } else {
502 log::debug!("Subscribed to all instruments");
503 }
504 }
505 Err(e) => {
506 log::error!("Failed to serialize subscribe message: {e}");
507 }
508 }
509
510 Ok(())
511 }
512
513 async fn connect_inner(
519 &mut self,
520 ) -> Result<
521 (
522 WebSocketClient,
523 tokio::sync::mpsc::UnboundedReceiver<Message>,
524 ),
525 BitmexWsError,
526 > {
527 let (message_handler, rx) = channel_message_handler();
528
529 let ping_handler: PingHandler = Arc::new(move |_payload: Vec<u8>| {
532 });
534
535 let config = WebSocketConfig {
536 url: self.url.clone(),
537 headers: vec![(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())],
538 heartbeat: self.heartbeat,
539 heartbeat_msg: None,
540 reconnect_timeout_ms: Some(5_000),
541 reconnect_delay_initial_ms: None, reconnect_delay_max_ms: None, reconnect_backoff_factor: None, reconnect_jitter_ms: None, reconnect_max_attempts: None,
546 };
547
548 let keyed_quotas = vec![];
549 let client = WebSocketClient::connect(
550 config,
551 Some(message_handler),
552 Some(ping_handler),
553 None, keyed_quotas,
555 None, )
557 .await
558 .map_err(|e| BitmexWsError::ClientError(e.to_string()))?;
559
560 Ok((client, rx))
561 }
562
563 async fn authenticate(&self) -> Result<(), BitmexWsError> {
570 let credential = match &self.credential {
571 Some(credential) => credential,
572 None => {
573 return Err(BitmexWsError::AuthenticationError(
574 "API credentials not available to authenticate".to_string(),
575 ));
576 }
577 };
578
579 let receiver = self.auth_tracker.begin();
580
581 let expires = (chrono::Utc::now() + chrono::Duration::seconds(30)).timestamp();
582 let signature = credential.sign("GET", "/realtime", expires, "");
583
584 let auth_message = BitmexAuthentication {
585 op: BitmexWsAuthAction::AuthKeyExpires,
586 args: (credential.api_key.to_string(), expires, signature),
587 };
588
589 let auth_json = serde_json::to_string(&auth_message).map_err(|e| {
590 let msg = format!("Failed to serialize auth message: {e}");
591 self.auth_tracker.fail(msg.clone());
592 BitmexWsError::AuthenticationError(msg)
593 })?;
594
595 self.cmd_tx
597 .read()
598 .await
599 .send(HandlerCommand::Authenticate { payload: auth_json })
600 .map_err(|e| {
601 let msg = format!("Failed to send authenticate command: {e}");
602 self.auth_tracker.fail(msg.clone());
603 BitmexWsError::AuthenticationError(msg)
604 })?;
605
606 self.auth_tracker
607 .wait_for_result::<BitmexWsError>(
608 Duration::from_secs(AUTHENTICATION_TIMEOUT_SECS),
609 receiver,
610 )
611 .await
612 }
613
614 pub async fn wait_until_active(&self, timeout_secs: f64) -> Result<(), BitmexWsError> {
620 let timeout = Duration::from_secs_f64(timeout_secs);
621
622 tokio::time::timeout(timeout, async {
623 while !self.is_active() {
624 tokio::time::sleep(Duration::from_millis(10)).await;
625 }
626 })
627 .await
628 .map_err(|_| {
629 BitmexWsError::ClientError(format!(
630 "WebSocket connection timeout after {timeout_secs} seconds"
631 ))
632 })?;
633
634 Ok(())
635 }
636
637 pub fn stream(&mut self) -> impl Stream<Item = NautilusWsMessage> + use<> {
645 let rx = self
646 .out_rx
647 .take()
648 .expect("Stream receiver already taken or not connected");
649 let mut rx = Arc::try_unwrap(rx).expect("Cannot take ownership - other references exist");
650 async_stream::stream! {
651 while let Some(msg) = rx.recv().await {
652 yield msg;
653 }
654 }
655 }
656
657 pub async fn close(&mut self) -> Result<(), BitmexWsError> {
667 log::debug!("Starting close process");
668
669 self.signal.store(true, Ordering::Relaxed);
670
671 if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Disconnect) {
673 log::debug!(
674 "Failed to send disconnect command (handler may already be shut down): {e}"
675 );
676 }
677
678 if let Some(task_handle) = self.task_handle.take() {
680 match Arc::try_unwrap(task_handle) {
681 Ok(handle) => {
682 log::debug!("Waiting for task handle to complete");
683 match tokio::time::timeout(Duration::from_secs(2), handle).await {
684 Ok(Ok(())) => log::debug!("Task handle completed successfully"),
685 Ok(Err(e)) => log::error!("Task handle encountered an error: {e:?}"),
686 Err(_) => {
687 log::warn!(
688 "Timeout waiting for task handle, task may still be running"
689 );
690 }
692 }
693 }
694 Err(arc_handle) => {
695 log::debug!(
696 "Cannot take ownership of task handle - other references exist, aborting task"
697 );
698 arc_handle.abort();
699 }
700 }
701 } else {
702 log::debug!("No task handle to await");
703 }
704
705 log::debug!("Closed");
706
707 Ok(())
708 }
709
710 pub async fn subscribe(&self, topics: Vec<String>) -> Result<(), BitmexWsError> {
720 log::debug!("Subscribing to topics: {topics:?}");
721
722 for topic in &topics {
723 self.subscriptions.mark_subscribe(topic.as_str());
724 self.tracked_subscriptions.insert(topic.clone(), ());
725 }
726
727 let mut payloads = Vec::with_capacity(topics.len());
729 for topic in &topics {
730 let message = BitmexSubscription {
731 op: BitmexWsOperation::Subscribe,
732 args: vec![Ustr::from(topic.as_ref())],
733 };
734 let payload = serde_json::to_string(&message).map_err(|e| {
735 BitmexWsError::SubscriptionError(format!("Failed to serialize subscription: {e}"))
736 })?;
737 payloads.push(payload);
738 }
739
740 let cmd = HandlerCommand::Subscribe { topics: payloads };
742
743 self.send_cmd(cmd).await.map_err(|e| {
744 BitmexWsError::SubscriptionError(format!("Failed to send subscribe command: {e}"))
745 })
746 }
747
748 async fn unsubscribe(&self, topics: Vec<String>) -> Result<(), BitmexWsError> {
754 log::debug!("Attempting to unsubscribe from topics: {topics:?}");
755
756 if self.signal.load(Ordering::Relaxed) {
757 log::debug!("Shutdown signal detected, skipping unsubscribe");
758 return Ok(());
759 }
760
761 for topic in &topics {
762 self.subscriptions.mark_unsubscribe(topic.as_str());
763 self.tracked_subscriptions.remove(topic);
764 }
765
766 let mut payloads = Vec::with_capacity(topics.len());
768 for topic in &topics {
769 let message = BitmexSubscription {
770 op: BitmexWsOperation::Unsubscribe,
771 args: vec![Ustr::from(topic.as_ref())],
772 };
773 if let Ok(payload) = serde_json::to_string(&message) {
774 payloads.push(payload);
775 }
776 }
777
778 let cmd = HandlerCommand::Unsubscribe { topics: payloads };
780
781 if let Err(e) = self.send_cmd(cmd).await {
782 log::debug!("Failed to send unsubscribe command: {e}");
783 }
784
785 Ok(())
786 }
787
788 #[must_use]
790 pub fn subscription_count(&self) -> usize {
791 self.subscriptions.len()
792 }
793
794 pub fn get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
795 let symbol = instrument_id.symbol.inner();
796 let confirmed = self.subscriptions.confirmed();
797 let mut channels = Vec::with_capacity(confirmed.len());
798
799 for entry in confirmed.iter() {
800 let (channel, symbols) = entry.pair();
801 if symbols.contains(&symbol) {
802 channels.push(format!("{channel}:{symbol}"));
804 } else {
805 let has_channel_marker = symbols.iter().any(|s| s.is_empty());
806 if has_channel_marker
807 && (*channel == BitmexWsAuthChannel::Execution.as_ref()
808 || *channel == BitmexWsAuthChannel::Order.as_ref())
809 {
810 channels.push(channel.to_string());
812 }
813 }
814 }
815
816 channels
817 }
818
819 pub async fn subscribe_instruments(&self) -> Result<(), BitmexWsError> {
825 log::debug!("Already subscribed to all instruments on connection, skipping");
827 Ok(())
828 }
829
830 pub async fn subscribe_instrument(
836 &self,
837 instrument_id: InstrumentId,
838 ) -> Result<(), BitmexWsError> {
839 log::debug!(
841 "Already subscribed to all instruments on connection (includes {instrument_id}), skipping"
842 );
843 Ok(())
844 }
845
846 pub async fn subscribe_book(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
852 let topic = BitmexWsTopic::OrderBookL2;
853 let symbol = instrument_id.symbol.inner();
854 self.subscribe(vec![format!("{topic}:{symbol}")]).await
855 }
856
857 pub async fn subscribe_book_25(
863 &self,
864 instrument_id: InstrumentId,
865 ) -> Result<(), BitmexWsError> {
866 let topic = BitmexWsTopic::OrderBookL2_25;
867 let symbol = instrument_id.symbol.inner();
868 self.subscribe(vec![format!("{topic}:{symbol}")]).await
869 }
870
871 pub async fn subscribe_book_depth10(
877 &self,
878 instrument_id: InstrumentId,
879 ) -> Result<(), BitmexWsError> {
880 let topic = BitmexWsTopic::OrderBook10;
881 let symbol = instrument_id.symbol.inner();
882 self.subscribe(vec![format!("{topic}:{symbol}")]).await
883 }
884
885 pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
893 let symbol = instrument_id.symbol.inner();
894
895 if is_index_symbol(&instrument_id.symbol.inner()) {
897 log::warn!("Ignoring quote subscription for index symbol: {symbol}");
898 return Ok(());
899 }
900
901 let topic = BitmexWsTopic::Quote;
902 self.subscribe(vec![format!("{topic}:{symbol}")]).await
903 }
904
905 pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
913 let symbol = instrument_id.symbol.inner();
914
915 if is_index_symbol(&symbol) {
917 log::warn!("Ignoring trade subscription for index symbol: {symbol}");
918 return Ok(());
919 }
920
921 let topic = BitmexWsTopic::Trade;
922 self.subscribe(vec![format!("{topic}:{symbol}")]).await
923 }
924
925 pub async fn subscribe_mark_prices(
931 &self,
932 instrument_id: InstrumentId,
933 ) -> Result<(), BitmexWsError> {
934 self.subscribe_instrument(instrument_id).await
935 }
936
937 pub async fn subscribe_index_prices(
943 &self,
944 instrument_id: InstrumentId,
945 ) -> Result<(), BitmexWsError> {
946 self.subscribe_instrument(instrument_id).await
947 }
948
949 pub async fn subscribe_funding_rates(
955 &self,
956 instrument_id: InstrumentId,
957 ) -> Result<(), BitmexWsError> {
958 let topic = BitmexWsTopic::Funding;
959 let symbol = instrument_id.symbol.inner();
960 self.subscribe(vec![format!("{topic}:{symbol}")]).await
961 }
962
963 pub async fn subscribe_bars(&self, bar_type: BarType) -> Result<(), BitmexWsError> {
969 let topic = topic_from_bar_spec(bar_type.spec());
970 let symbol = bar_type.instrument_id().symbol.inner();
971 self.subscribe(vec![format!("{topic}:{symbol}")]).await
972 }
973
974 pub async fn unsubscribe_instruments(&self) -> Result<(), BitmexWsError> {
980 log::debug!(
982 "Instruments subscription maintained for proper operation, skipping unsubscribe"
983 );
984 Ok(())
985 }
986
987 pub async fn unsubscribe_instrument(
993 &self,
994 instrument_id: InstrumentId,
995 ) -> Result<(), BitmexWsError> {
996 log::debug!(
998 "Instruments subscription maintained for proper operation (includes {instrument_id}), skipping unsubscribe"
999 );
1000 Ok(())
1001 }
1002
1003 pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
1009 let topic = BitmexWsTopic::OrderBookL2;
1010 let symbol = instrument_id.symbol.inner();
1011 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1012 }
1013
1014 pub async fn unsubscribe_book_25(
1020 &self,
1021 instrument_id: InstrumentId,
1022 ) -> Result<(), BitmexWsError> {
1023 let topic = BitmexWsTopic::OrderBookL2_25;
1024 let symbol = instrument_id.symbol.inner();
1025 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1026 }
1027
1028 pub async fn unsubscribe_book_depth10(
1034 &self,
1035 instrument_id: InstrumentId,
1036 ) -> Result<(), BitmexWsError> {
1037 let topic = BitmexWsTopic::OrderBook10;
1038 let symbol = instrument_id.symbol.inner();
1039 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1040 }
1041
1042 pub async fn unsubscribe_quotes(
1048 &self,
1049 instrument_id: InstrumentId,
1050 ) -> Result<(), BitmexWsError> {
1051 let symbol = instrument_id.symbol.inner();
1052
1053 if is_index_symbol(&symbol) {
1055 return Ok(());
1056 }
1057
1058 let topic = BitmexWsTopic::Quote;
1059 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1060 }
1061
1062 pub async fn unsubscribe_trades(
1068 &self,
1069 instrument_id: InstrumentId,
1070 ) -> Result<(), BitmexWsError> {
1071 let symbol = instrument_id.symbol.inner();
1072
1073 if is_index_symbol(&symbol) {
1075 return Ok(());
1076 }
1077
1078 let topic = BitmexWsTopic::Trade;
1079 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1080 }
1081
1082 pub async fn unsubscribe_mark_prices(
1088 &self,
1089 instrument_id: InstrumentId,
1090 ) -> Result<(), BitmexWsError> {
1091 log::debug!(
1093 "Mark prices for {instrument_id} uses shared instrument channel, skipping unsubscribe"
1094 );
1095 Ok(())
1096 }
1097
1098 pub async fn unsubscribe_index_prices(
1104 &self,
1105 instrument_id: InstrumentId,
1106 ) -> Result<(), BitmexWsError> {
1107 log::debug!(
1109 "Index prices for {instrument_id} uses shared instrument channel, skipping unsubscribe"
1110 );
1111 Ok(())
1112 }
1113
1114 pub async fn unsubscribe_funding_rates(
1120 &self,
1121 instrument_id: InstrumentId,
1122 ) -> Result<(), BitmexWsError> {
1123 log::debug!(
1125 "Funding rates for {instrument_id}, skipping unsubscribe to avoid shutdown race"
1126 );
1127 Ok(())
1128 }
1129
1130 pub async fn unsubscribe_bars(&self, bar_type: BarType) -> Result<(), BitmexWsError> {
1136 let topic = topic_from_bar_spec(bar_type.spec());
1137 let symbol = bar_type.instrument_id().symbol.inner();
1138 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1139 }
1140
1141 pub async fn subscribe_orders(&self) -> Result<(), BitmexWsError> {
1147 if self.credential.is_none() {
1148 return Err(BitmexWsError::MissingCredentials);
1149 }
1150 self.subscribe(vec![BitmexWsAuthChannel::Order.to_string()])
1151 .await
1152 }
1153
1154 pub async fn subscribe_executions(&self) -> Result<(), BitmexWsError> {
1160 if self.credential.is_none() {
1161 return Err(BitmexWsError::MissingCredentials);
1162 }
1163 self.subscribe(vec![BitmexWsAuthChannel::Execution.to_string()])
1164 .await
1165 }
1166
1167 pub async fn subscribe_positions(&self) -> Result<(), BitmexWsError> {
1173 if self.credential.is_none() {
1174 return Err(BitmexWsError::MissingCredentials);
1175 }
1176 self.subscribe(vec![BitmexWsAuthChannel::Position.to_string()])
1177 .await
1178 }
1179
1180 pub async fn subscribe_margin(&self) -> Result<(), BitmexWsError> {
1186 if self.credential.is_none() {
1187 return Err(BitmexWsError::MissingCredentials);
1188 }
1189 self.subscribe(vec![BitmexWsAuthChannel::Margin.to_string()])
1190 .await
1191 }
1192
1193 pub async fn subscribe_wallet(&self) -> Result<(), BitmexWsError> {
1199 if self.credential.is_none() {
1200 return Err(BitmexWsError::MissingCredentials);
1201 }
1202 self.subscribe(vec![BitmexWsAuthChannel::Wallet.to_string()])
1203 .await
1204 }
1205
1206 pub async fn unsubscribe_orders(&self) -> Result<(), BitmexWsError> {
1212 self.unsubscribe(vec![BitmexWsAuthChannel::Order.to_string()])
1213 .await
1214 }
1215
1216 pub async fn unsubscribe_executions(&self) -> Result<(), BitmexWsError> {
1222 self.unsubscribe(vec![BitmexWsAuthChannel::Execution.to_string()])
1223 .await
1224 }
1225
1226 pub async fn unsubscribe_positions(&self) -> Result<(), BitmexWsError> {
1232 self.unsubscribe(vec![BitmexWsAuthChannel::Position.to_string()])
1233 .await
1234 }
1235
1236 pub async fn unsubscribe_margin(&self) -> Result<(), BitmexWsError> {
1242 self.unsubscribe(vec![BitmexWsAuthChannel::Margin.to_string()])
1243 .await
1244 }
1245
1246 pub async fn unsubscribe_wallet(&self) -> Result<(), BitmexWsError> {
1252 self.unsubscribe(vec![BitmexWsAuthChannel::Wallet.to_string()])
1253 .await
1254 }
1255
1256 async fn send_cmd(&self, cmd: HandlerCommand) -> Result<(), BitmexWsError> {
1258 self.cmd_tx
1259 .read()
1260 .await
1261 .send(cmd)
1262 .map_err(|e| BitmexWsError::ClientError(format!("Handler not available: {e}")))
1263 }
1264}
1265
1266#[cfg(test)]
1267mod tests {
1268 use ahash::AHashSet;
1269 use rstest::rstest;
1270 use ustr::Ustr;
1271
1272 use super::*;
1273
1274 #[rstest]
1275 fn test_reconnect_topics_restoration_logic() {
1276 let client = BitmexWebSocketClient::new(
1278 Some("ws://test.com".to_string()),
1279 Some("test_key".to_string()),
1280 Some("test_secret".to_string()),
1281 Some(AccountId::new("BITMEX-TEST")),
1282 None,
1283 )
1284 .unwrap();
1285
1286 let subs = client.subscriptions.confirmed();
1288 subs.insert(Ustr::from(BitmexWsTopic::Trade.as_ref()), {
1289 let mut set = AHashSet::new();
1290 set.insert(Ustr::from("XBTUSD"));
1291 set.insert(Ustr::from("ETHUSD"));
1292 set
1293 });
1294
1295 subs.insert(Ustr::from(BitmexWsTopic::OrderBookL2.as_ref()), {
1296 let mut set = AHashSet::new();
1297 set.insert(Ustr::from("XBTUSD"));
1298 set
1299 });
1300
1301 subs.insert(Ustr::from(BitmexWsAuthChannel::Order.as_ref()), {
1303 let mut set = AHashSet::new();
1304 set.insert(Ustr::from(""));
1305 set
1306 });
1307 subs.insert(Ustr::from(BitmexWsAuthChannel::Position.as_ref()), {
1308 let mut set = AHashSet::new();
1309 set.insert(Ustr::from(""));
1310 set
1311 });
1312
1313 let mut topics_to_restore = Vec::new();
1315 for entry in subs.iter() {
1316 let (channel, symbols) = entry.pair();
1317 for symbol in symbols {
1318 if symbol.is_empty() {
1319 topics_to_restore.push(channel.to_string());
1320 } else {
1321 topics_to_restore.push(format!("{channel}:{symbol}"));
1322 }
1323 }
1324 }
1325
1326 assert!(topics_to_restore.contains(&format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref())));
1328 assert!(topics_to_restore.contains(&format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref())));
1329 assert!(
1330 topics_to_restore.contains(&format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref()))
1331 );
1332 assert!(topics_to_restore.contains(&BitmexWsAuthChannel::Order.as_ref().to_string()));
1333 assert!(topics_to_restore.contains(&BitmexWsAuthChannel::Position.as_ref().to_string()));
1334 assert_eq!(topics_to_restore.len(), 5);
1335 }
1336
1337 #[rstest]
1338 fn test_reconnect_auth_message_building() {
1339 let client_with_creds = BitmexWebSocketClient::new(
1341 Some("ws://test.com".to_string()),
1342 Some("test_key".to_string()),
1343 Some("test_secret".to_string()),
1344 Some(AccountId::new("BITMEX-TEST")),
1345 None,
1346 )
1347 .unwrap();
1348
1349 if let Some(cred) = &client_with_creds.credential {
1351 let expires = (chrono::Utc::now() + chrono::Duration::seconds(30)).timestamp();
1352 let signature = cred.sign("GET", "/realtime", expires, "");
1353
1354 let auth_message = BitmexAuthentication {
1355 op: BitmexWsAuthAction::AuthKeyExpires,
1356 args: (cred.api_key.to_string(), expires, signature),
1357 };
1358
1359 assert_eq!(auth_message.op, BitmexWsAuthAction::AuthKeyExpires);
1361 assert_eq!(auth_message.args.0, "test_key");
1362 assert!(auth_message.args.1 > 0); assert!(!auth_message.args.2.is_empty()); } else {
1365 panic!("Client should have credentials");
1366 }
1367
1368 let client_no_creds = BitmexWebSocketClient::new(
1370 Some("ws://test.com".to_string()),
1371 None,
1372 None,
1373 Some(AccountId::new("BITMEX-TEST")),
1374 None,
1375 )
1376 .unwrap();
1377
1378 assert!(client_no_creds.credential.is_none());
1379 }
1380
1381 #[rstest]
1382 fn test_subscription_state_after_unsubscribe() {
1383 let client = BitmexWebSocketClient::new(
1384 Some("ws://test.com".to_string()),
1385 Some("test_key".to_string()),
1386 Some("test_secret".to_string()),
1387 Some(AccountId::new("BITMEX-TEST")),
1388 None,
1389 )
1390 .unwrap();
1391
1392 let subs = client.subscriptions.confirmed();
1394 subs.insert(Ustr::from(BitmexWsTopic::Trade.as_ref()), {
1395 let mut set = AHashSet::new();
1396 set.insert(Ustr::from("XBTUSD"));
1397 set.insert(Ustr::from("ETHUSD"));
1398 set
1399 });
1400
1401 subs.insert(Ustr::from(BitmexWsTopic::OrderBookL2.as_ref()), {
1402 let mut set = AHashSet::new();
1403 set.insert(Ustr::from("XBTUSD"));
1404 set
1405 });
1406
1407 let topic = format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref());
1409 if let Some((channel, symbol)) = topic.split_once(':')
1410 && let Some(mut entry) = subs.get_mut(&Ustr::from(channel))
1411 {
1412 entry.remove(&Ustr::from(symbol));
1413 if entry.is_empty() {
1414 drop(entry);
1415 subs.remove(&Ustr::from(channel));
1416 }
1417 }
1418
1419 let mut topics_to_restore = Vec::new();
1421 for entry in subs.iter() {
1422 let (channel, symbols) = entry.pair();
1423 for symbol in symbols {
1424 if symbol.is_empty() {
1425 topics_to_restore.push(channel.to_string());
1426 } else {
1427 topics_to_restore.push(format!("{channel}:{symbol}"));
1428 }
1429 }
1430 }
1431
1432 let trade_xbt = format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref());
1434 let trade_eth = format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref());
1435 let book_xbt = format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref());
1436
1437 assert!(topics_to_restore.contains(&trade_xbt));
1438 assert!(!topics_to_restore.contains(&trade_eth));
1439 assert!(topics_to_restore.contains(&book_xbt));
1440 assert_eq!(topics_to_restore.len(), 2);
1441 }
1442
1443 #[rstest]
1444 fn test_race_unsubscribe_failure_recovery() {
1445 let client = BitmexWebSocketClient::new(
1451 Some("ws://test.com".to_string()),
1452 None,
1453 None,
1454 Some(AccountId::new("BITMEX-TEST")),
1455 None,
1456 )
1457 .unwrap();
1458
1459 let topic = format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref());
1460
1461 client.subscriptions.mark_subscribe(&topic);
1463 client.subscriptions.confirm_subscribe(&topic);
1464 assert_eq!(client.subscriptions.len(), 1);
1465
1466 client.subscriptions.mark_unsubscribe(&topic);
1468 assert_eq!(client.subscriptions.len(), 0);
1469 assert_eq!(
1470 client.subscriptions.pending_unsubscribe_topics(),
1471 vec![topic.clone()]
1472 );
1473
1474 client.subscriptions.confirm_unsubscribe(&topic); client.subscriptions.mark_subscribe(&topic); client.subscriptions.confirm_subscribe(&topic); assert_eq!(client.subscriptions.len(), 1);
1482 assert!(client.subscriptions.pending_unsubscribe_topics().is_empty());
1483 assert!(client.subscriptions.pending_subscribe_topics().is_empty());
1484
1485 let all = client.subscriptions.all_topics();
1487 assert_eq!(all.len(), 1);
1488 assert!(all.contains(&topic));
1489 }
1490
1491 #[rstest]
1492 fn test_race_resubscribe_before_unsubscribe_ack() {
1493 let client = BitmexWebSocketClient::new(
1497 Some("ws://test.com".to_string()),
1498 None,
1499 None,
1500 Some(AccountId::new("BITMEX-TEST")),
1501 None,
1502 )
1503 .unwrap();
1504
1505 let topic = format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref());
1506
1507 client.subscriptions.mark_subscribe(&topic);
1509 client.subscriptions.confirm_subscribe(&topic);
1510 assert_eq!(client.subscriptions.len(), 1);
1511
1512 client.subscriptions.mark_unsubscribe(&topic);
1514 assert_eq!(client.subscriptions.len(), 0);
1515 assert_eq!(
1516 client.subscriptions.pending_unsubscribe_topics(),
1517 vec![topic.clone()]
1518 );
1519
1520 client.subscriptions.mark_subscribe(&topic);
1522 assert_eq!(
1523 client.subscriptions.pending_subscribe_topics(),
1524 vec![topic.clone()]
1525 );
1526
1527 client.subscriptions.confirm_unsubscribe(&topic);
1529 assert!(client.subscriptions.pending_unsubscribe_topics().is_empty());
1530 assert_eq!(
1531 client.subscriptions.pending_subscribe_topics(),
1532 vec![topic.clone()]
1533 );
1534
1535 client.subscriptions.confirm_subscribe(&topic);
1537 assert_eq!(client.subscriptions.len(), 1);
1538 assert!(client.subscriptions.pending_subscribe_topics().is_empty());
1539
1540 let all = client.subscriptions.all_topics();
1542 assert_eq!(all.len(), 1);
1543 assert!(all.contains(&topic));
1544 }
1545
1546 #[rstest]
1547 fn test_race_channel_level_reconnection_with_pending_states() {
1548 let client = BitmexWebSocketClient::new(
1550 Some("ws://test.com".to_string()),
1551 Some("test_key".to_string()),
1552 Some("test_secret".to_string()),
1553 Some(AccountId::new("BITMEX-TEST")),
1554 None,
1555 )
1556 .unwrap();
1557
1558 let trade_xbt = format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref());
1561 client.subscriptions.mark_subscribe(&trade_xbt);
1562 client.subscriptions.confirm_subscribe(&trade_xbt);
1563
1564 let order_channel = BitmexWsAuthChannel::Order.as_ref();
1566 client.subscriptions.mark_subscribe(order_channel);
1567 client.subscriptions.confirm_subscribe(order_channel);
1568
1569 let trade_eth = format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref());
1571 client.subscriptions.mark_subscribe(&trade_eth);
1572
1573 let book_xbt = format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref());
1575 client.subscriptions.mark_subscribe(&book_xbt);
1576 client.subscriptions.confirm_subscribe(&book_xbt);
1577 client.subscriptions.mark_unsubscribe(&book_xbt);
1578
1579 let topics_to_restore = client.subscriptions.all_topics();
1581
1582 assert_eq!(topics_to_restore.len(), 3);
1584 assert!(topics_to_restore.contains(&trade_xbt));
1585 assert!(topics_to_restore.contains(&order_channel.to_string()));
1586 assert!(topics_to_restore.contains(&trade_eth));
1587 assert!(!topics_to_restore.contains(&book_xbt)); for topic in &topics_to_restore {
1592 if topic == order_channel {
1593 assert!(
1594 !topic.contains(':'),
1595 "Channel-level topic should not have delimiter"
1596 );
1597 }
1598 }
1599 }
1600}