1use std::{
21 fmt,
22 sync::{
23 Arc,
24 atomic::{AtomicBool, Ordering},
25 },
26 time::Duration,
27};
28
29use dashmap::DashMap;
30use nautilus_common::runtime::get_runtime;
31use nautilus_core::{consts::NAUTILUS_USER_AGENT, time::get_atomic_clock_realtime};
32use nautilus_model::{
33 enums::{OrderSide, OrderType, TimeInForce},
34 identifiers::{AccountId, ClientOrderId, InstrumentId, VenueOrderId},
35 instruments::{Instrument, InstrumentAny},
36 types::{Price, Quantity},
37};
38use nautilus_network::{
39 RECONNECTED,
40 retry::{RetryManager, create_websocket_retry_manager},
41 websocket::{PingHandler, WebSocketClient, WebSocketConfig, channel_message_handler},
42};
43use serde_json::{Value, json};
44use tokio::sync::RwLock;
45use tokio_tungstenite::tungstenite::Message;
46use tokio_util::sync::CancellationToken;
47use ustr::Ustr;
48
49use crate::{
50 common::{
51 consts::{BYBIT_NAUTILUS_BROKER_ID, BYBIT_PONG},
52 credential::Credential,
53 enums::{
54 BybitEnvironment, BybitOrderSide, BybitOrderType, BybitProductType, BybitTimeInForce,
55 BybitTriggerType, BybitWsOrderRequestOp,
56 },
57 parse::extract_raw_symbol,
58 symbol::BybitSymbol,
59 urls::{bybit_ws_private_url, bybit_ws_public_url, bybit_ws_trade_url},
60 },
61 websocket::{
62 auth::{AUTHENTICATION_TIMEOUT_SECS, AuthTracker},
63 cache,
64 enums::BybitWsOperation,
65 error::{BybitWsError, BybitWsResult},
66 messages::{
67 BybitAuthRequest, BybitSubscription, BybitWebSocketError, BybitWebSocketMessage,
68 BybitWsAccountExecutionMsg, BybitWsAccountOrderMsg, BybitWsAccountPositionMsg,
69 BybitWsAccountWalletMsg, BybitWsAmendOrderParams, BybitWsAuthResponse,
70 BybitWsCancelOrderParams, BybitWsHeader, BybitWsKlineMsg, BybitWsOrderbookDepthMsg,
71 BybitWsPlaceOrderParams, BybitWsRequest, BybitWsResponse, BybitWsSubscriptionMsg,
72 BybitWsTickerLinearMsg, BybitWsTickerOptionMsg, BybitWsTradeMsg,
73 },
74 subscription::SubscriptionState,
75 },
76};
77
78const MAX_ARGS_PER_SUBSCRIPTION_REQUEST: usize = 10;
79const DEFAULT_HEARTBEAT_SECS: u64 = 20;
80const WEBSOCKET_AUTH_WINDOW_MS: i64 = 5_000;
81
82fn should_retry_bybit_error(error: &BybitWsError) -> bool {
84 match error {
85 BybitWsError::Transport(_) => true, BybitWsError::Send(_) => true, BybitWsError::ClientError(msg) => {
88 let msg_lower = msg.to_lowercase();
90 msg_lower.contains("timeout")
91 || msg_lower.contains("timed out")
92 || msg_lower.contains("connection")
93 || msg_lower.contains("network")
94 }
95 BybitWsError::NotConnected => true, BybitWsError::Authentication(_) | BybitWsError::Json(_) => {
97 false
99 }
100 }
101}
102
103fn create_bybit_timeout_error(msg: String) -> BybitWsError {
105 BybitWsError::ClientError(msg)
106}
107
108#[cfg_attr(feature = "python", pyo3::pyclass)]
110pub struct BybitWebSocketClient {
111 url: String,
112 environment: BybitEnvironment,
113 product_type: Option<BybitProductType>,
114 credential: Option<Credential>,
115 requires_auth: bool,
116 auth_tracker: AuthTracker,
117 heartbeat: Option<u64>,
118 inner: Arc<RwLock<Option<WebSocketClient>>>,
119 rx: Option<tokio::sync::mpsc::UnboundedReceiver<BybitWebSocketMessage>>,
120 signal: Arc<AtomicBool>,
121 task_handle: Option<tokio::task::JoinHandle<()>>,
122 subscriptions: SubscriptionState,
123 is_authenticated: Arc<AtomicBool>,
124 instruments_cache: Arc<DashMap<InstrumentId, InstrumentAny>>,
125 account_id: Option<AccountId>,
126 quote_cache: Arc<RwLock<cache::QuoteCache>>,
127 retry_manager: Arc<RetryManager<BybitWsError>>,
128 cancellation_token: CancellationToken,
129}
130
131impl fmt::Debug for BybitWebSocketClient {
132 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
133 f.debug_struct("BybitWebSocketClient")
134 .field("url", &self.url)
135 .field("environment", &self.environment)
136 .field("product_type", &self.product_type)
137 .field("requires_auth", &self.requires_auth)
138 .field("heartbeat", &self.heartbeat)
139 .field("confirmed_subscriptions", &self.subscriptions.len())
140 .finish()
141 }
142}
143
144impl Clone for BybitWebSocketClient {
145 fn clone(&self) -> Self {
146 Self {
147 url: self.url.clone(),
148 environment: self.environment,
149 product_type: self.product_type,
150 credential: self.credential.clone(),
151 requires_auth: self.requires_auth,
152 auth_tracker: self.auth_tracker.clone(),
153 heartbeat: self.heartbeat,
154 inner: Arc::clone(&self.inner),
155 rx: None, signal: Arc::clone(&self.signal),
157 task_handle: None, subscriptions: self.subscriptions.clone(),
159 is_authenticated: Arc::clone(&self.is_authenticated),
160 instruments_cache: Arc::clone(&self.instruments_cache),
161 account_id: self.account_id,
162 quote_cache: Arc::clone(&self.quote_cache),
163 retry_manager: Arc::clone(&self.retry_manager),
164 cancellation_token: self.cancellation_token.clone(),
165 }
166 }
167}
168
169impl BybitWebSocketClient {
170 #[must_use]
172 pub fn new_public(url: Option<String>, heartbeat: Option<u64>) -> Self {
173 Self::new_public_with(
174 BybitProductType::Linear,
175 BybitEnvironment::Mainnet,
176 url,
177 heartbeat,
178 )
179 }
180
181 #[must_use]
187 pub fn new_public_with(
188 product_type: BybitProductType,
189 environment: BybitEnvironment,
190 url: Option<String>,
191 heartbeat: Option<u64>,
192 ) -> Self {
193 Self {
194 url: url.unwrap_or_else(|| bybit_ws_public_url(product_type, environment)),
195 environment,
196 product_type: Some(product_type),
197 credential: None,
198 requires_auth: false,
199 auth_tracker: AuthTracker::new(),
200 heartbeat: heartbeat.or(Some(DEFAULT_HEARTBEAT_SECS)),
201 inner: Arc::new(RwLock::new(None)),
202 rx: None,
203 signal: Arc::new(AtomicBool::new(false)),
204 task_handle: None,
205 subscriptions: SubscriptionState::new(),
206 is_authenticated: Arc::new(AtomicBool::new(false)),
207 instruments_cache: Arc::new(DashMap::new()),
208 account_id: None,
209 quote_cache: Arc::new(RwLock::new(cache::QuoteCache::new())),
210 retry_manager: Arc::new(
211 create_websocket_retry_manager().expect("Failed to create retry manager"),
212 ),
213 cancellation_token: CancellationToken::new(),
214 }
215 }
216
217 #[must_use]
223 pub fn new_private(
224 environment: BybitEnvironment,
225 credential: Credential,
226 url: Option<String>,
227 heartbeat: Option<u64>,
228 ) -> Self {
229 Self {
230 url: url.unwrap_or_else(|| bybit_ws_private_url(environment).to_string()),
231 environment,
232 product_type: None,
233 credential: Some(credential),
234 requires_auth: true,
235 auth_tracker: AuthTracker::new(),
236 heartbeat: heartbeat.or(Some(DEFAULT_HEARTBEAT_SECS)),
237 inner: Arc::new(RwLock::new(None)),
238 rx: None,
239 signal: Arc::new(AtomicBool::new(false)),
240 task_handle: None,
241 subscriptions: SubscriptionState::new(),
242 is_authenticated: Arc::new(AtomicBool::new(false)),
243 instruments_cache: Arc::new(DashMap::new()),
244 account_id: None,
245 quote_cache: Arc::new(RwLock::new(cache::QuoteCache::new())),
246 retry_manager: Arc::new(
247 create_websocket_retry_manager().expect("Failed to create retry manager"),
248 ),
249 cancellation_token: CancellationToken::new(),
250 }
251 }
252
253 #[must_use]
259 pub fn new_trade(
260 environment: BybitEnvironment,
261 credential: Credential,
262 url: Option<String>,
263 heartbeat: Option<u64>,
264 ) -> Self {
265 Self {
266 url: url.unwrap_or_else(|| bybit_ws_trade_url(environment).to_string()),
267 environment,
268 product_type: None,
269 credential: Some(credential),
270 requires_auth: true,
271 auth_tracker: AuthTracker::new(),
272 heartbeat: heartbeat.or(Some(DEFAULT_HEARTBEAT_SECS)),
273 inner: Arc::new(RwLock::new(None)),
274 rx: None,
275 signal: Arc::new(AtomicBool::new(false)),
276 task_handle: None,
277 subscriptions: SubscriptionState::new(),
278 is_authenticated: Arc::new(AtomicBool::new(false)),
279 instruments_cache: Arc::new(DashMap::new()),
280 account_id: None,
281 quote_cache: Arc::new(RwLock::new(cache::QuoteCache::new())),
282 retry_manager: Arc::new(
283 create_websocket_retry_manager().expect("Failed to create retry manager"),
284 ),
285 cancellation_token: CancellationToken::new(),
286 }
287 }
288
289 pub async fn connect(&mut self) -> BybitWsResult<()> {
299 let (message_handler, mut message_rx) = channel_message_handler();
300
301 let inner_for_ping = Arc::clone(&self.inner);
302 let ping_handler: PingHandler = Arc::new(move |payload: Vec<u8>| {
303 let inner = Arc::clone(&inner_for_ping);
304 get_runtime().spawn(async move {
305 let len = payload.len();
306 let guard = inner.read().await;
307 if let Some(client) = guard.as_ref() {
308 if let Err(err) = client.send_pong(payload).await {
309 tracing::warn!(error = %err, "Failed to send pong frame");
310 } else {
311 tracing::trace!("Sent pong frame ({len} bytes)");
312 }
313 }
314 });
315 });
316
317 let ping_msg = serde_json::to_string(&BybitSubscription {
318 op: BybitWsOperation::Ping,
319 args: vec![],
320 })
321 .expect("Failed to serialize ping message");
322
323 let config = WebSocketConfig {
324 url: self.url.clone(),
325 headers: Self::default_headers(),
326 message_handler: Some(message_handler),
327 heartbeat: self.heartbeat,
328 heartbeat_msg: Some(ping_msg),
329 ping_handler: Some(ping_handler),
330 reconnect_timeout_ms: Some(5_000),
331 reconnect_delay_initial_ms: Some(500),
332 reconnect_delay_max_ms: Some(5_000),
333 reconnect_backoff_factor: Some(1.5),
334 reconnect_jitter_ms: Some(250),
335 };
336
337 let client = WebSocketClient::connect(config, None, vec![], None)
338 .await
339 .map_err(BybitWsError::from)?;
340
341 {
342 let mut guard = self.inner.write().await;
343 *guard = Some(client);
344 }
345
346 let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel::<BybitWebSocketMessage>();
347 self.rx = Some(event_rx);
348 self.signal.store(false, Ordering::Relaxed);
349
350 let inner = Arc::clone(&self.inner);
351 let signal = Arc::clone(&self.signal);
352 let subscriptions = self.subscriptions.clone();
353 let auth_tracker = self.auth_tracker.clone();
354 let credential = self.credential.clone();
355 let requires_auth = self.requires_auth;
356 let is_authenticated = Arc::clone(&self.is_authenticated);
357 let quote_cache = Arc::clone(&self.quote_cache);
358
359 let task_handle = get_runtime().spawn(async move {
360 while let Some(message) = message_rx.recv().await {
361 if signal.load(Ordering::Relaxed) {
362 break;
363 }
364
365 match BybitWebSocketClient::handle_message(
366 &inner,
367 &subscriptions,
368 &auth_tracker,
369 requires_auth,
370 &is_authenticated,
371 message,
372 )
373 .await
374 {
375 Ok(Some(BybitWebSocketMessage::Reconnected)) => {
376 tracing::info!("Handling WebSocket reconnection");
377
378 let inner_for_task = inner.clone();
379 let subscriptions_for_task = subscriptions.clone();
380 let auth_tracker_for_task = auth_tracker.clone();
381 let is_authenticated_for_task = is_authenticated.clone();
382 let credential_for_task = credential.clone();
383 let quote_cache_for_task = quote_cache.clone();
384 let event_tx_for_task = event_tx.clone();
385
386 get_runtime().spawn(async move {
387 let auth_succeeded = if requires_auth {
389 match BybitWebSocketClient::authenticate_inner(
390 &inner_for_task,
391 requires_auth,
392 credential_for_task,
393 &auth_tracker_for_task,
394 &is_authenticated_for_task,
395 )
396 .await
397 {
398 Ok(()) => {
399 tracing::info!(
400 "Authentication successful after reconnect, proceeding with resubscription"
401 );
402 true
403 }
404 Err(e) => {
405 tracing::error!("Authentication after reconnect failed: {e}");
406 let error = BybitWebSocketError::from_message(e.to_string());
407 let _ = event_tx_for_task.send(BybitWebSocketMessage::Error(error));
408 false
409 }
410 }
411 } else {
412 true
413 };
414
415 if !auth_succeeded {
416 return;
417 }
418
419 quote_cache_for_task.write().await.clear();
421
422 if let Err(err) = BybitWebSocketClient::resubscribe_all_inner(
424 &inner_for_task,
425 &subscriptions_for_task,
426 )
427 .await
428 {
429 tracing::error!("Failed to restore subscriptions after reconnection: {err}");
430 let error = BybitWebSocketError::from_message(err.to_string());
431 let _ = event_tx_for_task.send(BybitWebSocketMessage::Error(error));
432 } else {
433 tracing::info!("Restored subscriptions after reconnection");
434 let _ = event_tx_for_task.send(BybitWebSocketMessage::Reconnected);
435 }
436 });
437 }
438 Ok(Some(event)) => {
439 if event_tx.send(event).is_err() {
440 break;
441 }
442 }
443 Ok(None) => {}
444 Err(err) => {
445 let error = BybitWebSocketError::from_message(err.to_string());
446 if event_tx.send(BybitWebSocketMessage::Error(error)).is_err() {
447 break;
448 }
449 }
450 }
451 }
452 });
453
454 self.task_handle = Some(task_handle);
455
456 self.authenticate_if_required().await?;
457
458 if !self.subscriptions.is_empty() {
460 Self::resubscribe_all_inner(&self.inner, &self.subscriptions).await?;
461 }
462
463 Ok(())
464 }
465
466 pub async fn close(&mut self) -> BybitWsResult<()> {
468 self.signal.store(true, Ordering::Relaxed);
469
470 {
471 let inner_guard = self.inner.read().await;
472 if let Some(inner) = inner_guard.as_ref() {
473 inner.disconnect().await;
474 }
475 }
476
477 if let Some(handle) = self.task_handle.take()
478 && let Err(err) = handle.await
479 {
480 tracing::error!(error = %err, "Bybit websocket task terminated with error");
481 }
482
483 self.rx = None;
484 self.is_authenticated.store(false, Ordering::Relaxed);
485
486 Ok(())
487 }
488
489 #[must_use]
491 pub async fn is_active(&self) -> bool {
492 let guard = self.inner.read().await;
493 guard.as_ref().is_some_and(WebSocketClient::is_active)
494 }
495
496 pub async fn wait_until_active(&self, timeout_secs: f64) -> BybitWsResult<()> {
502 let timeout = tokio::time::Duration::from_secs_f64(timeout_secs);
503
504 tokio::time::timeout(timeout, async {
505 while !self.is_active().await {
506 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
507 }
508 })
509 .await
510 .map_err(|_| {
511 BybitWsError::ClientError(format!(
512 "WebSocket connection timeout after {timeout_secs} seconds"
513 ))
514 })?;
515
516 Ok(())
517 }
518
519 pub async fn subscribe(&self, topics: Vec<String>) -> BybitWsResult<()> {
521 if topics.is_empty() {
522 return Ok(());
523 }
524
525 let mut topics_to_send = Vec::new();
527
528 for topic in topics {
529 if self.subscriptions.add_reference(&topic) {
531 self.subscriptions.mark_subscribe(&topic);
532 topics_to_send.push(topic.clone());
533 } else {
534 tracing::debug!("Already subscribed to {topic}, skipping duplicate subscription");
535 }
536 }
537
538 if topics_to_send.is_empty() {
539 return Ok(());
540 }
541
542 Self::send_topics_inner(&self.inner, BybitWsOperation::Subscribe, topics_to_send).await
543 }
544
545 pub async fn unsubscribe(&self, topics: Vec<String>) -> BybitWsResult<()> {
547 if topics.is_empty() {
548 return Ok(());
549 }
550
551 let mut topics_to_send = Vec::new();
553
554 for topic in topics {
555 if self.subscriptions.remove_reference(&topic) {
557 self.subscriptions.mark_unsubscribe(&topic);
558 topics_to_send.push(topic.clone());
559 } else {
560 tracing::debug!("Topic {topic} still has active subscriptions, not unsubscribing");
561 }
562 }
563
564 if topics_to_send.is_empty() {
565 return Ok(());
566 }
567
568 Self::send_topics_inner(&self.inner, BybitWsOperation::Unsubscribe, topics_to_send).await
569 }
570
571 pub fn stream(
577 &mut self,
578 ) -> impl futures_util::Stream<Item = BybitWebSocketMessage> + Send + 'static {
579 let rx = self
580 .rx
581 .take()
582 .expect("Stream receiver already taken or client not connected");
583
584 async_stream::stream! {
585 let mut rx = rx;
586 while let Some(event) = rx.recv().await {
587 yield event;
588 }
589 }
590 }
591
592 #[must_use]
594 pub fn subscription_count(&self) -> usize {
595 self.subscriptions.len()
596 }
597
598 pub fn add_instrument(&self, instrument: InstrumentAny) {
600 let instrument_id = instrument.id();
601 self.instruments_cache.insert(instrument_id, instrument);
602 tracing::debug!("Added instrument {instrument_id} to WebSocket client cache");
603 }
604
605 #[must_use]
607 pub fn instruments(&self) -> &Arc<DashMap<InstrumentId, InstrumentAny>> {
608 &self.instruments_cache
609 }
610
611 pub fn set_account_id(&mut self, account_id: AccountId) {
613 self.account_id = Some(account_id);
614 }
615
616 #[must_use]
618 pub fn account_id(&self) -> Option<AccountId> {
619 self.account_id
620 }
621
622 #[must_use]
624 pub fn product_type(&self) -> Option<BybitProductType> {
625 self.product_type
626 }
627
628 #[must_use]
630 pub fn quote_cache(&self) -> &Arc<RwLock<cache::QuoteCache>> {
631 &self.quote_cache
632 }
633
634 pub async fn subscribe_orderbook(
644 &self,
645 instrument_id: InstrumentId,
646 depth: u32,
647 ) -> BybitWsResult<()> {
648 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
649 let topic = format!("orderbook.{}.{}", depth, raw_symbol);
650 self.subscribe(vec![topic]).await
651 }
652
653 pub async fn unsubscribe_orderbook(
655 &self,
656 instrument_id: InstrumentId,
657 depth: u32,
658 ) -> BybitWsResult<()> {
659 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
660 let topic = format!("orderbook.{}.{}", depth, raw_symbol);
661 self.unsubscribe(vec![topic]).await
662 }
663
664 pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
674 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
675 let topic = format!("publicTrade.{}", raw_symbol);
676 self.subscribe(vec![topic]).await
677 }
678
679 pub async fn unsubscribe_trades(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
681 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
682 let topic = format!("publicTrade.{}", raw_symbol);
683 self.unsubscribe(vec![topic]).await
684 }
685
686 pub async fn subscribe_ticker(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
696 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
697 let topic = format!("tickers.{}", raw_symbol);
698 self.subscribe(vec![topic]).await
699 }
700
701 pub async fn unsubscribe_ticker(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
703 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
704 let topic = format!("tickers.{}", raw_symbol);
705 self.unsubscribe(vec![topic]).await
706 }
707
708 pub async fn subscribe_klines(
718 &self,
719 instrument_id: InstrumentId,
720 interval: impl Into<String>,
721 ) -> BybitWsResult<()> {
722 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
723 let topic = format!("kline.{}.{}", interval.into(), raw_symbol);
724 self.subscribe(vec![topic]).await
725 }
726
727 pub async fn unsubscribe_klines(
729 &self,
730 instrument_id: InstrumentId,
731 interval: impl Into<String>,
732 ) -> BybitWsResult<()> {
733 let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
734 let topic = format!("kline.{}.{}", interval.into(), raw_symbol);
735 self.unsubscribe(vec![topic]).await
736 }
737
738 pub async fn subscribe_orders(&self) -> BybitWsResult<()> {
748 if !self.requires_auth {
749 return Err(BybitWsError::Authentication(
750 "Order subscription requires authentication".to_string(),
751 ));
752 }
753 self.subscribe(vec!["order".to_string()]).await
754 }
755
756 pub async fn unsubscribe_orders(&self) -> BybitWsResult<()> {
758 self.unsubscribe(vec!["order".to_string()]).await
759 }
760
761 pub async fn subscribe_executions(&self) -> BybitWsResult<()> {
771 if !self.requires_auth {
772 return Err(BybitWsError::Authentication(
773 "Execution subscription requires authentication".to_string(),
774 ));
775 }
776 self.subscribe(vec!["execution".to_string()]).await
777 }
778
779 pub async fn unsubscribe_executions(&self) -> BybitWsResult<()> {
781 self.unsubscribe(vec!["execution".to_string()]).await
782 }
783
784 pub async fn subscribe_positions(&self) -> BybitWsResult<()> {
794 if !self.requires_auth {
795 return Err(BybitWsError::Authentication(
796 "Position subscription requires authentication".to_string(),
797 ));
798 }
799 self.subscribe(vec!["position".to_string()]).await
800 }
801
802 pub async fn unsubscribe_positions(&self) -> BybitWsResult<()> {
804 self.unsubscribe(vec!["position".to_string()]).await
805 }
806
807 pub async fn subscribe_wallet(&self) -> BybitWsResult<()> {
817 if !self.requires_auth {
818 return Err(BybitWsError::Authentication(
819 "Wallet subscription requires authentication".to_string(),
820 ));
821 }
822 self.subscribe(vec!["wallet".to_string()]).await
823 }
824
825 pub async fn unsubscribe_wallet(&self) -> BybitWsResult<()> {
827 self.unsubscribe(vec!["wallet".to_string()]).await
828 }
829
830 pub async fn place_order(&self, params: BybitWsPlaceOrderParams) -> BybitWsResult<()> {
840 if !self.is_authenticated.load(Ordering::Relaxed) {
841 return Err(BybitWsError::Authentication(
842 "Must be authenticated to place orders".to_string(),
843 ));
844 }
845
846 self.retry_manager
847 .execute_with_retry_with_cancel(
848 "place_order",
849 || {
850 let params = params.clone();
851 async move {
852 let request = BybitWsRequest {
853 op: BybitWsOrderRequestOp::Create,
854 header: BybitWsHeader::now(),
855 args: vec![params],
856 };
857
858 let payload =
859 serde_json::to_string(&request).map_err(BybitWsError::from)?;
860 tracing::debug!("Sending order WebSocket message: {}", payload);
861 Self::send_text_inner(&self.inner, &payload).await
862 }
863 },
864 should_retry_bybit_error,
865 create_bybit_timeout_error,
866 &self.cancellation_token,
867 )
868 .await
869 }
870
871 pub async fn amend_order(&self, params: BybitWsAmendOrderParams) -> BybitWsResult<()> {
881 if !self.is_authenticated.load(Ordering::Relaxed) {
882 return Err(BybitWsError::Authentication(
883 "Must be authenticated to amend orders".to_string(),
884 ));
885 }
886
887 self.retry_manager
888 .execute_with_retry_with_cancel(
889 "amend_order",
890 || {
891 let params = params.clone();
892 async move {
893 let request = BybitWsRequest {
894 op: BybitWsOrderRequestOp::Amend,
895 header: BybitWsHeader::now(),
896 args: vec![params],
897 };
898
899 let payload =
900 serde_json::to_string(&request).map_err(BybitWsError::from)?;
901 Self::send_text_inner(&self.inner, &payload).await
902 }
903 },
904 should_retry_bybit_error,
905 create_bybit_timeout_error,
906 &self.cancellation_token,
907 )
908 .await
909 }
910
911 pub async fn cancel_order(&self, params: BybitWsCancelOrderParams) -> BybitWsResult<()> {
921 if !self.is_authenticated.load(Ordering::Relaxed) {
922 return Err(BybitWsError::Authentication(
923 "Must be authenticated to cancel orders".to_string(),
924 ));
925 }
926
927 self.retry_manager
928 .execute_with_retry_with_cancel(
929 "cancel_order",
930 || {
931 let params = params.clone();
932 async move {
933 let request = BybitWsRequest {
934 op: BybitWsOrderRequestOp::Cancel,
935 header: BybitWsHeader::now(),
936 args: vec![params],
937 };
938
939 let payload =
940 serde_json::to_string(&request).map_err(BybitWsError::from)?;
941 Self::send_text_inner(&self.inner, &payload).await
942 }
943 },
944 should_retry_bybit_error,
945 create_bybit_timeout_error,
946 &self.cancellation_token,
947 )
948 .await
949 }
950
951 pub async fn batch_place_orders(
961 &self,
962 orders: Vec<BybitWsPlaceOrderParams>,
963 ) -> BybitWsResult<()> {
964 if !self.is_authenticated.load(Ordering::Relaxed) {
965 return Err(BybitWsError::Authentication(
966 "Must be authenticated to place orders".to_string(),
967 ));
968 }
969
970 if orders.len() > 20 {
971 return Err(BybitWsError::ClientError(
972 "Batch order limit is 20 orders per request".to_string(),
973 ));
974 }
975
976 let request = BybitWsRequest {
977 op: BybitWsOrderRequestOp::CreateBatch,
978 header: BybitWsHeader::now(),
979 args: orders,
980 };
981
982 let payload = serde_json::to_string(&request).map_err(BybitWsError::from)?;
983 Self::send_text_inner(&self.inner, &payload).await
984 }
985
986 pub async fn batch_amend_orders(
992 &self,
993 orders: Vec<BybitWsAmendOrderParams>,
994 ) -> BybitWsResult<()> {
995 if !self.is_authenticated.load(Ordering::Relaxed) {
996 return Err(BybitWsError::Authentication(
997 "Must be authenticated to amend orders".to_string(),
998 ));
999 }
1000
1001 if orders.len() > 20 {
1002 return Err(BybitWsError::ClientError(
1003 "Batch amend limit is 20 orders per request".to_string(),
1004 ));
1005 }
1006
1007 let request = BybitWsRequest {
1008 op: BybitWsOrderRequestOp::AmendBatch,
1009 header: BybitWsHeader::now(),
1010 args: orders,
1011 };
1012
1013 let payload = serde_json::to_string(&request).map_err(BybitWsError::from)?;
1014 Self::send_text_inner(&self.inner, &payload).await
1015 }
1016
1017 pub async fn batch_cancel_orders(
1023 &self,
1024 orders: Vec<BybitWsCancelOrderParams>,
1025 ) -> BybitWsResult<()> {
1026 if !self.is_authenticated.load(Ordering::Relaxed) {
1027 return Err(BybitWsError::Authentication(
1028 "Must be authenticated to cancel orders".to_string(),
1029 ));
1030 }
1031
1032 if orders.len() > 20 {
1033 return Err(BybitWsError::ClientError(
1034 "Batch cancel limit is 20 orders per request".to_string(),
1035 ));
1036 }
1037
1038 let request = BybitWsRequest {
1039 op: BybitWsOrderRequestOp::CancelBatch,
1040 header: BybitWsHeader::now(),
1041 args: orders,
1042 };
1043
1044 let payload = serde_json::to_string(&request).map_err(BybitWsError::from)?;
1045 Self::send_text_inner(&self.inner, &payload).await
1046 }
1047
1048 #[allow(clippy::too_many_arguments)]
1054 pub async fn submit_order(
1055 &self,
1056 product_type: BybitProductType,
1057 instrument_id: InstrumentId,
1058 client_order_id: ClientOrderId,
1059 order_side: OrderSide,
1060 order_type: OrderType,
1061 quantity: Quantity,
1062 time_in_force: Option<TimeInForce>,
1063 price: Option<Price>,
1064 trigger_price: Option<Price>,
1065 post_only: Option<bool>,
1066 reduce_only: Option<bool>,
1067 ) -> BybitWsResult<()> {
1068 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1069 .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1070 let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1071
1072 let bybit_side = match order_side {
1073 OrderSide::Buy => BybitOrderSide::Buy,
1074 OrderSide::Sell => BybitOrderSide::Sell,
1075 _ => {
1076 return Err(BybitWsError::ClientError(format!(
1077 "Invalid order side: {order_side:?}"
1078 )));
1079 }
1080 };
1081
1082 let (bybit_order_type, is_stop_order) = match order_type {
1085 OrderType::Market => (BybitOrderType::Market, false),
1086 OrderType::Limit => (BybitOrderType::Limit, false),
1087 OrderType::StopMarket | OrderType::MarketIfTouched => (BybitOrderType::Market, true),
1088 OrderType::StopLimit | OrderType::LimitIfTouched => (BybitOrderType::Limit, true),
1089 _ => {
1090 return Err(BybitWsError::ClientError(format!(
1091 "Unsupported order type: {order_type:?}"
1092 )));
1093 }
1094 };
1095
1096 let bybit_tif = if post_only == Some(true) {
1098 Some(BybitTimeInForce::PostOnly)
1099 } else if let Some(tif) = time_in_force {
1100 Some(match tif {
1101 TimeInForce::Gtc => BybitTimeInForce::Gtc,
1102 TimeInForce::Ioc => BybitTimeInForce::Ioc,
1103 TimeInForce::Fok => BybitTimeInForce::Fok,
1104 _ => {
1105 return Err(BybitWsError::ClientError(format!(
1106 "Unsupported time in force: {tif:?}"
1107 )));
1108 }
1109 })
1110 } else {
1111 None
1112 };
1113
1114 let params = if is_stop_order {
1115 BybitWsPlaceOrderParams {
1118 category: product_type,
1119 symbol: raw_symbol,
1120 side: bybit_side,
1121 order_type: bybit_order_type,
1122 qty: quantity.to_string(),
1123 price: price.map(|p| p.to_string()),
1124 time_in_force: bybit_tif,
1125 order_link_id: Some(client_order_id.to_string()),
1126 reduce_only: reduce_only.filter(|&r| r),
1127 close_on_trigger: None,
1128 trigger_price: trigger_price.map(|p| p.to_string()),
1129 trigger_by: Some(BybitTriggerType::LastPrice),
1130 trigger_direction: None,
1131 tpsl_mode: None, take_profit: None,
1133 stop_loss: None,
1134 tp_trigger_by: None,
1135 sl_trigger_by: None,
1136 sl_trigger_price: None, tp_trigger_price: None, sl_order_type: None,
1139 tp_order_type: None,
1140 sl_limit_price: None,
1141 tp_limit_price: None,
1142 }
1143 } else {
1144 BybitWsPlaceOrderParams {
1146 category: product_type,
1147 symbol: raw_symbol,
1148 side: bybit_side,
1149 order_type: bybit_order_type,
1150 qty: quantity.to_string(),
1151 price: price.map(|p| p.to_string()),
1152 time_in_force: bybit_tif,
1153 order_link_id: Some(client_order_id.to_string()),
1154 reduce_only: reduce_only.filter(|&r| r),
1155 close_on_trigger: None,
1156 trigger_price: None,
1157 trigger_by: None,
1158 trigger_direction: None,
1159 tpsl_mode: None,
1160 take_profit: None,
1161 stop_loss: None,
1162 tp_trigger_by: None,
1163 sl_trigger_by: None,
1164 sl_trigger_price: None,
1165 tp_trigger_price: None,
1166 sl_order_type: None,
1167 tp_order_type: None,
1168 sl_limit_price: None,
1169 tp_limit_price: None,
1170 }
1171 };
1172
1173 self.place_order(params).await
1174 }
1175
1176 pub async fn modify_order(
1182 &self,
1183 product_type: BybitProductType,
1184 instrument_id: InstrumentId,
1185 venue_order_id: Option<VenueOrderId>,
1186 client_order_id: Option<ClientOrderId>,
1187 quantity: Option<Quantity>,
1188 price: Option<Price>,
1189 ) -> BybitWsResult<()> {
1190 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1191 .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1192 let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1193
1194 let params = BybitWsAmendOrderParams {
1195 category: product_type,
1196 symbol: raw_symbol,
1197 order_id: venue_order_id.map(|id| id.to_string()),
1198 order_link_id: client_order_id.map(|id| id.to_string()),
1199 qty: quantity.map(|q| q.to_string()),
1200 price: price.map(|p| p.to_string()),
1201 trigger_price: None,
1202 take_profit: None,
1203 stop_loss: None,
1204 tp_trigger_by: None,
1205 sl_trigger_by: None,
1206 };
1207
1208 self.amend_order(params).await
1209 }
1210
1211 pub async fn cancel_order_by_id(
1217 &self,
1218 product_type: BybitProductType,
1219 instrument_id: InstrumentId,
1220 venue_order_id: Option<VenueOrderId>,
1221 client_order_id: Option<ClientOrderId>,
1222 ) -> BybitWsResult<()> {
1223 let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1224 .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1225 let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1226
1227 let params = BybitWsCancelOrderParams {
1228 category: product_type,
1229 symbol: raw_symbol,
1230 order_id: venue_order_id.map(|id| id.to_string()),
1231 order_link_id: client_order_id.map(|id| id.to_string()),
1232 };
1233
1234 self.cancel_order(params).await
1235 }
1236
1237 fn default_headers() -> Vec<(String, String)> {
1238 vec![
1239 ("Content-Type".to_string(), "application/json".to_string()),
1240 ("User-Agent".to_string(), NAUTILUS_USER_AGENT.to_string()),
1241 ("Referer".to_string(), BYBIT_NAUTILUS_BROKER_ID.to_string()),
1242 ]
1243 }
1244
1245 async fn authenticate_if_required(&self) -> BybitWsResult<()> {
1246 Self::authenticate_inner(
1247 &self.inner,
1248 self.requires_auth,
1249 self.credential.clone(),
1250 &self.auth_tracker,
1251 &self.is_authenticated,
1252 )
1253 .await
1254 }
1255
1256 async fn send_text_inner(
1257 inner: &Arc<RwLock<Option<WebSocketClient>>>,
1258 text: &str,
1259 ) -> BybitWsResult<()> {
1260 let guard = inner.read().await;
1261 let client = guard.as_ref().ok_or(BybitWsError::NotConnected)?;
1262 client
1263 .send_text(text.to_string(), None)
1264 .await
1265 .map_err(BybitWsError::from)
1266 }
1267
1268 async fn send_pong_inner(
1269 inner: &Arc<RwLock<Option<WebSocketClient>>>,
1270 payload: Vec<u8>,
1271 ) -> BybitWsResult<()> {
1272 let guard = inner.read().await;
1273 let client = guard.as_ref().ok_or(BybitWsError::NotConnected)?;
1274 client.send_pong(payload).await.map_err(BybitWsError::from)
1275 }
1276
1277 async fn send_topics_inner(
1278 inner: &Arc<RwLock<Option<WebSocketClient>>>,
1279 op: BybitWsOperation,
1280 topics: Vec<String>,
1281 ) -> BybitWsResult<()> {
1282 if topics.is_empty() {
1283 return Ok(());
1284 }
1285
1286 for chunk in topics.chunks(MAX_ARGS_PER_SUBSCRIPTION_REQUEST) {
1287 let subscription = BybitSubscription {
1288 op: op.clone(),
1289 args: chunk.to_vec(),
1290 };
1291 let payload = serde_json::to_string(&subscription)?;
1292 Self::send_text_inner(inner, &payload).await?;
1293 }
1294
1295 Ok(())
1296 }
1297
1298 async fn resubscribe_all_inner(
1299 inner: &Arc<RwLock<Option<WebSocketClient>>>,
1300 subscriptions: &SubscriptionState,
1301 ) -> BybitWsResult<()> {
1302 let topics = subscriptions.all_topics();
1303 if topics.is_empty() {
1304 return Ok(());
1305 }
1306
1307 tracing::info!(
1308 "Restoring {} subscriptions after reconnection",
1309 topics.len()
1310 );
1311 Self::send_topics_inner(inner, BybitWsOperation::Subscribe, topics).await
1312 }
1313
1314 async fn handle_message(
1315 inner: &Arc<RwLock<Option<WebSocketClient>>>,
1316 subscriptions: &SubscriptionState,
1317 auth_tracker: &AuthTracker,
1318 requires_auth: bool,
1319 is_authenticated: &Arc<AtomicBool>,
1320 message: Message,
1321 ) -> BybitWsResult<Option<BybitWebSocketMessage>> {
1322 match message {
1323 Message::Text(text) => {
1324 tracing::trace!("Bybit WS message: {text}");
1325
1326 if text == RECONNECTED {
1327 tracing::debug!("Bybit websocket reconnected signal received");
1328 return Ok(Some(BybitWebSocketMessage::Reconnected));
1329 }
1330
1331 if text.trim().eq_ignore_ascii_case(BYBIT_PONG) {
1332 return Ok(Some(BybitWebSocketMessage::Pong));
1333 }
1334
1335 let value: Value = serde_json::from_str(&text).map_err(BybitWsError::from)?;
1336
1337 if let Ok(op) = serde_json::from_value::<BybitWsOperation>(
1339 value.get("op").cloned().unwrap_or(Value::Null),
1340 ) {
1341 match op {
1342 BybitWsOperation::Ping => {
1343 let pong = BybitSubscription {
1344 op: BybitWsOperation::Pong,
1345 args: vec![],
1346 };
1347 let payload = serde_json::to_string(&pong)?;
1348 Self::send_text_inner(inner, &payload).await?;
1349 return Ok(None);
1350 }
1351 BybitWsOperation::Pong => {
1352 return Ok(Some(BybitWebSocketMessage::Pong));
1353 }
1354 _ => {}
1355 }
1356 }
1357
1358 if let Some(event) = Self::classify_message(&value) {
1359 if matches!(event, BybitWebSocketMessage::Error(_)) {
1361 tracing::debug!(
1362 json = %serde_json::to_string(&value).unwrap_or_default(),
1363 "Received error event from Bybit"
1364 );
1365 }
1366
1367 if let BybitWebSocketMessage::Auth(auth) = &event {
1368 let is_success =
1370 auth.success.unwrap_or(false) || auth.ret_code.unwrap_or(-1) == 0;
1371
1372 if is_success {
1373 is_authenticated.store(true, Ordering::Relaxed);
1374 auth_tracker.succeed();
1375 } else {
1376 is_authenticated.store(false, Ordering::Relaxed);
1377 let message = auth
1378 .ret_msg
1379 .clone()
1380 .unwrap_or_else(|| "Authentication failed".to_string());
1381 auth_tracker.fail(message);
1382 }
1383 } else if let BybitWebSocketMessage::Subscription(sub_msg) = &event {
1384 match sub_msg.op {
1386 BybitWsOperation::Subscribe => {
1387 let pending_topics = subscriptions.pending_subscribe_topics();
1388 if sub_msg.success {
1390 for topic in pending_topics {
1391 subscriptions.confirm_subscribe(&topic);
1392 tracing::debug!(topic = topic, "Subscription confirmed");
1393 }
1394 } else {
1395 for topic in pending_topics {
1396 subscriptions.mark_failure(&topic);
1397 tracing::warn!(
1398 topic = topic,
1399 error = ?sub_msg.ret_msg,
1400 "Subscription failed, will retry on reconnect"
1401 );
1402 }
1403 }
1404 }
1405 BybitWsOperation::Unsubscribe => {
1406 let pending_topics = subscriptions.pending_unsubscribe_topics();
1407 if sub_msg.success {
1409 for topic in pending_topics {
1410 subscriptions.confirm_unsubscribe(&topic);
1411 tracing::debug!(topic = topic, "Unsubscription confirmed");
1412 }
1413 } else {
1414 for topic in pending_topics {
1417 subscriptions.confirm_unsubscribe(&topic); subscriptions.confirm_subscribe(&topic); tracing::warn!(
1420 topic = topic,
1421 error = ?sub_msg.ret_msg,
1422 "Unsubscription failed, topic remains subscribed"
1423 );
1424 }
1425 }
1426 }
1427 _ => {}
1428 }
1429 } else if let BybitWebSocketMessage::Error(err) = &event
1430 && requires_auth
1431 && !is_authenticated.load(Ordering::Relaxed)
1432 {
1433 auth_tracker.fail(err.message.clone());
1434 }
1435 if let BybitWebSocketMessage::Error(err) = &event {
1436 tracing::warn!(
1437 code = err.code,
1438 message = %err.message,
1439 conn_id = ?err.conn_id,
1440 topic = ?err.topic,
1441 req_id = ?err.req_id,
1442 "Bybit websocket error"
1443 );
1444 }
1445 return Ok(Some(event));
1446 }
1447
1448 Ok(Some(BybitWebSocketMessage::Raw(value)))
1449 }
1450 Message::Ping(payload) => {
1451 Self::send_pong_inner(inner, payload.to_vec()).await?;
1452 Ok(None)
1453 }
1454 Message::Pong(_) => Ok(Some(BybitWebSocketMessage::Pong)),
1455 Message::Binary(_) => Ok(None),
1456 Message::Close(_) => Ok(None),
1457 Message::Frame(_) => Ok(None),
1458 }
1459 }
1460
1461 fn classify_message(value: &Value) -> Option<BybitWebSocketMessage> {
1462 if let Ok(op) = serde_json::from_value::<BybitWsOperation>(
1464 value.get("op").cloned().unwrap_or(Value::Null),
1465 ) && op == BybitWsOperation::Auth
1466 {
1467 tracing::debug!(json = %value, "Detected auth message by op field");
1468 if let Ok(auth) = serde_json::from_value::<BybitWsAuthResponse>(value.clone()) {
1469 let is_success = auth.success.unwrap_or(false) || auth.ret_code.unwrap_or(-1) == 0;
1471
1472 if is_success {
1473 tracing::debug!("Auth successful, returning Auth message");
1474 return Some(BybitWebSocketMessage::Auth(auth));
1475 }
1476 let resp = BybitWsResponse {
1477 op: Some(auth.op.clone()),
1478 topic: None,
1479 success: auth.success,
1480 conn_id: auth.conn_id.clone(),
1481 req_id: None,
1482 ret_code: auth.ret_code,
1483 ret_msg: auth.ret_msg.clone(),
1484 };
1485 let error = BybitWebSocketError::from_response(&resp);
1486 return Some(BybitWebSocketMessage::Error(error));
1487 }
1488 }
1489
1490 if let Some(success) = value.get("success").and_then(Value::as_bool) {
1491 if success {
1492 if let Ok(msg) = serde_json::from_value::<BybitWsSubscriptionMsg>(value.clone()) {
1493 return Some(BybitWebSocketMessage::Subscription(msg));
1494 }
1495 } else if let Ok(resp) = serde_json::from_value::<BybitWsResponse>(value.clone()) {
1496 let error = BybitWebSocketError::from_response(&resp);
1497 return Some(BybitWebSocketMessage::Error(error));
1498 }
1499 }
1500
1501 if (value.get("ret_code").is_some() || value.get("retCode").is_some())
1502 && let Ok(resp) = serde_json::from_value::<BybitWsResponse>(value.clone())
1503 {
1504 if resp.ret_code.unwrap_or_default() != 0 {
1505 let error = BybitWebSocketError::from_response(&resp);
1506 return Some(BybitWebSocketMessage::Error(error));
1507 }
1508 return Some(BybitWebSocketMessage::Response(resp));
1509 }
1510
1511 if let Ok(auth) = serde_json::from_value::<BybitWsAuthResponse>(value.clone())
1512 && auth.op == BybitWsOperation::Auth
1513 {
1514 if auth.success.unwrap_or(false) {
1515 return Some(BybitWebSocketMessage::Auth(auth));
1516 }
1517 let resp = BybitWsResponse {
1518 op: Some(auth.op.clone()),
1519 topic: None,
1520 success: auth.success,
1521 conn_id: auth.conn_id.clone(),
1522 req_id: None,
1523 ret_code: auth.ret_code,
1524 ret_msg: auth.ret_msg.clone(),
1525 };
1526 let error = BybitWebSocketError::from_response(&resp);
1527 return Some(BybitWebSocketMessage::Error(error));
1528 }
1529
1530 if let Some(topic) = value.get("topic").and_then(Value::as_str) {
1531 if topic.starts_with("orderbook") {
1532 if let Ok(msg) = serde_json::from_value::<BybitWsOrderbookDepthMsg>(value.clone()) {
1533 return Some(BybitWebSocketMessage::Orderbook(msg));
1534 }
1535 } else if topic.contains("publicTrade") || topic.starts_with("trade") {
1536 if let Ok(msg) = serde_json::from_value::<BybitWsTradeMsg>(value.clone()) {
1537 return Some(BybitWebSocketMessage::Trade(msg));
1538 }
1539 } else if topic.contains("kline") {
1540 if let Ok(msg) = serde_json::from_value::<BybitWsKlineMsg>(value.clone()) {
1541 return Some(BybitWebSocketMessage::Kline(msg));
1542 }
1543 } else if topic.contains("tickers") {
1544 if let Ok(msg) = serde_json::from_value::<BybitWsTickerOptionMsg>(value.clone()) {
1545 return Some(BybitWebSocketMessage::TickerOption(msg));
1546 }
1547 if let Ok(msg) = serde_json::from_value::<BybitWsTickerLinearMsg>(value.clone()) {
1548 return Some(BybitWebSocketMessage::TickerLinear(msg));
1549 }
1550 } else if topic == "order" || topic.starts_with("order.") {
1551 match serde_json::from_value::<BybitWsAccountOrderMsg>(value.clone()) {
1552 Ok(msg) => return Some(BybitWebSocketMessage::AccountOrder(msg)),
1553 Err(e) => tracing::warn!("Failed to deserialize order message: {e}\n{value}"),
1554 }
1555 } else if topic == "execution" || topic.starts_with("execution.") {
1556 match serde_json::from_value::<BybitWsAccountExecutionMsg>(value.clone()) {
1557 Ok(msg) => return Some(BybitWebSocketMessage::AccountExecution(msg)),
1558 Err(e) => {
1559 tracing::warn!("Failed to deserialize execution message: {e}\n{value}")
1560 }
1561 }
1562 } else if topic == "wallet" || topic.starts_with("wallet.") {
1563 match serde_json::from_value::<BybitWsAccountWalletMsg>(value.clone()) {
1564 Ok(msg) => return Some(BybitWebSocketMessage::AccountWallet(msg)),
1565 Err(e) => tracing::warn!("Failed to deserialize wallet message: {e}\n{value}"),
1566 }
1567 } else if topic == "position" || topic.starts_with("position.") {
1568 match serde_json::from_value::<BybitWsAccountPositionMsg>(value.clone()) {
1569 Ok(msg) => return Some(BybitWebSocketMessage::AccountPosition(msg)),
1570 Err(e) => {
1571 tracing::warn!("Failed to deserialize position message: {e}\n{value}")
1572 }
1573 }
1574 }
1575 }
1576
1577 None
1578 }
1579
1580 async fn authenticate_inner(
1581 inner: &Arc<RwLock<Option<WebSocketClient>>>,
1582 requires_auth: bool,
1583 credential: Option<Credential>,
1584 auth_tracker: &AuthTracker,
1585 is_authenticated: &Arc<AtomicBool>,
1586 ) -> BybitWsResult<()> {
1587 if !requires_auth {
1588 return Ok(());
1589 }
1590
1591 is_authenticated.store(false, Ordering::Relaxed);
1592
1593 let credential = credential.ok_or_else(|| {
1594 BybitWsError::Authentication(
1595 "API credentials not provided for authentication".to_string(),
1596 )
1597 })?;
1598
1599 let receiver = auth_tracker.begin();
1600
1601 let now_ns = get_atomic_clock_realtime().get_time_ns().as_i64();
1602 let now_ms = now_ns / 1_000_000;
1603 let expires = now_ms + WEBSOCKET_AUTH_WINDOW_MS;
1604 let signature = credential.sign_websocket_auth(expires);
1605
1606 let auth_request = BybitAuthRequest {
1607 op: BybitWsOperation::Auth,
1608 args: vec![
1609 json!(credential.api_key().as_str()),
1610 json!(expires),
1611 json!(signature),
1612 ],
1613 };
1614
1615 let payload = serde_json::to_string(&auth_request)?;
1616
1617 if let Err(err) = Self::send_text_inner(inner, &payload).await {
1618 auth_tracker.fail(err.to_string());
1619 return Err(err);
1620 }
1621
1622 match auth_tracker
1623 .wait_for_result(Duration::from_secs(AUTHENTICATION_TIMEOUT_SECS), receiver)
1624 .await
1625 {
1626 Ok(()) => {
1627 is_authenticated.store(true, Ordering::Relaxed);
1628 Ok(())
1629 }
1630 Err(err) => {
1631 is_authenticated.store(false, Ordering::Relaxed);
1632 Err(err)
1633 }
1634 }
1635 }
1636}
1637
1638#[cfg(test)]
1643mod tests {
1644 use rstest::rstest;
1645
1646 use super::*;
1647 use crate::common::testing::load_test_json;
1648
1649 #[rstest]
1650 fn classify_orderbook_snapshot() {
1651 let json: Value = serde_json::from_str(&load_test_json("ws_orderbook_snapshot.json"))
1652 .expect("invalid fixture");
1653 let message =
1654 BybitWebSocketClient::classify_message(&json).expect("expected orderbook message");
1655 assert!(matches!(message, BybitWebSocketMessage::Orderbook(_)));
1656 }
1657
1658 #[rstest]
1659 fn classify_trade_snapshot() {
1660 let json: Value =
1661 serde_json::from_str(&load_test_json("ws_public_trade.json")).expect("invalid fixture");
1662 let message =
1663 BybitWebSocketClient::classify_message(&json).expect("expected trade message");
1664 assert!(matches!(message, BybitWebSocketMessage::Trade(_)));
1665 }
1666
1667 #[rstest]
1668 fn classify_ticker_linear_snapshot() {
1669 let json: Value = serde_json::from_str(&load_test_json("ws_ticker_linear.json"))
1670 .expect("invalid fixture");
1671 let message =
1672 BybitWebSocketClient::classify_message(&json).expect("expected ticker message");
1673 assert!(matches!(message, BybitWebSocketMessage::TickerLinear(_)));
1674 }
1675
1676 #[rstest]
1677 fn classify_ticker_option_snapshot() {
1678 let json: Value = serde_json::from_str(&load_test_json("ws_ticker_option.json"))
1679 .expect("invalid fixture");
1680 let message =
1681 BybitWebSocketClient::classify_message(&json).expect("expected ticker message");
1682 assert!(matches!(message, BybitWebSocketMessage::TickerOption(_)));
1683 }
1684}