1use std::sync::{
24 Arc,
25 atomic::{AtomicBool, AtomicU8, Ordering},
26};
27
28use arc_swap::ArcSwap;
29use dashmap::DashMap;
30use futures_util::Stream;
31use nautilus_common::runtime::get_runtime;
32use nautilus_core::{consts::NAUTILUS_USER_AGENT, env::get_env_var};
33use nautilus_model::{
34 data::bar::BarType,
35 enums::OrderType,
36 identifiers::{AccountId, ClientOrderId, InstrumentId},
37 instruments::{Instrument, InstrumentAny},
38};
39use nautilus_network::{
40 mode::ConnectionMode,
41 websocket::{
42 AUTHENTICATION_TIMEOUT_SECS, AuthTracker, PingHandler, SubscriptionState, WebSocketClient,
43 WebSocketConfig, channel_message_handler,
44 },
45};
46use reqwest::header::USER_AGENT;
47use tokio::time::Duration;
48use tokio_tungstenite::tungstenite::Message;
49use ustr::Ustr;
50
51use super::{
52 enums::{BitmexWsAuthAction, BitmexWsAuthChannel, BitmexWsOperation, BitmexWsTopic},
53 error::BitmexWsError,
54 handler::{FeedHandler, HandlerCommand},
55 messages::{BitmexAuthentication, BitmexSubscription, NautilusWsMessage},
56 parse::{is_index_symbol, topic_from_bar_spec},
57};
58use crate::common::{
59 consts::{BITMEX_WS_TOPIC_DELIMITER, BITMEX_WS_URL},
60 credential::Credential,
61};
62
63#[derive(Clone, Debug)]
71#[cfg_attr(
72 feature = "python",
73 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
74)]
75pub struct BitmexWebSocketClient {
76 url: String,
77 credential: Option<Credential>,
78 heartbeat: Option<u64>,
79 account_id: AccountId,
80 auth_tracker: AuthTracker,
81 signal: Arc<AtomicBool>,
82 connection_mode: Arc<ArcSwap<AtomicU8>>,
83 cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
84 out_rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>>,
85 task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
86 subscriptions: SubscriptionState,
87 tracked_subscriptions: Arc<DashMap<String, ()>>,
88 instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
89 order_type_cache: Arc<DashMap<ClientOrderId, OrderType>>,
90 order_symbol_cache: Arc<DashMap<ClientOrderId, Ustr>>,
91}
92
93impl BitmexWebSocketClient {
94 pub fn new(
100 url: Option<String>,
101 api_key: Option<String>,
102 api_secret: Option<String>,
103 account_id: Option<AccountId>,
104 heartbeat: Option<u64>,
105 ) -> anyhow::Result<Self> {
106 let credential = match (api_key, api_secret) {
107 (Some(key), Some(secret)) => Some(Credential::new(key, secret)),
108 (None, None) => None,
109 _ => anyhow::bail!("Both `api_key` and `api_secret` must be provided together"),
110 };
111
112 let account_id = account_id.unwrap_or(AccountId::from("BITMEX-master"));
113
114 let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
115 let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
116
117 let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
121
122 Ok(Self {
123 url: url.unwrap_or(BITMEX_WS_URL.to_string()),
124 credential,
125 heartbeat,
126 account_id,
127 auth_tracker: AuthTracker::new(),
128 signal: Arc::new(AtomicBool::new(false)),
129 connection_mode,
130 cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
131 out_rx: None,
132 task_handle: None,
133 subscriptions: SubscriptionState::new(BITMEX_WS_TOPIC_DELIMITER),
134 tracked_subscriptions: Arc::new(DashMap::new()),
135 instruments_cache: Arc::new(DashMap::new()),
136 order_type_cache: Arc::new(DashMap::new()),
137 order_symbol_cache: Arc::new(DashMap::new()),
138 })
139 }
140
141 pub fn from_env() -> anyhow::Result<Self> {
147 let url = get_env_var("BITMEX_WS_URL")?;
148 let api_key = get_env_var("BITMEX_API_KEY")?;
149 let api_secret = get_env_var("BITMEX_API_SECRET")?;
150
151 Self::new(Some(url), Some(api_key), Some(api_secret), None, None)
152 }
153
154 #[must_use]
156 pub const fn url(&self) -> &str {
157 self.url.as_str()
158 }
159
160 #[must_use]
162 pub fn api_key(&self) -> Option<&str> {
163 self.credential.as_ref().map(|c| c.api_key.as_str())
164 }
165
166 #[must_use]
168 pub fn api_key_masked(&self) -> Option<String> {
169 self.credential.as_ref().map(|c| c.api_key_masked())
170 }
171
172 #[must_use]
174 pub fn is_active(&self) -> bool {
175 let connection_mode_arc = self.connection_mode.load();
176 ConnectionMode::from_atomic(&connection_mode_arc).is_active()
177 && !self.signal.load(Ordering::Relaxed)
178 }
179
180 #[must_use]
182 pub fn is_closed(&self) -> bool {
183 let connection_mode_arc = self.connection_mode.load();
184 ConnectionMode::from_atomic(&connection_mode_arc).is_closed()
185 || self.signal.load(Ordering::Relaxed)
186 }
187
188 pub fn set_account_id(&mut self, account_id: AccountId) {
190 self.account_id = account_id;
191 }
192
193 pub fn cache_instruments(&mut self, instruments: Vec<InstrumentAny>) {
197 self.instruments_cache.clear();
198 let mut count = 0;
199
200 log::debug!("Initializing BitMEX instrument cache");
201
202 for inst in instruments {
203 let symbol = inst.symbol().inner();
204 self.instruments_cache.insert(symbol, inst.clone());
205 log::debug!("Cached instrument: {symbol}");
206 count += 1;
207 }
208
209 log::info!("BitMEX instrument cache initialized with {count} instruments");
210 }
211
212 pub fn cache_instrument(&self, instrument: InstrumentAny) {
216 self.instruments_cache
217 .insert(instrument.symbol().inner(), instrument.clone());
218
219 if let Ok(cmd_tx) = self.cmd_tx.try_read()
222 && let Err(e) = cmd_tx.send(HandlerCommand::UpdateInstrument(instrument))
223 {
224 log::debug!("Failed to send instrument update to handler: {e}");
225 }
226 }
227
228 pub async fn connect(&mut self) -> Result<(), BitmexWsError> {
238 let (client, raw_rx) = self.connect_inner().await?;
239
240 self.connection_mode.store(client.connection_mode_atomic());
242
243 let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
244 self.out_rx = Some(Arc::new(out_rx));
245
246 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
247 *self.cmd_tx.write().await = cmd_tx.clone();
248
249 if let Err(e) = cmd_tx.send(HandlerCommand::SetClient(client)) {
251 return Err(BitmexWsError::ClientError(format!(
252 "Failed to send WebSocketClient to handler: {e}"
253 )));
254 }
255
256 if !self.instruments_cache.is_empty() {
258 let cached_instruments: Vec<InstrumentAny> = self
259 .instruments_cache
260 .iter()
261 .map(|entry| entry.value().clone())
262 .collect();
263 if let Err(e) = cmd_tx.send(HandlerCommand::InitializeInstruments(cached_instruments)) {
264 tracing::error!("Failed to replay instruments to handler: {e}");
265 }
266 }
267
268 let signal = self.signal.clone();
269 let account_id = self.account_id;
270 let credential = self.credential.clone();
271 let auth_tracker = self.auth_tracker.clone();
272 let subscriptions = self.subscriptions.clone();
273 let order_type_cache = self.order_type_cache.clone();
274 let order_symbol_cache = self.order_symbol_cache.clone();
275 let cmd_tx_for_reconnect = cmd_tx.clone();
276
277 let stream_handle = get_runtime().spawn(async move {
278 let mut handler = FeedHandler::new(
279 signal.clone(),
280 cmd_rx,
281 raw_rx,
282 out_tx,
283 account_id,
284 auth_tracker.clone(),
285 subscriptions.clone(),
286 order_type_cache,
287 order_symbol_cache,
288 );
289
290 let resubscribe_all = || {
292 let topics = subscriptions.all_topics();
294
295 if topics.is_empty() {
296 return;
297 }
298
299 tracing::debug!(count = topics.len(), "Resubscribing to confirmed subscriptions");
300
301 for topic in &topics {
302 subscriptions.mark_subscribe(topic.as_str());
303 }
304
305 let mut payloads = Vec::with_capacity(topics.len());
307 for topic in &topics {
308 let message = BitmexSubscription {
309 op: BitmexWsOperation::Subscribe,
310 args: vec![Ustr::from(topic.as_ref())],
311 };
312 if let Ok(payload) = serde_json::to_string(&message) {
313 payloads.push(payload);
314 }
315 }
316
317 if let Err(e) = cmd_tx_for_reconnect.send(HandlerCommand::Subscribe { topics: payloads }) {
318 tracing::error!(error = %e, "Failed to send resubscribe command");
319 }
320 };
321
322 loop {
324 match handler.next().await {
325 Some(NautilusWsMessage::Reconnected) => {
326 if signal.load(Ordering::Relaxed) {
327 continue;
328 }
329
330 log::info!("WebSocket reconnected");
331
332 let confirmed_topics: Vec<String> = {
334 let confirmed = subscriptions.confirmed();
335 let mut topics = Vec::new();
336
337 for entry in confirmed.iter() {
338 let (channel, symbols) = entry.pair();
339
340 if *channel == BitmexWsTopic::Instrument.as_ref() {
341 continue;
342 }
343
344 for symbol in symbols.iter() {
345 if symbol.is_empty() {
346 topics.push(channel.to_string());
347 } else {
348 topics.push(format!("{channel}:{symbol}"));
349 }
350 }
351 }
352
353 topics
354 };
355
356 if !confirmed_topics.is_empty() {
357 tracing::debug!(count = confirmed_topics.len(), "Marking confirmed subscriptions as pending for replay");
358 for topic in confirmed_topics {
359 subscriptions.mark_failure(&topic);
360 }
361 }
362
363 if let Some(cred) = &credential {
364 tracing::debug!("Re-authenticating after reconnection");
365
366 let expires = (chrono::Utc::now() + chrono::Duration::seconds(30)).timestamp();
367 let signature = cred.sign("GET", "/realtime", expires, "");
368
369 let auth_message = BitmexAuthentication {
370 op: BitmexWsAuthAction::AuthKeyExpires,
371 args: (cred.api_key.to_string(), expires, signature),
372 };
373
374 if let Ok(payload) = serde_json::to_string(&auth_message) {
375 if let Err(e) = cmd_tx_for_reconnect.send(HandlerCommand::Authenticate { payload }) {
376 tracing::error!(error = %e, "Failed to send reconnection auth command");
377 }
378 } else {
379 tracing::error!("Failed to serialize reconnection auth message");
380 }
381 }
382
383 if credential.is_none() {
386 tracing::debug!("No authentication required, resubscribing immediately");
387 resubscribe_all();
388 }
389
390 continue;
395 }
396 Some(NautilusWsMessage::Authenticated) => {
397 tracing::debug!("Authenticated after reconnection, resubscribing");
398 resubscribe_all();
399 continue;
400 }
401 Some(msg) => {
402 if handler.send(msg).is_err() {
403 tracing::error!("Failed to send message (receiver dropped)");
404 break;
405 }
406 }
407 None => {
408 if handler.is_stopped() {
410 tracing::debug!("Stop signal received, ending message processing");
411 break;
412 }
413 tracing::warn!("WebSocket stream ended unexpectedly");
415 break;
416 }
417 }
418 }
419
420 tracing::debug!("Handler task exiting");
421 });
422
423 self.task_handle = Some(Arc::new(stream_handle));
424
425 if self.credential.is_some()
426 && let Err(e) = self.authenticate().await
427 {
428 return Err(e);
429 }
430
431 let instrument_topic = BitmexWsTopic::Instrument.as_ref().to_string();
433 self.subscriptions.mark_subscribe(&instrument_topic);
434 self.tracked_subscriptions.insert(instrument_topic, ());
435
436 let subscribe_msg = BitmexSubscription {
437 op: BitmexWsOperation::Subscribe,
438 args: vec![Ustr::from(BitmexWsTopic::Instrument.as_ref())],
439 };
440
441 match serde_json::to_string(&subscribe_msg) {
442 Ok(subscribe_json) => {
443 if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Subscribe {
444 topics: vec![subscribe_json],
445 }) {
446 log::error!("Failed to send subscribe command for instruments: {e}");
447 } else {
448 log::debug!("Subscribed to all instruments");
449 }
450 }
451 Err(e) => {
452 tracing::error!(error = %e, "Failed to serialize subscribe message");
453 }
454 }
455
456 Ok(())
457 }
458
459 async fn connect_inner(
465 &mut self,
466 ) -> Result<
467 (
468 WebSocketClient,
469 tokio::sync::mpsc::UnboundedReceiver<Message>,
470 ),
471 BitmexWsError,
472 > {
473 let (message_handler, rx) = channel_message_handler();
474
475 let ping_handler: PingHandler = Arc::new(move |_payload: Vec<u8>| {
478 });
480
481 let config = WebSocketConfig {
482 url: self.url.clone(),
483 headers: vec![(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())],
484 heartbeat: self.heartbeat,
485 heartbeat_msg: None,
486 message_handler: Some(message_handler),
487 ping_handler: Some(ping_handler),
488 reconnect_timeout_ms: Some(5_000),
489 reconnect_delay_initial_ms: None, reconnect_delay_max_ms: None, reconnect_backoff_factor: None, reconnect_jitter_ms: None, reconnect_max_attempts: None,
494 };
495
496 let keyed_quotas = vec![];
497 let client = WebSocketClient::connect(
498 config,
499 None, keyed_quotas,
501 None, )
503 .await
504 .map_err(|e| BitmexWsError::ClientError(e.to_string()))?;
505
506 Ok((client, rx))
507 }
508
509 async fn authenticate(&self) -> Result<(), BitmexWsError> {
516 let credential = match &self.credential {
517 Some(credential) => credential,
518 None => {
519 return Err(BitmexWsError::AuthenticationError(
520 "API credentials not available to authenticate".to_string(),
521 ));
522 }
523 };
524
525 let receiver = self.auth_tracker.begin();
526
527 let expires = (chrono::Utc::now() + chrono::Duration::seconds(30)).timestamp();
528 let signature = credential.sign("GET", "/realtime", expires, "");
529
530 let auth_message = BitmexAuthentication {
531 op: BitmexWsAuthAction::AuthKeyExpires,
532 args: (credential.api_key.to_string(), expires, signature),
533 };
534
535 let auth_json = serde_json::to_string(&auth_message).map_err(|e| {
536 let msg = format!("Failed to serialize auth message: {e}");
537 self.auth_tracker.fail(msg.clone());
538 BitmexWsError::AuthenticationError(msg)
539 })?;
540
541 self.cmd_tx
543 .read()
544 .await
545 .send(HandlerCommand::Authenticate { payload: auth_json })
546 .map_err(|e| {
547 let msg = format!("Failed to send authenticate command: {e}");
548 self.auth_tracker.fail(msg.clone());
549 BitmexWsError::AuthenticationError(msg)
550 })?;
551
552 self.auth_tracker
553 .wait_for_result::<BitmexWsError>(
554 Duration::from_secs(AUTHENTICATION_TIMEOUT_SECS),
555 receiver,
556 )
557 .await
558 }
559
560 pub async fn wait_until_active(&self, timeout_secs: f64) -> Result<(), BitmexWsError> {
566 let timeout = tokio::time::Duration::from_secs_f64(timeout_secs);
567
568 tokio::time::timeout(timeout, async {
569 while !self.is_active() {
570 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
571 }
572 })
573 .await
574 .map_err(|_| {
575 BitmexWsError::ClientError(format!(
576 "WebSocket connection timeout after {timeout_secs} seconds"
577 ))
578 })?;
579
580 Ok(())
581 }
582
583 pub fn stream(&mut self) -> impl Stream<Item = NautilusWsMessage> + use<> {
591 let rx = self
592 .out_rx
593 .take()
594 .expect("Stream receiver already taken or not connected");
595 let mut rx = Arc::try_unwrap(rx).expect("Cannot take ownership - other references exist");
596 async_stream::stream! {
597 while let Some(msg) = rx.recv().await {
598 yield msg;
599 }
600 }
601 }
602
603 pub async fn close(&mut self) -> Result<(), BitmexWsError> {
613 log::debug!("Starting close process");
614
615 self.signal.store(true, Ordering::Relaxed);
616
617 if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Disconnect) {
619 log::debug!(
620 "Failed to send disconnect command (handler may already be shut down): {e}"
621 );
622 }
623
624 if let Some(task_handle) = self.task_handle.take() {
626 match Arc::try_unwrap(task_handle) {
627 Ok(handle) => {
628 log::debug!("Waiting for task handle to complete");
629 match tokio::time::timeout(Duration::from_secs(2), handle).await {
630 Ok(Ok(())) => log::debug!("Task handle completed successfully"),
631 Ok(Err(e)) => log::error!("Task handle encountered an error: {e:?}"),
632 Err(_) => {
633 log::warn!(
634 "Timeout waiting for task handle, task may still be running"
635 );
636 }
638 }
639 }
640 Err(arc_handle) => {
641 log::debug!(
642 "Cannot take ownership of task handle - other references exist, aborting task"
643 );
644 arc_handle.abort();
645 }
646 }
647 } else {
648 log::debug!("No task handle to await");
649 }
650
651 log::debug!("Closed");
652
653 Ok(())
654 }
655
656 pub async fn subscribe(&self, topics: Vec<String>) -> Result<(), BitmexWsError> {
666 log::debug!("Subscribing to topics: {topics:?}");
667
668 for topic in &topics {
669 self.subscriptions.mark_subscribe(topic.as_str());
670 self.tracked_subscriptions.insert(topic.clone(), ());
671 }
672
673 let mut payloads = Vec::with_capacity(topics.len());
675 for topic in &topics {
676 let message = BitmexSubscription {
677 op: BitmexWsOperation::Subscribe,
678 args: vec![Ustr::from(topic.as_ref())],
679 };
680 let payload = serde_json::to_string(&message).map_err(|e| {
681 BitmexWsError::SubscriptionError(format!("Failed to serialize subscription: {e}"))
682 })?;
683 payloads.push(payload);
684 }
685
686 let cmd = HandlerCommand::Subscribe { topics: payloads };
688
689 self.send_cmd(cmd).await.map_err(|e| {
690 BitmexWsError::SubscriptionError(format!("Failed to send subscribe command: {e}"))
691 })
692 }
693
694 async fn unsubscribe(&self, topics: Vec<String>) -> Result<(), BitmexWsError> {
700 log::debug!("Attempting to unsubscribe from topics: {topics:?}");
701
702 if self.signal.load(Ordering::Relaxed) {
703 log::debug!("Shutdown signal detected, skipping unsubscribe");
704 return Ok(());
705 }
706
707 for topic in &topics {
708 self.subscriptions.mark_unsubscribe(topic.as_str());
709 self.tracked_subscriptions.remove(topic);
710 }
711
712 let mut payloads = Vec::with_capacity(topics.len());
714 for topic in &topics {
715 let message = BitmexSubscription {
716 op: BitmexWsOperation::Unsubscribe,
717 args: vec![Ustr::from(topic.as_ref())],
718 };
719 if let Ok(payload) = serde_json::to_string(&message) {
720 payloads.push(payload);
721 }
722 }
723
724 let cmd = HandlerCommand::Unsubscribe { topics: payloads };
726
727 if let Err(e) = self.send_cmd(cmd).await {
728 tracing::debug!(error = %e, "Failed to send unsubscribe command");
729 }
730
731 Ok(())
732 }
733
734 #[must_use]
736 pub fn subscription_count(&self) -> usize {
737 self.subscriptions.len()
738 }
739
740 pub fn get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
741 let symbol = instrument_id.symbol.inner();
742 let confirmed = self.subscriptions.confirmed();
743 let mut channels = Vec::with_capacity(confirmed.len());
744
745 for entry in confirmed.iter() {
746 let (channel, symbols) = entry.pair();
747 if symbols.contains(&symbol) {
748 channels.push(format!("{channel}:{symbol}"));
750 } else {
751 let has_channel_marker = symbols.iter().any(|s| s.is_empty());
752 if has_channel_marker
753 && (*channel == BitmexWsAuthChannel::Execution.as_ref()
754 || *channel == BitmexWsAuthChannel::Order.as_ref())
755 {
756 channels.push(channel.to_string());
758 }
759 }
760 }
761
762 channels
763 }
764
765 pub async fn subscribe_instruments(&self) -> Result<(), BitmexWsError> {
771 log::debug!("Already subscribed to all instruments on connection, skipping");
773 Ok(())
774 }
775
776 pub async fn subscribe_instrument(
782 &self,
783 instrument_id: InstrumentId,
784 ) -> Result<(), BitmexWsError> {
785 log::debug!(
787 "Already subscribed to all instruments on connection (includes {instrument_id}), skipping"
788 );
789 Ok(())
790 }
791
792 pub async fn subscribe_book(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
798 let topic = BitmexWsTopic::OrderBookL2;
799 let symbol = instrument_id.symbol.inner();
800 self.subscribe(vec![format!("{topic}:{symbol}")]).await
801 }
802
803 pub async fn subscribe_book_25(
809 &self,
810 instrument_id: InstrumentId,
811 ) -> Result<(), BitmexWsError> {
812 let topic = BitmexWsTopic::OrderBookL2_25;
813 let symbol = instrument_id.symbol.inner();
814 self.subscribe(vec![format!("{topic}:{symbol}")]).await
815 }
816
817 pub async fn subscribe_book_depth10(
823 &self,
824 instrument_id: InstrumentId,
825 ) -> Result<(), BitmexWsError> {
826 let topic = BitmexWsTopic::OrderBook10;
827 let symbol = instrument_id.symbol.inner();
828 self.subscribe(vec![format!("{topic}:{symbol}")]).await
829 }
830
831 pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
839 let symbol = instrument_id.symbol.inner();
840
841 if is_index_symbol(&instrument_id.symbol.inner()) {
843 tracing::warn!("Ignoring quote subscription for index symbol: {symbol}");
844 return Ok(());
845 }
846
847 let topic = BitmexWsTopic::Quote;
848 self.subscribe(vec![format!("{topic}:{symbol}")]).await
849 }
850
851 pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
859 let symbol = instrument_id.symbol.inner();
860
861 if is_index_symbol(&symbol) {
863 tracing::warn!("Ignoring trade subscription for index symbol: {symbol}");
864 return Ok(());
865 }
866
867 let topic = BitmexWsTopic::Trade;
868 self.subscribe(vec![format!("{topic}:{symbol}")]).await
869 }
870
871 pub async fn subscribe_mark_prices(
877 &self,
878 instrument_id: InstrumentId,
879 ) -> Result<(), BitmexWsError> {
880 self.subscribe_instrument(instrument_id).await
881 }
882
883 pub async fn subscribe_index_prices(
889 &self,
890 instrument_id: InstrumentId,
891 ) -> Result<(), BitmexWsError> {
892 self.subscribe_instrument(instrument_id).await
893 }
894
895 pub async fn subscribe_funding_rates(
901 &self,
902 instrument_id: InstrumentId,
903 ) -> Result<(), BitmexWsError> {
904 let topic = BitmexWsTopic::Funding;
905 let symbol = instrument_id.symbol.inner();
906 self.subscribe(vec![format!("{topic}:{symbol}")]).await
907 }
908
909 pub async fn subscribe_bars(&self, bar_type: BarType) -> Result<(), BitmexWsError> {
915 let topic = topic_from_bar_spec(bar_type.spec());
916 let symbol = bar_type.instrument_id().symbol.to_string();
917 self.subscribe(vec![format!("{topic}:{symbol}")]).await
918 }
919
920 pub async fn unsubscribe_instruments(&self) -> Result<(), BitmexWsError> {
926 log::debug!(
928 "Instruments subscription maintained for proper operation, skipping unsubscribe"
929 );
930 Ok(())
931 }
932
933 pub async fn unsubscribe_instrument(
939 &self,
940 instrument_id: InstrumentId,
941 ) -> Result<(), BitmexWsError> {
942 log::debug!(
944 "Instruments subscription maintained for proper operation (includes {instrument_id}), skipping unsubscribe"
945 );
946 Ok(())
947 }
948
949 pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
955 let topic = BitmexWsTopic::OrderBookL2;
956 let symbol = instrument_id.symbol.inner();
957 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
958 }
959
960 pub async fn unsubscribe_book_25(
966 &self,
967 instrument_id: InstrumentId,
968 ) -> Result<(), BitmexWsError> {
969 let topic = BitmexWsTopic::OrderBookL2_25;
970 let symbol = instrument_id.symbol.inner();
971 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
972 }
973
974 pub async fn unsubscribe_book_depth10(
980 &self,
981 instrument_id: InstrumentId,
982 ) -> Result<(), BitmexWsError> {
983 let topic = BitmexWsTopic::OrderBook10;
984 let symbol = instrument_id.symbol.inner();
985 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
986 }
987
988 pub async fn unsubscribe_quotes(
994 &self,
995 instrument_id: InstrumentId,
996 ) -> Result<(), BitmexWsError> {
997 let symbol = instrument_id.symbol.inner();
998
999 if is_index_symbol(&symbol) {
1001 return Ok(());
1002 }
1003
1004 let topic = BitmexWsTopic::Quote;
1005 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1006 }
1007
1008 pub async fn unsubscribe_trades(
1014 &self,
1015 instrument_id: InstrumentId,
1016 ) -> Result<(), BitmexWsError> {
1017 let symbol = instrument_id.symbol.inner();
1018
1019 if is_index_symbol(&symbol) {
1021 return Ok(());
1022 }
1023
1024 let topic = BitmexWsTopic::Trade;
1025 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1026 }
1027
1028 pub async fn unsubscribe_mark_prices(
1034 &self,
1035 instrument_id: InstrumentId,
1036 ) -> Result<(), BitmexWsError> {
1037 log::debug!(
1039 "Mark prices for {instrument_id} uses shared instrument channel, skipping unsubscribe"
1040 );
1041 Ok(())
1042 }
1043
1044 pub async fn unsubscribe_index_prices(
1050 &self,
1051 instrument_id: InstrumentId,
1052 ) -> Result<(), BitmexWsError> {
1053 log::debug!(
1055 "Index prices for {instrument_id} uses shared instrument channel, skipping unsubscribe"
1056 );
1057 Ok(())
1058 }
1059
1060 pub async fn unsubscribe_funding_rates(
1066 &self,
1067 instrument_id: InstrumentId,
1068 ) -> Result<(), BitmexWsError> {
1069 log::debug!(
1071 "Funding rates for {instrument_id}, skipping unsubscribe to avoid shutdown race"
1072 );
1073 Ok(())
1074 }
1075
1076 pub async fn unsubscribe_bars(&self, bar_type: BarType) -> Result<(), BitmexWsError> {
1082 let topic = topic_from_bar_spec(bar_type.spec());
1083 let symbol = bar_type.instrument_id().symbol.to_string();
1084 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1085 }
1086
1087 pub async fn subscribe_orders(&self) -> Result<(), BitmexWsError> {
1093 if self.credential.is_none() {
1094 return Err(BitmexWsError::MissingCredentials);
1095 }
1096 self.subscribe(vec![BitmexWsAuthChannel::Order.to_string()])
1097 .await
1098 }
1099
1100 pub async fn subscribe_executions(&self) -> Result<(), BitmexWsError> {
1106 if self.credential.is_none() {
1107 return Err(BitmexWsError::MissingCredentials);
1108 }
1109 self.subscribe(vec![BitmexWsAuthChannel::Execution.to_string()])
1110 .await
1111 }
1112
1113 pub async fn subscribe_positions(&self) -> Result<(), BitmexWsError> {
1119 if self.credential.is_none() {
1120 return Err(BitmexWsError::MissingCredentials);
1121 }
1122 self.subscribe(vec![BitmexWsAuthChannel::Position.to_string()])
1123 .await
1124 }
1125
1126 pub async fn subscribe_margin(&self) -> Result<(), BitmexWsError> {
1132 if self.credential.is_none() {
1133 return Err(BitmexWsError::MissingCredentials);
1134 }
1135 self.subscribe(vec![BitmexWsAuthChannel::Margin.to_string()])
1136 .await
1137 }
1138
1139 pub async fn subscribe_wallet(&self) -> Result<(), BitmexWsError> {
1145 if self.credential.is_none() {
1146 return Err(BitmexWsError::MissingCredentials);
1147 }
1148 self.subscribe(vec![BitmexWsAuthChannel::Wallet.to_string()])
1149 .await
1150 }
1151
1152 pub async fn unsubscribe_orders(&self) -> Result<(), BitmexWsError> {
1158 self.unsubscribe(vec![BitmexWsAuthChannel::Order.to_string()])
1159 .await
1160 }
1161
1162 pub async fn unsubscribe_executions(&self) -> Result<(), BitmexWsError> {
1168 self.unsubscribe(vec![BitmexWsAuthChannel::Execution.to_string()])
1169 .await
1170 }
1171
1172 pub async fn unsubscribe_positions(&self) -> Result<(), BitmexWsError> {
1178 self.unsubscribe(vec![BitmexWsAuthChannel::Position.to_string()])
1179 .await
1180 }
1181
1182 pub async fn unsubscribe_margin(&self) -> Result<(), BitmexWsError> {
1188 self.unsubscribe(vec![BitmexWsAuthChannel::Margin.to_string()])
1189 .await
1190 }
1191
1192 pub async fn unsubscribe_wallet(&self) -> Result<(), BitmexWsError> {
1198 self.unsubscribe(vec![BitmexWsAuthChannel::Wallet.to_string()])
1199 .await
1200 }
1201
1202 async fn send_cmd(&self, cmd: HandlerCommand) -> Result<(), BitmexWsError> {
1204 self.cmd_tx
1205 .read()
1206 .await
1207 .send(cmd)
1208 .map_err(|e| BitmexWsError::ClientError(format!("Handler not available: {e}")))
1209 }
1210}
1211
1212#[cfg(test)]
1217mod tests {
1218 use ahash::AHashSet;
1219 use rstest::rstest;
1220 use ustr::Ustr;
1221
1222 use super::*;
1223
1224 #[rstest]
1225 fn test_reconnect_topics_restoration_logic() {
1226 let client = BitmexWebSocketClient::new(
1228 Some("ws://test.com".to_string()),
1229 Some("test_key".to_string()),
1230 Some("test_secret".to_string()),
1231 Some(AccountId::new("BITMEX-TEST")),
1232 None,
1233 )
1234 .unwrap();
1235
1236 let subs = client.subscriptions.confirmed();
1238 subs.insert(Ustr::from(BitmexWsTopic::Trade.as_ref()), {
1239 let mut set = AHashSet::new();
1240 set.insert(Ustr::from("XBTUSD"));
1241 set.insert(Ustr::from("ETHUSD"));
1242 set
1243 });
1244
1245 subs.insert(Ustr::from(BitmexWsTopic::OrderBookL2.as_ref()), {
1246 let mut set = AHashSet::new();
1247 set.insert(Ustr::from("XBTUSD"));
1248 set
1249 });
1250
1251 subs.insert(Ustr::from(BitmexWsAuthChannel::Order.as_ref()), {
1253 let mut set = AHashSet::new();
1254 set.insert(Ustr::from(""));
1255 set
1256 });
1257 subs.insert(Ustr::from(BitmexWsAuthChannel::Position.as_ref()), {
1258 let mut set = AHashSet::new();
1259 set.insert(Ustr::from(""));
1260 set
1261 });
1262
1263 let mut topics_to_restore = Vec::new();
1265 for entry in subs.iter() {
1266 let (channel, symbols) = entry.pair();
1267 for symbol in symbols.iter() {
1268 if symbol.is_empty() {
1269 topics_to_restore.push(channel.to_string());
1270 } else {
1271 topics_to_restore.push(format!("{channel}:{symbol}"));
1272 }
1273 }
1274 }
1275
1276 assert!(topics_to_restore.contains(&format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref())));
1278 assert!(topics_to_restore.contains(&format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref())));
1279 assert!(
1280 topics_to_restore.contains(&format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref()))
1281 );
1282 assert!(topics_to_restore.contains(&BitmexWsAuthChannel::Order.as_ref().to_string()));
1283 assert!(topics_to_restore.contains(&BitmexWsAuthChannel::Position.as_ref().to_string()));
1284 assert_eq!(topics_to_restore.len(), 5);
1285 }
1286
1287 #[rstest]
1288 fn test_reconnect_auth_message_building() {
1289 let client_with_creds = BitmexWebSocketClient::new(
1291 Some("ws://test.com".to_string()),
1292 Some("test_key".to_string()),
1293 Some("test_secret".to_string()),
1294 Some(AccountId::new("BITMEX-TEST")),
1295 None,
1296 )
1297 .unwrap();
1298
1299 if let Some(cred) = &client_with_creds.credential {
1301 let expires = (chrono::Utc::now() + chrono::Duration::seconds(30)).timestamp();
1302 let signature = cred.sign("GET", "/realtime", expires, "");
1303
1304 let auth_message = BitmexAuthentication {
1305 op: BitmexWsAuthAction::AuthKeyExpires,
1306 args: (cred.api_key.to_string(), expires, signature),
1307 };
1308
1309 assert_eq!(auth_message.op, BitmexWsAuthAction::AuthKeyExpires);
1311 assert_eq!(auth_message.args.0, "test_key");
1312 assert!(auth_message.args.1 > 0); assert!(!auth_message.args.2.is_empty()); } else {
1315 panic!("Client should have credentials");
1316 }
1317
1318 let client_no_creds = BitmexWebSocketClient::new(
1320 Some("ws://test.com".to_string()),
1321 None,
1322 None,
1323 Some(AccountId::new("BITMEX-TEST")),
1324 None,
1325 )
1326 .unwrap();
1327
1328 assert!(client_no_creds.credential.is_none());
1329 }
1330
1331 #[rstest]
1332 fn test_subscription_state_after_unsubscribe() {
1333 let client = BitmexWebSocketClient::new(
1334 Some("ws://test.com".to_string()),
1335 Some("test_key".to_string()),
1336 Some("test_secret".to_string()),
1337 Some(AccountId::new("BITMEX-TEST")),
1338 None,
1339 )
1340 .unwrap();
1341
1342 let subs = client.subscriptions.confirmed();
1344 subs.insert(Ustr::from(BitmexWsTopic::Trade.as_ref()), {
1345 let mut set = AHashSet::new();
1346 set.insert(Ustr::from("XBTUSD"));
1347 set.insert(Ustr::from("ETHUSD"));
1348 set
1349 });
1350
1351 subs.insert(Ustr::from(BitmexWsTopic::OrderBookL2.as_ref()), {
1352 let mut set = AHashSet::new();
1353 set.insert(Ustr::from("XBTUSD"));
1354 set
1355 });
1356
1357 let topic = format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref());
1359 if let Some((channel, symbol)) = topic.split_once(':')
1360 && let Some(mut entry) = subs.get_mut(&Ustr::from(channel))
1361 {
1362 entry.remove(&Ustr::from(symbol));
1363 if entry.is_empty() {
1364 drop(entry);
1365 subs.remove(&Ustr::from(channel));
1366 }
1367 }
1368
1369 let mut topics_to_restore = Vec::new();
1371 for entry in subs.iter() {
1372 let (channel, symbols) = entry.pair();
1373 for symbol in symbols.iter() {
1374 if symbol.is_empty() {
1375 topics_to_restore.push(channel.to_string());
1376 } else {
1377 topics_to_restore.push(format!("{channel}:{symbol}"));
1378 }
1379 }
1380 }
1381
1382 let trade_xbt = format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref());
1384 let trade_eth = format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref());
1385 let book_xbt = format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref());
1386
1387 assert!(topics_to_restore.contains(&trade_xbt));
1388 assert!(!topics_to_restore.contains(&trade_eth));
1389 assert!(topics_to_restore.contains(&book_xbt));
1390 assert_eq!(topics_to_restore.len(), 2);
1391 }
1392
1393 #[rstest]
1394 fn test_race_unsubscribe_failure_recovery() {
1395 let client = BitmexWebSocketClient::new(
1401 Some("ws://test.com".to_string()),
1402 None,
1403 None,
1404 Some(AccountId::new("BITMEX-TEST")),
1405 None,
1406 )
1407 .unwrap();
1408
1409 let topic = format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref());
1410
1411 client.subscriptions.mark_subscribe(&topic);
1413 client.subscriptions.confirm_subscribe(&topic);
1414 assert_eq!(client.subscriptions.len(), 1);
1415
1416 client.subscriptions.mark_unsubscribe(&topic);
1418 assert_eq!(client.subscriptions.len(), 0);
1419 assert_eq!(
1420 client.subscriptions.pending_unsubscribe_topics(),
1421 vec![topic.clone()]
1422 );
1423
1424 client.subscriptions.confirm_unsubscribe(&topic); client.subscriptions.mark_subscribe(&topic); client.subscriptions.confirm_subscribe(&topic); assert_eq!(client.subscriptions.len(), 1);
1432 assert!(client.subscriptions.pending_unsubscribe_topics().is_empty());
1433 assert!(client.subscriptions.pending_subscribe_topics().is_empty());
1434
1435 let all = client.subscriptions.all_topics();
1437 assert_eq!(all.len(), 1);
1438 assert!(all.contains(&topic));
1439 }
1440
1441 #[rstest]
1442 fn test_race_resubscribe_before_unsubscribe_ack() {
1443 let client = BitmexWebSocketClient::new(
1447 Some("ws://test.com".to_string()),
1448 None,
1449 None,
1450 Some(AccountId::new("BITMEX-TEST")),
1451 None,
1452 )
1453 .unwrap();
1454
1455 let topic = format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref());
1456
1457 client.subscriptions.mark_subscribe(&topic);
1459 client.subscriptions.confirm_subscribe(&topic);
1460 assert_eq!(client.subscriptions.len(), 1);
1461
1462 client.subscriptions.mark_unsubscribe(&topic);
1464 assert_eq!(client.subscriptions.len(), 0);
1465 assert_eq!(
1466 client.subscriptions.pending_unsubscribe_topics(),
1467 vec![topic.clone()]
1468 );
1469
1470 client.subscriptions.mark_subscribe(&topic);
1472 assert_eq!(
1473 client.subscriptions.pending_subscribe_topics(),
1474 vec![topic.clone()]
1475 );
1476
1477 client.subscriptions.confirm_unsubscribe(&topic);
1479 assert!(client.subscriptions.pending_unsubscribe_topics().is_empty());
1480 assert_eq!(
1481 client.subscriptions.pending_subscribe_topics(),
1482 vec![topic.clone()]
1483 );
1484
1485 client.subscriptions.confirm_subscribe(&topic);
1487 assert_eq!(client.subscriptions.len(), 1);
1488 assert!(client.subscriptions.pending_subscribe_topics().is_empty());
1489
1490 let all = client.subscriptions.all_topics();
1492 assert_eq!(all.len(), 1);
1493 assert!(all.contains(&topic));
1494 }
1495
1496 #[rstest]
1497 fn test_race_channel_level_reconnection_with_pending_states() {
1498 let client = BitmexWebSocketClient::new(
1500 Some("ws://test.com".to_string()),
1501 Some("test_key".to_string()),
1502 Some("test_secret".to_string()),
1503 Some(AccountId::new("BITMEX-TEST")),
1504 None,
1505 )
1506 .unwrap();
1507
1508 let trade_xbt = format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref());
1511 client.subscriptions.mark_subscribe(&trade_xbt);
1512 client.subscriptions.confirm_subscribe(&trade_xbt);
1513
1514 let order_channel = BitmexWsAuthChannel::Order.as_ref();
1516 client.subscriptions.mark_subscribe(order_channel);
1517 client.subscriptions.confirm_subscribe(order_channel);
1518
1519 let trade_eth = format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref());
1521 client.subscriptions.mark_subscribe(&trade_eth);
1522
1523 let book_xbt = format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref());
1525 client.subscriptions.mark_subscribe(&book_xbt);
1526 client.subscriptions.confirm_subscribe(&book_xbt);
1527 client.subscriptions.mark_unsubscribe(&book_xbt);
1528
1529 let topics_to_restore = client.subscriptions.all_topics();
1531
1532 assert_eq!(topics_to_restore.len(), 3);
1534 assert!(topics_to_restore.contains(&trade_xbt));
1535 assert!(topics_to_restore.contains(&order_channel.to_string()));
1536 assert!(topics_to_restore.contains(&trade_eth));
1537 assert!(!topics_to_restore.contains(&book_xbt)); for topic in &topics_to_restore {
1542 if topic == order_channel {
1543 assert!(
1544 !topic.contains(':'),
1545 "Channel-level topic should not have delimiter"
1546 );
1547 }
1548 }
1549 }
1550}