1use std::{
24 sync::{
25 Arc,
26 atomic::{AtomicBool, AtomicU8, Ordering},
27 },
28 time::Duration,
29};
30
31use arc_swap::ArcSwap;
32use dashmap::DashMap;
33use futures_util::Stream;
34use nautilus_common::live::runtime::get_runtime;
35use nautilus_core::{
36 consts::NAUTILUS_USER_AGENT,
37 env::{get_env_var, get_or_env_var_opt},
38};
39use nautilus_model::{
40 data::bar::BarType,
41 enums::OrderType,
42 identifiers::{AccountId, ClientOrderId, InstrumentId},
43 instruments::{Instrument, InstrumentAny},
44};
45use nautilus_network::{
46 http::USER_AGENT,
47 mode::ConnectionMode,
48 websocket::{
49 AUTHENTICATION_TIMEOUT_SECS, AuthTracker, PingHandler, SubscriptionState, WebSocketClient,
50 WebSocketConfig, channel_message_handler,
51 },
52};
53use tokio_tungstenite::tungstenite::Message;
54use ustr::Ustr;
55
56use super::{
57 enums::{BitmexWsAuthAction, BitmexWsAuthChannel, BitmexWsOperation, BitmexWsTopic},
58 error::BitmexWsError,
59 handler::{FeedHandler, HandlerCommand},
60 messages::{BitmexAuthentication, BitmexSubscription, NautilusWsMessage},
61 parse::{is_index_symbol, topic_from_bar_spec},
62};
63use crate::common::{
64 consts::{BITMEX_WS_TOPIC_DELIMITER, BITMEX_WS_URL},
65 credential::Credential,
66};
67
68#[derive(Clone, Debug)]
76#[cfg_attr(
77 feature = "python",
78 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
79)]
80pub struct BitmexWebSocketClient {
81 url: String,
82 credential: Option<Credential>,
83 heartbeat: Option<u64>,
84 account_id: AccountId,
85 auth_tracker: AuthTracker,
86 signal: Arc<AtomicBool>,
87 connection_mode: Arc<ArcSwap<AtomicU8>>,
88 cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
89 out_rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>>,
90 task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
91 subscriptions: SubscriptionState,
92 tracked_subscriptions: Arc<DashMap<String, ()>>,
93 instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
94 order_type_cache: Arc<DashMap<ClientOrderId, OrderType>>,
95 order_symbol_cache: Arc<DashMap<ClientOrderId, Ustr>>,
96}
97
98impl BitmexWebSocketClient {
99 pub fn new(
105 url: Option<String>,
106 api_key: Option<String>,
107 api_secret: Option<String>,
108 account_id: Option<AccountId>,
109 heartbeat: Option<u64>,
110 ) -> anyhow::Result<Self> {
111 let credential = match (api_key, api_secret) {
112 (Some(key), Some(secret)) => Some(Credential::new(key, secret)),
113 (None, None) => None,
114 _ => anyhow::bail!("Both `api_key` and `api_secret` must be provided together"),
115 };
116
117 let account_id = account_id.unwrap_or(AccountId::from("BITMEX-master"));
118
119 let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
120 let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
121
122 let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
126
127 Ok(Self {
128 url: url.unwrap_or(BITMEX_WS_URL.to_string()),
129 credential,
130 heartbeat,
131 account_id,
132 auth_tracker: AuthTracker::new(),
133 signal: Arc::new(AtomicBool::new(false)),
134 connection_mode,
135 cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
136 out_rx: None,
137 task_handle: None,
138 subscriptions: SubscriptionState::new(BITMEX_WS_TOPIC_DELIMITER),
139 tracked_subscriptions: Arc::new(DashMap::new()),
140 instruments_cache: Arc::new(DashMap::new()),
141 order_type_cache: Arc::new(DashMap::new()),
142 order_symbol_cache: Arc::new(DashMap::new()),
143 })
144 }
145
146 pub fn new_with_env(
157 url: Option<String>,
158 api_key: Option<String>,
159 api_secret: Option<String>,
160 account_id: Option<AccountId>,
161 heartbeat: Option<u64>,
162 testnet: bool,
163 ) -> anyhow::Result<Self> {
164 let (api_key_env, api_secret_env) = if testnet {
165 ("BITMEX_TESTNET_API_KEY", "BITMEX_TESTNET_API_SECRET")
166 } else {
167 ("BITMEX_API_KEY", "BITMEX_API_SECRET")
168 };
169
170 let key = get_or_env_var_opt(api_key, api_key_env);
171 let secret = get_or_env_var_opt(api_secret, api_secret_env);
172
173 Self::new(url, key, secret, account_id, heartbeat)
174 }
175
176 pub fn from_env() -> anyhow::Result<Self> {
182 let url = get_env_var("BITMEX_WS_URL")?;
183 let api_key = get_env_var("BITMEX_API_KEY")?;
184 let api_secret = get_env_var("BITMEX_API_SECRET")?;
185
186 Self::new(Some(url), Some(api_key), Some(api_secret), None, None)
187 }
188
189 #[must_use]
191 pub const fn url(&self) -> &str {
192 self.url.as_str()
193 }
194
195 #[must_use]
197 pub fn api_key(&self) -> Option<&str> {
198 self.credential.as_ref().map(|c| c.api_key.as_str())
199 }
200
201 #[must_use]
203 pub fn api_key_masked(&self) -> Option<String> {
204 self.credential.as_ref().map(|c| c.api_key_masked())
205 }
206
207 #[must_use]
209 pub fn is_active(&self) -> bool {
210 let connection_mode_arc = self.connection_mode.load();
211 ConnectionMode::from_atomic(&connection_mode_arc).is_active()
212 && !self.signal.load(Ordering::Relaxed)
213 }
214
215 #[must_use]
217 pub fn is_closed(&self) -> bool {
218 let connection_mode_arc = self.connection_mode.load();
219 ConnectionMode::from_atomic(&connection_mode_arc).is_closed()
220 || self.signal.load(Ordering::Relaxed)
221 }
222
223 pub fn set_account_id(&mut self, account_id: AccountId) {
225 self.account_id = account_id;
226 }
227
228 pub fn cache_instruments(&mut self, instruments: Vec<InstrumentAny>) {
232 self.instruments_cache.clear();
233 let mut count = 0;
234
235 log::debug!("Initializing BitMEX instrument cache");
236
237 for inst in instruments {
238 let symbol = inst.symbol().inner();
239 self.instruments_cache.insert(symbol, inst.clone());
240 log::debug!("Cached instrument: {symbol}");
241 count += 1;
242 }
243
244 log::info!("BitMEX instrument cache initialized with {count} instruments");
245 }
246
247 pub fn cache_instrument(&self, instrument: InstrumentAny) {
251 self.instruments_cache
252 .insert(instrument.symbol().inner(), instrument.clone());
253
254 if let Ok(cmd_tx) = self.cmd_tx.try_read()
257 && let Err(e) = cmd_tx.send(HandlerCommand::UpdateInstrument(instrument))
258 {
259 log::debug!("Failed to send instrument update to handler: {e}");
260 }
261 }
262
263 pub async fn connect(&mut self) -> Result<(), BitmexWsError> {
273 let (client, raw_rx) = self.connect_inner().await?;
274
275 self.connection_mode.store(client.connection_mode_atomic());
277
278 let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
279 self.out_rx = Some(Arc::new(out_rx));
280
281 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
282 *self.cmd_tx.write().await = cmd_tx.clone();
283
284 if let Err(e) = cmd_tx.send(HandlerCommand::SetClient(client)) {
286 return Err(BitmexWsError::ClientError(format!(
287 "Failed to send WebSocketClient to handler: {e}"
288 )));
289 }
290
291 if !self.instruments_cache.is_empty() {
293 let cached_instruments: Vec<InstrumentAny> = self
294 .instruments_cache
295 .iter()
296 .map(|entry| entry.value().clone())
297 .collect();
298 if let Err(e) = cmd_tx.send(HandlerCommand::InitializeInstruments(cached_instruments)) {
299 tracing::error!("Failed to replay instruments to handler: {e}");
300 }
301 }
302
303 let signal = self.signal.clone();
304 let account_id = self.account_id;
305 let credential = self.credential.clone();
306 let auth_tracker = self.auth_tracker.clone();
307 let subscriptions = self.subscriptions.clone();
308 let order_type_cache = self.order_type_cache.clone();
309 let order_symbol_cache = self.order_symbol_cache.clone();
310 let cmd_tx_for_reconnect = cmd_tx.clone();
311
312 let stream_handle = get_runtime().spawn(async move {
313 let mut handler = FeedHandler::new(
314 signal.clone(),
315 cmd_rx,
316 raw_rx,
317 out_tx,
318 account_id,
319 auth_tracker.clone(),
320 subscriptions.clone(),
321 order_type_cache,
322 order_symbol_cache,
323 );
324
325 let resubscribe_all = || {
327 let topics = subscriptions.all_topics();
329
330 if topics.is_empty() {
331 return;
332 }
333
334 tracing::debug!(count = topics.len(), "Resubscribing to confirmed subscriptions");
335
336 for topic in &topics {
337 subscriptions.mark_subscribe(topic.as_str());
338 }
339
340 let mut payloads = Vec::with_capacity(topics.len());
342 for topic in &topics {
343 let message = BitmexSubscription {
344 op: BitmexWsOperation::Subscribe,
345 args: vec![Ustr::from(topic.as_ref())],
346 };
347 if let Ok(payload) = serde_json::to_string(&message) {
348 payloads.push(payload);
349 }
350 }
351
352 if let Err(e) = cmd_tx_for_reconnect.send(HandlerCommand::Subscribe { topics: payloads }) {
353 tracing::error!(error = %e, "Failed to send resubscribe command");
354 }
355 };
356
357 loop {
359 match handler.next().await {
360 Some(NautilusWsMessage::Reconnected) => {
361 if signal.load(Ordering::Relaxed) {
362 continue;
363 }
364
365 log::info!("WebSocket reconnected");
366
367 let confirmed_topics: Vec<String> = {
369 let confirmed = subscriptions.confirmed();
370 let mut topics = Vec::new();
371
372 for entry in confirmed.iter() {
373 let (channel, symbols) = entry.pair();
374
375 if *channel == BitmexWsTopic::Instrument.as_ref() {
376 continue;
377 }
378
379 for symbol in symbols {
380 if symbol.is_empty() {
381 topics.push(channel.to_string());
382 } else {
383 topics.push(format!("{channel}:{symbol}"));
384 }
385 }
386 }
387
388 topics
389 };
390
391 if !confirmed_topics.is_empty() {
392 tracing::debug!(count = confirmed_topics.len(), "Marking confirmed subscriptions as pending for replay");
393 for topic in confirmed_topics {
394 subscriptions.mark_failure(&topic);
395 }
396 }
397
398 if let Some(cred) = &credential {
399 tracing::debug!("Re-authenticating after reconnection");
400
401 let expires = (chrono::Utc::now() + chrono::Duration::seconds(30)).timestamp();
402 let signature = cred.sign("GET", "/realtime", expires, "");
403
404 let auth_message = BitmexAuthentication {
405 op: BitmexWsAuthAction::AuthKeyExpires,
406 args: (cred.api_key.to_string(), expires, signature),
407 };
408
409 if let Ok(payload) = serde_json::to_string(&auth_message) {
410 if let Err(e) = cmd_tx_for_reconnect.send(HandlerCommand::Authenticate { payload }) {
411 tracing::error!(error = %e, "Failed to send reconnection auth command");
412 }
413 } else {
414 tracing::error!("Failed to serialize reconnection auth message");
415 }
416 }
417
418 if credential.is_none() {
421 tracing::debug!("No authentication required, resubscribing immediately");
422 resubscribe_all();
423 }
424
425 continue;
430 }
431 Some(NautilusWsMessage::Authenticated) => {
432 tracing::debug!("Authenticated after reconnection, resubscribing");
433 resubscribe_all();
434 continue;
435 }
436 Some(msg) => {
437 if handler.send(msg).is_err() {
438 tracing::error!("Failed to send message (receiver dropped)");
439 break;
440 }
441 }
442 None => {
443 if handler.is_stopped() {
445 tracing::debug!("Stop signal received, ending message processing");
446 break;
447 }
448 tracing::warn!("WebSocket stream ended unexpectedly");
450 break;
451 }
452 }
453 }
454
455 tracing::debug!("Handler task exiting");
456 });
457
458 self.task_handle = Some(Arc::new(stream_handle));
459
460 if self.credential.is_some()
461 && let Err(e) = self.authenticate().await
462 {
463 return Err(e);
464 }
465
466 let instrument_topic = BitmexWsTopic::Instrument.as_ref().to_string();
468 self.subscriptions.mark_subscribe(&instrument_topic);
469 self.tracked_subscriptions.insert(instrument_topic, ());
470
471 let subscribe_msg = BitmexSubscription {
472 op: BitmexWsOperation::Subscribe,
473 args: vec![Ustr::from(BitmexWsTopic::Instrument.as_ref())],
474 };
475
476 match serde_json::to_string(&subscribe_msg) {
477 Ok(subscribe_json) => {
478 if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Subscribe {
479 topics: vec![subscribe_json],
480 }) {
481 log::error!("Failed to send subscribe command for instruments: {e}");
482 } else {
483 log::debug!("Subscribed to all instruments");
484 }
485 }
486 Err(e) => {
487 tracing::error!(error = %e, "Failed to serialize subscribe message");
488 }
489 }
490
491 Ok(())
492 }
493
494 async fn connect_inner(
500 &mut self,
501 ) -> Result<
502 (
503 WebSocketClient,
504 tokio::sync::mpsc::UnboundedReceiver<Message>,
505 ),
506 BitmexWsError,
507 > {
508 let (message_handler, rx) = channel_message_handler();
509
510 let ping_handler: PingHandler = Arc::new(move |_payload: Vec<u8>| {
513 });
515
516 let config = WebSocketConfig {
517 url: self.url.clone(),
518 headers: vec![(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())],
519 heartbeat: self.heartbeat,
520 heartbeat_msg: None,
521 message_handler: Some(message_handler),
522 ping_handler: Some(ping_handler),
523 reconnect_timeout_ms: Some(5_000),
524 reconnect_delay_initial_ms: None, reconnect_delay_max_ms: None, reconnect_backoff_factor: None, reconnect_jitter_ms: None, reconnect_max_attempts: None,
529 };
530
531 let keyed_quotas = vec![];
532 let client = WebSocketClient::connect(
533 config,
534 None, keyed_quotas,
536 None, )
538 .await
539 .map_err(|e| BitmexWsError::ClientError(e.to_string()))?;
540
541 Ok((client, rx))
542 }
543
544 async fn authenticate(&self) -> Result<(), BitmexWsError> {
551 let credential = match &self.credential {
552 Some(credential) => credential,
553 None => {
554 return Err(BitmexWsError::AuthenticationError(
555 "API credentials not available to authenticate".to_string(),
556 ));
557 }
558 };
559
560 let receiver = self.auth_tracker.begin();
561
562 let expires = (chrono::Utc::now() + chrono::Duration::seconds(30)).timestamp();
563 let signature = credential.sign("GET", "/realtime", expires, "");
564
565 let auth_message = BitmexAuthentication {
566 op: BitmexWsAuthAction::AuthKeyExpires,
567 args: (credential.api_key.to_string(), expires, signature),
568 };
569
570 let auth_json = serde_json::to_string(&auth_message).map_err(|e| {
571 let msg = format!("Failed to serialize auth message: {e}");
572 self.auth_tracker.fail(msg.clone());
573 BitmexWsError::AuthenticationError(msg)
574 })?;
575
576 self.cmd_tx
578 .read()
579 .await
580 .send(HandlerCommand::Authenticate { payload: auth_json })
581 .map_err(|e| {
582 let msg = format!("Failed to send authenticate command: {e}");
583 self.auth_tracker.fail(msg.clone());
584 BitmexWsError::AuthenticationError(msg)
585 })?;
586
587 self.auth_tracker
588 .wait_for_result::<BitmexWsError>(
589 Duration::from_secs(AUTHENTICATION_TIMEOUT_SECS),
590 receiver,
591 )
592 .await
593 }
594
595 pub async fn wait_until_active(&self, timeout_secs: f64) -> Result<(), BitmexWsError> {
601 let timeout = Duration::from_secs_f64(timeout_secs);
602
603 tokio::time::timeout(timeout, async {
604 while !self.is_active() {
605 tokio::time::sleep(Duration::from_millis(10)).await;
606 }
607 })
608 .await
609 .map_err(|_| {
610 BitmexWsError::ClientError(format!(
611 "WebSocket connection timeout after {timeout_secs} seconds"
612 ))
613 })?;
614
615 Ok(())
616 }
617
618 pub fn stream(&mut self) -> impl Stream<Item = NautilusWsMessage> + use<> {
626 let rx = self
627 .out_rx
628 .take()
629 .expect("Stream receiver already taken or not connected");
630 let mut rx = Arc::try_unwrap(rx).expect("Cannot take ownership - other references exist");
631 async_stream::stream! {
632 while let Some(msg) = rx.recv().await {
633 yield msg;
634 }
635 }
636 }
637
638 pub async fn close(&mut self) -> Result<(), BitmexWsError> {
648 log::debug!("Starting close process");
649
650 self.signal.store(true, Ordering::Relaxed);
651
652 if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Disconnect) {
654 log::debug!(
655 "Failed to send disconnect command (handler may already be shut down): {e}"
656 );
657 }
658
659 if let Some(task_handle) = self.task_handle.take() {
661 match Arc::try_unwrap(task_handle) {
662 Ok(handle) => {
663 log::debug!("Waiting for task handle to complete");
664 match tokio::time::timeout(Duration::from_secs(2), handle).await {
665 Ok(Ok(())) => log::debug!("Task handle completed successfully"),
666 Ok(Err(e)) => log::error!("Task handle encountered an error: {e:?}"),
667 Err(_) => {
668 log::warn!(
669 "Timeout waiting for task handle, task may still be running"
670 );
671 }
673 }
674 }
675 Err(arc_handle) => {
676 log::debug!(
677 "Cannot take ownership of task handle - other references exist, aborting task"
678 );
679 arc_handle.abort();
680 }
681 }
682 } else {
683 log::debug!("No task handle to await");
684 }
685
686 log::debug!("Closed");
687
688 Ok(())
689 }
690
691 pub async fn subscribe(&self, topics: Vec<String>) -> Result<(), BitmexWsError> {
701 log::debug!("Subscribing to topics: {topics:?}");
702
703 for topic in &topics {
704 self.subscriptions.mark_subscribe(topic.as_str());
705 self.tracked_subscriptions.insert(topic.clone(), ());
706 }
707
708 let mut payloads = Vec::with_capacity(topics.len());
710 for topic in &topics {
711 let message = BitmexSubscription {
712 op: BitmexWsOperation::Subscribe,
713 args: vec![Ustr::from(topic.as_ref())],
714 };
715 let payload = serde_json::to_string(&message).map_err(|e| {
716 BitmexWsError::SubscriptionError(format!("Failed to serialize subscription: {e}"))
717 })?;
718 payloads.push(payload);
719 }
720
721 let cmd = HandlerCommand::Subscribe { topics: payloads };
723
724 self.send_cmd(cmd).await.map_err(|e| {
725 BitmexWsError::SubscriptionError(format!("Failed to send subscribe command: {e}"))
726 })
727 }
728
729 async fn unsubscribe(&self, topics: Vec<String>) -> Result<(), BitmexWsError> {
735 log::debug!("Attempting to unsubscribe from topics: {topics:?}");
736
737 if self.signal.load(Ordering::Relaxed) {
738 log::debug!("Shutdown signal detected, skipping unsubscribe");
739 return Ok(());
740 }
741
742 for topic in &topics {
743 self.subscriptions.mark_unsubscribe(topic.as_str());
744 self.tracked_subscriptions.remove(topic);
745 }
746
747 let mut payloads = Vec::with_capacity(topics.len());
749 for topic in &topics {
750 let message = BitmexSubscription {
751 op: BitmexWsOperation::Unsubscribe,
752 args: vec![Ustr::from(topic.as_ref())],
753 };
754 if let Ok(payload) = serde_json::to_string(&message) {
755 payloads.push(payload);
756 }
757 }
758
759 let cmd = HandlerCommand::Unsubscribe { topics: payloads };
761
762 if let Err(e) = self.send_cmd(cmd).await {
763 tracing::debug!(error = %e, "Failed to send unsubscribe command");
764 }
765
766 Ok(())
767 }
768
769 #[must_use]
771 pub fn subscription_count(&self) -> usize {
772 self.subscriptions.len()
773 }
774
775 pub fn get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
776 let symbol = instrument_id.symbol.inner();
777 let confirmed = self.subscriptions.confirmed();
778 let mut channels = Vec::with_capacity(confirmed.len());
779
780 for entry in confirmed.iter() {
781 let (channel, symbols) = entry.pair();
782 if symbols.contains(&symbol) {
783 channels.push(format!("{channel}:{symbol}"));
785 } else {
786 let has_channel_marker = symbols.iter().any(|s| s.is_empty());
787 if has_channel_marker
788 && (*channel == BitmexWsAuthChannel::Execution.as_ref()
789 || *channel == BitmexWsAuthChannel::Order.as_ref())
790 {
791 channels.push(channel.to_string());
793 }
794 }
795 }
796
797 channels
798 }
799
800 pub async fn subscribe_instruments(&self) -> Result<(), BitmexWsError> {
806 log::debug!("Already subscribed to all instruments on connection, skipping");
808 Ok(())
809 }
810
811 pub async fn subscribe_instrument(
817 &self,
818 instrument_id: InstrumentId,
819 ) -> Result<(), BitmexWsError> {
820 log::debug!(
822 "Already subscribed to all instruments on connection (includes {instrument_id}), skipping"
823 );
824 Ok(())
825 }
826
827 pub async fn subscribe_book(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
833 let topic = BitmexWsTopic::OrderBookL2;
834 let symbol = instrument_id.symbol.inner();
835 self.subscribe(vec![format!("{topic}:{symbol}")]).await
836 }
837
838 pub async fn subscribe_book_25(
844 &self,
845 instrument_id: InstrumentId,
846 ) -> Result<(), BitmexWsError> {
847 let topic = BitmexWsTopic::OrderBookL2_25;
848 let symbol = instrument_id.symbol.inner();
849 self.subscribe(vec![format!("{topic}:{symbol}")]).await
850 }
851
852 pub async fn subscribe_book_depth10(
858 &self,
859 instrument_id: InstrumentId,
860 ) -> Result<(), BitmexWsError> {
861 let topic = BitmexWsTopic::OrderBook10;
862 let symbol = instrument_id.symbol.inner();
863 self.subscribe(vec![format!("{topic}:{symbol}")]).await
864 }
865
866 pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
874 let symbol = instrument_id.symbol.inner();
875
876 if is_index_symbol(&instrument_id.symbol.inner()) {
878 tracing::warn!("Ignoring quote subscription for index symbol: {symbol}");
879 return Ok(());
880 }
881
882 let topic = BitmexWsTopic::Quote;
883 self.subscribe(vec![format!("{topic}:{symbol}")]).await
884 }
885
886 pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
894 let symbol = instrument_id.symbol.inner();
895
896 if is_index_symbol(&symbol) {
898 tracing::warn!("Ignoring trade subscription for index symbol: {symbol}");
899 return Ok(());
900 }
901
902 let topic = BitmexWsTopic::Trade;
903 self.subscribe(vec![format!("{topic}:{symbol}")]).await
904 }
905
906 pub async fn subscribe_mark_prices(
912 &self,
913 instrument_id: InstrumentId,
914 ) -> Result<(), BitmexWsError> {
915 self.subscribe_instrument(instrument_id).await
916 }
917
918 pub async fn subscribe_index_prices(
924 &self,
925 instrument_id: InstrumentId,
926 ) -> Result<(), BitmexWsError> {
927 self.subscribe_instrument(instrument_id).await
928 }
929
930 pub async fn subscribe_funding_rates(
936 &self,
937 instrument_id: InstrumentId,
938 ) -> Result<(), BitmexWsError> {
939 let topic = BitmexWsTopic::Funding;
940 let symbol = instrument_id.symbol.inner();
941 self.subscribe(vec![format!("{topic}:{symbol}")]).await
942 }
943
944 pub async fn subscribe_bars(&self, bar_type: BarType) -> Result<(), BitmexWsError> {
950 let topic = topic_from_bar_spec(bar_type.spec());
951 let symbol = bar_type.instrument_id().symbol.inner();
952 self.subscribe(vec![format!("{topic}:{symbol}")]).await
953 }
954
955 pub async fn unsubscribe_instruments(&self) -> Result<(), BitmexWsError> {
961 log::debug!(
963 "Instruments subscription maintained for proper operation, skipping unsubscribe"
964 );
965 Ok(())
966 }
967
968 pub async fn unsubscribe_instrument(
974 &self,
975 instrument_id: InstrumentId,
976 ) -> Result<(), BitmexWsError> {
977 log::debug!(
979 "Instruments subscription maintained for proper operation (includes {instrument_id}), skipping unsubscribe"
980 );
981 Ok(())
982 }
983
984 pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
990 let topic = BitmexWsTopic::OrderBookL2;
991 let symbol = instrument_id.symbol.inner();
992 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
993 }
994
995 pub async fn unsubscribe_book_25(
1001 &self,
1002 instrument_id: InstrumentId,
1003 ) -> Result<(), BitmexWsError> {
1004 let topic = BitmexWsTopic::OrderBookL2_25;
1005 let symbol = instrument_id.symbol.inner();
1006 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1007 }
1008
1009 pub async fn unsubscribe_book_depth10(
1015 &self,
1016 instrument_id: InstrumentId,
1017 ) -> Result<(), BitmexWsError> {
1018 let topic = BitmexWsTopic::OrderBook10;
1019 let symbol = instrument_id.symbol.inner();
1020 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1021 }
1022
1023 pub async fn unsubscribe_quotes(
1029 &self,
1030 instrument_id: InstrumentId,
1031 ) -> Result<(), BitmexWsError> {
1032 let symbol = instrument_id.symbol.inner();
1033
1034 if is_index_symbol(&symbol) {
1036 return Ok(());
1037 }
1038
1039 let topic = BitmexWsTopic::Quote;
1040 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1041 }
1042
1043 pub async fn unsubscribe_trades(
1049 &self,
1050 instrument_id: InstrumentId,
1051 ) -> Result<(), BitmexWsError> {
1052 let symbol = instrument_id.symbol.inner();
1053
1054 if is_index_symbol(&symbol) {
1056 return Ok(());
1057 }
1058
1059 let topic = BitmexWsTopic::Trade;
1060 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1061 }
1062
1063 pub async fn unsubscribe_mark_prices(
1069 &self,
1070 instrument_id: InstrumentId,
1071 ) -> Result<(), BitmexWsError> {
1072 log::debug!(
1074 "Mark prices for {instrument_id} uses shared instrument channel, skipping unsubscribe"
1075 );
1076 Ok(())
1077 }
1078
1079 pub async fn unsubscribe_index_prices(
1085 &self,
1086 instrument_id: InstrumentId,
1087 ) -> Result<(), BitmexWsError> {
1088 log::debug!(
1090 "Index prices for {instrument_id} uses shared instrument channel, skipping unsubscribe"
1091 );
1092 Ok(())
1093 }
1094
1095 pub async fn unsubscribe_funding_rates(
1101 &self,
1102 instrument_id: InstrumentId,
1103 ) -> Result<(), BitmexWsError> {
1104 log::debug!(
1106 "Funding rates for {instrument_id}, skipping unsubscribe to avoid shutdown race"
1107 );
1108 Ok(())
1109 }
1110
1111 pub async fn unsubscribe_bars(&self, bar_type: BarType) -> Result<(), BitmexWsError> {
1117 let topic = topic_from_bar_spec(bar_type.spec());
1118 let symbol = bar_type.instrument_id().symbol.inner();
1119 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1120 }
1121
1122 pub async fn subscribe_orders(&self) -> Result<(), BitmexWsError> {
1128 if self.credential.is_none() {
1129 return Err(BitmexWsError::MissingCredentials);
1130 }
1131 self.subscribe(vec![BitmexWsAuthChannel::Order.to_string()])
1132 .await
1133 }
1134
1135 pub async fn subscribe_executions(&self) -> Result<(), BitmexWsError> {
1141 if self.credential.is_none() {
1142 return Err(BitmexWsError::MissingCredentials);
1143 }
1144 self.subscribe(vec![BitmexWsAuthChannel::Execution.to_string()])
1145 .await
1146 }
1147
1148 pub async fn subscribe_positions(&self) -> Result<(), BitmexWsError> {
1154 if self.credential.is_none() {
1155 return Err(BitmexWsError::MissingCredentials);
1156 }
1157 self.subscribe(vec![BitmexWsAuthChannel::Position.to_string()])
1158 .await
1159 }
1160
1161 pub async fn subscribe_margin(&self) -> Result<(), BitmexWsError> {
1167 if self.credential.is_none() {
1168 return Err(BitmexWsError::MissingCredentials);
1169 }
1170 self.subscribe(vec![BitmexWsAuthChannel::Margin.to_string()])
1171 .await
1172 }
1173
1174 pub async fn subscribe_wallet(&self) -> Result<(), BitmexWsError> {
1180 if self.credential.is_none() {
1181 return Err(BitmexWsError::MissingCredentials);
1182 }
1183 self.subscribe(vec![BitmexWsAuthChannel::Wallet.to_string()])
1184 .await
1185 }
1186
1187 pub async fn unsubscribe_orders(&self) -> Result<(), BitmexWsError> {
1193 self.unsubscribe(vec![BitmexWsAuthChannel::Order.to_string()])
1194 .await
1195 }
1196
1197 pub async fn unsubscribe_executions(&self) -> Result<(), BitmexWsError> {
1203 self.unsubscribe(vec![BitmexWsAuthChannel::Execution.to_string()])
1204 .await
1205 }
1206
1207 pub async fn unsubscribe_positions(&self) -> Result<(), BitmexWsError> {
1213 self.unsubscribe(vec![BitmexWsAuthChannel::Position.to_string()])
1214 .await
1215 }
1216
1217 pub async fn unsubscribe_margin(&self) -> Result<(), BitmexWsError> {
1223 self.unsubscribe(vec![BitmexWsAuthChannel::Margin.to_string()])
1224 .await
1225 }
1226
1227 pub async fn unsubscribe_wallet(&self) -> Result<(), BitmexWsError> {
1233 self.unsubscribe(vec![BitmexWsAuthChannel::Wallet.to_string()])
1234 .await
1235 }
1236
1237 async fn send_cmd(&self, cmd: HandlerCommand) -> Result<(), BitmexWsError> {
1239 self.cmd_tx
1240 .read()
1241 .await
1242 .send(cmd)
1243 .map_err(|e| BitmexWsError::ClientError(format!("Handler not available: {e}")))
1244 }
1245}
1246
1247#[cfg(test)]
1248mod tests {
1249 use ahash::AHashSet;
1250 use rstest::rstest;
1251 use ustr::Ustr;
1252
1253 use super::*;
1254
1255 #[rstest]
1256 fn test_reconnect_topics_restoration_logic() {
1257 let client = BitmexWebSocketClient::new(
1259 Some("ws://test.com".to_string()),
1260 Some("test_key".to_string()),
1261 Some("test_secret".to_string()),
1262 Some(AccountId::new("BITMEX-TEST")),
1263 None,
1264 )
1265 .unwrap();
1266
1267 let subs = client.subscriptions.confirmed();
1269 subs.insert(Ustr::from(BitmexWsTopic::Trade.as_ref()), {
1270 let mut set = AHashSet::new();
1271 set.insert(Ustr::from("XBTUSD"));
1272 set.insert(Ustr::from("ETHUSD"));
1273 set
1274 });
1275
1276 subs.insert(Ustr::from(BitmexWsTopic::OrderBookL2.as_ref()), {
1277 let mut set = AHashSet::new();
1278 set.insert(Ustr::from("XBTUSD"));
1279 set
1280 });
1281
1282 subs.insert(Ustr::from(BitmexWsAuthChannel::Order.as_ref()), {
1284 let mut set = AHashSet::new();
1285 set.insert(Ustr::from(""));
1286 set
1287 });
1288 subs.insert(Ustr::from(BitmexWsAuthChannel::Position.as_ref()), {
1289 let mut set = AHashSet::new();
1290 set.insert(Ustr::from(""));
1291 set
1292 });
1293
1294 let mut topics_to_restore = Vec::new();
1296 for entry in subs.iter() {
1297 let (channel, symbols) = entry.pair();
1298 for symbol in symbols {
1299 if symbol.is_empty() {
1300 topics_to_restore.push(channel.to_string());
1301 } else {
1302 topics_to_restore.push(format!("{channel}:{symbol}"));
1303 }
1304 }
1305 }
1306
1307 assert!(topics_to_restore.contains(&format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref())));
1309 assert!(topics_to_restore.contains(&format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref())));
1310 assert!(
1311 topics_to_restore.contains(&format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref()))
1312 );
1313 assert!(topics_to_restore.contains(&BitmexWsAuthChannel::Order.as_ref().to_string()));
1314 assert!(topics_to_restore.contains(&BitmexWsAuthChannel::Position.as_ref().to_string()));
1315 assert_eq!(topics_to_restore.len(), 5);
1316 }
1317
1318 #[rstest]
1319 fn test_reconnect_auth_message_building() {
1320 let client_with_creds = BitmexWebSocketClient::new(
1322 Some("ws://test.com".to_string()),
1323 Some("test_key".to_string()),
1324 Some("test_secret".to_string()),
1325 Some(AccountId::new("BITMEX-TEST")),
1326 None,
1327 )
1328 .unwrap();
1329
1330 if let Some(cred) = &client_with_creds.credential {
1332 let expires = (chrono::Utc::now() + chrono::Duration::seconds(30)).timestamp();
1333 let signature = cred.sign("GET", "/realtime", expires, "");
1334
1335 let auth_message = BitmexAuthentication {
1336 op: BitmexWsAuthAction::AuthKeyExpires,
1337 args: (cred.api_key.to_string(), expires, signature),
1338 };
1339
1340 assert_eq!(auth_message.op, BitmexWsAuthAction::AuthKeyExpires);
1342 assert_eq!(auth_message.args.0, "test_key");
1343 assert!(auth_message.args.1 > 0); assert!(!auth_message.args.2.is_empty()); } else {
1346 panic!("Client should have credentials");
1347 }
1348
1349 let client_no_creds = BitmexWebSocketClient::new(
1351 Some("ws://test.com".to_string()),
1352 None,
1353 None,
1354 Some(AccountId::new("BITMEX-TEST")),
1355 None,
1356 )
1357 .unwrap();
1358
1359 assert!(client_no_creds.credential.is_none());
1360 }
1361
1362 #[rstest]
1363 fn test_subscription_state_after_unsubscribe() {
1364 let client = BitmexWebSocketClient::new(
1365 Some("ws://test.com".to_string()),
1366 Some("test_key".to_string()),
1367 Some("test_secret".to_string()),
1368 Some(AccountId::new("BITMEX-TEST")),
1369 None,
1370 )
1371 .unwrap();
1372
1373 let subs = client.subscriptions.confirmed();
1375 subs.insert(Ustr::from(BitmexWsTopic::Trade.as_ref()), {
1376 let mut set = AHashSet::new();
1377 set.insert(Ustr::from("XBTUSD"));
1378 set.insert(Ustr::from("ETHUSD"));
1379 set
1380 });
1381
1382 subs.insert(Ustr::from(BitmexWsTopic::OrderBookL2.as_ref()), {
1383 let mut set = AHashSet::new();
1384 set.insert(Ustr::from("XBTUSD"));
1385 set
1386 });
1387
1388 let topic = format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref());
1390 if let Some((channel, symbol)) = topic.split_once(':')
1391 && let Some(mut entry) = subs.get_mut(&Ustr::from(channel))
1392 {
1393 entry.remove(&Ustr::from(symbol));
1394 if entry.is_empty() {
1395 drop(entry);
1396 subs.remove(&Ustr::from(channel));
1397 }
1398 }
1399
1400 let mut topics_to_restore = Vec::new();
1402 for entry in subs.iter() {
1403 let (channel, symbols) = entry.pair();
1404 for symbol in symbols {
1405 if symbol.is_empty() {
1406 topics_to_restore.push(channel.to_string());
1407 } else {
1408 topics_to_restore.push(format!("{channel}:{symbol}"));
1409 }
1410 }
1411 }
1412
1413 let trade_xbt = format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref());
1415 let trade_eth = format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref());
1416 let book_xbt = format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref());
1417
1418 assert!(topics_to_restore.contains(&trade_xbt));
1419 assert!(!topics_to_restore.contains(&trade_eth));
1420 assert!(topics_to_restore.contains(&book_xbt));
1421 assert_eq!(topics_to_restore.len(), 2);
1422 }
1423
1424 #[rstest]
1425 fn test_race_unsubscribe_failure_recovery() {
1426 let client = BitmexWebSocketClient::new(
1432 Some("ws://test.com".to_string()),
1433 None,
1434 None,
1435 Some(AccountId::new("BITMEX-TEST")),
1436 None,
1437 )
1438 .unwrap();
1439
1440 let topic = format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref());
1441
1442 client.subscriptions.mark_subscribe(&topic);
1444 client.subscriptions.confirm_subscribe(&topic);
1445 assert_eq!(client.subscriptions.len(), 1);
1446
1447 client.subscriptions.mark_unsubscribe(&topic);
1449 assert_eq!(client.subscriptions.len(), 0);
1450 assert_eq!(
1451 client.subscriptions.pending_unsubscribe_topics(),
1452 vec![topic.clone()]
1453 );
1454
1455 client.subscriptions.confirm_unsubscribe(&topic); client.subscriptions.mark_subscribe(&topic); client.subscriptions.confirm_subscribe(&topic); assert_eq!(client.subscriptions.len(), 1);
1463 assert!(client.subscriptions.pending_unsubscribe_topics().is_empty());
1464 assert!(client.subscriptions.pending_subscribe_topics().is_empty());
1465
1466 let all = client.subscriptions.all_topics();
1468 assert_eq!(all.len(), 1);
1469 assert!(all.contains(&topic));
1470 }
1471
1472 #[rstest]
1473 fn test_race_resubscribe_before_unsubscribe_ack() {
1474 let client = BitmexWebSocketClient::new(
1478 Some("ws://test.com".to_string()),
1479 None,
1480 None,
1481 Some(AccountId::new("BITMEX-TEST")),
1482 None,
1483 )
1484 .unwrap();
1485
1486 let topic = format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref());
1487
1488 client.subscriptions.mark_subscribe(&topic);
1490 client.subscriptions.confirm_subscribe(&topic);
1491 assert_eq!(client.subscriptions.len(), 1);
1492
1493 client.subscriptions.mark_unsubscribe(&topic);
1495 assert_eq!(client.subscriptions.len(), 0);
1496 assert_eq!(
1497 client.subscriptions.pending_unsubscribe_topics(),
1498 vec![topic.clone()]
1499 );
1500
1501 client.subscriptions.mark_subscribe(&topic);
1503 assert_eq!(
1504 client.subscriptions.pending_subscribe_topics(),
1505 vec![topic.clone()]
1506 );
1507
1508 client.subscriptions.confirm_unsubscribe(&topic);
1510 assert!(client.subscriptions.pending_unsubscribe_topics().is_empty());
1511 assert_eq!(
1512 client.subscriptions.pending_subscribe_topics(),
1513 vec![topic.clone()]
1514 );
1515
1516 client.subscriptions.confirm_subscribe(&topic);
1518 assert_eq!(client.subscriptions.len(), 1);
1519 assert!(client.subscriptions.pending_subscribe_topics().is_empty());
1520
1521 let all = client.subscriptions.all_topics();
1523 assert_eq!(all.len(), 1);
1524 assert!(all.contains(&topic));
1525 }
1526
1527 #[rstest]
1528 fn test_race_channel_level_reconnection_with_pending_states() {
1529 let client = BitmexWebSocketClient::new(
1531 Some("ws://test.com".to_string()),
1532 Some("test_key".to_string()),
1533 Some("test_secret".to_string()),
1534 Some(AccountId::new("BITMEX-TEST")),
1535 None,
1536 )
1537 .unwrap();
1538
1539 let trade_xbt = format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref());
1542 client.subscriptions.mark_subscribe(&trade_xbt);
1543 client.subscriptions.confirm_subscribe(&trade_xbt);
1544
1545 let order_channel = BitmexWsAuthChannel::Order.as_ref();
1547 client.subscriptions.mark_subscribe(order_channel);
1548 client.subscriptions.confirm_subscribe(order_channel);
1549
1550 let trade_eth = format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref());
1552 client.subscriptions.mark_subscribe(&trade_eth);
1553
1554 let book_xbt = format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref());
1556 client.subscriptions.mark_subscribe(&book_xbt);
1557 client.subscriptions.confirm_subscribe(&book_xbt);
1558 client.subscriptions.mark_unsubscribe(&book_xbt);
1559
1560 let topics_to_restore = client.subscriptions.all_topics();
1562
1563 assert_eq!(topics_to_restore.len(), 3);
1565 assert!(topics_to_restore.contains(&trade_xbt));
1566 assert!(topics_to_restore.contains(&order_channel.to_string()));
1567 assert!(topics_to_restore.contains(&trade_eth));
1568 assert!(!topics_to_restore.contains(&book_xbt)); for topic in &topics_to_restore {
1573 if topic == order_channel {
1574 assert!(
1575 !topic.contains(':'),
1576 "Channel-level topic should not have delimiter"
1577 );
1578 }
1579 }
1580 }
1581}