1use std::sync::{
24 Arc,
25 atomic::{AtomicBool, Ordering},
26};
27
28use ahash::{AHashMap, AHashSet};
29use dashmap::DashMap;
30use futures_util::Stream;
31use nautilus_common::runtime::get_runtime;
32use nautilus_core::{
33 consts::NAUTILUS_USER_AGENT, env::get_env_var, time::get_atomic_clock_realtime,
34};
35use nautilus_model::{
36 data::{Data, bar::BarType},
37 identifiers::{AccountId, InstrumentId},
38 instruments::{Instrument, InstrumentAny},
39};
40use nautilus_network::{
41 RECONNECTED,
42 websocket::{WebSocketClient, WebSocketConfig, channel_message_handler},
43};
44use reqwest::header::USER_AGENT;
45use tokio::{sync::RwLock, time::Duration};
46use tokio_tungstenite::tungstenite::Message;
47use ustr::Ustr;
48
49use super::{
50 cache::QuoteCache,
51 enums::{
52 BitmexAction, BitmexWsAuthAction, BitmexWsAuthChannel, BitmexWsOperation, BitmexWsTopic,
53 },
54 error::BitmexWsError,
55 messages::{
56 BitmexAuthentication, BitmexSubscription, BitmexTableMessage, BitmexWsMessage,
57 NautilusWsMessage, OrderData,
58 },
59 parse::{
60 is_index_symbol, parse_book_msg_vec, parse_book10_msg_vec, parse_order_update_msg,
61 parse_trade_bin_msg_vec, parse_trade_msg_vec, parse_wallet_msg, topic_from_bar_spec,
62 },
63};
64use crate::{
65 common::{consts::BITMEX_WS_URL, credential::Credential, enums::BitmexExecType},
66 websocket::parse::{
67 parse_execution_msg, parse_funding_msg, parse_instrument_msg, parse_order_msg,
68 parse_position_msg,
69 },
70};
71
72#[derive(Clone, Debug)]
74#[cfg_attr(
75 feature = "python",
76 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
77)]
78pub struct BitmexWebSocketClient {
79 url: String,
80 credential: Option<Credential>,
81 heartbeat: Option<u64>,
82 inner: Arc<RwLock<Option<WebSocketClient>>>,
83 rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>>,
84 signal: Arc<AtomicBool>,
85 task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
86 subscriptions: Arc<DashMap<String, AHashSet<Ustr>>>,
87 instruments_cache: Arc<AHashMap<Ustr, InstrumentAny>>,
88 account_id: AccountId,
89}
90
91impl BitmexWebSocketClient {
92 pub fn new(
98 url: Option<String>,
99 api_key: Option<String>,
100 api_secret: Option<String>,
101 account_id: Option<AccountId>,
102 heartbeat: Option<u64>,
103 ) -> anyhow::Result<Self> {
104 let credential = match (api_key, api_secret) {
105 (Some(key), Some(secret)) => Some(Credential::new(key, secret)),
106 (None, None) => None,
107 _ => anyhow::bail!("Both `api_key` and `api_secret` must be provided together"),
108 };
109
110 let account_id = account_id.unwrap_or(AccountId::from("BITMEX-master"));
111
112 Ok(Self {
113 url: url.unwrap_or(BITMEX_WS_URL.to_string()),
114 credential,
115 heartbeat,
116 inner: Arc::new(RwLock::new(None)),
117 rx: None,
118 signal: Arc::new(AtomicBool::new(false)),
119 task_handle: None,
120 subscriptions: Arc::new(DashMap::new()),
121 instruments_cache: Arc::new(AHashMap::new()),
122 account_id,
123 })
124 }
125
126 pub fn from_env() -> anyhow::Result<Self> {
132 let url = get_env_var("BITMEX_WS_URL")?;
133 let api_key = get_env_var("BITMEX_API_KEY")?;
134 let api_secret = get_env_var("BITMEX_API_SECRET")?;
135
136 Self::new(Some(url), Some(api_key), Some(api_secret), None, None)
137 }
138
139 #[must_use]
141 pub const fn url(&self) -> &str {
142 self.url.as_str()
143 }
144
145 #[must_use]
147 pub fn api_key(&self) -> Option<&str> {
148 self.credential.as_ref().map(|c| c.api_key.as_str())
149 }
150
151 #[must_use]
153 pub fn is_active(&self) -> bool {
154 match self.inner.try_read() {
155 Ok(guard) => match &*guard {
156 Some(inner) => inner.is_active(),
157 None => false,
158 },
159 Err(_) => false,
160 }
161 }
162
163 #[must_use]
165 pub fn is_closed(&self) -> bool {
166 match self.inner.try_read() {
167 Ok(guard) => match &*guard {
168 Some(inner) => inner.is_closed(),
169 None => true,
170 },
171 Err(_) => true,
172 }
173 }
174
175 pub fn set_account_id(&mut self, account_id: AccountId) {
177 self.account_id = account_id;
178 }
179
180 pub fn initialize_instruments_cache(&mut self, instruments: Vec<InstrumentAny>) {
182 let mut instruments_cache: AHashMap<Ustr, InstrumentAny> = AHashMap::new();
183 for inst in instruments {
184 instruments_cache.insert(inst.symbol().inner(), inst.clone());
185 }
186
187 self.instruments_cache = Arc::new(instruments_cache);
188 }
189
190 pub async fn connect(&mut self) -> Result<(), BitmexWsError> {
200 let reader = self.connect_inner().await?;
201
202 let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
203 self.rx = Some(Arc::new(rx));
204 let signal = self.signal.clone();
205
206 let instruments_cache = self.instruments_cache.clone();
207 let account_id = self.account_id;
208 let inner_client = self.inner.clone();
209 let credential = self.credential.clone();
210 let subscriptions = self.subscriptions.clone();
211
212 let stream_handle = get_runtime().spawn(async move {
213 let mut handler =
214 BitmexWsMessageHandler::new(reader, signal, tx, instruments_cache, account_id);
215
216 loop {
218 match handler.next().await {
219 Some(NautilusWsMessage::Reconnected) => {
220 log::info!("Reconnecting WebSocket");
221
222 let inner_guard = inner_client.read().await;
223 if let Some(inner) = &*inner_guard {
224 if let Some(cred) = &credential {
226 let expires = (chrono::Utc::now() + chrono::Duration::seconds(30))
227 .timestamp();
228 let signature = cred.sign("GET", "/realtime", expires, "");
229
230 let auth_message = BitmexAuthentication {
231 op: BitmexWsAuthAction::AuthKeyExpires,
232 args: (cred.api_key.to_string(), expires, signature),
233 };
234
235 let auth_json = match serde_json::to_string(&auth_message) {
236 Ok(json) => json,
237 Err(e) => {
238 tracing::error!(error = %e, "Failed to serialize auth message");
239 continue;
240 }
241 };
242 if let Err(e) = inner
243 .send_text(auth_json, None)
244 .await
245 {
246 log::error!(
247 "Failed to re-authenticate after reconnection: {e}"
248 );
249 } else {
250 log::info!("Re-authenticated after reconnection");
251 }
252 }
253
254 let subscribe_msg = BitmexSubscription {
256 op: BitmexWsOperation::Subscribe,
257 args: vec!["instrument".to_string()],
258 };
259
260 let subscribe_json = match serde_json::to_string(&subscribe_msg) {
261 Ok(json) => json,
262 Err(e) => {
263 tracing::error!(error = %e, "Failed to serialize subscribe message");
264 continue;
265 }
266 };
267 if let Err(e) = inner
268 .send_text(subscribe_json, None)
269 .await
270 {
271 log::error!(
272 "Failed to subscribe to instruments after reconnection: {e}"
273 );
274 }
275
276 let mut topics_to_restore = Vec::new();
278 for entry in subscriptions.iter() {
279 let (channel, symbols) = entry.pair();
280 if symbols.is_empty() {
281 topics_to_restore.push(channel.clone());
282 } else {
283 for symbol in symbols.iter() {
284 topics_to_restore.push(format!("{channel}:{symbol}"));
285 }
286 }
287 }
288
289 if !topics_to_restore.is_empty() {
290 let message = BitmexSubscription {
291 op: BitmexWsOperation::Subscribe,
292 args: topics_to_restore.clone(),
293 };
294
295 let msg_json = match serde_json::to_string(&message) {
296 Ok(json) => json,
297 Err(e) => {
298 tracing::error!(error = %e, topics = ?topics_to_restore, "Failed to serialize message");
299 continue;
300 }
301 };
302 if let Err(e) = inner
303 .send_text(msg_json, None)
304 .await
305 {
306 log::error!(
307 "Failed to restore subscriptions after reconnection: {e}"
308 );
309 } else {
310 log::info!(
311 "Restored {} subscriptions after reconnection",
312 topics_to_restore.len()
313 );
314 }
315 }
316 }
317 }
318 Some(msg) => {
319 if let Err(e) = handler.tx.send(msg) {
320 tracing::error!("Error sending message: {e}");
321 break;
322 }
323 }
324 None => {
325 if handler.handler.signal.load(Ordering::Relaxed) {
327 tracing::debug!("Stop signal received, ending message processing");
328 break;
329 }
330 tracing::warn!("WebSocket stream ended unexpectedly");
332 break;
333 }
334 }
335 }
336 });
337
338 self.task_handle = Some(Arc::new(stream_handle));
339
340 {
341 let inner_guard = self.inner.read().await;
342 if let Some(inner) = &*inner_guard {
343 let subscribe_msg = BitmexSubscription {
344 op: BitmexWsOperation::Subscribe,
345 args: vec!["instrument".to_string()],
346 };
347
348 match serde_json::to_string(&subscribe_msg) {
349 Ok(subscribe_json) => {
350 if let Err(e) = inner.send_text(subscribe_json, None).await {
351 log::error!("Failed to subscribe to instruments: {e}");
352 } else {
353 log::debug!("Subscribed to all instruments");
354 }
355 }
356 Err(e) => {
357 tracing::error!(error = %e, "Failed to serialize resubscribe message");
358 }
359 }
360 }
361 }
362
363 Ok(())
364 }
365
366 async fn connect_inner(
372 &mut self,
373 ) -> Result<tokio::sync::mpsc::UnboundedReceiver<Message>, BitmexWsError> {
374 let (message_handler, rx) = channel_message_handler();
375
376 let config = WebSocketConfig {
377 url: self.url.clone(),
378 headers: vec![(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())],
379 heartbeat: self.heartbeat,
380 heartbeat_msg: None,
381 message_handler: Some(message_handler),
382 ping_handler: None,
383 reconnect_timeout_ms: Some(5_000),
384 reconnect_delay_initial_ms: None, reconnect_delay_max_ms: None, reconnect_backoff_factor: None, reconnect_jitter_ms: None, };
389
390 let keyed_quotas = vec![];
391 let client = WebSocketClient::connect(
392 config,
393 None, keyed_quotas,
395 None, )
397 .await
398 .map_err(|e| BitmexWsError::ClientError(e.to_string()))?;
399
400 {
401 let mut inner_guard = self.inner.write().await;
402 *inner_guard = Some(client);
403 }
404
405 if self.credential.is_some() {
406 self.authenticate().await?;
407 }
408
409 Ok(rx)
410 }
411
412 async fn authenticate(&self) -> Result<(), BitmexWsError> {
419 let credential = match &self.credential {
420 Some(credential) => credential,
421 None => {
422 return Err(BitmexWsError::AuthenticationError(
423 "API credentials not available to authenticate".to_string(),
424 ));
425 }
426 };
427
428 let expires = (chrono::Utc::now() + chrono::Duration::seconds(30)).timestamp();
429 let signature = credential.sign("GET", "/realtime", expires, "");
430
431 let auth_message = BitmexAuthentication {
432 op: BitmexWsAuthAction::AuthKeyExpires,
433 args: (credential.api_key.to_string(), expires, signature),
434 };
435
436 {
437 let inner_guard = self.inner.read().await;
438 if let Some(inner) = &*inner_guard {
439 let auth_json = serde_json::to_string(&auth_message).map_err(|e| {
440 BitmexWsError::AuthenticationError(format!(
441 "Failed to serialize auth message: {e}"
442 ))
443 })?;
444 inner
445 .send_text(auth_json, None)
446 .await
447 .map_err(|e| BitmexWsError::AuthenticationError(e.to_string()))
448 } else {
449 log::error!("Cannot authenticate: not connected");
450 Ok(())
451 }
452 }
453 }
454
455 pub async fn wait_until_active(&self, timeout_secs: f64) -> Result<(), BitmexWsError> {
461 let timeout = tokio::time::Duration::from_secs_f64(timeout_secs);
462
463 tokio::time::timeout(timeout, async {
464 while !self.is_active() {
465 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
466 }
467 })
468 .await
469 .map_err(|_| {
470 BitmexWsError::ClientError(format!(
471 "WebSocket connection timeout after {timeout_secs} seconds"
472 ))
473 })?;
474
475 Ok(())
476 }
477
478 pub fn stream(&mut self) -> impl Stream<Item = NautilusWsMessage> + use<> {
486 let rx = self
487 .rx
488 .take()
489 .expect("Stream receiver already taken or not connected");
490 let mut rx = Arc::try_unwrap(rx).expect("Cannot take ownership - other references exist");
491 async_stream::stream! {
492 while let Some(msg) = rx.recv().await {
493 yield msg;
494 }
495 }
496 }
497
498 pub async fn close(&mut self) -> Result<(), BitmexWsError> {
508 log::debug!("Starting close process");
509
510 self.signal.store(true, Ordering::Relaxed);
511
512 {
513 let inner_guard = self.inner.read().await;
514 if let Some(inner) = &*inner_guard {
515 log::debug!("Disconnecting websocket");
516
517 match tokio::time::timeout(Duration::from_secs(3), inner.disconnect()).await {
518 Ok(()) => log::debug!("Websocket disconnected successfully"),
519 Err(_) => {
520 log::warn!(
521 "Timeout waiting for websocket disconnect, continuing with cleanup"
522 );
523 }
524 }
525 } else {
526 log::debug!("No active connection to disconnect");
527 }
528 }
529
530 if let Some(task_handle) = self.task_handle.take() {
532 match Arc::try_unwrap(task_handle) {
533 Ok(handle) => {
534 log::debug!("Waiting for task handle to complete");
535 match tokio::time::timeout(Duration::from_secs(2), handle).await {
536 Ok(Ok(())) => log::debug!("Task handle completed successfully"),
537 Ok(Err(e)) => log::error!("Task handle encountered an error: {e:?}"),
538 Err(_) => {
539 log::warn!(
540 "Timeout waiting for task handle, task may still be running"
541 );
542 }
544 }
545 }
546 Err(arc_handle) => {
547 log::debug!(
548 "Cannot take ownership of task handle - other references exist, aborting task"
549 );
550 arc_handle.abort();
551 }
552 }
553 } else {
554 log::debug!("No task handle to await");
555 }
556
557 log::debug!("Closed");
558
559 Ok(())
560 }
561
562 pub async fn subscribe(&self, topics: Vec<String>) -> Result<(), BitmexWsError> {
572 log::debug!("Subscribing to topics: {topics:?}");
573
574 for topic in &topics {
576 if let Some((channel, symbol)) = topic.split_once(':') {
577 self.subscriptions
578 .entry(channel.to_string())
579 .or_default()
580 .insert(Ustr::from(symbol));
581 } else {
582 self.subscriptions.entry(topic.clone()).or_default();
584 }
585 }
586
587 let message = BitmexSubscription {
588 op: BitmexWsOperation::Subscribe,
589 args: topics.clone(),
590 };
591
592 {
593 let inner_guard = self.inner.read().await;
594 if let Some(inner) = &*inner_guard {
595 let msg_json = serde_json::to_string(&message).map_err(|e| {
596 BitmexWsError::SubscriptionError(format!(
597 "Failed to serialize subscribe message: {e}"
598 ))
599 })?;
600 inner
601 .send_text(msg_json, None)
602 .await
603 .map_err(|e| BitmexWsError::SubscriptionError(e.to_string()))?;
604 } else {
605 log::error!("Cannot send message: not connected");
606 }
607 }
608
609 Ok(())
610 }
611
612 async fn unsubscribe(&self, topics: Vec<String>) -> Result<(), BitmexWsError> {
618 log::debug!("Attempting to unsubscribe from topics: {topics:?}");
619
620 if self.signal.load(Ordering::Relaxed) {
621 log::debug!("Shutdown signal detected, skipping unsubscribe");
622 return Ok(());
623 }
624
625 for topic in &topics {
626 if let Some((channel, symbol)) = topic.split_once(':') {
627 if let Some(mut entry) = self.subscriptions.get_mut(channel) {
628 entry.remove(&Ustr::from(symbol));
629 if entry.is_empty() {
630 drop(entry);
631 self.subscriptions.remove(channel);
632 }
633 }
634 } else {
635 self.subscriptions.remove(topic);
636 }
637 }
638
639 let message = BitmexSubscription {
640 op: BitmexWsOperation::Unsubscribe,
641 args: topics.clone(),
642 };
643
644 {
645 let inner_guard = self.inner.read().await;
646 if let Some(inner) = &*inner_guard {
647 let msg_json = match serde_json::to_string(&message) {
648 Ok(json) => json,
649 Err(e) => {
650 tracing::error!(error = %e, "Failed to serialize unsubscribe message");
651 return Ok(());
652 }
653 };
654 if let Err(e) = inner.send_text(msg_json, None).await {
655 log::debug!("Error sending unsubscribe message: {e}");
656 }
657 } else {
658 log::debug!("Cannot send unsubscribe message: not connected");
659 }
660 }
661
662 Ok(())
663 }
664
665 #[must_use]
667 pub fn subscription_count(&self) -> usize {
668 self.subscriptions.len()
669 }
670
671 #[must_use]
673 pub fn get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
674 let symbol = instrument_id.symbol.inner();
675 let mut channels = Vec::with_capacity(self.subscriptions.len());
676
677 for entry in self.subscriptions.iter() {
678 let (channel, symbols) = entry.pair();
679 if symbols.contains(&symbol) {
680 channels.push(format!("{channel}:{symbol}"));
682 } else if symbols.is_empty() && (channel == "execution" || channel == "order") {
683 channels.push(channel.clone());
685 }
686 }
687
688 channels
689 }
690
691 pub async fn subscribe_instruments(&self) -> Result<(), BitmexWsError> {
697 log::debug!("Already subscribed to all instruments on connection, skipping");
699 Ok(())
700 }
701
702 pub async fn subscribe_instrument(
708 &self,
709 instrument_id: InstrumentId,
710 ) -> Result<(), BitmexWsError> {
711 log::debug!(
713 "Already subscribed to all instruments on connection (includes {instrument_id}), skipping"
714 );
715 Ok(())
716 }
717
718 pub async fn subscribe_book(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
724 let topic = BitmexWsTopic::OrderBookL2;
725 let symbol = instrument_id.symbol.as_str();
726 self.subscribe(vec![format!("{topic}:{symbol}")]).await
727 }
728
729 pub async fn subscribe_book_25(
735 &self,
736 instrument_id: InstrumentId,
737 ) -> Result<(), BitmexWsError> {
738 let topic = BitmexWsTopic::OrderBookL2_25;
739 let symbol = instrument_id.symbol.as_str();
740 self.subscribe(vec![format!("{topic}:{symbol}")]).await
741 }
742
743 pub async fn subscribe_book_depth10(
749 &self,
750 instrument_id: InstrumentId,
751 ) -> Result<(), BitmexWsError> {
752 let topic = BitmexWsTopic::OrderBook10;
753 let symbol = instrument_id.symbol.as_str();
754 self.subscribe(vec![format!("{topic}:{symbol}")]).await
755 }
756
757 pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
765 let symbol = instrument_id.symbol.inner();
766
767 if is_index_symbol(&instrument_id.symbol.inner()) {
769 tracing::warn!("Ignoring quote subscription for index symbol: {symbol}");
770 return Ok(());
771 }
772
773 let topic = BitmexWsTopic::Quote;
774 self.subscribe(vec![format!("{topic}:{symbol}")]).await
775 }
776
777 pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
785 let symbol = instrument_id.symbol.inner();
786
787 if is_index_symbol(&symbol) {
789 tracing::warn!("Ignoring trade subscription for index symbol: {symbol}");
790 return Ok(());
791 }
792
793 let topic = BitmexWsTopic::Trade;
794 self.subscribe(vec![format!("{topic}:{symbol}")]).await
795 }
796
797 pub async fn subscribe_mark_prices(
803 &self,
804 instrument_id: InstrumentId,
805 ) -> Result<(), BitmexWsError> {
806 self.subscribe_instrument(instrument_id).await
807 }
808
809 pub async fn subscribe_index_prices(
815 &self,
816 instrument_id: InstrumentId,
817 ) -> Result<(), BitmexWsError> {
818 self.subscribe_instrument(instrument_id).await
819 }
820
821 pub async fn subscribe_funding_rates(
827 &self,
828 instrument_id: InstrumentId,
829 ) -> Result<(), BitmexWsError> {
830 let topic = BitmexWsTopic::Funding;
831 let symbol = instrument_id.symbol.as_str();
832 self.subscribe(vec![format!("{topic}:{symbol}")]).await
833 }
834
835 pub async fn subscribe_bars(&self, bar_type: BarType) -> Result<(), BitmexWsError> {
841 let topic = topic_from_bar_spec(bar_type.spec());
842 let symbol = bar_type.instrument_id().symbol.to_string();
843 self.subscribe(vec![format!("{topic}:{symbol}")]).await
844 }
845
846 pub async fn unsubscribe_instruments(&self) -> Result<(), BitmexWsError> {
852 log::debug!(
854 "Instruments subscription maintained for proper operation, skipping unsubscribe"
855 );
856 Ok(())
857 }
858
859 pub async fn unsubscribe_instrument(
865 &self,
866 instrument_id: InstrumentId,
867 ) -> Result<(), BitmexWsError> {
868 log::debug!(
870 "Instruments subscription maintained for proper operation (includes {instrument_id}), skipping unsubscribe"
871 );
872 Ok(())
873 }
874
875 pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> Result<(), BitmexWsError> {
881 let topic = BitmexWsTopic::OrderBookL2;
882 let symbol = instrument_id.symbol.as_str();
883 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
884 }
885
886 pub async fn unsubscribe_book_25(
892 &self,
893 instrument_id: InstrumentId,
894 ) -> Result<(), BitmexWsError> {
895 let topic = BitmexWsTopic::OrderBookL2_25;
896 let symbol = instrument_id.symbol.as_str();
897 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
898 }
899
900 pub async fn unsubscribe_book_depth10(
906 &self,
907 instrument_id: InstrumentId,
908 ) -> Result<(), BitmexWsError> {
909 let topic = BitmexWsTopic::OrderBook10;
910 let symbol = instrument_id.symbol.as_str();
911 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
912 }
913
914 pub async fn unsubscribe_quotes(
920 &self,
921 instrument_id: InstrumentId,
922 ) -> Result<(), BitmexWsError> {
923 let symbol = instrument_id.symbol.inner();
924
925 if is_index_symbol(&symbol) {
927 return Ok(());
928 }
929
930 let topic = BitmexWsTopic::Quote;
931 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
932 }
933
934 pub async fn unsubscribe_trades(
940 &self,
941 instrument_id: InstrumentId,
942 ) -> Result<(), BitmexWsError> {
943 let symbol = instrument_id.symbol.inner();
944
945 if is_index_symbol(&symbol) {
947 return Ok(());
948 }
949
950 let topic = BitmexWsTopic::Trade;
951 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
952 }
953
954 pub async fn unsubscribe_mark_prices(
960 &self,
961 instrument_id: InstrumentId,
962 ) -> Result<(), BitmexWsError> {
963 log::debug!(
965 "Mark prices for {instrument_id} uses shared instrument channel, skipping unsubscribe"
966 );
967 Ok(())
968 }
969
970 pub async fn unsubscribe_index_prices(
976 &self,
977 instrument_id: InstrumentId,
978 ) -> Result<(), BitmexWsError> {
979 log::debug!(
981 "Index prices for {instrument_id} uses shared instrument channel, skipping unsubscribe"
982 );
983 Ok(())
984 }
985
986 pub async fn unsubscribe_funding_rates(
992 &self,
993 instrument_id: InstrumentId,
994 ) -> Result<(), BitmexWsError> {
995 log::debug!(
997 "Funding rates for {instrument_id}, skipping unsubscribe to avoid shutdown race"
998 );
999 Ok(())
1000 }
1001
1002 pub async fn unsubscribe_bars(&self, bar_type: BarType) -> Result<(), BitmexWsError> {
1008 let topic = topic_from_bar_spec(bar_type.spec());
1009 let symbol = bar_type.instrument_id().symbol.to_string();
1010 self.unsubscribe(vec![format!("{topic}:{symbol}")]).await
1011 }
1012
1013 pub async fn subscribe_orders(&self) -> Result<(), BitmexWsError> {
1019 if self.credential.is_none() {
1020 return Err(BitmexWsError::MissingCredentials);
1021 }
1022 self.subscribe(vec![BitmexWsAuthChannel::Order.to_string()])
1023 .await
1024 }
1025
1026 pub async fn subscribe_executions(&self) -> Result<(), BitmexWsError> {
1032 if self.credential.is_none() {
1033 return Err(BitmexWsError::MissingCredentials);
1034 }
1035 self.subscribe(vec![BitmexWsAuthChannel::Execution.to_string()])
1036 .await
1037 }
1038
1039 pub async fn subscribe_positions(&self) -> Result<(), BitmexWsError> {
1045 if self.credential.is_none() {
1046 return Err(BitmexWsError::MissingCredentials);
1047 }
1048 self.subscribe(vec![BitmexWsAuthChannel::Position.to_string()])
1049 .await
1050 }
1051
1052 pub async fn subscribe_margin(&self) -> Result<(), BitmexWsError> {
1058 if self.credential.is_none() {
1059 return Err(BitmexWsError::MissingCredentials);
1060 }
1061 self.subscribe(vec![BitmexWsAuthChannel::Margin.to_string()])
1062 .await
1063 }
1064
1065 pub async fn subscribe_wallet(&self) -> Result<(), BitmexWsError> {
1071 if self.credential.is_none() {
1072 return Err(BitmexWsError::MissingCredentials);
1073 }
1074 self.subscribe(vec![BitmexWsAuthChannel::Wallet.to_string()])
1075 .await
1076 }
1077
1078 pub async fn unsubscribe_orders(&self) -> Result<(), BitmexWsError> {
1084 self.unsubscribe(vec![BitmexWsAuthChannel::Order.to_string()])
1085 .await
1086 }
1087
1088 pub async fn unsubscribe_executions(&self) -> Result<(), BitmexWsError> {
1094 self.unsubscribe(vec![BitmexWsAuthChannel::Execution.to_string()])
1095 .await
1096 }
1097
1098 pub async fn unsubscribe_positions(&self) -> Result<(), BitmexWsError> {
1104 self.unsubscribe(vec![BitmexWsAuthChannel::Position.to_string()])
1105 .await
1106 }
1107
1108 pub async fn unsubscribe_margin(&self) -> Result<(), BitmexWsError> {
1114 self.unsubscribe(vec![BitmexWsAuthChannel::Margin.to_string()])
1115 .await
1116 }
1117
1118 pub async fn unsubscribe_wallet(&self) -> Result<(), BitmexWsError> {
1124 self.unsubscribe(vec![BitmexWsAuthChannel::Wallet.to_string()])
1125 .await
1126 }
1127}
1128
1129struct BitmexFeedHandler {
1130 receiver: tokio::sync::mpsc::UnboundedReceiver<Message>,
1131 signal: Arc<AtomicBool>,
1132}
1133
1134impl BitmexFeedHandler {
1135 pub fn new(
1137 receiver: tokio::sync::mpsc::UnboundedReceiver<Message>,
1138 signal: Arc<AtomicBool>,
1139 ) -> Self {
1140 Self { receiver, signal }
1141 }
1142
1143 async fn next(&mut self) -> Option<BitmexWsMessage> {
1145 loop {
1146 tokio::select! {
1147 msg = self.receiver.recv() => match msg {
1148 Some(msg) => match msg {
1149 Message::Text(text) => {
1150 if text == RECONNECTED {
1151 tracing::info!("Received WebSocket reconnection signal");
1152 return Some(BitmexWsMessage::Reconnected);
1153 }
1154
1155 tracing::trace!("Raw websocket message: {text}");
1156
1157 match serde_json::from_str(&text) {
1158 Ok(msg) => match &msg {
1159 BitmexWsMessage::Welcome {
1160 version,
1161 heartbeat_enabled,
1162 limit,
1163 ..
1164 } => {
1165 tracing::info!(
1166 version = version,
1167 heartbeat = heartbeat_enabled,
1168 rate_limit = limit.remaining,
1169 "Welcome to the BitMEX Realtime API:",
1170 );
1171 }
1172 BitmexWsMessage::Subscription {
1173 success: _,
1174 subscribe: _,
1175 error,
1176 } => {
1177 if let Some(error) = error {
1178 tracing::error!("Subscription error: {error}");
1179 }
1180 }
1181 BitmexWsMessage::Error { status, error, .. } => {
1182 tracing::error!(
1183 status = status,
1184 error = error,
1185 "Received error from BitMEX"
1186 );
1187 }
1188 _ => return Some(msg),
1189 },
1190 Err(e) => {
1191 tracing::error!("Failed to parse WebSocket message: {e}: {text}");
1192 }
1193 }
1194 }
1195 Message::Binary(msg) => {
1196 tracing::debug!("Raw binary: {msg:?}");
1197 }
1198 Message::Close(_) => {
1199 tracing::debug!("Received close message");
1200 return None;
1201 }
1202 msg => match msg {
1203 Message::Ping(data) => {
1204 tracing::trace!("Received ping frame with {} bytes", data.len());
1205 }
1206 Message::Pong(data) => {
1207 tracing::trace!("Received pong frame with {} bytes", data.len());
1208 }
1209 Message::Frame(frame) => {
1210 tracing::debug!("Received raw frame: {frame:?}");
1211 }
1212 _ => {
1213 tracing::warn!("Unexpected message type: {msg:?}");
1214 }
1215 },
1216 }
1217 None => {
1218 tracing::info!("WebSocket stream closed");
1219 return None;
1220 }
1221 },
1222 _ = tokio::time::sleep(Duration::from_millis(1)) => {
1223 if self.signal.load(std::sync::atomic::Ordering::Relaxed) {
1224 tracing::debug!("Stop signal received");
1225 return None;
1226 }
1227 }
1228 }
1229 }
1230 }
1231}
1232
1233struct BitmexWsMessageHandler {
1234 handler: BitmexFeedHandler,
1235 tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
1236 instruments_cache: Arc<AHashMap<Ustr, InstrumentAny>>,
1237 #[allow(dead_code)] account_id: AccountId,
1239}
1240
1241impl BitmexWsMessageHandler {
1242 pub fn new(
1244 receiver: tokio::sync::mpsc::UnboundedReceiver<Message>,
1245 signal: Arc<AtomicBool>,
1246 tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
1247 instruments_cache: Arc<AHashMap<Ustr, InstrumentAny>>,
1248 account_id: AccountId,
1249 ) -> Self {
1250 let handler = BitmexFeedHandler::new(receiver, signal);
1251 Self {
1252 handler,
1253 tx,
1254 instruments_cache,
1255 account_id,
1256 }
1257 }
1258
1259 #[inline]
1267 fn get_price_precision(&self, symbol: &Ustr) -> u8 {
1268 self.instruments_cache
1269 .get(symbol).map_or_else(|| panic!("Instrument '{symbol}' not found in cache; ensure all instruments are loaded before starting websocket"), Instrument::price_precision)
1270 }
1271
1272 async fn next(&mut self) -> Option<NautilusWsMessage> {
1273 let clock = get_atomic_clock_realtime();
1274 let mut quote_cache = QuoteCache::new();
1275
1276 while let Some(msg) = self.handler.next().await {
1277 match msg {
1278 BitmexWsMessage::Reconnected => {
1279 return Some(NautilusWsMessage::Reconnected);
1281 }
1282 BitmexWsMessage::Table(table_msg) => {
1283 let ts_init = clock.get_time_ns();
1284
1285 return Some(match table_msg {
1286 BitmexTableMessage::OrderBookL2 { action, data } => {
1287 if data.is_empty() {
1288 continue;
1289 }
1290 let price_precision = self.get_price_precision(&data[0].symbol);
1291 let data = parse_book_msg_vec(data, action, price_precision, ts_init);
1292
1293 NautilusWsMessage::Data(data)
1294 }
1295 BitmexTableMessage::OrderBookL2_25 { action, data } => {
1296 if data.is_empty() {
1297 continue;
1298 }
1299 let price_precision = self.get_price_precision(&data[0].symbol);
1300 let data = parse_book_msg_vec(data, action, price_precision, ts_init);
1301
1302 NautilusWsMessage::Data(data)
1303 }
1304 BitmexTableMessage::OrderBook10 { data, .. } => {
1305 if data.is_empty() {
1306 continue;
1307 }
1308 let price_precision = self.get_price_precision(&data[0].symbol);
1309 let data = parse_book10_msg_vec(data, price_precision, ts_init);
1310
1311 NautilusWsMessage::Data(data)
1312 }
1313 BitmexTableMessage::Quote { mut data, .. } => {
1314 if data.is_empty() {
1316 continue;
1317 }
1318
1319 let msg = data.remove(0);
1320 let price_precision = self.get_price_precision(&msg.symbol);
1321
1322 if let Some(quote) = quote_cache.process(&msg, price_precision, ts_init)
1323 {
1324 NautilusWsMessage::Data(vec![Data::Quote(quote)])
1325 } else {
1326 continue;
1327 }
1328 }
1329 BitmexTableMessage::Trade { data, .. } => {
1330 if data.is_empty() {
1331 continue;
1332 }
1333 let price_precision = self.get_price_precision(&data[0].symbol);
1334 let data = parse_trade_msg_vec(data, price_precision, ts_init);
1335
1336 NautilusWsMessage::Data(data)
1337 }
1338 BitmexTableMessage::TradeBin1m { action, data } => {
1339 if action == BitmexAction::Partial || data.is_empty() {
1340 continue;
1341 }
1342 let price_precision = self.get_price_precision(&data[0].symbol);
1343 let data = parse_trade_bin_msg_vec(
1344 data,
1345 BitmexWsTopic::TradeBin1m,
1346 price_precision,
1347 ts_init,
1348 );
1349
1350 NautilusWsMessage::Data(data)
1351 }
1352 BitmexTableMessage::TradeBin5m { action, data } => {
1353 if action == BitmexAction::Partial || data.is_empty() {
1354 continue;
1355 }
1356 let price_precision = self.get_price_precision(&data[0].symbol);
1357 let data = parse_trade_bin_msg_vec(
1358 data,
1359 BitmexWsTopic::TradeBin5m,
1360 price_precision,
1361 ts_init,
1362 );
1363
1364 NautilusWsMessage::Data(data)
1365 }
1366 BitmexTableMessage::TradeBin1h { action, data } => {
1367 if action == BitmexAction::Partial || data.is_empty() {
1368 continue;
1369 }
1370 let price_precision = self.get_price_precision(&data[0].symbol);
1371 let data = parse_trade_bin_msg_vec(
1372 data,
1373 BitmexWsTopic::TradeBin1h,
1374 price_precision,
1375 ts_init,
1376 );
1377
1378 NautilusWsMessage::Data(data)
1379 }
1380 BitmexTableMessage::TradeBin1d { action, data } => {
1381 if action == BitmexAction::Partial || data.is_empty() {
1382 continue;
1383 }
1384 let price_precision = self.get_price_precision(&data[0].symbol);
1385 let data = parse_trade_bin_msg_vec(
1386 data,
1387 BitmexWsTopic::TradeBin1d,
1388 price_precision,
1389 ts_init,
1390 );
1391
1392 NautilusWsMessage::Data(data)
1393 }
1394 BitmexTableMessage::Order { data, .. } => {
1398 let mut reports = Vec::with_capacity(data.len());
1400
1401 for order_data in data {
1402 match order_data {
1403 OrderData::Full(order_msg) => {
1404 let price_precision =
1405 self.get_price_precision(&order_msg.symbol);
1406 match parse_order_msg(&order_msg, price_precision) {
1407 Ok(report) => reports.push(report),
1408 Err(e) => {
1409 tracing::error!(
1410 error = %e,
1411 symbol = %order_msg.symbol,
1412 order_id = %order_msg.order_id,
1413 time_in_force = ?order_msg.time_in_force,
1414 "Failed to parse full order message - potential data loss"
1415 );
1416 continue;
1418 }
1419 }
1420 }
1421 OrderData::Update(msg) => {
1422 let price_precision = self.get_price_precision(&msg.symbol);
1423
1424 if let Some(event) = parse_order_update_msg(
1425 &msg,
1426 price_precision,
1427 self.account_id,
1428 ) {
1429 return Some(NautilusWsMessage::OrderUpdated(event));
1430 } else {
1431 tracing::warn!(
1432 order_id = %msg.order_id,
1433 price = ?msg.price,
1434 "Skipped order update message (insufficient data)"
1435 );
1436 }
1437 }
1438 }
1439 }
1440
1441 if reports.is_empty() {
1442 continue;
1443 }
1444
1445 NautilusWsMessage::OrderStatusReports(reports)
1446 }
1447 BitmexTableMessage::Execution { data, .. } => {
1448 let mut fills = Vec::with_capacity(data.len());
1449
1450 for exec_msg in data {
1451 let Some(symbol) = &exec_msg.symbol else {
1453 if exec_msg.exec_type == Some(BitmexExecType::Trade) {
1455 tracing::warn!(
1456 "Execution message missing symbol: {:?}",
1457 exec_msg.exec_id
1458 );
1459 } else {
1460 tracing::debug!(
1461 "Execution message missing symbol: {:?} (exec_type: {:?})",
1462 exec_msg.exec_id,
1463 exec_msg.exec_type
1464 );
1465 }
1466 continue;
1467 };
1468 let price_precision = self.get_price_precision(symbol);
1469
1470 if let Some(fill) = parse_execution_msg(exec_msg, price_precision) {
1471 fills.push(fill);
1472 }
1473 }
1474
1475 if fills.is_empty() {
1476 continue;
1477 }
1478 NautilusWsMessage::FillReports(fills)
1479 }
1480 BitmexTableMessage::Position { data, .. } => {
1481 if let Some(pos_msg) = data.into_iter().next() {
1482 let report = parse_position_msg(pos_msg);
1483 NautilusWsMessage::PositionStatusReport(report)
1484 } else {
1485 continue;
1486 }
1487 }
1488 BitmexTableMessage::Wallet { data, .. } => {
1489 if let Some(wallet_msg) = data.into_iter().next() {
1490 let account_state = parse_wallet_msg(wallet_msg, ts_init);
1491 NautilusWsMessage::AccountState(account_state)
1492 } else {
1493 continue;
1494 }
1495 }
1496 BitmexTableMessage::Margin { .. } => {
1497 continue;
1501 }
1502 BitmexTableMessage::Instrument { data, .. } => {
1503 let ts_init = clock.get_time_ns();
1504 let mut data_msgs = Vec::with_capacity(data.len());
1505
1506 for msg in data {
1507 let parsed =
1508 parse_instrument_msg(msg, &self.instruments_cache, ts_init);
1509 data_msgs.extend(parsed);
1510 }
1511
1512 if data_msgs.is_empty() {
1513 continue;
1514 }
1515 NautilusWsMessage::Data(data_msgs)
1516 }
1517 BitmexTableMessage::Funding { data, .. } => {
1518 let ts_init = clock.get_time_ns();
1519 let mut funding_updates = Vec::with_capacity(data.len());
1520
1521 for msg in data {
1522 if let Some(parsed) = parse_funding_msg(msg, ts_init) {
1523 funding_updates.push(parsed);
1524 }
1525 }
1526
1527 if !funding_updates.is_empty() {
1528 NautilusWsMessage::FundingRateUpdates(funding_updates)
1529 } else {
1530 continue;
1531 }
1532 }
1533 _ => {
1534 tracing::warn!("Unhandled table message type: {table_msg:?}");
1536 continue;
1537 }
1538 });
1539 }
1540 _ => {
1541 continue;
1544 }
1545 }
1546 }
1547
1548 None
1549 }
1550}
1551
1552#[cfg(test)]
1557mod tests {
1558 use ahash::AHashSet;
1559 use rstest::rstest;
1560 use ustr::Ustr;
1561
1562 use super::*;
1563
1564 #[rstest]
1565 fn test_reconnect_topics_restoration_logic() {
1566 let client = BitmexWebSocketClient::new(
1568 Some("ws://test.com".to_string()),
1569 Some("test_key".to_string()),
1570 Some("test_secret".to_string()),
1571 Some(AccountId::new("BITMEX-TEST")),
1572 None,
1573 )
1574 .unwrap();
1575
1576 client.subscriptions.insert("trade".to_string(), {
1578 let mut set = AHashSet::new();
1579 set.insert(Ustr::from("XBTUSD"));
1580 set.insert(Ustr::from("ETHUSD"));
1581 set
1582 });
1583
1584 client.subscriptions.insert("orderBookL2".to_string(), {
1585 let mut set = AHashSet::new();
1586 set.insert(Ustr::from("XBTUSD"));
1587 set
1588 });
1589
1590 client
1592 .subscriptions
1593 .insert("order".to_string(), AHashSet::new());
1594 client
1595 .subscriptions
1596 .insert("position".to_string(), AHashSet::new());
1597
1598 let mut topics_to_restore = Vec::new();
1600 for entry in client.subscriptions.iter() {
1601 let (channel, symbols) = entry.pair();
1602 if symbols.is_empty() {
1603 topics_to_restore.push(channel.clone());
1604 } else {
1605 for symbol in symbols.iter() {
1606 topics_to_restore.push(format!("{channel}:{symbol}"));
1607 }
1608 }
1609 }
1610
1611 assert!(topics_to_restore.contains(&"trade:XBTUSD".to_string()));
1613 assert!(topics_to_restore.contains(&"trade:ETHUSD".to_string()));
1614 assert!(topics_to_restore.contains(&"orderBookL2:XBTUSD".to_string()));
1615 assert!(topics_to_restore.contains(&"order".to_string()));
1616 assert!(topics_to_restore.contains(&"position".to_string()));
1617 assert_eq!(topics_to_restore.len(), 5);
1618 }
1619
1620 #[rstest]
1621 fn test_reconnect_auth_message_building() {
1622 let client_with_creds = BitmexWebSocketClient::new(
1624 Some("ws://test.com".to_string()),
1625 Some("test_key".to_string()),
1626 Some("test_secret".to_string()),
1627 Some(AccountId::new("BITMEX-TEST")),
1628 None,
1629 )
1630 .unwrap();
1631
1632 if let Some(cred) = &client_with_creds.credential {
1634 let expires = (chrono::Utc::now() + chrono::Duration::seconds(30)).timestamp();
1635 let signature = cred.sign("GET", "/realtime", expires, "");
1636
1637 let auth_message = BitmexAuthentication {
1638 op: BitmexWsAuthAction::AuthKeyExpires,
1639 args: (cred.api_key.to_string(), expires, signature),
1640 };
1641
1642 assert_eq!(auth_message.op, BitmexWsAuthAction::AuthKeyExpires);
1644 assert_eq!(auth_message.args.0, "test_key");
1645 assert!(auth_message.args.1 > 0); assert!(!auth_message.args.2.is_empty()); } else {
1648 panic!("Client should have credentials");
1649 }
1650
1651 let client_no_creds = BitmexWebSocketClient::new(
1653 Some("ws://test.com".to_string()),
1654 None,
1655 None,
1656 Some(AccountId::new("BITMEX-TEST")),
1657 None,
1658 )
1659 .unwrap();
1660
1661 assert!(client_no_creds.credential.is_none());
1662 }
1663
1664 #[rstest]
1665 fn test_subscription_state_after_unsubscribe() {
1666 let client = BitmexWebSocketClient::new(
1667 Some("ws://test.com".to_string()),
1668 Some("test_key".to_string()),
1669 Some("test_secret".to_string()),
1670 Some(AccountId::new("BITMEX-TEST")),
1671 None,
1672 )
1673 .unwrap();
1674
1675 client.subscriptions.insert("trade".to_string(), {
1677 let mut set = AHashSet::new();
1678 set.insert(Ustr::from("XBTUSD"));
1679 set.insert(Ustr::from("ETHUSD"));
1680 set
1681 });
1682
1683 client.subscriptions.insert("orderBookL2".to_string(), {
1684 let mut set = AHashSet::new();
1685 set.insert(Ustr::from("XBTUSD"));
1686 set
1687 });
1688
1689 let topic = "trade:ETHUSD";
1691 if let Some((channel, symbol)) = topic.split_once(':')
1692 && let Some(mut entry) = client.subscriptions.get_mut(channel)
1693 {
1694 entry.remove(&Ustr::from(symbol));
1695 if entry.is_empty() {
1696 drop(entry);
1697 client.subscriptions.remove(channel);
1698 }
1699 }
1700
1701 let mut topics_to_restore = Vec::new();
1703 for entry in client.subscriptions.iter() {
1704 let (channel, symbols) = entry.pair();
1705 if symbols.is_empty() {
1706 topics_to_restore.push(channel.clone());
1707 } else {
1708 for symbol in symbols.iter() {
1709 topics_to_restore.push(format!("{channel}:{symbol}"));
1710 }
1711 }
1712 }
1713
1714 assert!(topics_to_restore.contains(&"trade:XBTUSD".to_string()));
1716 assert!(!topics_to_restore.contains(&"trade:ETHUSD".to_string()));
1717 assert!(topics_to_restore.contains(&"orderBookL2:XBTUSD".to_string()));
1718 assert_eq!(topics_to_restore.len(), 2);
1719 }
1720}