1use std::{
24 collections::HashSet,
25 sync::{
26 Arc,
27 atomic::{AtomicBool, Ordering},
28 },
29};
30
31use ahash::{AHashMap, AHashSet};
32use dashmap::DashMap;
33use futures_util::Stream;
34use nautilus_common::runtime::get_runtime;
35use nautilus_core::{
36 consts::NAUTILUS_USER_AGENT, env::get_env_var, time::get_atomic_clock_realtime,
37};
38use nautilus_model::{
39 data::{Data, bar::BarType},
40 enums::{OrderStatus, OrderType},
41 identifiers::{AccountId, ClientOrderId, InstrumentId},
42 instruments::{Instrument, InstrumentAny},
43};
44use nautilus_network::{
45 RECONNECTED,
46 websocket::{PingHandler, WebSocketClient, WebSocketConfig, channel_message_handler},
47};
48use reqwest::header::USER_AGENT;
49use tokio::{sync::RwLock, time::Duration};
50use tokio_tungstenite::tungstenite::Message;
51use ustr::Ustr;
52
53use super::{
54 cache::QuoteCache,
55 enums::{
56 BitmexAction, BitmexWsAuthAction, BitmexWsAuthChannel, BitmexWsOperation, BitmexWsTopic,
57 },
58 error::BitmexWsError,
59 messages::{
60 BitmexAuthentication, BitmexHttpRequest, BitmexSubscription, BitmexTableMessage,
61 BitmexWsMessage, NautilusWsMessage, OrderData,
62 },
63 parse::{
64 is_index_symbol, parse_book_msg_vec, parse_book10_msg_vec, parse_order_update_msg,
65 parse_trade_bin_msg_vec, parse_trade_msg_vec, parse_wallet_msg, topic_from_bar_spec,
66 },
67};
68use crate::{
69 common::{consts::BITMEX_WS_URL, credential::Credential, enums::BitmexExecType},
70 websocket::{
71 auth::{AUTHENTICATION_TIMEOUT_SECS, AuthResultReceiver, AuthTracker},
72 parse::{
73 parse_execution_msg, parse_funding_msg, parse_instrument_msg, parse_order_msg,
74 parse_position_msg,
75 },
76 subscription::SubscriptionState,
77 },
78};
79
80#[derive(Clone, Debug)]
88#[cfg_attr(
89 feature = "python",
90 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
91)]
92pub struct BitmexWebSocketClient {
93 url: String,
94 credential: Option<Credential>,
95 heartbeat: Option<u64>,
96 inner: Arc<RwLock<Option<WebSocketClient>>>,
97 rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>>,
98 signal: Arc<AtomicBool>,
99 task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
100 account_id: AccountId,
101 auth_tracker: AuthTracker,
102 subscriptions: SubscriptionState,
103 instruments_cache: Arc<AHashMap<Ustr, InstrumentAny>>,
104 order_type_cache: Arc<DashMap<ClientOrderId, OrderType>>,
105 order_symbol_cache: Arc<DashMap<ClientOrderId, Ustr>>,
106}
107
108impl BitmexWebSocketClient {
109 pub fn new(
115 url: Option<String>,
116 api_key: Option<String>,
117 api_secret: Option<String>,
118 account_id: Option<AccountId>,
119 heartbeat: Option<u64>,
120 ) -> anyhow::Result<Self> {
121 let credential = match (api_key, api_secret) {
122 (Some(key), Some(secret)) => Some(Credential::new(key, secret)),
123 (None, None) => None,
124 _ => anyhow::bail!("Both `api_key` and `api_secret` must be provided together"),
125 };
126
127 let account_id = account_id.unwrap_or(AccountId::from("BITMEX-master"));
128
129 Ok(Self {
130 url: url.unwrap_or(BITMEX_WS_URL.to_string()),
131 credential,
132 heartbeat,
133 inner: Arc::new(RwLock::new(None)),
134 rx: None,
135 signal: Arc::new(AtomicBool::new(false)),
136 task_handle: None,
137 account_id,
138 auth_tracker: AuthTracker::new(),
139 subscriptions: SubscriptionState::new(),
140 instruments_cache: Arc::new(AHashMap::new()),
141 order_type_cache: Arc::new(DashMap::new()),
142 order_symbol_cache: Arc::new(DashMap::new()),
143 })
144 }
145
146 pub fn from_env() -> anyhow::Result<Self> {
152 let url = get_env_var("BITMEX_WS_URL")?;
153 let api_key = get_env_var("BITMEX_API_KEY")?;
154 let api_secret = get_env_var("BITMEX_API_SECRET")?;
155
156 Self::new(Some(url), Some(api_key), Some(api_secret), None, None)
157 }
158
159 #[must_use]
161 pub const fn url(&self) -> &str {
162 self.url.as_str()
163 }
164
165 #[must_use]
167 pub fn api_key(&self) -> Option<&str> {
168 self.credential.as_ref().map(|c| c.api_key.as_str())
169 }
170
171 #[must_use]
173 pub fn is_active(&self) -> bool {
174 match self.inner.try_read() {
175 Ok(guard) => match &*guard {
176 Some(inner) => inner.is_active(),
177 None => false,
178 },
179 Err(_) => false,
180 }
181 }
182
183 #[must_use]
185 pub fn is_closed(&self) -> bool {
186 match self.inner.try_read() {
187 Ok(guard) => match &*guard {
188 Some(inner) => inner.is_closed(),
189 None => true,
190 },
191 Err(_) => true,
192 }
193 }
194
195 pub fn set_account_id(&mut self, account_id: AccountId) {
197 self.account_id = account_id;
198 }
199
200 pub fn initialize_instruments_cache(&mut self, instruments: Vec<InstrumentAny>) {
202 let mut instruments_cache: AHashMap<Ustr, InstrumentAny> = AHashMap::new();
203 for inst in instruments {
204 instruments_cache.insert(inst.symbol().inner(), inst.clone());
205 }
206
207 self.instruments_cache = Arc::new(instruments_cache);
208 }
209
210 pub async fn connect(&mut self) -> Result<(), BitmexWsError> {
220 let reader = self.connect_inner().await?;
221
222 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
223 self.rx = Some(Arc::new(rx));
224 let signal = self.signal.clone();
225
226 let account_id = self.account_id;
227 let inner_client = self.inner.clone();
228 let credential = self.credential.clone();
229 let auth_tracker = self.auth_tracker.clone();
230 let subscriptions = self.subscriptions.clone();
231 let instruments_cache = self.instruments_cache.clone();
232 let order_type_cache = self.order_type_cache.clone();
233 let order_symbol_cache = self.order_symbol_cache.clone();
234
235 let stream_handle = get_runtime().spawn(async move {
236 let mut handler = BitmexWsMessageHandler::new(
237 reader,
238 signal,
239 tx,
240 account_id,
241 auth_tracker.clone(),
242 subscriptions.clone(),
243 instruments_cache,
244 order_type_cache,
245 order_symbol_cache,
246 );
247
248 loop {
250 match handler.next().await {
251 Some(NautilusWsMessage::Reconnected) => {
252 log::info!("Reconnecting WebSocket");
253
254 let has_client = {
255 let guard = inner_client.read().await;
256 guard.is_some()
257 };
258
259 if !has_client {
260 log::warn!("Reconnection signaled but WebSocket client unavailable");
261 continue;
262 }
263
264 let confirmed = subscriptions.confirmed();
265 let pending = subscriptions.pending();
266 let mut restore_set: HashSet<String> = HashSet::new();
267
268 let mut collect_topics = |map: &DashMap<String, AHashSet<Ustr>>| {
269 for entry in map.iter() {
270 let (channel, symbols) = entry.pair();
271
272 if channel == BitmexWsTopic::Instrument.as_ref() {
273 continue;
274 }
275
276 if symbols.is_empty() {
277 restore_set.insert(channel.clone());
278 } else {
279 for symbol in symbols.iter() {
280 restore_set.insert(format!("{channel}:{symbol}"));
281 }
282 }
283 }
284 };
285
286 collect_topics(&confirmed);
287 collect_topics(&pending);
288
289 let mut topics_to_restore: Vec<String> = restore_set.into_iter().collect();
290 topics_to_restore.sort();
291
292 let auth_rx_opt = if let Some(cred) = &credential {
293 match BitmexWebSocketClient::issue_authentication_request(
294 &inner_client,
295 cred,
296 &auth_tracker,
297 )
298 .await
299 {
300 Ok(rx) => Some(rx),
301 Err(e) => {
302 log::error!(
303 "Failed to send re-authentication request after reconnection: {e}"
304 );
305 continue;
306 }
307 }
308 } else {
309 None
310 };
311
312 let inner_for_task = inner_client.clone();
313 let state_for_task = subscriptions.clone();
314 let auth_tracker_for_task = auth_tracker.clone();
315 let auth_rx_for_task = auth_rx_opt;
316 let topics_to_restore_clone = topics_to_restore.clone();
317 get_runtime().spawn(async move {
318 if let Some(rx) = auth_rx_for_task {
319 if let Err(e) = auth_tracker_for_task
320 .wait_for_result(
321 Duration::from_secs(AUTHENTICATION_TIMEOUT_SECS),
322 rx,
323 )
324 .await
325 {
326 log::error!("Authentication after reconnection failed: {e}");
327 return;
328 }
329 log::info!("Re-authenticated after reconnection");
330 }
331
332 let mut all_topics =
333 Vec::with_capacity(1 + topics_to_restore_clone.len());
334 all_topics.push(BitmexWsTopic::Instrument.as_ref().to_string());
335 all_topics.extend(topics_to_restore_clone.iter().cloned());
336
337 for topic in &all_topics {
338 state_for_task.mark_subscribe(topic.as_str());
339 }
340
341 if let Err(e) = BitmexWebSocketClient::send_topics(
342 &inner_for_task,
343 BitmexWsOperation::Subscribe,
344 all_topics.clone(),
345 )
346 .await
347 {
348 log::error!(
349 "Failed to restore subscriptions after reconnection: {e}"
350 );
351 } else {
353 log::info!(
354 "Restored {} subscriptions after reconnection",
355 all_topics.len()
356 );
357 }
358 });
359 }
360 Some(msg) => {
361 if let Err(e) = handler.tx.send(msg) {
362 tracing::error!("Error sending message: {e}");
363 break;
364 }
365 }
366 None => {
367 if handler.handler.signal.load(Ordering::Relaxed) {
369 tracing::debug!("Stop signal received, ending message processing");
370 break;
371 }
372 tracing::warn!("WebSocket stream ended unexpectedly");
374 break;
375 }
376 }
377 }
378 });
379
380 self.task_handle = Some(Arc::new(stream_handle));
381
382 if self.credential.is_some() {
383 self.authenticate().await?;
384 }
385
386 {
387 let inner_guard = self.inner.read().await;
388 if let Some(inner) = &*inner_guard {
389 self.subscriptions
390 .mark_subscribe(BitmexWsTopic::Instrument.as_ref());
391
392 let subscribe_msg = BitmexSubscription {
393 op: BitmexWsOperation::Subscribe,
394 args: vec![Ustr::from(BitmexWsTopic::Instrument.as_ref())],
395 };
396
397 match serde_json::to_string(&subscribe_msg) {
398 Ok(subscribe_json) => {
399 if let Err(e) = inner.send_text(subscribe_json, None).await {
400 log::error!("Failed to subscribe to instruments: {e}");
401 } else {
402 log::debug!("Subscribed to all instruments");
403 }
404 }
405 Err(e) => {
406 tracing::error!(error = %e, "Failed to serialize resubscribe message");
407 }
408 }
409 }
410 }
411
412 Ok(())
413 }
414
415 async fn connect_inner(
421 &mut self,
422 ) -> Result<tokio::sync::mpsc::UnboundedReceiver<Message>, BitmexWsError> {
423 let (message_handler, rx) = channel_message_handler();
424
425 let inner_for_ping = self.inner.clone();
426 let ping_handler: PingHandler = Arc::new(move |payload: Vec<u8>| {
427 let inner = inner_for_ping.clone();
428
429 get_runtime().spawn(async move {
430 let len = payload.len();
431 let guard = inner.read().await;
432
433 if let Some(client) = guard.as_ref() {
434 if let Err(err) = client.send_pong(payload).await {
435 tracing::warn!(error = %err, "Failed to send pong frame");
436 } else {
437 tracing::trace!("Sent pong frame ({len} bytes)");
438 }
439 } else {
440 tracing::debug!("Ping received with no active websocket client");
441 }
442 });
443 });
444
445 let config = WebSocketConfig {
446 url: self.url.clone(),
447 headers: vec![(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())],
448 heartbeat: self.heartbeat,
449 heartbeat_msg: None,
450 message_handler: Some(message_handler),
451 ping_handler: Some(ping_handler),
452 reconnect_timeout_ms: Some(5_000),
453 reconnect_delay_initial_ms: None, reconnect_delay_max_ms: None, reconnect_backoff_factor: None, reconnect_jitter_ms: None, };
458
459 let keyed_quotas = vec![];
460 let client = WebSocketClient::connect(
461 config,
462 None, keyed_quotas,
464 None, )
466 .await
467 .map_err(|e| BitmexWsError::ClientError(e.to_string()))?;
468
469 {
470 let mut inner_guard = self.inner.write().await;
471 *inner_guard = Some(client);
472 }
473
474 Ok(rx)
475 }
476
477 async fn issue_authentication_request(
478 inner: &Arc<RwLock<Option<WebSocketClient>>>,
479 credential: &Credential,
480 tracker: &AuthTracker,
481 ) -> Result<AuthResultReceiver, BitmexWsError> {
482 let receiver = tracker.begin();
483
484 let expires = (chrono::Utc::now() + chrono::Duration::seconds(30)).timestamp();
485 let signature = credential.sign("GET", "/realtime", expires, "");
486
487 let auth_message = BitmexAuthentication {
488 op: BitmexWsAuthAction::AuthKeyExpires,
489 args: (credential.api_key.to_string(), expires, signature),
490 };
491
492 let auth_json = serde_json::to_string(&auth_message).map_err(|e| {
493 let msg = format!("Failed to serialize auth message: {e}");
494 tracker.fail(msg.clone());
495 BitmexWsError::AuthenticationError(msg)
496 })?;
497
498 {
499 let inner_guard = inner.read().await;
500 let client = inner_guard.as_ref().ok_or_else(|| {
501 tracker.fail("Cannot authenticate: not connected");
502 BitmexWsError::AuthenticationError("Cannot authenticate: not connected".to_string())
503 })?;
504
505 client.send_text(auth_json, None).await.map_err(|e| {
506 let error = e.to_string();
507 tracker.fail(error.clone());
508 BitmexWsError::AuthenticationError(error)
509 })?;
510 }
511
512 Ok(receiver)
513 }
514
515 async fn authenticate(&self) -> Result<(), BitmexWsError> {
522 let credential = match &self.credential {
523 Some(credential) => credential,
524 None => {
525 return Err(BitmexWsError::AuthenticationError(
526 "API credentials not available to authenticate".to_string(),
527 ));
528 }
529 };
530
531 let rx =
532 Self::issue_authentication_request(&self.inner, credential, &self.auth_tracker).await?;
533 self.auth_tracker
534 .wait_for_result(Duration::from_secs(AUTHENTICATION_TIMEOUT_SECS), rx)
535 .await
536 }
537
538 pub async fn wait_until_active(&self, timeout_secs: f64) -> Result<(), BitmexWsError> {
544 let timeout = tokio::time::Duration::from_secs_f64(timeout_secs);
545
546 tokio::time::timeout(timeout, async {
547 while !self.is_active() {
548 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
549 }
550 })
551 .await
552 .map_err(|_| {
553 BitmexWsError::ClientError(format!(
554 "WebSocket connection timeout after {timeout_secs} seconds"
555 ))
556 })?;
557
558 Ok(())
559 }
560
561 pub fn stream(&mut self) -> impl Stream<Item = NautilusWsMessage> + use<> {
569 let rx = self
570 .rx
571 .take()
572 .expect("Stream receiver already taken or not connected");
573 let mut rx = Arc::try_unwrap(rx).expect("Cannot take ownership - other references exist");
574 async_stream::stream! {
575 while let Some(msg) = rx.recv().await {
576 yield msg;
577 }
578 }
579 }
580
581 pub async fn close(&mut self) -> Result<(), BitmexWsError> {
591 log::debug!("Starting close process");
592
593 self.signal.store(true, Ordering::Relaxed);
594
595 {
596 let inner_guard = self.inner.read().await;
597 if let Some(inner) = &*inner_guard {
598 log::debug!("Disconnecting websocket");
599
600 match tokio::time::timeout(Duration::from_secs(3), inner.disconnect()).await {
601 Ok(()) => log::debug!("Websocket disconnected successfully"),
602 Err(_) => {
603 log::warn!(
604 "Timeout waiting for websocket disconnect, continuing with cleanup"
605 );
606 }
607 }
608 } else {
609 log::debug!("No active connection to disconnect");
610 }
611 }
612
613 if let Some(task_handle) = self.task_handle.take() {
615 match Arc::try_unwrap(task_handle) {
616 Ok(handle) => {
617 log::debug!("Waiting for task handle to complete");
618 match tokio::time::timeout(Duration::from_secs(2), handle).await {
619 Ok(Ok(())) => log::debug!("Task handle completed successfully"),
620 Ok(Err(e)) => log::error!("Task handle encountered an error: {e:?}"),
621 Err(_) => {
622 log::warn!(
623 "Timeout waiting for task handle, task may still be running"
624 );
625 }
627 }
628 }
629 Err(arc_handle) => {
630 log::debug!(
631 "Cannot take ownership of task handle - other references exist, aborting task"
632 );
633 arc_handle.abort();
634 }
635 }
636 } else {
637 log::debug!("No task handle to await");
638 }
639
640 log::debug!("Closed");
641
642 Ok(())
643 }
644
645 async fn send_topics(
646 inner: &Arc<RwLock<Option<WebSocketClient>>>,
647 op: BitmexWsOperation,
648 topics: Vec<String>,
649 ) -> Result<(), BitmexWsError> {
650 if topics.is_empty() {
651 return Ok(());
652 }
653
654 let message = BitmexSubscription {
655 op,
656 args: topics
657 .iter()
658 .map(|topic| Ustr::from(topic.as_str()))
659 .collect(),
660 };
661
662 let op_name = message.op.as_ref().to_string();
663 let payload = serde_json::to_string(&message).map_err(|e| {
664 BitmexWsError::SubscriptionError(format!("Failed to serialize {op_name} message: {e}"))
665 })?;
666
667 let inner_guard = inner.read().await;
668 if let Some(client) = &*inner_guard {
669 client
670 .send_text(payload, None)
671 .await
672 .map_err(|e| BitmexWsError::SubscriptionError(e.to_string()))?;
673 } else {
674 log::error!("Cannot send {op_name} message: not connected");
675 }
676
677 Ok(())
678 }
679
680 pub async fn subscribe(&self, topics: Vec<String>) -> Result<(), BitmexWsError> {
690 log::debug!("Subscribing to topics: {topics:?}");
691
692 for topic in &topics {
693 self.subscriptions.mark_subscribe(topic.as_str());
694 }
695
696 Self::send_topics(&self.inner, BitmexWsOperation::Subscribe, topics).await
697 }
698
699 async fn unsubscribe(&self, topics: Vec<String>) -> Result<(), BitmexWsError> {
705 log::debug!("Attempting to unsubscribe from topics: {topics:?}");
706
707 if self.signal.load(Ordering::Relaxed) {
708 log::debug!("Shutdown signal detected, skipping unsubscribe");
709 return Ok(());
710 }
711
712 for topic in &topics {
713 self.subscriptions.mark_unsubscribe(topic.as_str());
714 }
715
716 let result = Self::send_topics(&self.inner, BitmexWsOperation::Unsubscribe, topics).await;
717 if let Err(e) = result {
718 tracing::debug!(error = %e, "Failed to send unsubscribe message");
719 }
720 Ok(())
721 }
722
723 #[must_use]
725 pub fn subscription_count(&self) -> usize {
726 self.subscriptions.len()
727 }
728
729 pub fn get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
730 let symbol = instrument_id.symbol.inner();
731 let confirmed = self.subscriptions.confirmed();
732 let mut channels = Vec::with_capacity(confirmed.len());
733
734 for entry in confirmed.iter() {
735 let (channel, symbols) = entry.pair();
736 if symbols.contains(&symbol) {
737 channels.push(format!("{channel}:{symbol}"));
739 } else if symbols.is_empty()
740 && (channel == BitmexWsAuthChannel::Execution.as_ref()
741 || channel == BitmexWsAuthChannel::Order.as_ref())
742 {
743 channels.push(channel.clone());
745 }
746 }
747
748 channels
749 }
750
751 pub async fn subscribe_instruments(&self) -> Result<(), BitmexWsError> {
757 log::debug!("Already subscribed to all instruments on connection, skipping");
759 Ok(())
760 }
761
762 pub async fn subscribe_instrument(
768 &self,
769 instrument_id: InstrumentId,
770 ) -> Result<(), BitmexWsError> {
771 log::debug!(
773 "Already subscribed to all instruments on connection (includes {instrument_id}), skipping"
774 );
775 Ok(())
776 }
777
778 pub async fn subscribe_book(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
784 let topic = BitmexWsTopic::OrderBookL2;
785 let symbol = instrument_id.symbol.as_str();
786 self.subscribe(vec![format!("{topic}:{symbol}")]).await
787 }
788
789 pub async fn subscribe_book_25(
795 &self,
796 instrument_id: InstrumentId,
797 ) -> Result<(), BitmexWsError> {
798 let topic = BitmexWsTopic::OrderBookL2_25;
799 let symbol = instrument_id.symbol.as_str();
800 self.subscribe(vec![format!("{topic}:{symbol}")]).await
801 }
802
803 pub async fn subscribe_book_depth10(
809 &self,
810 instrument_id: InstrumentId,
811 ) -> Result<(), BitmexWsError> {
812 let topic = BitmexWsTopic::OrderBook10;
813 let symbol = instrument_id.symbol.as_str();
814 self.subscribe(vec![format!("{topic}:{symbol}")]).await
815 }
816
817 pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
825 let symbol = instrument_id.symbol.inner();
826
827 if is_index_symbol(&instrument_id.symbol.inner()) {
829 tracing::warn!("Ignoring quote subscription for index symbol: {symbol}");
830 return Ok(());
831 }
832
833 let topic = BitmexWsTopic::Quote;
834 self.subscribe(vec![format!("{topic}:{symbol}")]).await
835 }
836
837 pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
845 let symbol = instrument_id.symbol.inner();
846
847 if is_index_symbol(&symbol) {
849 tracing::warn!("Ignoring trade subscription for index symbol: {symbol}");
850 return Ok(());
851 }
852
853 let topic = BitmexWsTopic::Trade;
854 self.subscribe(vec![format!("{topic}:{symbol}")]).await
855 }
856
857 pub async fn subscribe_mark_prices(
863 &self,
864 instrument_id: InstrumentId,
865 ) -> Result<(), BitmexWsError> {
866 self.subscribe_instrument(instrument_id).await
867 }
868
869 pub async fn subscribe_index_prices(
875 &self,
876 instrument_id: InstrumentId,
877 ) -> Result<(), BitmexWsError> {
878 self.subscribe_instrument(instrument_id).await
879 }
880
881 pub async fn subscribe_funding_rates(
887 &self,
888 instrument_id: InstrumentId,
889 ) -> Result<(), BitmexWsError> {
890 let topic = BitmexWsTopic::Funding;
891 let symbol = instrument_id.symbol.as_str();
892 self.subscribe(vec![format!("{topic}:{symbol}")]).await
893 }
894
895 pub async fn subscribe_bars(&self, bar_type: BarType) -> Result<(), BitmexWsError> {
901 let topic = topic_from_bar_spec(bar_type.spec());
902 let symbol = bar_type.instrument_id().symbol.to_string();
903 self.subscribe(vec![format!("{topic}:{symbol}")]).await
904 }
905
906 pub async fn unsubscribe_instruments(&self) -> Result<(), BitmexWsError> {
912 log::debug!(
914 "Instruments subscription maintained for proper operation, skipping unsubscribe"
915 );
916 Ok(())
917 }
918
919 pub async fn unsubscribe_instrument(
925 &self,
926 instrument_id: InstrumentId,
927 ) -> Result<(), BitmexWsError> {
928 log::debug!(
930 "Instruments subscription maintained for proper operation (includes {instrument_id}), skipping unsubscribe"
931 );
932 Ok(())
933 }
934
935 pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
941 let topic = BitmexWsTopic::OrderBookL2;
942 let symbol = instrument_id.symbol.as_str();
943 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
944 }
945
946 pub async fn unsubscribe_book_25(
952 &self,
953 instrument_id: InstrumentId,
954 ) -> Result<(), BitmexWsError> {
955 let topic = BitmexWsTopic::OrderBookL2_25;
956 let symbol = instrument_id.symbol.as_str();
957 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
958 }
959
960 pub async fn unsubscribe_book_depth10(
966 &self,
967 instrument_id: InstrumentId,
968 ) -> Result<(), BitmexWsError> {
969 let topic = BitmexWsTopic::OrderBook10;
970 let symbol = instrument_id.symbol.as_str();
971 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
972 }
973
974 pub async fn unsubscribe_quotes(
980 &self,
981 instrument_id: InstrumentId,
982 ) -> Result<(), BitmexWsError> {
983 let symbol = instrument_id.symbol.inner();
984
985 if is_index_symbol(&symbol) {
987 return Ok(());
988 }
989
990 let topic = BitmexWsTopic::Quote;
991 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
992 }
993
994 pub async fn unsubscribe_trades(
1000 &self,
1001 instrument_id: InstrumentId,
1002 ) -> Result<(), BitmexWsError> {
1003 let symbol = instrument_id.symbol.inner();
1004
1005 if is_index_symbol(&symbol) {
1007 return Ok(());
1008 }
1009
1010 let topic = BitmexWsTopic::Trade;
1011 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1012 }
1013
1014 pub async fn unsubscribe_mark_prices(
1020 &self,
1021 instrument_id: InstrumentId,
1022 ) -> Result<(), BitmexWsError> {
1023 log::debug!(
1025 "Mark prices for {instrument_id} uses shared instrument channel, skipping unsubscribe"
1026 );
1027 Ok(())
1028 }
1029
1030 pub async fn unsubscribe_index_prices(
1036 &self,
1037 instrument_id: InstrumentId,
1038 ) -> Result<(), BitmexWsError> {
1039 log::debug!(
1041 "Index prices for {instrument_id} uses shared instrument channel, skipping unsubscribe"
1042 );
1043 Ok(())
1044 }
1045
1046 pub async fn unsubscribe_funding_rates(
1052 &self,
1053 instrument_id: InstrumentId,
1054 ) -> Result<(), BitmexWsError> {
1055 log::debug!(
1057 "Funding rates for {instrument_id}, skipping unsubscribe to avoid shutdown race"
1058 );
1059 Ok(())
1060 }
1061
1062 pub async fn unsubscribe_bars(&self, bar_type: BarType) -> Result<(), BitmexWsError> {
1068 let topic = topic_from_bar_spec(bar_type.spec());
1069 let symbol = bar_type.instrument_id().symbol.to_string();
1070 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1071 }
1072
1073 pub async fn subscribe_orders(&self) -> Result<(), BitmexWsError> {
1079 if self.credential.is_none() {
1080 return Err(BitmexWsError::MissingCredentials);
1081 }
1082 self.subscribe(vec![BitmexWsAuthChannel::Order.to_string()])
1083 .await
1084 }
1085
1086 pub async fn subscribe_executions(&self) -> Result<(), BitmexWsError> {
1092 if self.credential.is_none() {
1093 return Err(BitmexWsError::MissingCredentials);
1094 }
1095 self.subscribe(vec![BitmexWsAuthChannel::Execution.to_string()])
1096 .await
1097 }
1098
1099 pub async fn subscribe_positions(&self) -> Result<(), BitmexWsError> {
1105 if self.credential.is_none() {
1106 return Err(BitmexWsError::MissingCredentials);
1107 }
1108 self.subscribe(vec![BitmexWsAuthChannel::Position.to_string()])
1109 .await
1110 }
1111
1112 pub async fn subscribe_margin(&self) -> Result<(), BitmexWsError> {
1118 if self.credential.is_none() {
1119 return Err(BitmexWsError::MissingCredentials);
1120 }
1121 self.subscribe(vec![BitmexWsAuthChannel::Margin.to_string()])
1122 .await
1123 }
1124
1125 pub async fn subscribe_wallet(&self) -> Result<(), BitmexWsError> {
1131 if self.credential.is_none() {
1132 return Err(BitmexWsError::MissingCredentials);
1133 }
1134 self.subscribe(vec![BitmexWsAuthChannel::Wallet.to_string()])
1135 .await
1136 }
1137
1138 pub async fn unsubscribe_orders(&self) -> Result<(), BitmexWsError> {
1144 self.unsubscribe(vec![BitmexWsAuthChannel::Order.to_string()])
1145 .await
1146 }
1147
1148 pub async fn unsubscribe_executions(&self) -> Result<(), BitmexWsError> {
1154 self.unsubscribe(vec![BitmexWsAuthChannel::Execution.to_string()])
1155 .await
1156 }
1157
1158 pub async fn unsubscribe_positions(&self) -> Result<(), BitmexWsError> {
1164 self.unsubscribe(vec![BitmexWsAuthChannel::Position.to_string()])
1165 .await
1166 }
1167
1168 pub async fn unsubscribe_margin(&self) -> Result<(), BitmexWsError> {
1174 self.unsubscribe(vec![BitmexWsAuthChannel::Margin.to_string()])
1175 .await
1176 }
1177
1178 pub async fn unsubscribe_wallet(&self) -> Result<(), BitmexWsError> {
1184 self.unsubscribe(vec![BitmexWsAuthChannel::Wallet.to_string()])
1185 .await
1186 }
1187}
1188
1189struct BitmexFeedHandler {
1190 receiver: tokio::sync::mpsc::UnboundedReceiver<Message>,
1191 signal: Arc<AtomicBool>,
1192}
1193
1194impl BitmexFeedHandler {
1195 pub fn new(
1197 receiver: tokio::sync::mpsc::UnboundedReceiver<Message>,
1198 signal: Arc<AtomicBool>,
1199 ) -> Self {
1200 Self { receiver, signal }
1201 }
1202
1203 async fn next(&mut self) -> Option<BitmexWsMessage> {
1205 loop {
1206 tokio::select! {
1207 msg = self.receiver.recv() => match msg {
1208 Some(msg) => match msg {
1209 Message::Text(text) => {
1210 if text == RECONNECTED {
1211 tracing::info!("Received WebSocket reconnection signal");
1212 return Some(BitmexWsMessage::Reconnected);
1213 }
1214
1215 tracing::trace!("Raw websocket message: {text}");
1216
1217 if Self::is_heartbeat_message(&text) {
1218 tracing::trace!(
1219 "Ignoring heartbeat control message: {text}"
1220 );
1221 continue;
1222 }
1223
1224 match serde_json::from_str(&text) {
1225 Ok(msg) => match &msg {
1226 BitmexWsMessage::Welcome {
1227 version,
1228 heartbeat_enabled,
1229 limit,
1230 ..
1231 } => {
1232 tracing::info!(
1233 version = version,
1234 heartbeat = heartbeat_enabled,
1235 rate_limit = ?limit.remaining,
1236 "Welcome to the BitMEX Realtime API:",
1237 );
1238 }
1239 BitmexWsMessage::Subscription { .. } => return Some(msg),
1240 BitmexWsMessage::Error { status, error, .. } => {
1241 tracing::error!(
1242 status = status,
1243 error = error,
1244 "Received error from BitMEX"
1245 );
1246 }
1247 _ => return Some(msg),
1248 },
1249 Err(e) => {
1250 tracing::error!("Failed to parse WebSocket message: {e}: {text}");
1251 }
1252 }
1253 }
1254 Message::Binary(msg) => {
1255 tracing::debug!("Raw binary: {msg:?}");
1256 }
1257 Message::Close(_) => {
1258 tracing::debug!("Received close message, waiting for reconnection");
1259 continue;
1260 }
1261 msg => match msg {
1262 Message::Ping(data) => {
1263 tracing::trace!("Received ping frame with {} bytes", data.len());
1264 }
1265 Message::Pong(data) => {
1266 tracing::trace!("Received pong frame with {} bytes", data.len());
1267 }
1268 Message::Frame(frame) => {
1269 tracing::debug!("Received raw frame: {frame:?}");
1270 }
1271 _ => {
1272 tracing::warn!("Unexpected message type: {msg:?}");
1273 }
1274 },
1275 }
1276 None => {
1277 tracing::info!("WebSocket stream closed");
1278 return None;
1279 }
1280 },
1281 _ = tokio::time::sleep(Duration::from_millis(1)) => {
1282 if self.signal.load(std::sync::atomic::Ordering::Relaxed) {
1283 tracing::debug!("Stop signal received");
1284 return None;
1285 }
1286 }
1287 }
1288 }
1289 }
1290
1291 fn is_heartbeat_message(text: &str) -> bool {
1292 let trimmed = text.trim();
1293
1294 if !trimmed.starts_with('{') || trimmed.len() > 64 {
1295 return false;
1296 }
1297
1298 trimmed.contains("\"op\":\"ping\"") || trimmed.contains("\"op\":\"pong\"")
1299 }
1300}
1301
1302struct BitmexWsMessageHandler {
1303 handler: BitmexFeedHandler,
1304 tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
1305 #[allow(
1306 dead_code,
1307 reason = "May be needed for future account-specific processing"
1308 )]
1309 account_id: AccountId,
1310 auth_tracker: AuthTracker,
1311 subscriptions: SubscriptionState,
1312 instruments_cache: Arc<AHashMap<Ustr, InstrumentAny>>,
1313 order_type_cache: Arc<DashMap<ClientOrderId, OrderType>>,
1314 order_symbol_cache: Arc<DashMap<ClientOrderId, Ustr>>,
1315 quote_cache: QuoteCache,
1316}
1317
1318impl BitmexWsMessageHandler {
1319 #[allow(clippy::too_many_arguments)]
1321 pub fn new(
1322 receiver: tokio::sync::mpsc::UnboundedReceiver<Message>,
1323 signal: Arc<AtomicBool>,
1324 tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
1325 account_id: AccountId,
1326 auth_tracker: AuthTracker,
1327 subscriptions: SubscriptionState,
1328 instruments_cache: Arc<AHashMap<Ustr, InstrumentAny>>,
1329 order_type_cache: Arc<DashMap<ClientOrderId, OrderType>>,
1330 order_symbol_cache: Arc<DashMap<ClientOrderId, Ustr>>,
1331 ) -> Self {
1332 let handler = BitmexFeedHandler::new(receiver, signal);
1333 Self {
1334 handler,
1335 tx,
1336 account_id,
1337 auth_tracker,
1338 subscriptions,
1339 instruments_cache,
1340 order_type_cache,
1341 order_symbol_cache,
1342 quote_cache: QuoteCache::new(),
1343 }
1344 }
1345
1346 #[inline]
1349 fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
1350 self.instruments_cache.get(symbol).cloned()
1351 }
1352
1353 async fn next(&mut self) -> Option<NautilusWsMessage> {
1354 let clock = get_atomic_clock_realtime();
1355
1356 while let Some(msg) = self.handler.next().await {
1357 match msg {
1358 BitmexWsMessage::Reconnected => {
1359 self.quote_cache.clear();
1361 return Some(NautilusWsMessage::Reconnected);
1362 }
1363 BitmexWsMessage::Subscription {
1364 success,
1365 subscribe,
1366 request,
1367 error,
1368 } => {
1369 self.handle_subscription_message(
1370 success,
1371 subscribe.as_ref(),
1372 request.as_ref(),
1373 error.as_deref(),
1374 );
1375 continue;
1376 }
1377 BitmexWsMessage::Table(table_msg) => {
1378 let ts_init = clock.get_time_ns();
1379
1380 return Some(match table_msg {
1381 BitmexTableMessage::OrderBookL2 { action, data } => {
1382 if data.is_empty() {
1383 continue;
1384 }
1385 let data = parse_book_msg_vec(
1386 data,
1387 action,
1388 self.instruments_cache.as_ref(),
1389 ts_init,
1390 );
1391
1392 NautilusWsMessage::Data(data)
1393 }
1394 BitmexTableMessage::OrderBookL2_25 { action, data } => {
1395 if data.is_empty() {
1396 continue;
1397 }
1398 let data = parse_book_msg_vec(
1399 data,
1400 action,
1401 self.instruments_cache.as_ref(),
1402 ts_init,
1403 );
1404
1405 NautilusWsMessage::Data(data)
1406 }
1407 BitmexTableMessage::OrderBook10 { data, .. } => {
1408 if data.is_empty() {
1409 continue;
1410 }
1411 let data = parse_book10_msg_vec(
1412 data,
1413 self.instruments_cache.as_ref(),
1414 ts_init,
1415 );
1416
1417 NautilusWsMessage::Data(data)
1418 }
1419 BitmexTableMessage::Quote { mut data, .. } => {
1420 if data.is_empty() {
1422 continue;
1423 }
1424
1425 let msg = data.remove(0);
1426 let Some(instrument) = self.get_instrument(&msg.symbol) else {
1427 tracing::error!(
1428 symbol = %msg.symbol,
1429 "Instrument not found for quote message"
1430 );
1431 continue;
1432 };
1433
1434 if let Some(quote) =
1435 self.quote_cache.process(&msg, &instrument, ts_init)
1436 {
1437 NautilusWsMessage::Data(vec![Data::Quote(quote)])
1438 } else {
1439 continue;
1440 }
1441 }
1442 BitmexTableMessage::Trade { data, .. } => {
1443 if data.is_empty() {
1444 continue;
1445 }
1446 let data =
1447 parse_trade_msg_vec(data, self.instruments_cache.as_ref(), ts_init);
1448
1449 NautilusWsMessage::Data(data)
1450 }
1451 BitmexTableMessage::TradeBin1m { action, data } => {
1452 if action == BitmexAction::Partial || data.is_empty() {
1453 continue;
1454 }
1455 let data = parse_trade_bin_msg_vec(
1456 data,
1457 BitmexWsTopic::TradeBin1m,
1458 self.instruments_cache.as_ref(),
1459 ts_init,
1460 );
1461
1462 NautilusWsMessage::Data(data)
1463 }
1464 BitmexTableMessage::TradeBin5m { action, data } => {
1465 if action == BitmexAction::Partial || data.is_empty() {
1466 continue;
1467 }
1468 let data = parse_trade_bin_msg_vec(
1469 data,
1470 BitmexWsTopic::TradeBin5m,
1471 self.instruments_cache.as_ref(),
1472 ts_init,
1473 );
1474
1475 NautilusWsMessage::Data(data)
1476 }
1477 BitmexTableMessage::TradeBin1h { action, data } => {
1478 if action == BitmexAction::Partial || data.is_empty() {
1479 continue;
1480 }
1481 let data = parse_trade_bin_msg_vec(
1482 data,
1483 BitmexWsTopic::TradeBin1h,
1484 self.instruments_cache.as_ref(),
1485 ts_init,
1486 );
1487
1488 NautilusWsMessage::Data(data)
1489 }
1490 BitmexTableMessage::TradeBin1d { action, data } => {
1491 if action == BitmexAction::Partial || data.is_empty() {
1492 continue;
1493 }
1494 let data = parse_trade_bin_msg_vec(
1495 data,
1496 BitmexWsTopic::TradeBin1d,
1497 self.instruments_cache.as_ref(),
1498 ts_init,
1499 );
1500
1501 NautilusWsMessage::Data(data)
1502 }
1503 BitmexTableMessage::Order { data, .. } => {
1507 let mut reports = Vec::with_capacity(data.len());
1509
1510 for order_data in data {
1511 match order_data {
1512 OrderData::Full(order_msg) => {
1513 let Some(instrument) =
1514 self.get_instrument(&order_msg.symbol)
1515 else {
1516 tracing::error!(
1517 symbol = %order_msg.symbol,
1518 "Instrument not found for order message"
1519 );
1520 continue;
1521 };
1522
1523 match parse_order_msg(
1524 &order_msg,
1525 &instrument,
1526 &self.order_type_cache,
1527 ) {
1528 Ok(report) => {
1529 if let Some(client_order_id) = &order_msg.cl_ord_id
1531 {
1532 let client_order_id =
1533 ClientOrderId::new(client_order_id);
1534
1535 if let Some(ord_type) = &order_msg.ord_type {
1536 let order_type: OrderType =
1537 (*ord_type).into();
1538 self.order_type_cache
1539 .insert(client_order_id, order_type);
1540 }
1541
1542 self.order_symbol_cache
1544 .insert(client_order_id, order_msg.symbol);
1545 }
1546
1547 if is_terminal_order_status(report.order_status)
1548 && let Some(client_id) = report.client_order_id
1549 {
1550 self.order_type_cache.remove(&client_id);
1551 self.order_symbol_cache.remove(&client_id);
1552 }
1553
1554 reports.push(report);
1555 }
1556 Err(e) => {
1557 tracing::error!(
1558 error = %e,
1559 symbol = %order_msg.symbol,
1560 order_id = %order_msg.order_id,
1561 time_in_force = ?order_msg.time_in_force,
1562 "Failed to parse full order message - potential data loss"
1563 );
1564 continue;
1566 }
1567 }
1568 }
1569 OrderData::Update(msg) => {
1570 let Some(instrument) = self.get_instrument(&msg.symbol)
1571 else {
1572 tracing::error!(
1573 symbol = %msg.symbol,
1574 "Instrument not found for order update"
1575 );
1576 continue;
1577 };
1578
1579 if let Some(cl_ord_id) = &msg.cl_ord_id {
1581 let client_order_id = ClientOrderId::new(cl_ord_id);
1582 self.order_symbol_cache
1583 .insert(client_order_id, msg.symbol);
1584 }
1585
1586 if let Some(event) = parse_order_update_msg(
1587 &msg,
1588 &instrument,
1589 self.account_id,
1590 ) {
1591 return Some(NautilusWsMessage::OrderUpdated(event));
1592 } else {
1593 tracing::warn!(
1594 order_id = %msg.order_id,
1595 price = ?msg.price,
1596 "Skipped order update message (insufficient data)"
1597 );
1598 }
1599 }
1600 }
1601 }
1602
1603 if reports.is_empty() {
1604 continue;
1605 }
1606
1607 NautilusWsMessage::OrderStatusReports(reports)
1608 }
1609 BitmexTableMessage::Execution { data, .. } => {
1610 let mut fills = Vec::with_capacity(data.len());
1611
1612 for exec_msg in data {
1613 let symbol_opt = if let Some(sym) = &exec_msg.symbol {
1615 Some(*sym)
1616 } else if let Some(cl_ord_id) = &exec_msg.cl_ord_id {
1617 let client_order_id = ClientOrderId::new(cl_ord_id);
1619 self.order_symbol_cache
1620 .get(&client_order_id)
1621 .map(|r| *r.value())
1622 } else {
1623 None
1624 };
1625
1626 let Some(symbol) = symbol_opt else {
1627 if let Some(cl_ord_id) = &exec_msg.cl_ord_id {
1629 if exec_msg.exec_type == Some(BitmexExecType::Trade) {
1630 tracing::warn!(
1631 cl_ord_id = %cl_ord_id,
1632 exec_id = ?exec_msg.exec_id,
1633 ord_rej_reason = ?exec_msg.ord_rej_reason,
1634 text = ?exec_msg.text,
1635 "Execution message missing symbol and not found in cache"
1636 );
1637 } else {
1638 tracing::debug!(
1639 cl_ord_id = %cl_ord_id,
1640 exec_id = ?exec_msg.exec_id,
1641 exec_type = ?exec_msg.exec_type,
1642 ord_rej_reason = ?exec_msg.ord_rej_reason,
1643 text = ?exec_msg.text,
1644 "Execution message missing symbol and not found in cache"
1645 );
1646 }
1647 } else {
1648 if exec_msg.exec_type == Some(BitmexExecType::CancelReject)
1652 {
1653 tracing::debug!(
1654 exec_id = ?exec_msg.exec_id,
1655 order_id = ?exec_msg.order_id,
1656 "CancelReject message missing symbol/clOrdID (expected with redundant cancels)"
1657 );
1658 } else {
1659 tracing::warn!(
1660 exec_id = ?exec_msg.exec_id,
1661 order_id = ?exec_msg.order_id,
1662 exec_type = ?exec_msg.exec_type,
1663 ord_rej_reason = ?exec_msg.ord_rej_reason,
1664 text = ?exec_msg.text,
1665 "Execution message missing both symbol and clOrdID, cannot process"
1666 );
1667 }
1668 }
1669 continue;
1670 };
1671
1672 let Some(instrument) = self.get_instrument(&symbol) else {
1673 tracing::error!(
1674 symbol = %symbol,
1675 exec_id = ?exec_msg.exec_id,
1676 "Instrument not found for execution message"
1677 );
1678 continue;
1679 };
1680
1681 if let Some(fill) = parse_execution_msg(exec_msg, &instrument) {
1682 fills.push(fill);
1683 }
1684 }
1685
1686 if fills.is_empty() {
1687 continue;
1688 }
1689 NautilusWsMessage::FillReports(fills)
1690 }
1691 BitmexTableMessage::Position { data, .. } => {
1692 if let Some(pos_msg) = data.into_iter().next() {
1693 let Some(instrument) = self.get_instrument(&pos_msg.symbol) else {
1694 tracing::error!(
1695 symbol = %pos_msg.symbol,
1696 "Instrument not found for position message"
1697 );
1698 continue;
1699 };
1700 let report = parse_position_msg(pos_msg, &instrument);
1701 NautilusWsMessage::PositionStatusReport(report)
1702 } else {
1703 continue;
1704 }
1705 }
1706 BitmexTableMessage::Wallet { data, .. } => {
1707 if let Some(wallet_msg) = data.into_iter().next() {
1708 let account_state = parse_wallet_msg(wallet_msg, ts_init);
1709 NautilusWsMessage::AccountState(account_state)
1710 } else {
1711 continue;
1712 }
1713 }
1714 BitmexTableMessage::Margin { .. } => {
1715 continue;
1718 }
1719 BitmexTableMessage::Instrument { data, .. } => {
1720 let ts_init = clock.get_time_ns();
1721 let mut data_msgs = Vec::with_capacity(data.len());
1722
1723 for msg in data {
1724 let parsed =
1725 parse_instrument_msg(msg, &self.instruments_cache, ts_init);
1726 data_msgs.extend(parsed);
1727 }
1728
1729 if data_msgs.is_empty() {
1730 continue;
1731 }
1732 NautilusWsMessage::Data(data_msgs)
1733 }
1734 BitmexTableMessage::Funding { data, .. } => {
1735 let ts_init = clock.get_time_ns();
1736 let mut funding_updates = Vec::with_capacity(data.len());
1737
1738 for msg in data {
1739 if let Some(parsed) = parse_funding_msg(msg, ts_init) {
1740 funding_updates.push(parsed);
1741 }
1742 }
1743
1744 if !funding_updates.is_empty() {
1745 NautilusWsMessage::FundingRateUpdates(funding_updates)
1746 } else {
1747 continue;
1748 }
1749 }
1750 _ => {
1751 tracing::warn!("Unhandled table message type: {table_msg:?}");
1753 continue;
1754 }
1755 });
1756 }
1757 BitmexWsMessage::Welcome { .. } | BitmexWsMessage::Error { .. } => continue,
1758 }
1759 }
1760
1761 None
1762 }
1763
1764 fn handle_subscription_message(
1765 &self,
1766 success: bool,
1767 subscribe: Option<&String>,
1768 request: Option<&BitmexHttpRequest>,
1769 error: Option<&str>,
1770 ) {
1771 if let Some(req) = request {
1772 if req
1773 .op
1774 .eq_ignore_ascii_case(BitmexWsAuthAction::AuthKeyExpires.as_ref())
1775 {
1776 if success {
1777 tracing::info!("Authenticated BitMEX WebSocket session");
1778 self.auth_tracker.succeed();
1779 } else {
1780 let reason = error.unwrap_or("Authentication rejected").to_string();
1781 tracing::error!(error = %reason, "Authentication failed");
1782 self.auth_tracker.fail(reason);
1783 }
1784 return;
1785 }
1786
1787 if req
1788 .op
1789 .eq_ignore_ascii_case(BitmexWsOperation::Subscribe.as_ref())
1790 {
1791 self.handle_subscription_ack(success, request, subscribe, error);
1792 return;
1793 }
1794
1795 if req
1796 .op
1797 .eq_ignore_ascii_case(BitmexWsOperation::Unsubscribe.as_ref())
1798 {
1799 self.handle_unsubscribe_ack(success, request, subscribe, error);
1800 return;
1801 }
1802 }
1803
1804 if subscribe.is_some() {
1805 self.handle_subscription_ack(success, request, subscribe, error);
1806 return;
1807 }
1808
1809 if let Some(error) = error {
1810 tracing::warn!(
1811 success = success,
1812 error = error,
1813 "Unhandled subscription control message"
1814 );
1815 }
1816 }
1817
1818 fn handle_subscription_ack(
1819 &self,
1820 success: bool,
1821 request: Option<&BitmexHttpRequest>,
1822 subscribe: Option<&String>,
1823 error: Option<&str>,
1824 ) {
1825 let topics = Self::topics_from_request(request, subscribe);
1826
1827 if topics.is_empty() {
1828 tracing::debug!("Subscription acknowledgement without topics");
1829 return;
1830 }
1831
1832 for topic in topics {
1833 if success {
1834 self.subscriptions.confirm(topic);
1835 tracing::debug!(topic = topic, "Subscription confirmed");
1836 } else {
1837 self.subscriptions.mark_failure(topic);
1838 let reason = error.unwrap_or("Subscription rejected");
1839 tracing::error!(topic = topic, error = reason, "Subscription failed");
1840 }
1841 }
1842 }
1843
1844 fn handle_unsubscribe_ack(
1845 &self,
1846 success: bool,
1847 request: Option<&BitmexHttpRequest>,
1848 subscribe: Option<&String>,
1849 error: Option<&str>,
1850 ) {
1851 let topics = Self::topics_from_request(request, subscribe);
1852
1853 if topics.is_empty() {
1854 tracing::debug!("Unsubscription acknowledgement without topics");
1855 return;
1856 }
1857
1858 for topic in topics {
1859 if success {
1860 tracing::debug!(topic = topic, "Unsubscription confirmed");
1861 self.subscriptions.clear_pending(topic);
1862 } else {
1863 let reason = error.unwrap_or("Unsubscription rejected");
1864 tracing::error!(topic = topic, error = reason, "Unsubscription failed");
1865 self.subscriptions.confirm(topic);
1866 }
1867 }
1868 }
1869
1870 fn topics_from_request<'a>(
1871 request: Option<&'a BitmexHttpRequest>,
1872 fallback: Option<&'a String>,
1873 ) -> Vec<&'a str> {
1874 if let Some(req) = request
1875 && !req.args.is_empty()
1876 {
1877 return req.args.iter().filter_map(|arg| arg.as_str()).collect();
1878 }
1879
1880 fallback.into_iter().map(|topic| topic.as_str()).collect()
1881 }
1882}
1883
1884fn is_terminal_order_status(status: OrderStatus) -> bool {
1885 matches!(
1886 status,
1887 OrderStatus::Canceled | OrderStatus::Expired | OrderStatus::Rejected | OrderStatus::Filled,
1888 )
1889}
1890
1891#[cfg(test)]
1896mod tests {
1897 use ahash::AHashSet;
1898 use rstest::rstest;
1899 use ustr::Ustr;
1900
1901 use super::*;
1902
1903 #[test]
1904 fn test_is_heartbeat_message_detection() {
1905 assert!(BitmexFeedHandler::is_heartbeat_message("{\"op\":\"ping\"}"));
1906 assert!(BitmexFeedHandler::is_heartbeat_message("{\"op\":\"pong\"}"));
1907 assert!(!BitmexFeedHandler::is_heartbeat_message(
1908 "{\"op\":\"subscribe\",\"args\":[\"trade:XBTUSD\"]}"
1909 ));
1910 }
1911
1912 #[rstest]
1913 fn test_reconnect_topics_restoration_logic() {
1914 let client = BitmexWebSocketClient::new(
1916 Some("ws://test.com".to_string()),
1917 Some("test_key".to_string()),
1918 Some("test_secret".to_string()),
1919 Some(AccountId::new("BITMEX-TEST")),
1920 None,
1921 )
1922 .unwrap();
1923
1924 let subs = client.subscriptions.confirmed();
1926 subs.insert(BitmexWsTopic::Trade.as_ref().to_string(), {
1927 let mut set = AHashSet::new();
1928 set.insert(Ustr::from("XBTUSD"));
1929 set.insert(Ustr::from("ETHUSD"));
1930 set
1931 });
1932
1933 subs.insert(BitmexWsTopic::OrderBookL2.as_ref().to_string(), {
1934 let mut set = AHashSet::new();
1935 set.insert(Ustr::from("XBTUSD"));
1936 set
1937 });
1938
1939 subs.insert(
1941 BitmexWsAuthChannel::Order.as_ref().to_string(),
1942 AHashSet::new(),
1943 );
1944 subs.insert(
1945 BitmexWsAuthChannel::Position.as_ref().to_string(),
1946 AHashSet::new(),
1947 );
1948
1949 let mut topics_to_restore = Vec::new();
1951 for entry in subs.iter() {
1952 let (channel, symbols) = entry.pair();
1953 if symbols.is_empty() {
1954 topics_to_restore.push(channel.clone());
1955 } else {
1956 for symbol in symbols.iter() {
1957 topics_to_restore.push(format!("{channel}:{symbol}"));
1958 }
1959 }
1960 }
1961
1962 assert!(topics_to_restore.contains(&format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref())));
1964 assert!(topics_to_restore.contains(&format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref())));
1965 assert!(
1966 topics_to_restore.contains(&format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref()))
1967 );
1968 assert!(topics_to_restore.contains(&BitmexWsAuthChannel::Order.as_ref().to_string()));
1969 assert!(topics_to_restore.contains(&BitmexWsAuthChannel::Position.as_ref().to_string()));
1970 assert_eq!(topics_to_restore.len(), 5);
1971 }
1972
1973 #[rstest]
1974 fn test_reconnect_auth_message_building() {
1975 let client_with_creds = BitmexWebSocketClient::new(
1977 Some("ws://test.com".to_string()),
1978 Some("test_key".to_string()),
1979 Some("test_secret".to_string()),
1980 Some(AccountId::new("BITMEX-TEST")),
1981 None,
1982 )
1983 .unwrap();
1984
1985 if let Some(cred) = &client_with_creds.credential {
1987 let expires = (chrono::Utc::now() + chrono::Duration::seconds(30)).timestamp();
1988 let signature = cred.sign("GET", "/realtime", expires, "");
1989
1990 let auth_message = BitmexAuthentication {
1991 op: BitmexWsAuthAction::AuthKeyExpires,
1992 args: (cred.api_key.to_string(), expires, signature),
1993 };
1994
1995 assert_eq!(auth_message.op, BitmexWsAuthAction::AuthKeyExpires);
1997 assert_eq!(auth_message.args.0, "test_key");
1998 assert!(auth_message.args.1 > 0); assert!(!auth_message.args.2.is_empty()); } else {
2001 panic!("Client should have credentials");
2002 }
2003
2004 let client_no_creds = BitmexWebSocketClient::new(
2006 Some("ws://test.com".to_string()),
2007 None,
2008 None,
2009 Some(AccountId::new("BITMEX-TEST")),
2010 None,
2011 )
2012 .unwrap();
2013
2014 assert!(client_no_creds.credential.is_none());
2015 }
2016
2017 #[rstest]
2018 fn test_subscription_state_after_unsubscribe() {
2019 let client = BitmexWebSocketClient::new(
2020 Some("ws://test.com".to_string()),
2021 Some("test_key".to_string()),
2022 Some("test_secret".to_string()),
2023 Some(AccountId::new("BITMEX-TEST")),
2024 None,
2025 )
2026 .unwrap();
2027
2028 let subs = client.subscriptions.confirmed();
2030 subs.insert(BitmexWsTopic::Trade.as_ref().to_string(), {
2031 let mut set = AHashSet::new();
2032 set.insert(Ustr::from("XBTUSD"));
2033 set.insert(Ustr::from("ETHUSD"));
2034 set
2035 });
2036
2037 subs.insert(BitmexWsTopic::OrderBookL2.as_ref().to_string(), {
2038 let mut set = AHashSet::new();
2039 set.insert(Ustr::from("XBTUSD"));
2040 set
2041 });
2042
2043 let topic = format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref());
2045 if let Some((channel, symbol)) = topic.split_once(':')
2046 && let Some(mut entry) = subs.get_mut(channel)
2047 {
2048 entry.remove(&Ustr::from(symbol));
2049 if entry.is_empty() {
2050 drop(entry);
2051 subs.remove(channel);
2052 }
2053 }
2054
2055 let mut topics_to_restore = Vec::new();
2057 for entry in subs.iter() {
2058 let (channel, symbols) = entry.pair();
2059 if symbols.is_empty() {
2060 topics_to_restore.push(channel.clone());
2061 } else {
2062 for symbol in symbols.iter() {
2063 topics_to_restore.push(format!("{channel}:{symbol}"));
2064 }
2065 }
2066 }
2067
2068 let trade_xbt = format!("{}:XBTUSD", BitmexWsTopic::Trade.as_ref());
2070 let trade_eth = format!("{}:ETHUSD", BitmexWsTopic::Trade.as_ref());
2071 let book_xbt = format!("{}:XBTUSD", BitmexWsTopic::OrderBookL2.as_ref());
2072
2073 assert!(topics_to_restore.contains(&trade_xbt));
2074 assert!(!topics_to_restore.contains(&trade_eth));
2075 assert!(topics_to_restore.contains(&book_xbt));
2076 assert_eq!(topics_to_restore.len(), 2);
2077 }
2078}