nautilus_bybit/websocket/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Bybit WebSocket client providing public market data streaming.
17//!
18//! Bybit API reference <https://bybit-exchange.github.io/docs/>.
19
20use std::{
21    fmt,
22    sync::{
23        Arc,
24        atomic::{AtomicBool, Ordering},
25    },
26    time::Duration,
27};
28
29use dashmap::DashMap;
30use nautilus_common::runtime::get_runtime;
31use nautilus_core::{consts::NAUTILUS_USER_AGENT, time::get_atomic_clock_realtime};
32use nautilus_model::{
33    enums::{OrderSide, OrderType, TimeInForce},
34    identifiers::{AccountId, ClientOrderId, InstrumentId, VenueOrderId},
35    instruments::{Instrument, InstrumentAny},
36    types::{Price, Quantity},
37};
38use nautilus_network::{
39    RECONNECTED,
40    retry::{RetryManager, create_websocket_retry_manager},
41    websocket::{PingHandler, WebSocketClient, WebSocketConfig, channel_message_handler},
42};
43use serde_json::{Value, json};
44use tokio::sync::RwLock;
45use tokio_tungstenite::tungstenite::Message;
46use tokio_util::sync::CancellationToken;
47use ustr::Ustr;
48
49use crate::{
50    common::{
51        consts::{BYBIT_NAUTILUS_BROKER_ID, BYBIT_PONG},
52        credential::Credential,
53        enums::{
54            BybitEnvironment, BybitOrderSide, BybitOrderType, BybitProductType, BybitTimeInForce,
55            BybitTriggerType, BybitWsOrderRequestOp,
56        },
57        parse::extract_raw_symbol,
58        symbol::BybitSymbol,
59        urls::{bybit_ws_private_url, bybit_ws_public_url, bybit_ws_trade_url},
60    },
61    websocket::{
62        auth::{AUTHENTICATION_TIMEOUT_SECS, AuthTracker},
63        cache,
64        enums::BybitWsOperation,
65        error::{BybitWsError, BybitWsResult},
66        messages::{
67            BybitAuthRequest, BybitSubscription, BybitWebSocketError, BybitWebSocketMessage,
68            BybitWsAccountExecutionMsg, BybitWsAccountOrderMsg, BybitWsAccountPositionMsg,
69            BybitWsAccountWalletMsg, BybitWsAmendOrderParams, BybitWsAuthResponse,
70            BybitWsCancelOrderParams, BybitWsHeader, BybitWsKlineMsg, BybitWsOrderbookDepthMsg,
71            BybitWsPlaceOrderParams, BybitWsRequest, BybitWsResponse, BybitWsSubscriptionMsg,
72            BybitWsTickerLinearMsg, BybitWsTickerOptionMsg, BybitWsTradeMsg,
73        },
74        subscription::SubscriptionState,
75    },
76};
77
78const MAX_ARGS_PER_SUBSCRIPTION_REQUEST: usize = 10;
79const DEFAULT_HEARTBEAT_SECS: u64 = 20;
80const WEBSOCKET_AUTH_WINDOW_MS: i64 = 5_000;
81
82/// Determines if a Bybit WebSocket error should trigger a retry.
83fn should_retry_bybit_error(error: &BybitWsError) -> bool {
84    match error {
85        BybitWsError::Transport(_) => true, // Network errors are retryable
86        BybitWsError::Send(_) => true,      // Send errors are retryable
87        BybitWsError::ClientError(msg) => {
88            // Retry on timeout and connection errors (case-insensitive)
89            let msg_lower = msg.to_lowercase();
90            msg_lower.contains("timeout")
91                || msg_lower.contains("timed out")
92                || msg_lower.contains("connection")
93                || msg_lower.contains("network")
94        }
95        BybitWsError::NotConnected => true, // Connection issues are retryable
96        BybitWsError::Authentication(_) | BybitWsError::Json(_) => {
97            // Don't retry authentication or parsing errors automatically
98            false
99        }
100    }
101}
102
103/// Creates a timeout error for Bybit operations.
104fn create_bybit_timeout_error(msg: String) -> BybitWsError {
105    BybitWsError::ClientError(msg)
106}
107
108/// Public/market data WebSocket client for Bybit.
109#[cfg_attr(feature = "python", pyo3::pyclass)]
110pub struct BybitWebSocketClient {
111    url: String,
112    environment: BybitEnvironment,
113    product_type: Option<BybitProductType>,
114    credential: Option<Credential>,
115    requires_auth: bool,
116    auth_tracker: AuthTracker,
117    heartbeat: Option<u64>,
118    inner: Arc<RwLock<Option<WebSocketClient>>>,
119    rx: Option<tokio::sync::mpsc::UnboundedReceiver<BybitWebSocketMessage>>,
120    signal: Arc<AtomicBool>,
121    task_handle: Option<tokio::task::JoinHandle<()>>,
122    subscriptions: SubscriptionState,
123    is_authenticated: Arc<AtomicBool>,
124    instruments_cache: Arc<DashMap<InstrumentId, InstrumentAny>>,
125    account_id: Option<AccountId>,
126    quote_cache: Arc<RwLock<cache::QuoteCache>>,
127    retry_manager: Arc<RetryManager<BybitWsError>>,
128    cancellation_token: CancellationToken,
129}
130
131impl fmt::Debug for BybitWebSocketClient {
132    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
133        f.debug_struct("BybitWebSocketClient")
134            .field("url", &self.url)
135            .field("environment", &self.environment)
136            .field("product_type", &self.product_type)
137            .field("requires_auth", &self.requires_auth)
138            .field("heartbeat", &self.heartbeat)
139            .field("confirmed_subscriptions", &self.subscriptions.len())
140            .finish()
141    }
142}
143
144impl Clone for BybitWebSocketClient {
145    fn clone(&self) -> Self {
146        Self {
147            url: self.url.clone(),
148            environment: self.environment,
149            product_type: self.product_type,
150            credential: self.credential.clone(),
151            requires_auth: self.requires_auth,
152            auth_tracker: self.auth_tracker.clone(),
153            heartbeat: self.heartbeat,
154            inner: Arc::clone(&self.inner),
155            rx: None, // Each clone gets its own receiver
156            signal: Arc::clone(&self.signal),
157            task_handle: None, // Each clone gets its own task handle
158            subscriptions: self.subscriptions.clone(),
159            is_authenticated: Arc::clone(&self.is_authenticated),
160            instruments_cache: Arc::clone(&self.instruments_cache),
161            account_id: self.account_id,
162            quote_cache: Arc::clone(&self.quote_cache),
163            retry_manager: Arc::clone(&self.retry_manager),
164            cancellation_token: self.cancellation_token.clone(),
165        }
166    }
167}
168
169impl BybitWebSocketClient {
170    /// Creates a new Bybit public WebSocket client.
171    #[must_use]
172    pub fn new_public(url: Option<String>, heartbeat: Option<u64>) -> Self {
173        Self::new_public_with(
174            BybitProductType::Linear,
175            BybitEnvironment::Mainnet,
176            url,
177            heartbeat,
178        )
179    }
180
181    /// Creates a new Bybit public WebSocket client targeting the specified product/environment.
182    ///
183    /// # Panics
184    ///
185    /// Panics if the retry manager cannot be created.
186    #[must_use]
187    pub fn new_public_with(
188        product_type: BybitProductType,
189        environment: BybitEnvironment,
190        url: Option<String>,
191        heartbeat: Option<u64>,
192    ) -> Self {
193        Self {
194            url: url.unwrap_or_else(|| bybit_ws_public_url(product_type, environment)),
195            environment,
196            product_type: Some(product_type),
197            credential: None,
198            requires_auth: false,
199            auth_tracker: AuthTracker::new(),
200            heartbeat: heartbeat.or(Some(DEFAULT_HEARTBEAT_SECS)),
201            inner: Arc::new(RwLock::new(None)),
202            rx: None,
203            signal: Arc::new(AtomicBool::new(false)),
204            task_handle: None,
205            subscriptions: SubscriptionState::new(),
206            is_authenticated: Arc::new(AtomicBool::new(false)),
207            instruments_cache: Arc::new(DashMap::new()),
208            account_id: None,
209            quote_cache: Arc::new(RwLock::new(cache::QuoteCache::new())),
210            retry_manager: Arc::new(
211                create_websocket_retry_manager().expect("Failed to create retry manager"),
212            ),
213            cancellation_token: CancellationToken::new(),
214        }
215    }
216
217    /// Creates a new Bybit private WebSocket client.
218    ///
219    /// # Panics
220    ///
221    /// Panics if the retry manager cannot be created.
222    #[must_use]
223    pub fn new_private(
224        environment: BybitEnvironment,
225        credential: Credential,
226        url: Option<String>,
227        heartbeat: Option<u64>,
228    ) -> Self {
229        Self {
230            url: url.unwrap_or_else(|| bybit_ws_private_url(environment).to_string()),
231            environment,
232            product_type: None,
233            credential: Some(credential),
234            requires_auth: true,
235            auth_tracker: AuthTracker::new(),
236            heartbeat: heartbeat.or(Some(DEFAULT_HEARTBEAT_SECS)),
237            inner: Arc::new(RwLock::new(None)),
238            rx: None,
239            signal: Arc::new(AtomicBool::new(false)),
240            task_handle: None,
241            subscriptions: SubscriptionState::new(),
242            is_authenticated: Arc::new(AtomicBool::new(false)),
243            instruments_cache: Arc::new(DashMap::new()),
244            account_id: None,
245            quote_cache: Arc::new(RwLock::new(cache::QuoteCache::new())),
246            retry_manager: Arc::new(
247                create_websocket_retry_manager().expect("Failed to create retry manager"),
248            ),
249            cancellation_token: CancellationToken::new(),
250        }
251    }
252
253    /// Creates a new Bybit trade WebSocket client for order operations.
254    ///
255    /// # Panics
256    ///
257    /// Panics if the retry manager cannot be created.
258    #[must_use]
259    pub fn new_trade(
260        environment: BybitEnvironment,
261        credential: Credential,
262        url: Option<String>,
263        heartbeat: Option<u64>,
264    ) -> Self {
265        Self {
266            url: url.unwrap_or_else(|| bybit_ws_trade_url(environment).to_string()),
267            environment,
268            product_type: None,
269            credential: Some(credential),
270            requires_auth: true,
271            auth_tracker: AuthTracker::new(),
272            heartbeat: heartbeat.or(Some(DEFAULT_HEARTBEAT_SECS)),
273            inner: Arc::new(RwLock::new(None)),
274            rx: None,
275            signal: Arc::new(AtomicBool::new(false)),
276            task_handle: None,
277            subscriptions: SubscriptionState::new(),
278            is_authenticated: Arc::new(AtomicBool::new(false)),
279            instruments_cache: Arc::new(DashMap::new()),
280            account_id: None,
281            quote_cache: Arc::new(RwLock::new(cache::QuoteCache::new())),
282            retry_manager: Arc::new(
283                create_websocket_retry_manager().expect("Failed to create retry manager"),
284            ),
285            cancellation_token: CancellationToken::new(),
286        }
287    }
288
289    /// Establishes the WebSocket connection.
290    ///
291    /// # Errors
292    ///
293    /// Returns an error if the underlying WebSocket connection cannot be established.
294    ///
295    /// # Panics
296    ///
297    /// Panics if the ping message cannot be serialized to JSON.
298    pub async fn connect(&mut self) -> BybitWsResult<()> {
299        let (message_handler, mut message_rx) = channel_message_handler();
300
301        let inner_for_ping = Arc::clone(&self.inner);
302        let ping_handler: PingHandler = Arc::new(move |payload: Vec<u8>| {
303            let inner = Arc::clone(&inner_for_ping);
304            get_runtime().spawn(async move {
305                let len = payload.len();
306                let guard = inner.read().await;
307                if let Some(client) = guard.as_ref() {
308                    if let Err(err) = client.send_pong(payload).await {
309                        tracing::warn!(error = %err, "Failed to send pong frame");
310                    } else {
311                        tracing::trace!("Sent pong frame ({len} bytes)");
312                    }
313                }
314            });
315        });
316
317        let ping_msg = serde_json::to_string(&BybitSubscription {
318            op: BybitWsOperation::Ping,
319            args: vec![],
320        })
321        .expect("Failed to serialize ping message");
322
323        let config = WebSocketConfig {
324            url: self.url.clone(),
325            headers: Self::default_headers(),
326            message_handler: Some(message_handler),
327            heartbeat: self.heartbeat,
328            heartbeat_msg: Some(ping_msg),
329            ping_handler: Some(ping_handler),
330            reconnect_timeout_ms: Some(5_000),
331            reconnect_delay_initial_ms: Some(500),
332            reconnect_delay_max_ms: Some(5_000),
333            reconnect_backoff_factor: Some(1.5),
334            reconnect_jitter_ms: Some(250),
335        };
336
337        let client = WebSocketClient::connect(config, None, vec![], None)
338            .await
339            .map_err(BybitWsError::from)?;
340
341        {
342            let mut guard = self.inner.write().await;
343            *guard = Some(client);
344        }
345
346        let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel::<BybitWebSocketMessage>();
347        self.rx = Some(event_rx);
348        self.signal.store(false, Ordering::Relaxed);
349
350        let inner = Arc::clone(&self.inner);
351        let signal = Arc::clone(&self.signal);
352        let subscriptions = self.subscriptions.clone();
353        let auth_tracker = self.auth_tracker.clone();
354        let credential = self.credential.clone();
355        let requires_auth = self.requires_auth;
356        let is_authenticated = Arc::clone(&self.is_authenticated);
357        let quote_cache = Arc::clone(&self.quote_cache);
358
359        let task_handle = get_runtime().spawn(async move {
360            while let Some(message) = message_rx.recv().await {
361                if signal.load(Ordering::Relaxed) {
362                    break;
363                }
364
365                match BybitWebSocketClient::handle_message(
366                    &inner,
367                    &subscriptions,
368                    &auth_tracker,
369                    requires_auth,
370                    &is_authenticated,
371                    message,
372                )
373                .await
374                {
375                    Ok(Some(BybitWebSocketMessage::Reconnected)) => {
376                        tracing::info!("Handling WebSocket reconnection");
377
378                        let inner_for_task = inner.clone();
379                        let subscriptions_for_task = subscriptions.clone();
380                        let auth_tracker_for_task = auth_tracker.clone();
381                        let is_authenticated_for_task = is_authenticated.clone();
382                        let credential_for_task = credential.clone();
383                        let quote_cache_for_task = quote_cache.clone();
384                        let event_tx_for_task = event_tx.clone();
385
386                        get_runtime().spawn(async move {
387                            // Authenticate if required
388                            let auth_succeeded = if requires_auth {
389                                match BybitWebSocketClient::authenticate_inner(
390                                    &inner_for_task,
391                                    requires_auth,
392                                    credential_for_task,
393                                    &auth_tracker_for_task,
394                                    &is_authenticated_for_task,
395                                )
396                                .await
397                                {
398                                    Ok(()) => {
399                                        tracing::info!(
400                                            "Authentication successful after reconnect, proceeding with resubscription"
401                                        );
402                                        true
403                                    }
404                                    Err(e) => {
405                                        tracing::error!("Authentication after reconnect failed: {e}");
406                                        let error = BybitWebSocketError::from_message(e.to_string());
407                                        let _ = event_tx_for_task.send(BybitWebSocketMessage::Error(error));
408                                        false
409                                    }
410                                }
411                            } else {
412                                true
413                            };
414
415                            if !auth_succeeded {
416                                return;
417                            }
418
419                            // Clear the quote cache to prevent stale data after reconnection
420                            quote_cache_for_task.write().await.clear();
421
422                            // Resubscribe to all topics
423                            if let Err(err) = BybitWebSocketClient::resubscribe_all_inner(
424                                &inner_for_task,
425                                &subscriptions_for_task,
426                            )
427                            .await
428                            {
429                                tracing::error!("Failed to restore subscriptions after reconnection: {err}");
430                                let error = BybitWebSocketError::from_message(err.to_string());
431                                let _ = event_tx_for_task.send(BybitWebSocketMessage::Error(error));
432                            } else {
433                                tracing::info!("Restored subscriptions after reconnection");
434                                let _ = event_tx_for_task.send(BybitWebSocketMessage::Reconnected);
435                            }
436                        });
437                    }
438                    Ok(Some(event)) => {
439                        if event_tx.send(event).is_err() {
440                            break;
441                        }
442                    }
443                    Ok(None) => {}
444                    Err(err) => {
445                        let error = BybitWebSocketError::from_message(err.to_string());
446                        if event_tx.send(BybitWebSocketMessage::Error(error)).is_err() {
447                            break;
448                        }
449                    }
450                }
451            }
452        });
453
454        self.task_handle = Some(task_handle);
455
456        self.authenticate_if_required().await?;
457
458        // Resubscribe to any pre-registered topics (e.g. configured before connect).
459        if !self.subscriptions.is_empty() {
460            Self::resubscribe_all_inner(&self.inner, &self.subscriptions).await?;
461        }
462
463        Ok(())
464    }
465
466    /// Disconnects the WebSocket client and stops the background task.
467    pub async fn close(&mut self) -> BybitWsResult<()> {
468        self.signal.store(true, Ordering::Relaxed);
469
470        {
471            let inner_guard = self.inner.read().await;
472            if let Some(inner) = inner_guard.as_ref() {
473                inner.disconnect().await;
474            }
475        }
476
477        if let Some(handle) = self.task_handle.take()
478            && let Err(err) = handle.await
479        {
480            tracing::error!(error = %err, "Bybit websocket task terminated with error");
481        }
482
483        self.rx = None;
484        self.is_authenticated.store(false, Ordering::Relaxed);
485
486        Ok(())
487    }
488
489    /// Returns `true` when the underlying client is active.
490    #[must_use]
491    pub async fn is_active(&self) -> bool {
492        let guard = self.inner.read().await;
493        guard.as_ref().is_some_and(WebSocketClient::is_active)
494    }
495
496    /// Waits until the WebSocket client becomes active or times out.
497    ///
498    /// # Errors
499    ///
500    /// Returns an error if the timeout is exceeded before the client becomes active.
501    pub async fn wait_until_active(&self, timeout_secs: f64) -> BybitWsResult<()> {
502        let timeout = tokio::time::Duration::from_secs_f64(timeout_secs);
503
504        tokio::time::timeout(timeout, async {
505            while !self.is_active().await {
506                tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
507            }
508        })
509        .await
510        .map_err(|_| {
511            BybitWsError::ClientError(format!(
512                "WebSocket connection timeout after {timeout_secs} seconds"
513            ))
514        })?;
515
516        Ok(())
517    }
518
519    /// Subscribe to the provided topic strings.
520    pub async fn subscribe(&self, topics: Vec<String>) -> BybitWsResult<()> {
521        if topics.is_empty() {
522            return Ok(());
523        }
524
525        // Use reference counting to deduplicate subscriptions
526        let mut topics_to_send = Vec::new();
527
528        for topic in topics {
529            // Returns true if this is the first subscription (ref count 0 -> 1)
530            if self.subscriptions.add_reference(&topic) {
531                self.subscriptions.mark_subscribe(&topic);
532                topics_to_send.push(topic.clone());
533            } else {
534                tracing::debug!("Already subscribed to {topic}, skipping duplicate subscription");
535            }
536        }
537
538        if topics_to_send.is_empty() {
539            return Ok(());
540        }
541
542        Self::send_topics_inner(&self.inner, BybitWsOperation::Subscribe, topics_to_send).await
543    }
544
545    /// Unsubscribe from the provided topics.
546    pub async fn unsubscribe(&self, topics: Vec<String>) -> BybitWsResult<()> {
547        if topics.is_empty() {
548            return Ok(());
549        }
550
551        // Use reference counting to avoid unsubscribing while other consumers still need the topic
552        let mut topics_to_send = Vec::new();
553
554        for topic in topics {
555            // Returns true if this was the last subscription (ref count 1 -> 0)
556            if self.subscriptions.remove_reference(&topic) {
557                self.subscriptions.mark_unsubscribe(&topic);
558                topics_to_send.push(topic.clone());
559            } else {
560                tracing::debug!("Topic {topic} still has active subscriptions, not unsubscribing");
561            }
562        }
563
564        if topics_to_send.is_empty() {
565            return Ok(());
566        }
567
568        Self::send_topics_inner(&self.inner, BybitWsOperation::Unsubscribe, topics_to_send).await
569    }
570
571    /// Returns a stream of parsed [`BybitWebSocketMessage`] items.
572    ///
573    /// # Panics
574    ///
575    /// Panics if called before [`Self::connect`] or if the stream has already been taken.
576    pub fn stream(
577        &mut self,
578    ) -> impl futures_util::Stream<Item = BybitWebSocketMessage> + Send + 'static {
579        let rx = self
580            .rx
581            .take()
582            .expect("Stream receiver already taken or client not connected");
583
584        async_stream::stream! {
585            let mut rx = rx;
586            while let Some(event) = rx.recv().await {
587                yield event;
588            }
589        }
590    }
591
592    /// Returns the number of currently registered subscriptions.
593    #[must_use]
594    pub fn subscription_count(&self) -> usize {
595        self.subscriptions.len()
596    }
597
598    /// Adds an instrument to the cache for parsing WebSocket messages.
599    pub fn add_instrument(&self, instrument: InstrumentAny) {
600        let instrument_id = instrument.id();
601        self.instruments_cache.insert(instrument_id, instrument);
602        tracing::debug!("Added instrument {instrument_id} to WebSocket client cache");
603    }
604
605    /// Returns a reference to the instruments cache.
606    #[must_use]
607    pub fn instruments(&self) -> &Arc<DashMap<InstrumentId, InstrumentAny>> {
608        &self.instruments_cache
609    }
610
611    /// Sets the account ID for account message parsing.
612    pub fn set_account_id(&mut self, account_id: AccountId) {
613        self.account_id = Some(account_id);
614    }
615
616    /// Returns the account ID if set.
617    #[must_use]
618    pub fn account_id(&self) -> Option<AccountId> {
619        self.account_id
620    }
621
622    /// Returns the product type for public connections.
623    #[must_use]
624    pub fn product_type(&self) -> Option<BybitProductType> {
625        self.product_type
626    }
627
628    /// Returns a reference to the quote cache.
629    #[must_use]
630    pub fn quote_cache(&self) -> &Arc<RwLock<cache::QuoteCache>> {
631        &self.quote_cache
632    }
633
634    /// Subscribes to orderbook updates for a specific instrument.
635    ///
636    /// # Errors
637    ///
638    /// Returns an error if the subscription request fails.
639    ///
640    /// # References
641    ///
642    /// <https://bybit-exchange.github.io/docs/v5/websocket/public/orderbook>
643    pub async fn subscribe_orderbook(
644        &self,
645        instrument_id: InstrumentId,
646        depth: u32,
647    ) -> BybitWsResult<()> {
648        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
649        let topic = format!("orderbook.{}.{}", depth, raw_symbol);
650        self.subscribe(vec![topic]).await
651    }
652
653    /// Unsubscribes from orderbook updates for a specific instrument.
654    pub async fn unsubscribe_orderbook(
655        &self,
656        instrument_id: InstrumentId,
657        depth: u32,
658    ) -> BybitWsResult<()> {
659        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
660        let topic = format!("orderbook.{}.{}", depth, raw_symbol);
661        self.unsubscribe(vec![topic]).await
662    }
663
664    /// Subscribes to public trade updates for a specific instrument.
665    ///
666    /// # Errors
667    ///
668    /// Returns an error if the subscription request fails.
669    ///
670    /// # References
671    ///
672    /// <https://bybit-exchange.github.io/docs/v5/websocket/public/trade>
673    pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
674        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
675        let topic = format!("publicTrade.{}", raw_symbol);
676        self.subscribe(vec![topic]).await
677    }
678
679    /// Unsubscribes from public trade updates for a specific instrument.
680    pub async fn unsubscribe_trades(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
681        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
682        let topic = format!("publicTrade.{}", raw_symbol);
683        self.unsubscribe(vec![topic]).await
684    }
685
686    /// Subscribes to ticker updates for a specific instrument.
687    ///
688    /// # Errors
689    ///
690    /// Returns an error if the subscription request fails.
691    ///
692    /// # References
693    ///
694    /// <https://bybit-exchange.github.io/docs/v5/websocket/public/ticker>
695    pub async fn subscribe_ticker(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
696        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
697        let topic = format!("tickers.{}", raw_symbol);
698        self.subscribe(vec![topic]).await
699    }
700
701    /// Unsubscribes from ticker updates for a specific instrument.
702    pub async fn unsubscribe_ticker(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
703        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
704        let topic = format!("tickers.{}", raw_symbol);
705        self.unsubscribe(vec![topic]).await
706    }
707
708    /// Subscribes to kline/candlestick updates for a specific instrument.
709    ///
710    /// # Errors
711    ///
712    /// Returns an error if the subscription request fails.
713    ///
714    /// # References
715    ///
716    /// <https://bybit-exchange.github.io/docs/v5/websocket/public/kline>
717    pub async fn subscribe_klines(
718        &self,
719        instrument_id: InstrumentId,
720        interval: impl Into<String>,
721    ) -> BybitWsResult<()> {
722        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
723        let topic = format!("kline.{}.{}", interval.into(), raw_symbol);
724        self.subscribe(vec![topic]).await
725    }
726
727    /// Unsubscribes from kline/candlestick updates for a specific instrument.
728    pub async fn unsubscribe_klines(
729        &self,
730        instrument_id: InstrumentId,
731        interval: impl Into<String>,
732    ) -> BybitWsResult<()> {
733        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
734        let topic = format!("kline.{}.{}", interval.into(), raw_symbol);
735        self.unsubscribe(vec![topic]).await
736    }
737
738    /// Subscribes to order updates.
739    ///
740    /// # Errors
741    ///
742    /// Returns an error if the subscription request fails or if not authenticated.
743    ///
744    /// # References
745    ///
746    /// <https://bybit-exchange.github.io/docs/v5/websocket/private/order>
747    pub async fn subscribe_orders(&self) -> BybitWsResult<()> {
748        if !self.requires_auth {
749            return Err(BybitWsError::Authentication(
750                "Order subscription requires authentication".to_string(),
751            ));
752        }
753        self.subscribe(vec!["order".to_string()]).await
754    }
755
756    /// Unsubscribes from order updates.
757    pub async fn unsubscribe_orders(&self) -> BybitWsResult<()> {
758        self.unsubscribe(vec!["order".to_string()]).await
759    }
760
761    /// Subscribes to execution/fill updates.
762    ///
763    /// # Errors
764    ///
765    /// Returns an error if the subscription request fails or if not authenticated.
766    ///
767    /// # References
768    ///
769    /// <https://bybit-exchange.github.io/docs/v5/websocket/private/execution>
770    pub async fn subscribe_executions(&self) -> BybitWsResult<()> {
771        if !self.requires_auth {
772            return Err(BybitWsError::Authentication(
773                "Execution subscription requires authentication".to_string(),
774            ));
775        }
776        self.subscribe(vec!["execution".to_string()]).await
777    }
778
779    /// Unsubscribes from execution/fill updates.
780    pub async fn unsubscribe_executions(&self) -> BybitWsResult<()> {
781        self.unsubscribe(vec!["execution".to_string()]).await
782    }
783
784    /// Subscribes to position updates.
785    ///
786    /// # Errors
787    ///
788    /// Returns an error if the subscription request fails or if not authenticated.
789    ///
790    /// # References
791    ///
792    /// <https://bybit-exchange.github.io/docs/v5/websocket/private/position>
793    pub async fn subscribe_positions(&self) -> BybitWsResult<()> {
794        if !self.requires_auth {
795            return Err(BybitWsError::Authentication(
796                "Position subscription requires authentication".to_string(),
797            ));
798        }
799        self.subscribe(vec!["position".to_string()]).await
800    }
801
802    /// Unsubscribes from position updates.
803    pub async fn unsubscribe_positions(&self) -> BybitWsResult<()> {
804        self.unsubscribe(vec!["position".to_string()]).await
805    }
806
807    /// Subscribes to wallet/balance updates.
808    ///
809    /// # Errors
810    ///
811    /// Returns an error if the subscription request fails or if not authenticated.
812    ///
813    /// # References
814    ///
815    /// <https://bybit-exchange.github.io/docs/v5/websocket/private/wallet>
816    pub async fn subscribe_wallet(&self) -> BybitWsResult<()> {
817        if !self.requires_auth {
818            return Err(BybitWsError::Authentication(
819                "Wallet subscription requires authentication".to_string(),
820            ));
821        }
822        self.subscribe(vec!["wallet".to_string()]).await
823    }
824
825    /// Unsubscribes from wallet/balance updates.
826    pub async fn unsubscribe_wallet(&self) -> BybitWsResult<()> {
827        self.unsubscribe(vec!["wallet".to_string()]).await
828    }
829
830    /// Places an order via WebSocket.
831    ///
832    /// # Errors
833    ///
834    /// Returns an error if the order request fails or if not authenticated.
835    ///
836    /// # References
837    ///
838    /// <https://bybit-exchange.github.io/docs/v5/websocket/trade/guideline>
839    pub async fn place_order(&self, params: BybitWsPlaceOrderParams) -> BybitWsResult<()> {
840        if !self.is_authenticated.load(Ordering::Relaxed) {
841            return Err(BybitWsError::Authentication(
842                "Must be authenticated to place orders".to_string(),
843            ));
844        }
845
846        self.retry_manager
847            .execute_with_retry_with_cancel(
848                "place_order",
849                || {
850                    let params = params.clone();
851                    async move {
852                        let request = BybitWsRequest {
853                            op: BybitWsOrderRequestOp::Create,
854                            header: BybitWsHeader::now(),
855                            args: vec![params],
856                        };
857
858                        let payload =
859                            serde_json::to_string(&request).map_err(BybitWsError::from)?;
860                        tracing::debug!("Sending order WebSocket message: {}", payload);
861                        Self::send_text_inner(&self.inner, &payload).await
862                    }
863                },
864                should_retry_bybit_error,
865                create_bybit_timeout_error,
866                &self.cancellation_token,
867            )
868            .await
869    }
870
871    /// Amends an existing order via WebSocket.
872    ///
873    /// # Errors
874    ///
875    /// Returns an error if the amend request fails or if not authenticated.
876    ///
877    /// # References
878    ///
879    /// <https://bybit-exchange.github.io/docs/v5/websocket/trade/guideline>
880    pub async fn amend_order(&self, params: BybitWsAmendOrderParams) -> BybitWsResult<()> {
881        if !self.is_authenticated.load(Ordering::Relaxed) {
882            return Err(BybitWsError::Authentication(
883                "Must be authenticated to amend orders".to_string(),
884            ));
885        }
886
887        self.retry_manager
888            .execute_with_retry_with_cancel(
889                "amend_order",
890                || {
891                    let params = params.clone();
892                    async move {
893                        let request = BybitWsRequest {
894                            op: BybitWsOrderRequestOp::Amend,
895                            header: BybitWsHeader::now(),
896                            args: vec![params],
897                        };
898
899                        let payload =
900                            serde_json::to_string(&request).map_err(BybitWsError::from)?;
901                        Self::send_text_inner(&self.inner, &payload).await
902                    }
903                },
904                should_retry_bybit_error,
905                create_bybit_timeout_error,
906                &self.cancellation_token,
907            )
908            .await
909    }
910
911    /// Cancels an order via WebSocket.
912    ///
913    /// # Errors
914    ///
915    /// Returns an error if the cancel request fails or if not authenticated.
916    ///
917    /// # References
918    ///
919    /// <https://bybit-exchange.github.io/docs/v5/websocket/trade/guideline>
920    pub async fn cancel_order(&self, params: BybitWsCancelOrderParams) -> BybitWsResult<()> {
921        if !self.is_authenticated.load(Ordering::Relaxed) {
922            return Err(BybitWsError::Authentication(
923                "Must be authenticated to cancel orders".to_string(),
924            ));
925        }
926
927        self.retry_manager
928            .execute_with_retry_with_cancel(
929                "cancel_order",
930                || {
931                    let params = params.clone();
932                    async move {
933                        let request = BybitWsRequest {
934                            op: BybitWsOrderRequestOp::Cancel,
935                            header: BybitWsHeader::now(),
936                            args: vec![params],
937                        };
938
939                        let payload =
940                            serde_json::to_string(&request).map_err(BybitWsError::from)?;
941                        Self::send_text_inner(&self.inner, &payload).await
942                    }
943                },
944                should_retry_bybit_error,
945                create_bybit_timeout_error,
946                &self.cancellation_token,
947            )
948            .await
949    }
950
951    /// Batch creates multiple orders via WebSocket.
952    ///
953    /// # Errors
954    ///
955    /// Returns an error if the batch request fails or if not authenticated.
956    ///
957    /// # References
958    ///
959    /// <https://bybit-exchange.github.io/docs/v5/websocket/trade/guideline>
960    pub async fn batch_place_orders(
961        &self,
962        orders: Vec<BybitWsPlaceOrderParams>,
963    ) -> BybitWsResult<()> {
964        if !self.is_authenticated.load(Ordering::Relaxed) {
965            return Err(BybitWsError::Authentication(
966                "Must be authenticated to place orders".to_string(),
967            ));
968        }
969
970        if orders.len() > 20 {
971            return Err(BybitWsError::ClientError(
972                "Batch order limit is 20 orders per request".to_string(),
973            ));
974        }
975
976        let request = BybitWsRequest {
977            op: BybitWsOrderRequestOp::CreateBatch,
978            header: BybitWsHeader::now(),
979            args: orders,
980        };
981
982        let payload = serde_json::to_string(&request).map_err(BybitWsError::from)?;
983        Self::send_text_inner(&self.inner, &payload).await
984    }
985
986    /// Batch amends multiple orders via WebSocket.
987    ///
988    /// # Errors
989    ///
990    /// Returns an error if the batch request fails or if not authenticated.
991    pub async fn batch_amend_orders(
992        &self,
993        orders: Vec<BybitWsAmendOrderParams>,
994    ) -> BybitWsResult<()> {
995        if !self.is_authenticated.load(Ordering::Relaxed) {
996            return Err(BybitWsError::Authentication(
997                "Must be authenticated to amend orders".to_string(),
998            ));
999        }
1000
1001        if orders.len() > 20 {
1002            return Err(BybitWsError::ClientError(
1003                "Batch amend limit is 20 orders per request".to_string(),
1004            ));
1005        }
1006
1007        let request = BybitWsRequest {
1008            op: BybitWsOrderRequestOp::AmendBatch,
1009            header: BybitWsHeader::now(),
1010            args: orders,
1011        };
1012
1013        let payload = serde_json::to_string(&request).map_err(BybitWsError::from)?;
1014        Self::send_text_inner(&self.inner, &payload).await
1015    }
1016
1017    /// Batch cancels multiple orders via WebSocket.
1018    ///
1019    /// # Errors
1020    ///
1021    /// Returns an error if the batch request fails or if not authenticated.
1022    pub async fn batch_cancel_orders(
1023        &self,
1024        orders: Vec<BybitWsCancelOrderParams>,
1025    ) -> BybitWsResult<()> {
1026        if !self.is_authenticated.load(Ordering::Relaxed) {
1027            return Err(BybitWsError::Authentication(
1028                "Must be authenticated to cancel orders".to_string(),
1029            ));
1030        }
1031
1032        if orders.len() > 20 {
1033            return Err(BybitWsError::ClientError(
1034                "Batch cancel limit is 20 orders per request".to_string(),
1035            ));
1036        }
1037
1038        let request = BybitWsRequest {
1039            op: BybitWsOrderRequestOp::CancelBatch,
1040            header: BybitWsHeader::now(),
1041            args: orders,
1042        };
1043
1044        let payload = serde_json::to_string(&request).map_err(BybitWsError::from)?;
1045        Self::send_text_inner(&self.inner, &payload).await
1046    }
1047
1048    /// Submits an order using Nautilus domain objects.
1049    ///
1050    /// # Errors
1051    ///
1052    /// Returns an error if order submission fails or if not authenticated.
1053    #[allow(clippy::too_many_arguments)]
1054    pub async fn submit_order(
1055        &self,
1056        product_type: BybitProductType,
1057        instrument_id: InstrumentId,
1058        client_order_id: ClientOrderId,
1059        order_side: OrderSide,
1060        order_type: OrderType,
1061        quantity: Quantity,
1062        time_in_force: Option<TimeInForce>,
1063        price: Option<Price>,
1064        trigger_price: Option<Price>,
1065        post_only: Option<bool>,
1066        reduce_only: Option<bool>,
1067    ) -> BybitWsResult<()> {
1068        let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1069            .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1070        let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1071
1072        let bybit_side = match order_side {
1073            OrderSide::Buy => BybitOrderSide::Buy,
1074            OrderSide::Sell => BybitOrderSide::Sell,
1075            _ => {
1076                return Err(BybitWsError::ClientError(format!(
1077                    "Invalid order side: {order_side:?}"
1078                )));
1079            }
1080        };
1081
1082        // Determine the base order type for Bybit API
1083        // For stop/conditional orders, Bybit uses Market/Limit with trigger parameters
1084        let (bybit_order_type, is_stop_order) = match order_type {
1085            OrderType::Market => (BybitOrderType::Market, false),
1086            OrderType::Limit => (BybitOrderType::Limit, false),
1087            OrderType::StopMarket | OrderType::MarketIfTouched => (BybitOrderType::Market, true),
1088            OrderType::StopLimit | OrderType::LimitIfTouched => (BybitOrderType::Limit, true),
1089            _ => {
1090                return Err(BybitWsError::ClientError(format!(
1091                    "Unsupported order type: {order_type:?}"
1092                )));
1093            }
1094        };
1095
1096        // If post_only is true, use PostOnly time in force, otherwise use provided time_in_force
1097        let bybit_tif = if post_only == Some(true) {
1098            Some(BybitTimeInForce::PostOnly)
1099        } else if let Some(tif) = time_in_force {
1100            Some(match tif {
1101                TimeInForce::Gtc => BybitTimeInForce::Gtc,
1102                TimeInForce::Ioc => BybitTimeInForce::Ioc,
1103                TimeInForce::Fok => BybitTimeInForce::Fok,
1104                _ => {
1105                    return Err(BybitWsError::ClientError(format!(
1106                        "Unsupported time in force: {tif:?}"
1107                    )));
1108                }
1109            })
1110        } else {
1111            None
1112        };
1113
1114        let params = if is_stop_order {
1115            // For conditional orders, ALL types use triggerPrice field
1116            // sl_trigger_price/tp_trigger_price are only for TP/SL attached to regular orders
1117            BybitWsPlaceOrderParams {
1118                category: product_type,
1119                symbol: raw_symbol,
1120                side: bybit_side,
1121                order_type: bybit_order_type,
1122                qty: quantity.to_string(),
1123                price: price.map(|p| p.to_string()),
1124                time_in_force: bybit_tif,
1125                order_link_id: Some(client_order_id.to_string()),
1126                reduce_only: reduce_only.filter(|&r| r),
1127                close_on_trigger: None,
1128                trigger_price: trigger_price.map(|p| p.to_string()),
1129                trigger_by: Some(BybitTriggerType::LastPrice),
1130                trigger_direction: None,
1131                tpsl_mode: None, // Not needed for standalone conditional orders
1132                take_profit: None,
1133                stop_loss: None,
1134                tp_trigger_by: None,
1135                sl_trigger_by: None,
1136                sl_trigger_price: None, // Not used for standalone stop orders
1137                tp_trigger_price: None, // Not used for standalone stop orders
1138                sl_order_type: None,
1139                tp_order_type: None,
1140                sl_limit_price: None,
1141                tp_limit_price: None,
1142            }
1143        } else {
1144            // Regular market/limit orders
1145            BybitWsPlaceOrderParams {
1146                category: product_type,
1147                symbol: raw_symbol,
1148                side: bybit_side,
1149                order_type: bybit_order_type,
1150                qty: quantity.to_string(),
1151                price: price.map(|p| p.to_string()),
1152                time_in_force: bybit_tif,
1153                order_link_id: Some(client_order_id.to_string()),
1154                reduce_only: reduce_only.filter(|&r| r),
1155                close_on_trigger: None,
1156                trigger_price: None,
1157                trigger_by: None,
1158                trigger_direction: None,
1159                tpsl_mode: None,
1160                take_profit: None,
1161                stop_loss: None,
1162                tp_trigger_by: None,
1163                sl_trigger_by: None,
1164                sl_trigger_price: None,
1165                tp_trigger_price: None,
1166                sl_order_type: None,
1167                tp_order_type: None,
1168                sl_limit_price: None,
1169                tp_limit_price: None,
1170            }
1171        };
1172
1173        self.place_order(params).await
1174    }
1175
1176    /// Modifies an existing order using Nautilus domain objects.
1177    ///
1178    /// # Errors
1179    ///
1180    /// Returns an error if modification fails or if not authenticated.
1181    pub async fn modify_order(
1182        &self,
1183        product_type: BybitProductType,
1184        instrument_id: InstrumentId,
1185        venue_order_id: Option<VenueOrderId>,
1186        client_order_id: Option<ClientOrderId>,
1187        quantity: Option<Quantity>,
1188        price: Option<Price>,
1189    ) -> BybitWsResult<()> {
1190        let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1191            .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1192        let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1193
1194        let params = BybitWsAmendOrderParams {
1195            category: product_type,
1196            symbol: raw_symbol,
1197            order_id: venue_order_id.map(|id| id.to_string()),
1198            order_link_id: client_order_id.map(|id| id.to_string()),
1199            qty: quantity.map(|q| q.to_string()),
1200            price: price.map(|p| p.to_string()),
1201            trigger_price: None,
1202            take_profit: None,
1203            stop_loss: None,
1204            tp_trigger_by: None,
1205            sl_trigger_by: None,
1206        };
1207
1208        self.amend_order(params).await
1209    }
1210
1211    /// Cancels an order using Nautilus domain objects.
1212    ///
1213    /// # Errors
1214    ///
1215    /// Returns an error if cancellation fails or if not authenticated.
1216    pub async fn cancel_order_by_id(
1217        &self,
1218        product_type: BybitProductType,
1219        instrument_id: InstrumentId,
1220        venue_order_id: Option<VenueOrderId>,
1221        client_order_id: Option<ClientOrderId>,
1222    ) -> BybitWsResult<()> {
1223        let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1224            .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1225        let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1226
1227        let params = BybitWsCancelOrderParams {
1228            category: product_type,
1229            symbol: raw_symbol,
1230            order_id: venue_order_id.map(|id| id.to_string()),
1231            order_link_id: client_order_id.map(|id| id.to_string()),
1232        };
1233
1234        self.cancel_order(params).await
1235    }
1236
1237    fn default_headers() -> Vec<(String, String)> {
1238        vec![
1239            ("Content-Type".to_string(), "application/json".to_string()),
1240            ("User-Agent".to_string(), NAUTILUS_USER_AGENT.to_string()),
1241            ("Referer".to_string(), BYBIT_NAUTILUS_BROKER_ID.to_string()),
1242        ]
1243    }
1244
1245    async fn authenticate_if_required(&self) -> BybitWsResult<()> {
1246        Self::authenticate_inner(
1247            &self.inner,
1248            self.requires_auth,
1249            self.credential.clone(),
1250            &self.auth_tracker,
1251            &self.is_authenticated,
1252        )
1253        .await
1254    }
1255
1256    async fn send_text_inner(
1257        inner: &Arc<RwLock<Option<WebSocketClient>>>,
1258        text: &str,
1259    ) -> BybitWsResult<()> {
1260        let guard = inner.read().await;
1261        let client = guard.as_ref().ok_or(BybitWsError::NotConnected)?;
1262        client
1263            .send_text(text.to_string(), None)
1264            .await
1265            .map_err(BybitWsError::from)
1266    }
1267
1268    async fn send_pong_inner(
1269        inner: &Arc<RwLock<Option<WebSocketClient>>>,
1270        payload: Vec<u8>,
1271    ) -> BybitWsResult<()> {
1272        let guard = inner.read().await;
1273        let client = guard.as_ref().ok_or(BybitWsError::NotConnected)?;
1274        client.send_pong(payload).await.map_err(BybitWsError::from)
1275    }
1276
1277    async fn send_topics_inner(
1278        inner: &Arc<RwLock<Option<WebSocketClient>>>,
1279        op: BybitWsOperation,
1280        topics: Vec<String>,
1281    ) -> BybitWsResult<()> {
1282        if topics.is_empty() {
1283            return Ok(());
1284        }
1285
1286        for chunk in topics.chunks(MAX_ARGS_PER_SUBSCRIPTION_REQUEST) {
1287            let subscription = BybitSubscription {
1288                op: op.clone(),
1289                args: chunk.to_vec(),
1290            };
1291            let payload = serde_json::to_string(&subscription)?;
1292            Self::send_text_inner(inner, &payload).await?;
1293        }
1294
1295        Ok(())
1296    }
1297
1298    async fn resubscribe_all_inner(
1299        inner: &Arc<RwLock<Option<WebSocketClient>>>,
1300        subscriptions: &SubscriptionState,
1301    ) -> BybitWsResult<()> {
1302        let topics = subscriptions.all_topics();
1303        if topics.is_empty() {
1304            return Ok(());
1305        }
1306
1307        tracing::info!(
1308            "Restoring {} subscriptions after reconnection",
1309            topics.len()
1310        );
1311        Self::send_topics_inner(inner, BybitWsOperation::Subscribe, topics).await
1312    }
1313
1314    async fn handle_message(
1315        inner: &Arc<RwLock<Option<WebSocketClient>>>,
1316        subscriptions: &SubscriptionState,
1317        auth_tracker: &AuthTracker,
1318        requires_auth: bool,
1319        is_authenticated: &Arc<AtomicBool>,
1320        message: Message,
1321    ) -> BybitWsResult<Option<BybitWebSocketMessage>> {
1322        match message {
1323            Message::Text(text) => {
1324                tracing::trace!("Bybit WS message: {text}");
1325
1326                if text == RECONNECTED {
1327                    tracing::debug!("Bybit websocket reconnected signal received");
1328                    return Ok(Some(BybitWebSocketMessage::Reconnected));
1329                }
1330
1331                if text.trim().eq_ignore_ascii_case(BYBIT_PONG) {
1332                    return Ok(Some(BybitWebSocketMessage::Pong));
1333                }
1334
1335                let value: Value = serde_json::from_str(&text).map_err(BybitWsError::from)?;
1336
1337                // Handle ping/pong
1338                if let Ok(op) = serde_json::from_value::<BybitWsOperation>(
1339                    value.get("op").cloned().unwrap_or(Value::Null),
1340                ) {
1341                    match op {
1342                        BybitWsOperation::Ping => {
1343                            let pong = BybitSubscription {
1344                                op: BybitWsOperation::Pong,
1345                                args: vec![],
1346                            };
1347                            let payload = serde_json::to_string(&pong)?;
1348                            Self::send_text_inner(inner, &payload).await?;
1349                            return Ok(None);
1350                        }
1351                        BybitWsOperation::Pong => {
1352                            return Ok(Some(BybitWebSocketMessage::Pong));
1353                        }
1354                        _ => {}
1355                    }
1356                }
1357
1358                if let Some(event) = Self::classify_message(&value) {
1359                    // Log raw JSON for error events to aid debugging
1360                    if matches!(event, BybitWebSocketMessage::Error(_)) {
1361                        tracing::debug!(
1362                            json = %serde_json::to_string(&value).unwrap_or_default(),
1363                            "Received error event from Bybit"
1364                        );
1365                    }
1366
1367                    if let BybitWebSocketMessage::Auth(auth) = &event {
1368                        // Auth is successful if either success=true OR retCode=0
1369                        let is_success =
1370                            auth.success.unwrap_or(false) || auth.ret_code.unwrap_or(-1) == 0;
1371
1372                        if is_success {
1373                            is_authenticated.store(true, Ordering::Relaxed);
1374                            auth_tracker.succeed();
1375                        } else {
1376                            is_authenticated.store(false, Ordering::Relaxed);
1377                            let message = auth
1378                                .ret_msg
1379                                .clone()
1380                                .unwrap_or_else(|| "Authentication failed".to_string());
1381                            auth_tracker.fail(message);
1382                        }
1383                    } else if let BybitWebSocketMessage::Subscription(sub_msg) = &event {
1384                        // Handle subscription/unsubscription confirmation
1385                        match sub_msg.op {
1386                            BybitWsOperation::Subscribe => {
1387                                let pending_topics = subscriptions.pending_subscribe_topics();
1388                                // Handle subscribe acknowledgment
1389                                if sub_msg.success {
1390                                    for topic in pending_topics {
1391                                        subscriptions.confirm_subscribe(&topic);
1392                                        tracing::debug!(topic = topic, "Subscription confirmed");
1393                                    }
1394                                } else {
1395                                    for topic in pending_topics {
1396                                        subscriptions.mark_failure(&topic);
1397                                        tracing::warn!(
1398                                            topic = topic,
1399                                            error = ?sub_msg.ret_msg,
1400                                            "Subscription failed, will retry on reconnect"
1401                                        );
1402                                    }
1403                                }
1404                            }
1405                            BybitWsOperation::Unsubscribe => {
1406                                let pending_topics = subscriptions.pending_unsubscribe_topics();
1407                                // Handle unsubscribe acknowledgment
1408                                if sub_msg.success {
1409                                    for topic in pending_topics {
1410                                        subscriptions.confirm_unsubscribe(&topic);
1411                                        tracing::debug!(topic = topic, "Unsubscription confirmed");
1412                                    }
1413                                } else {
1414                                    // Unsubscribe failed - venue still considers us subscribed
1415                                    // Clear from pending_unsubscribe and restore to confirmed
1416                                    for topic in pending_topics {
1417                                        subscriptions.confirm_unsubscribe(&topic); // Clear from pending_unsubscribe
1418                                        subscriptions.confirm_subscribe(&topic); // Restore to confirmed
1419                                        tracing::warn!(
1420                                            topic = topic,
1421                                            error = ?sub_msg.ret_msg,
1422                                            "Unsubscription failed, topic remains subscribed"
1423                                        );
1424                                    }
1425                                }
1426                            }
1427                            _ => {}
1428                        }
1429                    } else if let BybitWebSocketMessage::Error(err) = &event
1430                        && requires_auth
1431                        && !is_authenticated.load(Ordering::Relaxed)
1432                    {
1433                        auth_tracker.fail(err.message.clone());
1434                    }
1435                    if let BybitWebSocketMessage::Error(err) = &event {
1436                        tracing::warn!(
1437                            code = err.code,
1438                            message = %err.message,
1439                            conn_id = ?err.conn_id,
1440                            topic = ?err.topic,
1441                            req_id = ?err.req_id,
1442                            "Bybit websocket error"
1443                        );
1444                    }
1445                    return Ok(Some(event));
1446                }
1447
1448                Ok(Some(BybitWebSocketMessage::Raw(value)))
1449            }
1450            Message::Ping(payload) => {
1451                Self::send_pong_inner(inner, payload.to_vec()).await?;
1452                Ok(None)
1453            }
1454            Message::Pong(_) => Ok(Some(BybitWebSocketMessage::Pong)),
1455            Message::Binary(_) => Ok(None),
1456            Message::Close(_) => Ok(None),
1457            Message::Frame(_) => Ok(None),
1458        }
1459    }
1460
1461    fn classify_message(value: &Value) -> Option<BybitWebSocketMessage> {
1462        // Check for auth response first (by op field) to avoid confusion with subscription messages
1463        if let Ok(op) = serde_json::from_value::<BybitWsOperation>(
1464            value.get("op").cloned().unwrap_or(Value::Null),
1465        ) && op == BybitWsOperation::Auth
1466        {
1467            tracing::debug!(json = %value, "Detected auth message by op field");
1468            if let Ok(auth) = serde_json::from_value::<BybitWsAuthResponse>(value.clone()) {
1469                // Auth is successful if either success=true OR retCode=0
1470                let is_success = auth.success.unwrap_or(false) || auth.ret_code.unwrap_or(-1) == 0;
1471
1472                if is_success {
1473                    tracing::debug!("Auth successful, returning Auth message");
1474                    return Some(BybitWebSocketMessage::Auth(auth));
1475                }
1476                let resp = BybitWsResponse {
1477                    op: Some(auth.op.clone()),
1478                    topic: None,
1479                    success: auth.success,
1480                    conn_id: auth.conn_id.clone(),
1481                    req_id: None,
1482                    ret_code: auth.ret_code,
1483                    ret_msg: auth.ret_msg.clone(),
1484                };
1485                let error = BybitWebSocketError::from_response(&resp);
1486                return Some(BybitWebSocketMessage::Error(error));
1487            }
1488        }
1489
1490        if let Some(success) = value.get("success").and_then(Value::as_bool) {
1491            if success {
1492                if let Ok(msg) = serde_json::from_value::<BybitWsSubscriptionMsg>(value.clone()) {
1493                    return Some(BybitWebSocketMessage::Subscription(msg));
1494                }
1495            } else if let Ok(resp) = serde_json::from_value::<BybitWsResponse>(value.clone()) {
1496                let error = BybitWebSocketError::from_response(&resp);
1497                return Some(BybitWebSocketMessage::Error(error));
1498            }
1499        }
1500
1501        if (value.get("ret_code").is_some() || value.get("retCode").is_some())
1502            && let Ok(resp) = serde_json::from_value::<BybitWsResponse>(value.clone())
1503        {
1504            if resp.ret_code.unwrap_or_default() != 0 {
1505                let error = BybitWebSocketError::from_response(&resp);
1506                return Some(BybitWebSocketMessage::Error(error));
1507            }
1508            return Some(BybitWebSocketMessage::Response(resp));
1509        }
1510
1511        if let Ok(auth) = serde_json::from_value::<BybitWsAuthResponse>(value.clone())
1512            && auth.op == BybitWsOperation::Auth
1513        {
1514            if auth.success.unwrap_or(false) {
1515                return Some(BybitWebSocketMessage::Auth(auth));
1516            }
1517            let resp = BybitWsResponse {
1518                op: Some(auth.op.clone()),
1519                topic: None,
1520                success: auth.success,
1521                conn_id: auth.conn_id.clone(),
1522                req_id: None,
1523                ret_code: auth.ret_code,
1524                ret_msg: auth.ret_msg.clone(),
1525            };
1526            let error = BybitWebSocketError::from_response(&resp);
1527            return Some(BybitWebSocketMessage::Error(error));
1528        }
1529
1530        if let Some(topic) = value.get("topic").and_then(Value::as_str) {
1531            if topic.starts_with("orderbook") {
1532                if let Ok(msg) = serde_json::from_value::<BybitWsOrderbookDepthMsg>(value.clone()) {
1533                    return Some(BybitWebSocketMessage::Orderbook(msg));
1534                }
1535            } else if topic.contains("publicTrade") || topic.starts_with("trade") {
1536                if let Ok(msg) = serde_json::from_value::<BybitWsTradeMsg>(value.clone()) {
1537                    return Some(BybitWebSocketMessage::Trade(msg));
1538                }
1539            } else if topic.contains("kline") {
1540                if let Ok(msg) = serde_json::from_value::<BybitWsKlineMsg>(value.clone()) {
1541                    return Some(BybitWebSocketMessage::Kline(msg));
1542                }
1543            } else if topic.contains("tickers") {
1544                if let Ok(msg) = serde_json::from_value::<BybitWsTickerOptionMsg>(value.clone()) {
1545                    return Some(BybitWebSocketMessage::TickerOption(msg));
1546                }
1547                if let Ok(msg) = serde_json::from_value::<BybitWsTickerLinearMsg>(value.clone()) {
1548                    return Some(BybitWebSocketMessage::TickerLinear(msg));
1549                }
1550            } else if topic == "order" || topic.starts_with("order.") {
1551                match serde_json::from_value::<BybitWsAccountOrderMsg>(value.clone()) {
1552                    Ok(msg) => return Some(BybitWebSocketMessage::AccountOrder(msg)),
1553                    Err(e) => tracing::warn!("Failed to deserialize order message: {e}\n{value}"),
1554                }
1555            } else if topic == "execution" || topic.starts_with("execution.") {
1556                match serde_json::from_value::<BybitWsAccountExecutionMsg>(value.clone()) {
1557                    Ok(msg) => return Some(BybitWebSocketMessage::AccountExecution(msg)),
1558                    Err(e) => {
1559                        tracing::warn!("Failed to deserialize execution message: {e}\n{value}")
1560                    }
1561                }
1562            } else if topic == "wallet" || topic.starts_with("wallet.") {
1563                match serde_json::from_value::<BybitWsAccountWalletMsg>(value.clone()) {
1564                    Ok(msg) => return Some(BybitWebSocketMessage::AccountWallet(msg)),
1565                    Err(e) => tracing::warn!("Failed to deserialize wallet message: {e}\n{value}"),
1566                }
1567            } else if topic == "position" || topic.starts_with("position.") {
1568                match serde_json::from_value::<BybitWsAccountPositionMsg>(value.clone()) {
1569                    Ok(msg) => return Some(BybitWebSocketMessage::AccountPosition(msg)),
1570                    Err(e) => {
1571                        tracing::warn!("Failed to deserialize position message: {e}\n{value}")
1572                    }
1573                }
1574            }
1575        }
1576
1577        None
1578    }
1579
1580    async fn authenticate_inner(
1581        inner: &Arc<RwLock<Option<WebSocketClient>>>,
1582        requires_auth: bool,
1583        credential: Option<Credential>,
1584        auth_tracker: &AuthTracker,
1585        is_authenticated: &Arc<AtomicBool>,
1586    ) -> BybitWsResult<()> {
1587        if !requires_auth {
1588            return Ok(());
1589        }
1590
1591        is_authenticated.store(false, Ordering::Relaxed);
1592
1593        let credential = credential.ok_or_else(|| {
1594            BybitWsError::Authentication(
1595                "API credentials not provided for authentication".to_string(),
1596            )
1597        })?;
1598
1599        let receiver = auth_tracker.begin();
1600
1601        let now_ns = get_atomic_clock_realtime().get_time_ns().as_i64();
1602        let now_ms = now_ns / 1_000_000;
1603        let expires = now_ms + WEBSOCKET_AUTH_WINDOW_MS;
1604        let signature = credential.sign_websocket_auth(expires);
1605
1606        let auth_request = BybitAuthRequest {
1607            op: BybitWsOperation::Auth,
1608            args: vec![
1609                json!(credential.api_key().as_str()),
1610                json!(expires),
1611                json!(signature),
1612            ],
1613        };
1614
1615        let payload = serde_json::to_string(&auth_request)?;
1616
1617        if let Err(err) = Self::send_text_inner(inner, &payload).await {
1618            auth_tracker.fail(err.to_string());
1619            return Err(err);
1620        }
1621
1622        match auth_tracker
1623            .wait_for_result(Duration::from_secs(AUTHENTICATION_TIMEOUT_SECS), receiver)
1624            .await
1625        {
1626            Ok(()) => {
1627                is_authenticated.store(true, Ordering::Relaxed);
1628                Ok(())
1629            }
1630            Err(err) => {
1631                is_authenticated.store(false, Ordering::Relaxed);
1632                Err(err)
1633            }
1634        }
1635    }
1636}
1637
1638////////////////////////////////////////////////////////////////////////////////
1639// Tests
1640////////////////////////////////////////////////////////////////////////////////
1641
1642#[cfg(test)]
1643mod tests {
1644    use rstest::rstest;
1645
1646    use super::*;
1647    use crate::common::testing::load_test_json;
1648
1649    #[rstest]
1650    fn classify_orderbook_snapshot() {
1651        let json: Value = serde_json::from_str(&load_test_json("ws_orderbook_snapshot.json"))
1652            .expect("invalid fixture");
1653        let message =
1654            BybitWebSocketClient::classify_message(&json).expect("expected orderbook message");
1655        assert!(matches!(message, BybitWebSocketMessage::Orderbook(_)));
1656    }
1657
1658    #[rstest]
1659    fn classify_trade_snapshot() {
1660        let json: Value =
1661            serde_json::from_str(&load_test_json("ws_public_trade.json")).expect("invalid fixture");
1662        let message =
1663            BybitWebSocketClient::classify_message(&json).expect("expected trade message");
1664        assert!(matches!(message, BybitWebSocketMessage::Trade(_)));
1665    }
1666
1667    #[rstest]
1668    fn classify_ticker_linear_snapshot() {
1669        let json: Value = serde_json::from_str(&load_test_json("ws_ticker_linear.json"))
1670            .expect("invalid fixture");
1671        let message =
1672            BybitWebSocketClient::classify_message(&json).expect("expected ticker message");
1673        assert!(matches!(message, BybitWebSocketMessage::TickerLinear(_)));
1674    }
1675
1676    #[rstest]
1677    fn classify_ticker_option_snapshot() {
1678        let json: Value = serde_json::from_str(&load_test_json("ws_ticker_option.json"))
1679            .expect("invalid fixture");
1680        let message =
1681            BybitWebSocketClient::classify_message(&json).expect("expected ticker message");
1682        assert!(matches!(message, BybitWebSocketMessage::TickerOption(_)));
1683    }
1684}