1use std::{
24 collections::HashSet,
25 sync::{
26 Arc,
27 atomic::{AtomicBool, Ordering},
28 },
29};
30
31use ahash::{AHashMap, AHashSet};
32use dashmap::DashMap;
33use futures_util::Stream;
34use nautilus_common::runtime::get_runtime;
35use nautilus_core::{
36 consts::NAUTILUS_USER_AGENT, env::get_env_var, time::get_atomic_clock_realtime,
37};
38use nautilus_model::{
39 data::{Data, bar::BarType},
40 enums::{OrderStatus, OrderType},
41 identifiers::{AccountId, ClientOrderId, InstrumentId},
42 instruments::{Instrument, InstrumentAny},
43};
44use nautilus_network::{
45 RECONNECTED,
46 websocket::{PingHandler, WebSocketClient, WebSocketConfig, channel_message_handler},
47};
48use reqwest::header::USER_AGENT;
49use tokio::{sync::RwLock, time::Duration};
50use tokio_tungstenite::tungstenite::Message;
51use ustr::Ustr;
52
53use super::{
54 cache::QuoteCache,
55 enums::{
56 BitmexAction, BitmexWsAuthAction, BitmexWsAuthChannel, BitmexWsOperation, BitmexWsTopic,
57 },
58 error::BitmexWsError,
59 messages::{
60 BitmexAuthentication, BitmexHttpRequest, BitmexSubscription, BitmexTableMessage,
61 BitmexWsMessage, NautilusWsMessage, OrderData,
62 },
63 parse::{
64 is_index_symbol, parse_book_msg_vec, parse_book10_msg_vec, parse_order_update_msg,
65 parse_trade_bin_msg_vec, parse_trade_msg_vec, parse_wallet_msg, topic_from_bar_spec,
66 },
67};
68use crate::{
69 common::{consts::BITMEX_WS_URL, credential::Credential, enums::BitmexExecType},
70 websocket::{
71 auth::{AUTHENTICATION_TIMEOUT_SECS, AuthResultReceiver, AuthTracker},
72 parse::{
73 parse_execution_msg, parse_funding_msg, parse_instrument_msg, parse_order_msg,
74 parse_position_msg,
75 },
76 subscription::SubscriptionState,
77 },
78};
79
80#[derive(Clone, Debug)]
88#[cfg_attr(
89 feature = "python",
90 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
91)]
92pub struct BitmexWebSocketClient {
93 url: String,
94 credential: Option<Credential>,
95 heartbeat: Option<u64>,
96 inner: Arc<RwLock<Option<WebSocketClient>>>,
97 rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>>,
98 signal: Arc<AtomicBool>,
99 task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
100 account_id: AccountId,
101 auth_tracker: AuthTracker,
102 subscriptions: SubscriptionState,
103 instruments_cache: Arc<AHashMap<Ustr, InstrumentAny>>,
104 order_type_cache: Arc<DashMap<ClientOrderId, OrderType>>,
105 order_symbol_cache: Arc<DashMap<ClientOrderId, Ustr>>,
106}
107
108impl BitmexWebSocketClient {
109 pub fn new(
115 url: Option<String>,
116 api_key: Option<String>,
117 api_secret: Option<String>,
118 account_id: Option<AccountId>,
119 heartbeat: Option<u64>,
120 ) -> anyhow::Result<Self> {
121 let credential = match (api_key, api_secret) {
122 (Some(key), Some(secret)) => Some(Credential::new(key, secret)),
123 (None, None) => None,
124 _ => anyhow::bail!("Both `api_key` and `api_secret` must be provided together"),
125 };
126
127 let account_id = account_id.unwrap_or(AccountId::from("BITMEX-master"));
128
129 Ok(Self {
130 url: url.unwrap_or(BITMEX_WS_URL.to_string()),
131 credential,
132 heartbeat,
133 inner: Arc::new(RwLock::new(None)),
134 rx: None,
135 signal: Arc::new(AtomicBool::new(false)),
136 task_handle: None,
137 account_id,
138 auth_tracker: AuthTracker::new(),
139 subscriptions: SubscriptionState::new(),
140 instruments_cache: Arc::new(AHashMap::new()),
141 order_type_cache: Arc::new(DashMap::new()),
142 order_symbol_cache: Arc::new(DashMap::new()),
143 })
144 }
145
146 pub fn from_env() -> anyhow::Result<Self> {
152 let url = get_env_var("BITMEX_WS_URL")?;
153 let api_key = get_env_var("BITMEX_API_KEY")?;
154 let api_secret = get_env_var("BITMEX_API_SECRET")?;
155
156 Self::new(Some(url), Some(api_key), Some(api_secret), None, None)
157 }
158
159 #[must_use]
161 pub const fn url(&self) -> &str {
162 self.url.as_str()
163 }
164
165 #[must_use]
167 pub fn api_key(&self) -> Option<&str> {
168 self.credential.as_ref().map(|c| c.api_key.as_str())
169 }
170
171 #[must_use]
173 pub fn is_active(&self) -> bool {
174 match self.inner.try_read() {
175 Ok(guard) => match &*guard {
176 Some(inner) => inner.is_active(),
177 None => false,
178 },
179 Err(_) => false,
180 }
181 }
182
183 #[must_use]
185 pub fn is_closed(&self) -> bool {
186 match self.inner.try_read() {
187 Ok(guard) => match &*guard {
188 Some(inner) => inner.is_closed(),
189 None => true,
190 },
191 Err(_) => true,
192 }
193 }
194
195 pub fn set_account_id(&mut self, account_id: AccountId) {
197 self.account_id = account_id;
198 }
199
200 pub fn initialize_instruments_cache(&mut self, instruments: Vec<InstrumentAny>) {
202 let mut instruments_cache: AHashMap<Ustr, InstrumentAny> = AHashMap::new();
203 let mut count = 0;
204
205 log::info!("Initializing BitMEX instrument cache...");
206
207 for inst in instruments {
208 let symbol = inst.symbol().inner();
209 instruments_cache.insert(symbol, inst.clone());
210 log::debug!("Cached instrument: {symbol}");
211 count += 1;
212 }
213
214 self.instruments_cache = Arc::new(instruments_cache);
215
216 log::info!("BitMEX instrument cache initialized with {count} instruments");
217 }
218
219 pub async fn connect(&mut self) -> Result<(), BitmexWsError> {
229 let reader = self.connect_inner().await?;
230
231 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
232 self.rx = Some(Arc::new(rx));
233 let signal = self.signal.clone();
234
235 let account_id = self.account_id;
236 let inner_client = self.inner.clone();
237 let credential = self.credential.clone();
238 let auth_tracker = self.auth_tracker.clone();
239 let subscriptions = self.subscriptions.clone();
240 let instruments_cache = self.instruments_cache.clone();
241 let order_type_cache = self.order_type_cache.clone();
242 let order_symbol_cache = self.order_symbol_cache.clone();
243
244 let stream_handle = get_runtime().spawn(async move {
245 let mut handler = BitmexWsMessageHandler::new(
246 reader,
247 signal,
248 tx,
249 account_id,
250 auth_tracker.clone(),
251 subscriptions.clone(),
252 instruments_cache,
253 order_type_cache,
254 order_symbol_cache,
255 );
256
257 loop {
259 match handler.next().await {
260 Some(NautilusWsMessage::Reconnected) => {
261 log::info!("Reconnecting WebSocket");
262
263 let has_client = {
264 let guard = inner_client.read().await;
265 guard.is_some()
266 };
267
268 if !has_client {
269 log::warn!("Reconnection signaled but WebSocket client unavailable");
270 continue;
271 }
272
273 let confirmed = subscriptions.confirmed();
274 let pending = subscriptions.pending();
275 let mut restore_set: HashSet<String> = HashSet::new();
276
277 let mut collect_topics = |map: &DashMap<String, AHashSet<Ustr>>| {
278 for entry in map.iter() {
279 let (channel, symbols) = entry.pair();
280
281 if channel == BitmexWsTopic::Instrument.as_ref() {
282 continue;
283 }
284
285 if symbols.is_empty() {
286 restore_set.insert(channel.clone());
287 } else {
288 for symbol in symbols.iter() {
289 restore_set.insert(format!("{channel}:{symbol}"));
290 }
291 }
292 }
293 };
294
295 collect_topics(&confirmed);
296 collect_topics(&pending);
297
298 let mut topics_to_restore: Vec<String> = restore_set.into_iter().collect();
299 topics_to_restore.sort();
300
301 let auth_rx_opt = if let Some(cred) = &credential {
302 match Self::issue_authentication_request(
303 &inner_client,
304 cred,
305 &auth_tracker,
306 )
307 .await
308 {
309 Ok(rx) => Some(rx),
310 Err(e) => {
311 log::error!(
312 "Failed to send re-authentication request after reconnection: {e}"
313 );
314 continue;
315 }
316 }
317 } else {
318 None
319 };
320
321 let inner_for_task = inner_client.clone();
322 let state_for_task = subscriptions.clone();
323 let auth_tracker_for_task = auth_tracker.clone();
324 let auth_rx_for_task = auth_rx_opt;
325 let topics_to_restore_clone = topics_to_restore.clone();
326 get_runtime().spawn(async move {
327 if let Some(rx) = auth_rx_for_task {
328 if let Err(e) = auth_tracker_for_task
329 .wait_for_result(
330 Duration::from_secs(AUTHENTICATION_TIMEOUT_SECS),
331 rx,
332 )
333 .await
334 {
335 log::error!("Authentication after reconnection failed: {e}");
336 return;
337 }
338 log::info!("Re-authenticated after reconnection");
339 }
340
341 let mut all_topics =
342 Vec::with_capacity(1 + topics_to_restore_clone.len());
343 all_topics.push(BitmexWsTopic::Instrument.as_ref().to_string());
344 all_topics.extend(topics_to_restore_clone.iter().cloned());
345
346 for topic in &all_topics {
347 state_for_task.mark_subscribe(topic.as_str());
348 }
349
350 if let Err(e) = Self::send_topics(
351 &inner_for_task,
352 BitmexWsOperation::Subscribe,
353 all_topics.clone(),
354 )
355 .await
356 {
357 log::error!(
358 "Failed to restore subscriptions after reconnection: {e}"
359 );
360 } else {
362 log::info!(
363 "Restored {} subscriptions after reconnection",
364 all_topics.len()
365 );
366 }
367 });
368 }
369 Some(msg) => {
370 if let Err(e) = handler.tx.send(msg) {
371 tracing::error!("Error sending message: {e}");
372 break;
373 }
374 }
375 None => {
376 if handler.handler.signal.load(Ordering::Relaxed) {
378 tracing::debug!("Stop signal received, ending message processing");
379 break;
380 }
381 tracing::warn!("WebSocket stream ended unexpectedly");
383 break;
384 }
385 }
386 }
387 });
388
389 self.task_handle = Some(Arc::new(stream_handle));
390
391 if self.credential.is_some() {
392 self.authenticate().await?;
393 }
394
395 {
396 let inner_guard = self.inner.read().await;
397 if let Some(inner) = &*inner_guard {
398 self.subscriptions
399 .mark_subscribe(BitmexWsTopic::Instrument.as_ref());
400
401 let subscribe_msg = BitmexSubscription {
402 op: BitmexWsOperation::Subscribe,
403 args: vec![Ustr::from(BitmexWsTopic::Instrument.as_ref())],
404 };
405
406 match serde_json::to_string(&subscribe_msg) {
407 Ok(subscribe_json) => {
408 if let Err(e) = inner.send_text(subscribe_json, None).await {
409 log::error!("Failed to subscribe to instruments: {e}");
410 } else {
411 log::debug!("Subscribed to all instruments");
412 }
413 }
414 Err(e) => {
415 tracing::error!(error = %e, "Failed to serialize resubscribe message");
416 }
417 }
418 }
419 }
420
421 Ok(())
422 }
423
424 async fn connect_inner(
430 &mut self,
431 ) -> Result<tokio::sync::mpsc::UnboundedReceiver<Message>, BitmexWsError> {
432 let (message_handler, rx) = channel_message_handler();
433
434 let inner_for_ping = self.inner.clone();
435 let ping_handler: PingHandler = Arc::new(move |payload: Vec<u8>| {
436 let inner = inner_for_ping.clone();
437
438 get_runtime().spawn(async move {
439 let len = payload.len();
440 let guard = inner.read().await;
441
442 if let Some(client) = guard.as_ref() {
443 if let Err(e) = client.send_pong(payload).await {
444 tracing::warn!(error = %e, "Failed to send pong frame");
445 } else {
446 tracing::trace!("Sent pong frame ({len} bytes)");
447 }
448 } else {
449 tracing::debug!("Ping received with no active websocket client");
450 }
451 });
452 });
453
454 let config = WebSocketConfig {
455 url: self.url.clone(),
456 headers: vec![(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())],
457 heartbeat: self.heartbeat,
458 heartbeat_msg: None,
459 message_handler: Some(message_handler),
460 ping_handler: Some(ping_handler),
461 reconnect_timeout_ms: Some(5_000),
462 reconnect_delay_initial_ms: None, reconnect_delay_max_ms: None, reconnect_backoff_factor: None, reconnect_jitter_ms: None, };
467
468 let keyed_quotas = vec![];
469 let client = WebSocketClient::connect(
470 config,
471 None, keyed_quotas,
473 None, )
475 .await
476 .map_err(|e| BitmexWsError::ClientError(e.to_string()))?;
477
478 {
479 let mut inner_guard = self.inner.write().await;
480 *inner_guard = Some(client);
481 }
482
483 Ok(rx)
484 }
485
486 async fn issue_authentication_request(
487 inner: &Arc<RwLock<Option<WebSocketClient>>>,
488 credential: &Credential,
489 tracker: &AuthTracker,
490 ) -> Result<AuthResultReceiver, BitmexWsError> {
491 let receiver = tracker.begin();
492
493 let expires = (chrono::Utc::now() + chrono::Duration::seconds(30)).timestamp();
494 let signature = credential.sign("GET", "/realtime", expires, "");
495
496 let auth_message = BitmexAuthentication {
497 op: BitmexWsAuthAction::AuthKeyExpires,
498 args: (credential.api_key.to_string(), expires, signature),
499 };
500
501 let auth_json = serde_json::to_string(&auth_message).map_err(|e| {
502 let msg = format!("Failed to serialize auth message: {e}");
503 tracker.fail(msg.clone());
504 BitmexWsError::AuthenticationError(msg)
505 })?;
506
507 {
508 let inner_guard = inner.read().await;
509 let client = inner_guard.as_ref().ok_or_else(|| {
510 tracker.fail("Cannot authenticate: not connected");
511 BitmexWsError::AuthenticationError("Cannot authenticate: not connected".to_string())
512 })?;
513
514 client.send_text(auth_json, None).await.map_err(|e| {
515 let error = e.to_string();
516 tracker.fail(error.clone());
517 BitmexWsError::AuthenticationError(error)
518 })?;
519 }
520
521 Ok(receiver)
522 }
523
524 async fn authenticate(&self) -> Result<(), BitmexWsError> {
531 let credential = match &self.credential {
532 Some(credential) => credential,
533 None => {
534 return Err(BitmexWsError::AuthenticationError(
535 "API credentials not available to authenticate".to_string(),
536 ));
537 }
538 };
539
540 let rx =
541 Self::issue_authentication_request(&self.inner, credential, &self.auth_tracker).await?;
542 self.auth_tracker
543 .wait_for_result(Duration::from_secs(AUTHENTICATION_TIMEOUT_SECS), rx)
544 .await
545 }
546
547 pub async fn wait_until_active(&self, timeout_secs: f64) -> Result<(), BitmexWsError> {
553 let timeout = tokio::time::Duration::from_secs_f64(timeout_secs);
554
555 tokio::time::timeout(timeout, async {
556 while !self.is_active() {
557 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
558 }
559 })
560 .await
561 .map_err(|_| {
562 BitmexWsError::ClientError(format!(
563 "WebSocket connection timeout after {timeout_secs} seconds"
564 ))
565 })?;
566
567 Ok(())
568 }
569
570 pub fn stream(&mut self) -> impl Stream<Item = NautilusWsMessage> + use<> {
578 let rx = self
579 .rx
580 .take()
581 .expect("Stream receiver already taken or not connected");
582 let mut rx = Arc::try_unwrap(rx).expect("Cannot take ownership - other references exist");
583 async_stream::stream! {
584 while let Some(msg) = rx.recv().await {
585 yield msg;
586 }
587 }
588 }
589
590 pub async fn close(&mut self) -> Result<(), BitmexWsError> {
600 log::debug!("Starting close process");
601
602 self.signal.store(true, Ordering::Relaxed);
603
604 {
605 let inner_guard = self.inner.read().await;
606 if let Some(inner) = &*inner_guard {
607 log::debug!("Disconnecting websocket");
608
609 match tokio::time::timeout(Duration::from_secs(3), inner.disconnect()).await {
610 Ok(()) => log::debug!("Websocket disconnected successfully"),
611 Err(_) => {
612 log::warn!(
613 "Timeout waiting for websocket disconnect, continuing with cleanup"
614 );
615 }
616 }
617 } else {
618 log::debug!("No active connection to disconnect");
619 }
620 }
621
622 if let Some(task_handle) = self.task_handle.take() {
624 match Arc::try_unwrap(task_handle) {
625 Ok(handle) => {
626 log::debug!("Waiting for task handle to complete");
627 match tokio::time::timeout(Duration::from_secs(2), handle).await {
628 Ok(Ok(())) => log::debug!("Task handle completed successfully"),
629 Ok(Err(e)) => log::error!("Task handle encountered an error: {e:?}"),
630 Err(_) => {
631 log::warn!(
632 "Timeout waiting for task handle, task may still be running"
633 );
634 }
636 }
637 }
638 Err(arc_handle) => {
639 log::debug!(
640 "Cannot take ownership of task handle - other references exist, aborting task"
641 );
642 arc_handle.abort();
643 }
644 }
645 } else {
646 log::debug!("No task handle to await");
647 }
648
649 log::debug!("Closed");
650
651 Ok(())
652 }
653
654 async fn send_topics(
655 inner: &Arc<RwLock<Option<WebSocketClient>>>,
656 op: BitmexWsOperation,
657 topics: Vec<String>,
658 ) -> Result<(), BitmexWsError> {
659 if topics.is_empty() {
660 return Ok(());
661 }
662
663 let message = BitmexSubscription {
664 op,
665 args: topics
666 .iter()
667 .map(|topic| Ustr::from(topic.as_str()))
668 .collect(),
669 };
670
671 let op_name = message.op.as_ref().to_string();
672 let payload = serde_json::to_string(&message).map_err(|e| {
673 BitmexWsError::SubscriptionError(format!("Failed to serialize {op_name} message: {e}"))
674 })?;
675
676 let inner_guard = inner.read().await;
677 if let Some(client) = &*inner_guard {
678 client
679 .send_text(payload, None)
680 .await
681 .map_err(|e| BitmexWsError::SubscriptionError(e.to_string()))?;
682 } else {
683 log::error!("Cannot send {op_name} message: not connected");
684 }
685
686 Ok(())
687 }
688
689 pub async fn subscribe(&self, topics: Vec<String>) -> Result<(), BitmexWsError> {
699 log::debug!("Subscribing to topics: {topics:?}");
700
701 for topic in &topics {
702 self.subscriptions.mark_subscribe(topic.as_str());
703 }
704
705 Self::send_topics(&self.inner, BitmexWsOperation::Subscribe, topics).await
706 }
707
708 async fn unsubscribe(&self, topics: Vec<String>) -> Result<(), BitmexWsError> {
714 log::debug!("Attempting to unsubscribe from topics: {topics:?}");
715
716 if self.signal.load(Ordering::Relaxed) {
717 log::debug!("Shutdown signal detected, skipping unsubscribe");
718 return Ok(());
719 }
720
721 for topic in &topics {
722 self.subscriptions.mark_unsubscribe(topic.as_str());
723 }
724
725 let result = Self::send_topics(&self.inner, BitmexWsOperation::Unsubscribe, topics).await;
726 if let Err(e) = result {
727 tracing::debug!(error = %e, "Failed to send unsubscribe message");
728 }
729 Ok(())
730 }
731
732 #[must_use]
734 pub fn subscription_count(&self) -> usize {
735 self.subscriptions.len()
736 }
737
738 pub fn get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
739 let symbol = instrument_id.symbol.inner();
740 let confirmed = self.subscriptions.confirmed();
741 let mut channels = Vec::with_capacity(confirmed.len());
742
743 for entry in confirmed.iter() {
744 let (channel, symbols) = entry.pair();
745 if symbols.contains(&symbol) {
746 channels.push(format!("{channel}:{symbol}"));
748 } else if symbols.is_empty()
749 && (channel == BitmexWsAuthChannel::Execution.as_ref()
750 || channel == BitmexWsAuthChannel::Order.as_ref())
751 {
752 channels.push(channel.clone());
754 }
755 }
756
757 channels
758 }
759
760 pub async fn subscribe_instruments(&self) -> Result<(), BitmexWsError> {
766 log::debug!("Already subscribed to all instruments on connection, skipping");
768 Ok(())
769 }
770
771 pub async fn subscribe_instrument(
777 &self,
778 instrument_id: InstrumentId,
779 ) -> Result<(), BitmexWsError> {
780 log::debug!(
782 "Already subscribed to all instruments on connection (includes {instrument_id}), skipping"
783 );
784 Ok(())
785 }
786
787 pub async fn subscribe_book(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
793 let topic = BitmexWsTopic::OrderBookL2;
794 let symbol = instrument_id.symbol.as_str();
795 self.subscribe(vec![format!("{topic}:{symbol}")]).await
796 }
797
798 pub async fn subscribe_book_25(
804 &self,
805 instrument_id: InstrumentId,
806 ) -> Result<(), BitmexWsError> {
807 let topic = BitmexWsTopic::OrderBookL2_25;
808 let symbol = instrument_id.symbol.as_str();
809 self.subscribe(vec![format!("{topic}:{symbol}")]).await
810 }
811
812 pub async fn subscribe_book_depth10(
818 &self,
819 instrument_id: InstrumentId,
820 ) -> Result<(), BitmexWsError> {
821 let topic = BitmexWsTopic::OrderBook10;
822 let symbol = instrument_id.symbol.as_str();
823 self.subscribe(vec![format!("{topic}:{symbol}")]).await
824 }
825
826 pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
834 let symbol = instrument_id.symbol.inner();
835
836 if is_index_symbol(&instrument_id.symbol.inner()) {
838 tracing::warn!("Ignoring quote subscription for index symbol: {symbol}");
839 return Ok(());
840 }
841
842 let topic = BitmexWsTopic::Quote;
843 self.subscribe(vec![format!("{topic}:{symbol}")]).await
844 }
845
846 pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
854 let symbol = instrument_id.symbol.inner();
855
856 if is_index_symbol(&symbol) {
858 tracing::warn!("Ignoring trade subscription for index symbol: {symbol}");
859 return Ok(());
860 }
861
862 let topic = BitmexWsTopic::Trade;
863 self.subscribe(vec![format!("{topic}:{symbol}")]).await
864 }
865
866 pub async fn subscribe_mark_prices(
872 &self,
873 instrument_id: InstrumentId,
874 ) -> Result<(), BitmexWsError> {
875 self.subscribe_instrument(instrument_id).await
876 }
877
878 pub async fn subscribe_index_prices(
884 &self,
885 instrument_id: InstrumentId,
886 ) -> Result<(), BitmexWsError> {
887 self.subscribe_instrument(instrument_id).await
888 }
889
890 pub async fn subscribe_funding_rates(
896 &self,
897 instrument_id: InstrumentId,
898 ) -> Result<(), BitmexWsError> {
899 let topic = BitmexWsTopic::Funding;
900 let symbol = instrument_id.symbol.as_str();
901 self.subscribe(vec![format!("{topic}:{symbol}")]).await
902 }
903
904 pub async fn subscribe_bars(&self, bar_type: BarType) -> Result<(), BitmexWsError> {
910 let topic = topic_from_bar_spec(bar_type.spec());
911 let symbol = bar_type.instrument_id().symbol.to_string();
912 self.subscribe(vec![format!("{topic}:{symbol}")]).await
913 }
914
915 pub async fn unsubscribe_instruments(&self) -> Result<(), BitmexWsError> {
921 log::debug!(
923 "Instruments subscription maintained for proper operation, skipping unsubscribe"
924 );
925 Ok(())
926 }
927
928 pub async fn unsubscribe_instrument(
934 &self,
935 instrument_id: InstrumentId,
936 ) -> Result<(), BitmexWsError> {
937 log::debug!(
939 "Instruments subscription maintained for proper operation (includes {instrument_id}), skipping unsubscribe"
940 );
941 Ok(())
942 }
943
944 pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
950 let topic = BitmexWsTopic::OrderBookL2;
951 let symbol = instrument_id.symbol.as_str();
952 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
953 }
954
955 pub async fn unsubscribe_book_25(
961 &self,
962 instrument_id: InstrumentId,
963 ) -> Result<(), BitmexWsError> {
964 let topic = BitmexWsTopic::OrderBookL2_25;
965 let symbol = instrument_id.symbol.as_str();
966 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
967 }
968
969 pub async fn unsubscribe_book_depth10(
975 &self,
976 instrument_id: InstrumentId,
977 ) -> Result<(), BitmexWsError> {
978 let topic = BitmexWsTopic::OrderBook10;
979 let symbol = instrument_id.symbol.as_str();
980 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
981 }
982
983 pub async fn unsubscribe_quotes(
989 &self,
990 instrument_id: InstrumentId,
991 ) -> Result<(), BitmexWsError> {
992 let symbol = instrument_id.symbol.inner();
993
994 if is_index_symbol(&symbol) {
996 return Ok(());
997 }
998
999 let topic = BitmexWsTopic::Quote;
1000 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1001 }
1002
1003 pub async fn unsubscribe_trades(
1009 &self,
1010 instrument_id: InstrumentId,
1011 ) -> Result<(), BitmexWsError> {
1012 let symbol = instrument_id.symbol.inner();
1013
1014 if is_index_symbol(&symbol) {
1016 return Ok(());
1017 }
1018
1019 let topic = BitmexWsTopic::Trade;
1020 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1021 }
1022
1023 pub async fn unsubscribe_mark_prices(
1029 &self,
1030 instrument_id: InstrumentId,
1031 ) -> Result<(), BitmexWsError> {
1032 log::debug!(
1034 "Mark prices for {instrument_id} uses shared instrument channel, skipping unsubscribe"
1035 );
1036 Ok(())
1037 }
1038
1039 pub async fn unsubscribe_index_prices(
1045 &self,
1046 instrument_id: InstrumentId,
1047 ) -> Result<(), BitmexWsError> {
1048 log::debug!(
1050 "Index prices for {instrument_id} uses shared instrument channel, skipping unsubscribe"
1051 );
1052 Ok(())
1053 }
1054
1055 pub async fn unsubscribe_funding_rates(
1061 &self,
1062 instrument_id: InstrumentId,
1063 ) -> Result<(), BitmexWsError> {
1064 log::debug!(
1066 "Funding rates for {instrument_id}, skipping unsubscribe to avoid shutdown race"
1067 );
1068 Ok(())
1069 }
1070
1071 pub async fn unsubscribe_bars(&self, bar_type: BarType) -> Result<(), BitmexWsError> {
1077 let topic = topic_from_bar_spec(bar_type.spec());
1078 let symbol = bar_type.instrument_id().symbol.to_string();
1079 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1080 }
1081
1082 pub async fn subscribe_orders(&self) -> Result<(), BitmexWsError> {
1088 if self.credential.is_none() {
1089 return Err(BitmexWsError::MissingCredentials);
1090 }
1091 self.subscribe(vec![BitmexWsAuthChannel::Order.to_string()])
1092 .await
1093 }
1094
1095 pub async fn subscribe_executions(&self) -> Result<(), BitmexWsError> {
1101 if self.credential.is_none() {
1102 return Err(BitmexWsError::MissingCredentials);
1103 }
1104 self.subscribe(vec![BitmexWsAuthChannel::Execution.to_string()])
1105 .await
1106 }
1107
1108 pub async fn subscribe_positions(&self) -> Result<(), BitmexWsError> {
1114 if self.credential.is_none() {
1115 return Err(BitmexWsError::MissingCredentials);
1116 }
1117 self.subscribe(vec![BitmexWsAuthChannel::Position.to_string()])
1118 .await
1119 }
1120
1121 pub async fn subscribe_margin(&self) -> Result<(), BitmexWsError> {
1127 if self.credential.is_none() {
1128 return Err(BitmexWsError::MissingCredentials);
1129 }
1130 self.subscribe(vec![BitmexWsAuthChannel::Margin.to_string()])
1131 .await
1132 }
1133
1134 pub async fn subscribe_wallet(&self) -> Result<(), BitmexWsError> {
1140 if self.credential.is_none() {
1141 return Err(BitmexWsError::MissingCredentials);
1142 }
1143 self.subscribe(vec![BitmexWsAuthChannel::Wallet.to_string()])
1144 .await
1145 }
1146
1147 pub async fn unsubscribe_orders(&self) -> Result<(), BitmexWsError> {
1153 self.unsubscribe(vec![BitmexWsAuthChannel::Order.to_string()])
1154 .await
1155 }
1156
1157 pub async fn unsubscribe_executions(&self) -> Result<(), BitmexWsError> {
1163 self.unsubscribe(vec![BitmexWsAuthChannel::Execution.to_string()])
1164 .await
1165 }
1166
1167 pub async fn unsubscribe_positions(&self) -> Result<(), BitmexWsError> {
1173 self.unsubscribe(vec![BitmexWsAuthChannel::Position.to_string()])
1174 .await
1175 }
1176
1177 pub async fn unsubscribe_margin(&self) -> Result<(), BitmexWsError> {
1183 self.unsubscribe(vec![BitmexWsAuthChannel::Margin.to_string()])
1184 .await
1185 }
1186
1187 pub async fn unsubscribe_wallet(&self) -> Result<(), BitmexWsError> {
1193 self.unsubscribe(vec![BitmexWsAuthChannel::Wallet.to_string()])
1194 .await
1195 }
1196}
1197
1198struct BitmexFeedHandler {
1199 receiver: tokio::sync::mpsc::UnboundedReceiver<Message>,
1200 signal: Arc<AtomicBool>,
1201}
1202
1203impl BitmexFeedHandler {
1204 pub fn new(
1206 receiver: tokio::sync::mpsc::UnboundedReceiver<Message>,
1207 signal: Arc<AtomicBool>,
1208 ) -> Self {
1209 Self { receiver, signal }
1210 }
1211
1212 async fn next(&mut self) -> Option<BitmexWsMessage> {
1214 loop {
1215 tokio::select! {
1216 msg = self.receiver.recv() => match msg {
1217 Some(msg) => match msg {
1218 Message::Text(text) => {
1219 if text == RECONNECTED {
1220 tracing::info!("Received WebSocket reconnection signal");
1221 return Some(BitmexWsMessage::Reconnected);
1222 }
1223
1224 tracing::trace!("Raw websocket message: {text}");
1225
1226 if Self::is_heartbeat_message(&text) {
1227 tracing::trace!(
1228 "Ignoring heartbeat control message: {text}"
1229 );
1230 continue;
1231 }
1232
1233 match serde_json::from_str(&text) {
1234 Ok(msg) => match &msg {
1235 BitmexWsMessage::Welcome {
1236 version,
1237 heartbeat_enabled,
1238 limit,
1239 ..
1240 } => {
1241 tracing::info!(
1242 version = version,
1243 heartbeat = heartbeat_enabled,
1244 rate_limit = ?limit.remaining,
1245 "Welcome to the BitMEX Realtime API:",
1246 );
1247 }
1248 BitmexWsMessage::Subscription { .. } => return Some(msg),
1249 BitmexWsMessage::Error { status, error, .. } => {
1250 tracing::error!(
1251 status = status,
1252 error = error,
1253 "Received error from BitMEX"
1254 );
1255 }
1256 _ => return Some(msg),
1257 },
1258 Err(e) => {
1259 tracing::error!("Failed to parse WebSocket message: {e}: {text}");
1260 }
1261 }
1262 }
1263 Message::Binary(msg) => {
1264 tracing::debug!("Raw binary: {msg:?}");
1265 }
1266 Message::Close(_) => {
1267 tracing::debug!("Received close message, waiting for reconnection");
1268 continue;
1269 }
1270 msg => match msg {
1271 Message::Ping(data) => {
1272 tracing::trace!("Received ping frame with {} bytes", data.len());
1273 }
1274 Message::Pong(data) => {
1275 tracing::trace!("Received pong frame with {} bytes", data.len());
1276 }
1277 Message::Frame(frame) => {
1278 tracing::debug!("Received raw frame: {frame:?}");
1279 }
1280 _ => {
1281 tracing::warn!("Unexpected message type: {msg:?}");
1282 }
1283 },
1284 }
1285 None => {
1286 tracing::info!("WebSocket stream closed");
1287 return None;
1288 }
1289 },
1290 _ = tokio::time::sleep(Duration::from_millis(1)) => {
1291 if self.signal.load(std::sync::atomic::Ordering::Relaxed) {
1292 tracing::debug!("Stop signal received");
1293 return None;
1294 }
1295 }
1296 }
1297 }
1298 }
1299
1300 fn is_heartbeat_message(text: &str) -> bool {
1301 let trimmed = text.trim();
1302
1303 if !trimmed.starts_with('{') || trimmed.len() > 64 {
1304 return false;
1305 }
1306
1307 trimmed.contains("\"op\":\"ping\"") || trimmed.contains("\"op\":\"pong\"")
1308 }
1309}
1310
1311struct BitmexWsMessageHandler {
1312 handler: BitmexFeedHandler,
1313 tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
1314 #[allow(
1315 dead_code,
1316 reason = "May be needed for future account-specific processing"
1317 )]
1318 account_id: AccountId,
1319 auth_tracker: AuthTracker,
1320 subscriptions: SubscriptionState,
1321 instruments_cache: Arc<AHashMap<Ustr, InstrumentAny>>,
1322 order_type_cache: Arc<DashMap<ClientOrderId, OrderType>>,
1323 order_symbol_cache: Arc<DashMap<ClientOrderId, Ustr>>,
1324 quote_cache: QuoteCache,
1325}
1326
1327impl BitmexWsMessageHandler {
1328 #[allow(clippy::too_many_arguments)]
1330 pub fn new(
1331 receiver: tokio::sync::mpsc::UnboundedReceiver<Message>,
1332 signal: Arc<AtomicBool>,
1333 tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
1334 account_id: AccountId,
1335 auth_tracker: AuthTracker,
1336 subscriptions: SubscriptionState,
1337 instruments_cache: Arc<AHashMap<Ustr, InstrumentAny>>,
1338 order_type_cache: Arc<DashMap<ClientOrderId, OrderType>>,
1339 order_symbol_cache: Arc<DashMap<ClientOrderId, Ustr>>,
1340 ) -> Self {
1341 let handler = BitmexFeedHandler::new(receiver, signal);
1342 Self {
1343 handler,
1344 tx,
1345 account_id,
1346 auth_tracker,
1347 subscriptions,
1348 instruments_cache,
1349 order_type_cache,
1350 order_symbol_cache,
1351 quote_cache: QuoteCache::new(),
1352 }
1353 }
1354
1355 #[inline]
1358 fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
1359 self.instruments_cache.get(symbol).cloned()
1360 }
1361
1362 async fn next(&mut self) -> Option<NautilusWsMessage> {
1363 let clock = get_atomic_clock_realtime();
1364
1365 while let Some(msg) = self.handler.next().await {
1366 match msg {
1367 BitmexWsMessage::Reconnected => {
1368 self.quote_cache.clear();
1370 return Some(NautilusWsMessage::Reconnected);
1371 }
1372 BitmexWsMessage::Subscription {
1373 success,
1374 subscribe,
1375 request,
1376 error,
1377 } => {
1378 self.handle_subscription_message(
1379 success,
1380 subscribe.as_ref(),
1381 request.as_ref(),
1382 error.as_deref(),
1383 );
1384 continue;
1385 }
1386 BitmexWsMessage::Table(table_msg) => {
1387 let ts_init = clock.get_time_ns();
1388
1389 return Some(match table_msg {
1390 BitmexTableMessage::OrderBookL2 { action, data } => {
1391 if data.is_empty() {
1392 continue;
1393 }
1394 let data = parse_book_msg_vec(
1395 data,
1396 action,
1397 self.instruments_cache.as_ref(),
1398 ts_init,
1399 );
1400
1401 NautilusWsMessage::Data(data)
1402 }
1403 BitmexTableMessage::OrderBookL2_25 { action, data } => {
1404 if data.is_empty() {
1405 continue;
1406 }
1407 let data = parse_book_msg_vec(
1408 data,
1409 action,
1410 self.instruments_cache.as_ref(),
1411 ts_init,
1412 );
1413
1414 NautilusWsMessage::Data(data)
1415 }
1416 BitmexTableMessage::OrderBook10 { data, .. } => {
1417 if data.is_empty() {
1418 continue;
1419 }
1420 let data = parse_book10_msg_vec(
1421 data,
1422 self.instruments_cache.as_ref(),
1423 ts_init,
1424 );
1425
1426 NautilusWsMessage::Data(data)
1427 }
1428 BitmexTableMessage::Quote { mut data, .. } => {
1429 if data.is_empty() {
1431 continue;
1432 }
1433
1434 let msg = data.remove(0);
1435 let Some(instrument) = self.get_instrument(&msg.symbol) else {
1436 tracing::error!(
1437 "Instrument cache miss: quote message dropped for symbol={}",
1438 msg.symbol
1439 );
1440 continue;
1441 };
1442
1443 if let Some(quote) =
1444 self.quote_cache.process(&msg, &instrument, ts_init)
1445 {
1446 NautilusWsMessage::Data(vec![Data::Quote(quote)])
1447 } else {
1448 continue;
1449 }
1450 }
1451 BitmexTableMessage::Trade { data, .. } => {
1452 if data.is_empty() {
1453 continue;
1454 }
1455 let data =
1456 parse_trade_msg_vec(data, self.instruments_cache.as_ref(), ts_init);
1457
1458 NautilusWsMessage::Data(data)
1459 }
1460 BitmexTableMessage::TradeBin1m { action, data } => {
1461 if action == BitmexAction::Partial || data.is_empty() {
1462 continue;
1463 }
1464 let data = parse_trade_bin_msg_vec(
1465 data,
1466 BitmexWsTopic::TradeBin1m,
1467 self.instruments_cache.as_ref(),
1468 ts_init,
1469 );
1470
1471 NautilusWsMessage::Data(data)
1472 }
1473 BitmexTableMessage::TradeBin5m { action, data } => {
1474 if action == BitmexAction::Partial || data.is_empty() {
1475 continue;
1476 }
1477 let data = parse_trade_bin_msg_vec(
1478 data,
1479 BitmexWsTopic::TradeBin5m,
1480 self.instruments_cache.as_ref(),
1481 ts_init,
1482 );
1483
1484 NautilusWsMessage::Data(data)
1485 }
1486 BitmexTableMessage::TradeBin1h { action, data } => {
1487 if action == BitmexAction::Partial || data.is_empty() {
1488 continue;
1489 }
1490 let data = parse_trade_bin_msg_vec(
1491 data,
1492 BitmexWsTopic::TradeBin1h,
1493 self.instruments_cache.as_ref(),
1494 ts_init,
1495 );
1496
1497 NautilusWsMessage::Data(data)
1498 }
1499 BitmexTableMessage::TradeBin1d { action, data } => {
1500 if action == BitmexAction::Partial || data.is_empty() {
1501 continue;
1502 }
1503 let data = parse_trade_bin_msg_vec(
1504 data,
1505 BitmexWsTopic::TradeBin1d,
1506 self.instruments_cache.as_ref(),
1507 ts_init,
1508 );
1509
1510 NautilusWsMessage::Data(data)
1511 }
1512 BitmexTableMessage::Order { data, .. } => {
1516 let mut reports = Vec::with_capacity(data.len());
1518
1519 for order_data in data {
1520 match order_data {
1521 OrderData::Full(order_msg) => {
1522 let Some(instrument) =
1523 self.get_instrument(&order_msg.symbol)
1524 else {
1525 tracing::error!(
1526 "Instrument cache miss: order message dropped for symbol={}, order_id={}",
1527 order_msg.symbol,
1528 order_msg.order_id
1529 );
1530 continue;
1531 };
1532
1533 match parse_order_msg(
1534 &order_msg,
1535 &instrument,
1536 &self.order_type_cache,
1537 ) {
1538 Ok(report) => {
1539 if let Some(client_order_id) = &order_msg.cl_ord_id
1541 {
1542 let client_order_id =
1543 ClientOrderId::new(client_order_id);
1544
1545 if let Some(ord_type) = &order_msg.ord_type {
1546 let order_type: OrderType =
1547 (*ord_type).into();
1548 self.order_type_cache
1549 .insert(client_order_id, order_type);
1550 }
1551
1552 self.order_symbol_cache
1554 .insert(client_order_id, order_msg.symbol);
1555 }
1556
1557 if is_terminal_order_status(report.order_status)
1558 && let Some(client_id) = report.client_order_id
1559 {
1560 self.order_type_cache.remove(&client_id);
1561 self.order_symbol_cache.remove(&client_id);
1562 }
1563
1564 reports.push(report);
1565 }
1566 Err(e) => {
1567 tracing::error!(
1568 error = %e,
1569 symbol = %order_msg.symbol,
1570 order_id = %order_msg.order_id,
1571 time_in_force = ?order_msg.time_in_force,
1572 "Failed to parse full order message - potential data loss"
1573 );
1574 continue;
1576 }
1577 }
1578 }
1579 OrderData::Update(msg) => {
1580 let Some(instrument) = self.get_instrument(&msg.symbol)
1581 else {
1582 tracing::error!(
1583 "Instrument cache miss: order update dropped for symbol={}, order_id={}",
1584 msg.symbol,
1585 msg.order_id
1586 );
1587 continue;
1588 };
1589
1590 if let Some(cl_ord_id) = &msg.cl_ord_id {
1592 let client_order_id = ClientOrderId::new(cl_ord_id);
1593 self.order_symbol_cache
1594 .insert(client_order_id, msg.symbol);
1595 }
1596
1597 if let Some(event) = parse_order_update_msg(
1598 &msg,
1599 &instrument,
1600 self.account_id,
1601 ) {
1602 return Some(NautilusWsMessage::OrderUpdated(event));
1603 } else {
1604 tracing::warn!(
1605 order_id = %msg.order_id,
1606 price = ?msg.price,
1607 "Skipped order update message (insufficient data)"
1608 );
1609 }
1610 }
1611 }
1612 }
1613
1614 if reports.is_empty() {
1615 continue;
1616 }
1617
1618 NautilusWsMessage::OrderStatusReports(reports)
1619 }
1620 BitmexTableMessage::Execution { data, .. } => {
1621 let mut fills = Vec::with_capacity(data.len());
1622
1623 for exec_msg in data {
1624 let symbol_opt = if let Some(sym) = &exec_msg.symbol {
1626 Some(*sym)
1627 } else if let Some(cl_ord_id) = &exec_msg.cl_ord_id {
1628 let client_order_id = ClientOrderId::new(cl_ord_id);
1630 self.order_symbol_cache
1631 .get(&client_order_id)
1632 .map(|r| *r.value())
1633 } else {
1634 None
1635 };
1636
1637 let Some(symbol) = symbol_opt else {
1638 if let Some(cl_ord_id) = &exec_msg.cl_ord_id {
1640 if exec_msg.exec_type == Some(BitmexExecType::Trade) {
1641 tracing::warn!(
1642 cl_ord_id = %cl_ord_id,
1643 exec_id = ?exec_msg.exec_id,
1644 ord_rej_reason = ?exec_msg.ord_rej_reason,
1645 text = ?exec_msg.text,
1646 "Execution message missing symbol and not found in cache"
1647 );
1648 } else {
1649 tracing::debug!(
1650 cl_ord_id = %cl_ord_id,
1651 exec_id = ?exec_msg.exec_id,
1652 exec_type = ?exec_msg.exec_type,
1653 ord_rej_reason = ?exec_msg.ord_rej_reason,
1654 text = ?exec_msg.text,
1655 "Execution message missing symbol and not found in cache"
1656 );
1657 }
1658 } else {
1659 if exec_msg.exec_type == Some(BitmexExecType::CancelReject)
1663 {
1664 tracing::debug!(
1665 exec_id = ?exec_msg.exec_id,
1666 order_id = ?exec_msg.order_id,
1667 "CancelReject message missing symbol/clOrdID (expected with redundant cancels)"
1668 );
1669 } else {
1670 tracing::warn!(
1671 exec_id = ?exec_msg.exec_id,
1672 order_id = ?exec_msg.order_id,
1673 exec_type = ?exec_msg.exec_type,
1674 ord_rej_reason = ?exec_msg.ord_rej_reason,
1675 text = ?exec_msg.text,
1676 "Execution message missing both symbol and clOrdID, cannot process"
1677 );
1678 }
1679 }
1680 continue;
1681 };
1682
1683 let Some(instrument) = self.get_instrument(&symbol) else {
1684 tracing::error!(
1685 "Instrument cache miss: execution message dropped for symbol={}, exec_id={:?}, exec_type={:?}, Liquidation/ADL fills may be lost",
1686 symbol,
1687 exec_msg.exec_id,
1688 exec_msg.exec_type
1689 );
1690 continue;
1691 };
1692
1693 if let Some(fill) = parse_execution_msg(exec_msg, &instrument) {
1694 fills.push(fill);
1695 }
1696 }
1697
1698 if fills.is_empty() {
1699 continue;
1700 }
1701 NautilusWsMessage::FillReports(fills)
1702 }
1703 BitmexTableMessage::Position { data, .. } => {
1704 if let Some(pos_msg) = data.into_iter().next() {
1705 let Some(instrument) = self.get_instrument(&pos_msg.symbol) else {
1706 tracing::error!(
1707 "Instrument cache miss: position message dropped for symbol={}, account={}",
1708 pos_msg.symbol,
1709 pos_msg.account
1710 );
1711 continue;
1712 };
1713 let report = parse_position_msg(pos_msg, &instrument);
1714 NautilusWsMessage::PositionStatusReport(report)
1715 } else {
1716 continue;
1717 }
1718 }
1719 BitmexTableMessage::Wallet { data, .. } => {
1720 if let Some(wallet_msg) = data.into_iter().next() {
1721 let account_state = parse_wallet_msg(wallet_msg, ts_init);
1722 NautilusWsMessage::AccountState(account_state)
1723 } else {
1724 continue;
1725 }
1726 }
1727 BitmexTableMessage::Margin { .. } => {
1728 continue;
1731 }
1732 BitmexTableMessage::Instrument { data, .. } => {
1733 let ts_init = clock.get_time_ns();
1734 let mut data_msgs = Vec::with_capacity(data.len());
1735
1736 for msg in data {
1737 let parsed =
1738 parse_instrument_msg(msg, &self.instruments_cache, ts_init);
1739 data_msgs.extend(parsed);
1740 }
1741
1742 if data_msgs.is_empty() {
1743 continue;
1744 }
1745 NautilusWsMessage::Data(data_msgs)
1746 }
1747 BitmexTableMessage::Funding { data, .. } => {
1748 let ts_init = clock.get_time_ns();
1749 let mut funding_updates = Vec::with_capacity(data.len());
1750
1751 for msg in data {
1752 if let Some(parsed) = parse_funding_msg(msg, ts_init) {
1753 funding_updates.push(parsed);
1754 }
1755 }
1756
1757 if !funding_updates.is_empty() {
1758 NautilusWsMessage::FundingRateUpdates(funding_updates)
1759 } else {
1760 continue;
1761 }
1762 }
1763 _ => {
1764 tracing::warn!("Unhandled table message type: {table_msg:?}");
1766 continue;
1767 }
1768 });
1769 }
1770 BitmexWsMessage::Welcome { .. } | BitmexWsMessage::Error { .. } => continue,
1771 }
1772 }
1773
1774 None
1775 }
1776
1777 fn handle_subscription_message(
1778 &self,
1779 success: bool,
1780 subscribe: Option<&String>,
1781 request: Option<&BitmexHttpRequest>,
1782 error: Option<&str>,
1783 ) {
1784 if let Some(req) = request {
1785 if req
1786 .op
1787 .eq_ignore_ascii_case(BitmexWsAuthAction::AuthKeyExpires.as_ref())
1788 {
1789 if success {
1790 tracing::info!("Authenticated BitMEX WebSocket session");
1791 self.auth_tracker.succeed();
1792 } else {
1793 let reason = error.unwrap_or("Authentication rejected").to_string();
1794 tracing::error!(error = %reason, "Authentication failed");
1795 self.auth_tracker.fail(reason);
1796 }
1797 return;
1798 }
1799
1800 if req
1801 .op
1802 .eq_ignore_ascii_case(BitmexWsOperation::Subscribe.as_ref())
1803 {
1804 self.handle_subscription_ack(success, request, subscribe, error);
1805 return;
1806 }
1807
1808 if req
1809 .op
1810 .eq_ignore_ascii_case(BitmexWsOperation::Unsubscribe.as_ref())
1811 {
1812 self.handle_unsubscribe_ack(success, request, subscribe, error);
1813 return;
1814 }
1815 }
1816
1817 if subscribe.is_some() {
1818 self.handle_subscription_ack(success, request, subscribe, error);
1819 return;
1820 }
1821
1822 if let Some(error) = error {
1823 tracing::warn!(
1824 success = success,
1825 error = error,
1826 "Unhandled subscription control message"
1827 );
1828 }
1829 }
1830
1831 fn handle_subscription_ack(
1832 &self,
1833 success: bool,
1834 request: Option<&BitmexHttpRequest>,
1835 subscribe: Option<&String>,
1836 error: Option<&str>,
1837 ) {
1838 let topics = Self::topics_from_request(request, subscribe);
1839
1840 if topics.is_empty() {
1841 tracing::debug!("Subscription acknowledgement without topics");
1842 return;
1843 }
1844
1845 for topic in topics {
1846 if success {
1847 self.subscriptions.confirm(topic);
1848 tracing::debug!(topic = topic, "Subscription confirmed");
1849 } else {
1850 self.subscriptions.mark_failure(topic);
1851 let reason = error.unwrap_or("Subscription rejected");
1852 tracing::error!(topic = topic, error = reason, "Subscription failed");
1853 }
1854 }
1855 }
1856
1857 fn handle_unsubscribe_ack(
1858 &self,
1859 success: bool,
1860 request: Option<&BitmexHttpRequest>,
1861 subscribe: Option<&String>,
1862 error: Option<&str>,
1863 ) {
1864 let topics = Self::topics_from_request(request, subscribe);
1865
1866 if topics.is_empty() {
1867 tracing::debug!("Unsubscription acknowledgement without topics");
1868 return;
1869 }
1870
1871 for topic in topics {
1872 if success {
1873 tracing::debug!(topic = topic, "Unsubscription confirmed");
1874 self.subscriptions.clear_pending(topic);
1875 } else {
1876 let reason = error.unwrap_or("Unsubscription rejected");
1877 tracing::error!(topic = topic, error = reason, "Unsubscription failed");
1878 self.subscriptions.confirm(topic);
1879 }
1880 }
1881 }
1882
1883 fn topics_from_request<'a>(
1884 request: Option<&'a BitmexHttpRequest>,
1885 fallback: Option<&'a String>,
1886 ) -> Vec<&'a str> {
1887 if let Some(req) = request
1888 && !req.args.is_empty()
1889 {
1890 return req.args.iter().filter_map(|arg| arg.as_str()).collect();
1891 }
1892
1893 fallback.into_iter().map(|topic| topic.as_str()).collect()
1894 }
1895}
1896
1897fn is_terminal_order_status(status: OrderStatus) -> bool {
1898 matches!(
1899 status,
1900 OrderStatus::Canceled | OrderStatus::Expired | OrderStatus::Rejected | OrderStatus::Filled,
1901 )
1902}
1903
1904#[cfg(test)]
1909mod tests {
1910 use ahash::AHashSet;
1911 use rstest::rstest;
1912 use ustr::Ustr;
1913
1914 use super::*;
1915
1916 #[test]
1917 fn test_is_heartbeat_message_detection() {
1918 assert!(BitmexFeedHandler::is_heartbeat_message("{\"op\":\"ping\"}"));
1919 assert!(BitmexFeedHandler::is_heartbeat_message("{\"op\":\"pong\"}"));
1920 assert!(!BitmexFeedHandler::is_heartbeat_message(
1921 "{\"op\":\"subscribe\",\"args\":[\"trade:XBTUSD\"]}"
1922 ));
1923 }
1924
1925 #[rstest]
1926 fn test_reconnect_topics_restoration_logic() {
1927 let client = BitmexWebSocketClient::new(
1929 Some("ws://test.com".to_string()),
1930 Some("test_key".to_string()),
1931 Some("test_secret".to_string()),
1932 Some(AccountId::new("BITMEX-TEST")),
1933 None,
1934 )
1935 .unwrap();
1936
1937 let subs = client.subscriptions.confirmed();
1939 subs.insert(BitmexWsTopic::Trade.as_ref().to_string(), {
1940 let mut set = AHashSet::new();
1941 set.insert(Ustr::from("XBTUSD"));
1942 set.insert(Ustr::from("ETHUSD"));
1943 set
1944 });
1945
1946 subs.insert(BitmexWsTopic::OrderBookL2.as_ref().to_string(), {
1947 let mut set = AHashSet::new();
1948 set.insert(Ustr::from("XBTUSD"));
1949 set
1950 });
1951
1952 subs.insert(
1954 BitmexWsAuthChannel::Order.as_ref().to_string(),
1955 AHashSet::new(),
1956 );
1957 subs.insert(
1958 BitmexWsAuthChannel::Position.as_ref().to_string(),
1959 AHashSet::new(),
1960 );
1961
1962 let mut topics_to_restore = Vec::new();
1964 for entry in subs.iter() {
1965 let (channel, symbols) = entry.pair();
1966 if symbols.is_empty() {
1967 topics_to_restore.push(channel.clone());
1968 } else {
1969 for symbol in symbols.iter() {
1970 topics_to_restore.push(format!("{channel}:{symbol}"));
1971 }
1972 }
1973 }
1974
1975 assert!(topics_to_restore.contains(&format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref())));
1977 assert!(topics_to_restore.contains(&format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref())));
1978 assert!(
1979 topics_to_restore.contains(&format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref()))
1980 );
1981 assert!(topics_to_restore.contains(&BitmexWsAuthChannel::Order.as_ref().to_string()));
1982 assert!(topics_to_restore.contains(&BitmexWsAuthChannel::Position.as_ref().to_string()));
1983 assert_eq!(topics_to_restore.len(), 5);
1984 }
1985
1986 #[rstest]
1987 fn test_reconnect_auth_message_building() {
1988 let client_with_creds = BitmexWebSocketClient::new(
1990 Some("ws://test.com".to_string()),
1991 Some("test_key".to_string()),
1992 Some("test_secret".to_string()),
1993 Some(AccountId::new("BITMEX-TEST")),
1994 None,
1995 )
1996 .unwrap();
1997
1998 if let Some(cred) = &client_with_creds.credential {
2000 let expires = (chrono::Utc::now() + chrono::Duration::seconds(30)).timestamp();
2001 let signature = cred.sign("GET", "/realtime", expires, "");
2002
2003 let auth_message = BitmexAuthentication {
2004 op: BitmexWsAuthAction::AuthKeyExpires,
2005 args: (cred.api_key.to_string(), expires, signature),
2006 };
2007
2008 assert_eq!(auth_message.op, BitmexWsAuthAction::AuthKeyExpires);
2010 assert_eq!(auth_message.args.0, "test_key");
2011 assert!(auth_message.args.1 > 0); assert!(!auth_message.args.2.is_empty()); } else {
2014 panic!("Client should have credentials");
2015 }
2016
2017 let client_no_creds = BitmexWebSocketClient::new(
2019 Some("ws://test.com".to_string()),
2020 None,
2021 None,
2022 Some(AccountId::new("BITMEX-TEST")),
2023 None,
2024 )
2025 .unwrap();
2026
2027 assert!(client_no_creds.credential.is_none());
2028 }
2029
2030 #[rstest]
2031 fn test_subscription_state_after_unsubscribe() {
2032 let client = BitmexWebSocketClient::new(
2033 Some("ws://test.com".to_string()),
2034 Some("test_key".to_string()),
2035 Some("test_secret".to_string()),
2036 Some(AccountId::new("BITMEX-TEST")),
2037 None,
2038 )
2039 .unwrap();
2040
2041 let subs = client.subscriptions.confirmed();
2043 subs.insert(BitmexWsTopic::Trade.as_ref().to_string(), {
2044 let mut set = AHashSet::new();
2045 set.insert(Ustr::from("XBTUSD"));
2046 set.insert(Ustr::from("ETHUSD"));
2047 set
2048 });
2049
2050 subs.insert(BitmexWsTopic::OrderBookL2.as_ref().to_string(), {
2051 let mut set = AHashSet::new();
2052 set.insert(Ustr::from("XBTUSD"));
2053 set
2054 });
2055
2056 let topic = format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref());
2058 if let Some((channel, symbol)) = topic.split_once(':')
2059 && let Some(mut entry) = subs.get_mut(channel)
2060 {
2061 entry.remove(&Ustr::from(symbol));
2062 if entry.is_empty() {
2063 drop(entry);
2064 subs.remove(channel);
2065 }
2066 }
2067
2068 let mut topics_to_restore = Vec::new();
2070 for entry in subs.iter() {
2071 let (channel, symbols) = entry.pair();
2072 if symbols.is_empty() {
2073 topics_to_restore.push(channel.clone());
2074 } else {
2075 for symbol in symbols.iter() {
2076 topics_to_restore.push(format!("{channel}:{symbol}"));
2077 }
2078 }
2079 }
2080
2081 let trade_xbt = format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref());
2083 let trade_eth = format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref());
2084 let book_xbt = format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref());
2085
2086 assert!(topics_to_restore.contains(&trade_xbt));
2087 assert!(!topics_to_restore.contains(&trade_eth));
2088 assert!(topics_to_restore.contains(&book_xbt));
2089 assert_eq!(topics_to_restore.len(), 2);
2090 }
2091}