nautilus_bybit/websocket/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Bybit WebSocket client providing public market data streaming.
17//!
18//! Bybit API reference <https://bybit-exchange.github.io/docs/>.
19
20use std::{
21    fmt::Debug,
22    sync::{
23        Arc,
24        atomic::{AtomicBool, AtomicU8, Ordering},
25    },
26    time::Duration,
27};
28
29use ahash::AHashMap;
30use arc_swap::ArcSwap;
31use dashmap::DashMap;
32use nautilus_common::runtime::get_runtime;
33use nautilus_core::{UUID4, consts::NAUTILUS_USER_AGENT};
34use nautilus_model::{
35    enums::{OrderSide, OrderType, TimeInForce},
36    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
37    instruments::{Instrument, InstrumentAny},
38    types::{Price, Quantity},
39};
40use nautilus_network::{
41    backoff::ExponentialBackoff,
42    mode::ConnectionMode,
43    websocket::{
44        AuthTracker, PingHandler, SubscriptionState, WebSocketClient, WebSocketConfig,
45        channel_message_handler,
46    },
47};
48use serde_json::Value;
49use tokio::sync::RwLock;
50use tokio_util::sync::CancellationToken;
51use ustr::Ustr;
52
53use crate::{
54    common::{
55        consts::{
56            BYBIT_BASE_COIN, BYBIT_NAUTILUS_BROKER_ID, BYBIT_QUOTE_COIN, BYBIT_WS_TOPIC_DELIMITER,
57        },
58        credential::Credential,
59        enums::{
60            BybitEnvironment, BybitOrderSide, BybitOrderType, BybitProductType, BybitTimeInForce,
61            BybitTriggerDirection, BybitTriggerType, BybitWsOrderRequestOp,
62        },
63        parse::{extract_raw_symbol, make_bybit_symbol},
64        symbol::BybitSymbol,
65        urls::{bybit_ws_private_url, bybit_ws_public_url, bybit_ws_trade_url},
66    },
67    websocket::{
68        enums::{BybitWsOperation, BybitWsPrivateChannel, BybitWsPublicChannel},
69        error::{BybitWsError, BybitWsResult},
70        handler::{FeedHandler, HandlerCommand},
71        messages::{
72            BybitAuthRequest, BybitSubscription, BybitWsAmendOrderParams, BybitWsBatchCancelItem,
73            BybitWsBatchCancelOrderArgs, BybitWsBatchPlaceItem, BybitWsBatchPlaceOrderArgs,
74            BybitWsCancelOrderParams, BybitWsHeader, BybitWsPlaceOrderParams, BybitWsRequest,
75            NautilusWsMessage,
76        },
77    },
78};
79
80const DEFAULT_HEARTBEAT_SECS: u64 = 20;
81const WEBSOCKET_AUTH_WINDOW_MS: i64 = 5_000;
82const BATCH_PROCESSING_LIMIT: usize = 20;
83
84/// Type alias for the funding rate cache.
85type FundingCache = Arc<RwLock<AHashMap<Ustr, (Option<String>, Option<String>)>>>;
86
87/// Public/market data WebSocket client for Bybit.
88#[cfg_attr(feature = "python", pyo3::pyclass)]
89pub struct BybitWebSocketClient {
90    url: String,
91    environment: BybitEnvironment,
92    product_type: Option<BybitProductType>,
93    credential: Option<Credential>,
94    requires_auth: bool,
95    auth_tracker: AuthTracker,
96    heartbeat: Option<u64>,
97    connection_mode: Arc<ArcSwap<AtomicU8>>,
98    cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
99    out_rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>>,
100    signal: Arc<AtomicBool>,
101    task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
102    subscriptions: SubscriptionState,
103    is_authenticated: Arc<AtomicBool>,
104    account_id: Option<AccountId>,
105    mm_level: Arc<AtomicU8>,
106    instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
107    funding_cache: FundingCache,
108    cancellation_token: CancellationToken,
109}
110
111impl Debug for BybitWebSocketClient {
112    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
113        f.debug_struct("BybitWebSocketClient")
114            .field("url", &self.url)
115            .field("environment", &self.environment)
116            .field("product_type", &self.product_type)
117            .field("requires_auth", &self.requires_auth)
118            .field("heartbeat", &self.heartbeat)
119            .field("confirmed_subscriptions", &self.subscriptions.len())
120            .finish()
121    }
122}
123
124impl Clone for BybitWebSocketClient {
125    fn clone(&self) -> Self {
126        Self {
127            url: self.url.clone(),
128            environment: self.environment,
129            product_type: self.product_type,
130            credential: self.credential.clone(),
131            requires_auth: self.requires_auth,
132            auth_tracker: self.auth_tracker.clone(),
133            heartbeat: self.heartbeat,
134            connection_mode: Arc::clone(&self.connection_mode),
135            cmd_tx: Arc::clone(&self.cmd_tx),
136            out_rx: None, // Each clone gets its own receiver
137            signal: Arc::clone(&self.signal),
138            task_handle: None, // Each clone gets its own task handle
139            subscriptions: self.subscriptions.clone(),
140            is_authenticated: Arc::clone(&self.is_authenticated),
141            account_id: self.account_id,
142            mm_level: Arc::clone(&self.mm_level),
143            instruments_cache: Arc::clone(&self.instruments_cache),
144            funding_cache: Arc::clone(&self.funding_cache),
145            cancellation_token: self.cancellation_token.clone(),
146        }
147    }
148}
149
150impl BybitWebSocketClient {
151    /// Creates a new Bybit public WebSocket client.
152    #[must_use]
153    pub fn new_public(url: Option<String>, heartbeat: Option<u64>) -> Self {
154        Self::new_public_with(
155            BybitProductType::Linear,
156            BybitEnvironment::Mainnet,
157            url,
158            heartbeat,
159        )
160    }
161
162    /// Creates a new Bybit public WebSocket client targeting the specified product/environment.
163    #[must_use]
164    pub fn new_public_with(
165        product_type: BybitProductType,
166        environment: BybitEnvironment,
167        url: Option<String>,
168        heartbeat: Option<u64>,
169    ) -> Self {
170        // We don't have a handler yet; this placeholder keeps cache_instrument() working.
171        // connect() swaps in the real channel and replays any queued instruments so the
172        // handler sees them once it starts.
173        let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
174
175        let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
176        let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
177
178        Self {
179            url: url.unwrap_or_else(|| bybit_ws_public_url(product_type, environment)),
180            environment,
181            product_type: Some(product_type),
182            credential: None,
183            requires_auth: false,
184            auth_tracker: AuthTracker::new(),
185            heartbeat: heartbeat.or(Some(DEFAULT_HEARTBEAT_SECS)),
186            connection_mode,
187            cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
188            out_rx: None,
189            signal: Arc::new(AtomicBool::new(false)),
190            task_handle: None,
191            subscriptions: SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER),
192            is_authenticated: Arc::new(AtomicBool::new(false)),
193            instruments_cache: Arc::new(DashMap::new()),
194            account_id: None,
195            funding_cache: Arc::new(RwLock::new(AHashMap::new())),
196            cancellation_token: CancellationToken::new(),
197            mm_level: Arc::new(AtomicU8::new(0)),
198        }
199    }
200
201    /// Creates a new Bybit private WebSocket client.
202    #[must_use]
203    pub fn new_private(
204        environment: BybitEnvironment,
205        credential: Credential,
206        url: Option<String>,
207        heartbeat: Option<u64>,
208    ) -> Self {
209        // We don't have a handler yet; this placeholder keeps cache_instrument() working.
210        // connect() swaps in the real channel and replays any queued instruments so the
211        // handler sees them once it starts.
212        let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
213
214        let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
215        let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
216
217        Self {
218            url: url.unwrap_or_else(|| bybit_ws_private_url(environment).to_string()),
219            environment,
220            product_type: None,
221            credential: Some(credential),
222            requires_auth: true,
223            auth_tracker: AuthTracker::new(),
224            heartbeat: heartbeat.or(Some(DEFAULT_HEARTBEAT_SECS)),
225            connection_mode,
226            cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
227            out_rx: None,
228            signal: Arc::new(AtomicBool::new(false)),
229            task_handle: None,
230            subscriptions: SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER),
231            is_authenticated: Arc::new(AtomicBool::new(false)),
232            instruments_cache: Arc::new(DashMap::new()),
233            account_id: None,
234            funding_cache: Arc::new(RwLock::new(AHashMap::new())),
235            cancellation_token: CancellationToken::new(),
236            mm_level: Arc::new(AtomicU8::new(0)),
237        }
238    }
239
240    /// Creates a new Bybit trade WebSocket client for order operations.
241    #[must_use]
242    pub fn new_trade(
243        environment: BybitEnvironment,
244        credential: Credential,
245        url: Option<String>,
246        heartbeat: Option<u64>,
247    ) -> Self {
248        // We don't have a handler yet; this placeholder keeps cache_instrument() working.
249        // connect() swaps in the real channel and replays any queued instruments so the
250        // handler sees them once it starts.
251        let (cmd_tx, _) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
252
253        let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
254        let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
255
256        Self {
257            url: url.unwrap_or_else(|| bybit_ws_trade_url(environment).to_string()),
258            environment,
259            product_type: None,
260            credential: Some(credential),
261            requires_auth: true,
262            auth_tracker: AuthTracker::new(),
263            heartbeat: heartbeat.or(Some(DEFAULT_HEARTBEAT_SECS)),
264            connection_mode,
265            cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
266            out_rx: None,
267            signal: Arc::new(AtomicBool::new(false)),
268            task_handle: None,
269            subscriptions: SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER),
270            is_authenticated: Arc::new(AtomicBool::new(false)),
271            instruments_cache: Arc::new(DashMap::new()),
272            account_id: None,
273            funding_cache: Arc::new(RwLock::new(AHashMap::new())),
274            cancellation_token: CancellationToken::new(),
275            mm_level: Arc::new(AtomicU8::new(0)),
276        }
277    }
278
279    /// Establishes the WebSocket connection.
280    ///
281    /// # Errors
282    ///
283    /// Returns an error if the underlying WebSocket connection cannot be established,
284    /// after retrying multiple times with exponential backoff.
285    pub async fn connect(&mut self) -> BybitWsResult<()> {
286        self.signal.store(false, Ordering::Relaxed);
287
288        let (raw_handler, raw_rx) = channel_message_handler();
289
290        // No-op ping handler: handler owns the WebSocketClient and responds to pings directly
291        // in the message loop for minimal latency (see handler.rs pong response)
292        let ping_handler: PingHandler = Arc::new(move |_payload: Vec<u8>| {
293            // Handler responds to pings internally via select! loop
294        });
295
296        let ping_msg = serde_json::to_string(&BybitSubscription {
297            op: BybitWsOperation::Ping,
298            args: vec![],
299        })?;
300
301        let config = WebSocketConfig {
302            url: self.url.clone(),
303            headers: Self::default_headers(),
304            message_handler: Some(raw_handler),
305            heartbeat: self.heartbeat,
306            heartbeat_msg: Some(ping_msg),
307            ping_handler: Some(ping_handler),
308            reconnect_timeout_ms: Some(5_000),
309            reconnect_delay_initial_ms: Some(500),
310            reconnect_delay_max_ms: Some(5_000),
311            reconnect_backoff_factor: Some(1.5),
312            reconnect_jitter_ms: Some(250),
313            reconnect_max_attempts: None,
314        };
315
316        // Retry initial connection with exponential backoff to handle transient DNS/network issues
317        // TODO: Eventually expose client config options for this
318        const MAX_RETRIES: u32 = 5;
319        const CONNECTION_TIMEOUT_SECS: u64 = 10;
320
321        let mut backoff = ExponentialBackoff::new(
322            Duration::from_millis(500),
323            Duration::from_millis(5000),
324            2.0,
325            250,
326            false,
327        )
328        .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
329
330        #[allow(unused_assignments)]
331        let mut last_error = String::new();
332        let mut attempt = 0;
333        let client = loop {
334            attempt += 1;
335
336            match tokio::time::timeout(
337                Duration::from_secs(CONNECTION_TIMEOUT_SECS),
338                WebSocketClient::connect(config.clone(), None, vec![], None),
339            )
340            .await
341            {
342                Ok(Ok(client)) => {
343                    if attempt > 1 {
344                        tracing::info!("WebSocket connection established after {attempt} attempts");
345                    }
346                    break client;
347                }
348                Ok(Err(e)) => {
349                    last_error = e.to_string();
350                    tracing::warn!(
351                        attempt,
352                        max_retries = MAX_RETRIES,
353                        url = %self.url,
354                        error = %last_error,
355                        "WebSocket connection attempt failed"
356                    );
357                }
358                Err(_) => {
359                    last_error = format!(
360                        "Connection timeout after {CONNECTION_TIMEOUT_SECS}s (possible DNS resolution failure)"
361                    );
362                    tracing::warn!(
363                        attempt,
364                        max_retries = MAX_RETRIES,
365                        url = %self.url,
366                        "WebSocket connection attempt timed out"
367                    );
368                }
369            }
370
371            if attempt >= MAX_RETRIES {
372                return Err(BybitWsError::Transport(format!(
373                    "Failed to connect to {} after {MAX_RETRIES} attempts: {}. \
374                    If this is a DNS error, check your network configuration and DNS settings.",
375                    self.url,
376                    if last_error.is_empty() {
377                        "unknown error"
378                    } else {
379                        &last_error
380                    }
381                )));
382            }
383
384            let delay = backoff.next_duration();
385            tracing::debug!(
386                "Retrying in {delay:?} (attempt {}/{MAX_RETRIES})",
387                attempt + 1
388            );
389            tokio::time::sleep(delay).await;
390        };
391
392        self.connection_mode.store(client.connection_mode_atomic());
393
394        let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
395        self.out_rx = Some(Arc::new(out_rx));
396
397        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
398        *self.cmd_tx.write().await = cmd_tx.clone();
399
400        let cmd = HandlerCommand::SetClient(client);
401
402        self.send_cmd(cmd).await?;
403
404        // Replay cached instruments to the new handler via the new channel
405        if !self.instruments_cache.is_empty() {
406            let cached_instruments: Vec<InstrumentAny> = self
407                .instruments_cache
408                .iter()
409                .map(|entry| entry.value().clone())
410                .collect();
411            let cmd = HandlerCommand::InitializeInstruments(cached_instruments);
412            self.send_cmd(cmd).await?;
413        }
414
415        let signal = Arc::clone(&self.signal);
416        let subscriptions = self.subscriptions.clone();
417        let credential = self.credential.clone();
418        let requires_auth = self.requires_auth;
419        let funding_cache = Arc::clone(&self.funding_cache);
420        let account_id = self.account_id;
421        let product_type = self.product_type;
422        let mm_level = Arc::clone(&self.mm_level);
423        let cmd_tx_for_reconnect = cmd_tx.clone();
424        let auth_tracker = self.auth_tracker.clone();
425        let is_authenticated = Arc::clone(&self.is_authenticated);
426
427        let stream_handle = get_runtime().spawn(async move {
428            let mut handler = FeedHandler::new(
429                signal.clone(),
430                cmd_rx,
431                raw_rx,
432                out_tx.clone(),
433                account_id,
434                product_type,
435                mm_level.clone(),
436                auth_tracker,
437                subscriptions.clone(),
438                funding_cache.clone(),
439            );
440
441            // Helper closure to resubscribe all tracked subscriptions after reconnection
442            let resubscribe_all = || async {
443                let topics = subscriptions.all_topics();
444
445                if topics.is_empty() {
446                    return;
447                }
448
449                tracing::debug!(count = topics.len(), "Resubscribing to confirmed subscriptions");
450
451                for topic in &topics {
452                    subscriptions.mark_subscribe(topic.as_str());
453                }
454
455                let mut payloads = Vec::with_capacity(topics.len());
456                for topic in &topics {
457                    let message = BybitSubscription {
458                        op: BybitWsOperation::Subscribe,
459                        args: vec![topic.clone()],
460                    };
461                    if let Ok(payload) = serde_json::to_string(&message) {
462                        payloads.push(payload);
463                    }
464                }
465
466                let cmd = HandlerCommand::Subscribe { topics: payloads };
467
468                if let Err(e) = cmd_tx_for_reconnect.send(cmd) {
469                    tracing::error!("Failed to send resubscribe command: {e}");
470                }
471            };
472
473            // Run message processing with reconnection handling
474            loop {
475                match handler.next().await {
476                    Some(NautilusWsMessage::Reconnected) => {
477                        if signal.load(Ordering::Relaxed) {
478                            continue;
479                        }
480
481                        tracing::info!("WebSocket reconnected");
482
483                        // Mark all confirmed subscriptions as failed so they transition to pending state
484                        let confirmed_topics: Vec<String> = {
485                            let confirmed = subscriptions.confirmed();
486                            let mut topics = Vec::new();
487                            for entry in confirmed.iter() {
488                                let (channel, symbols) = entry.pair();
489                                for symbol in symbols.iter() {
490                                    if symbol.is_empty() {
491                                        topics.push(channel.to_string());
492                                    } else {
493                                        topics.push(format!("{channel}.{symbol}"));
494                                    }
495                                }
496                            }
497                            topics
498                        };
499
500                        if !confirmed_topics.is_empty() {
501                            tracing::debug!(count = confirmed_topics.len(), "Marking confirmed subscriptions as pending for replay");
502                            for topic in confirmed_topics {
503                                subscriptions.mark_failure(&topic);
504                            }
505                        }
506
507                        // Clear caches to prevent stale data after reconnection
508                        funding_cache.write().await.clear();
509
510                        if requires_auth {
511                            is_authenticated.store(false, Ordering::Relaxed);
512                            tracing::debug!("Re-authenticating after reconnection");
513
514                            if let Some(cred) = &credential {
515                                let expires = chrono::Utc::now().timestamp_millis() + WEBSOCKET_AUTH_WINDOW_MS;
516                                let signature = cred.sign_websocket_auth(expires);
517
518                                let auth_message = BybitAuthRequest {
519                                    op: BybitWsOperation::Auth,
520                                    args: vec![
521                                        Value::String(cred.api_key().to_string()),
522                                        Value::Number(expires.into()),
523                                        Value::String(signature),
524                                    ],
525                                };
526
527                                if let Ok(payload) = serde_json::to_string(&auth_message) {
528                                    let cmd = HandlerCommand::Authenticate { payload };
529                                    if let Err(e) = cmd_tx_for_reconnect.send(cmd) {
530                                        tracing::error!(error = %e, "Failed to send reconnection auth command");
531                                    }
532                                } else {
533                                    tracing::error!("Failed to serialize reconnection auth message");
534                                }
535                            }
536                        }
537
538                        // Unauthenticated sessions resubscribe immediately after reconnection,
539                        // authenticated sessions wait for Authenticated message
540                        if !requires_auth {
541                            tracing::debug!("No authentication required, resubscribing immediately");
542                            resubscribe_all().await;
543                        }
544
545                        // Forward to out_tx so caller sees the Reconnected message
546                        if out_tx.send(NautilusWsMessage::Reconnected).is_err() {
547                            tracing::debug!("Receiver dropped, stopping");
548                            break;
549                        }
550                        continue;
551                    }
552                    Some(NautilusWsMessage::Authenticated) => {
553                        tracing::debug!("Authenticated, resubscribing");
554                        is_authenticated.store(true, Ordering::Relaxed);
555                        resubscribe_all().await;
556                        continue;
557                    }
558                    Some(msg) => {
559                        if out_tx.send(msg).is_err() {
560                            tracing::error!("Failed to send message (receiver dropped)");
561                            break;
562                        }
563                    }
564                    None => {
565                        // Stream ended - check if it's a stop signal
566                        if handler.is_stopped() {
567                            tracing::debug!("Stop signal received, ending message processing");
568                            break;
569                        }
570                        // Otherwise it's an unexpected stream end
571                        tracing::warn!("WebSocket stream ended unexpectedly");
572                        break;
573                    }
574                }
575            }
576
577            tracing::debug!("Handler task exiting");
578        });
579
580        self.task_handle = Some(Arc::new(stream_handle));
581
582        if requires_auth && let Err(e) = self.authenticate_if_required().await {
583            return Err(e);
584        }
585
586        Ok(())
587    }
588
589    /// Disconnects the WebSocket client and stops the background task.
590    pub async fn close(&mut self) -> BybitWsResult<()> {
591        tracing::debug!("Starting close process");
592
593        self.signal.store(true, Ordering::Relaxed);
594
595        let cmd = HandlerCommand::Disconnect;
596        if let Err(e) = self.cmd_tx.read().await.send(cmd) {
597            tracing::debug!(
598                "Failed to send disconnect command (handler may already be shut down): {e}"
599            );
600        }
601
602        if let Some(task_handle) = self.task_handle.take() {
603            match Arc::try_unwrap(task_handle) {
604                Ok(handle) => {
605                    tracing::debug!("Waiting for task handle to complete");
606                    match tokio::time::timeout(Duration::from_secs(2), handle).await {
607                        Ok(Ok(())) => tracing::debug!("Task handle completed successfully"),
608                        Ok(Err(e)) => tracing::error!("Task handle encountered an error: {e:?}"),
609                        Err(_) => {
610                            tracing::warn!(
611                                "Timeout waiting for task handle, task may still be running"
612                            );
613                            // The task will be dropped and should clean up automatically
614                        }
615                    }
616                }
617                Err(arc_handle) => {
618                    tracing::debug!(
619                        "Cannot take ownership of task handle - other references exist, aborting task"
620                    );
621                    arc_handle.abort();
622                }
623            }
624        } else {
625            tracing::debug!("No task handle to await");
626        }
627
628        self.is_authenticated.store(false, Ordering::Relaxed);
629
630        tracing::debug!("Closed");
631
632        Ok(())
633    }
634
635    /// Returns a value indicating whether the client is active.
636    #[must_use]
637    pub fn is_active(&self) -> bool {
638        let connection_mode_arc = self.connection_mode.load();
639        ConnectionMode::from_atomic(&connection_mode_arc).is_active()
640            && !self.signal.load(Ordering::Relaxed)
641    }
642
643    /// Returns a value indicating whether the client is closed.
644    pub fn is_closed(&self) -> bool {
645        let connection_mode_arc = self.connection_mode.load();
646        ConnectionMode::from_atomic(&connection_mode_arc).is_closed()
647            || self.signal.load(Ordering::Relaxed)
648    }
649
650    /// Waits until the WebSocket client becomes active or times out.
651    ///
652    /// # Errors
653    ///
654    /// Returns an error if the timeout is exceeded before the client becomes active.
655    pub async fn wait_until_active(&self, timeout_secs: f64) -> BybitWsResult<()> {
656        let timeout = tokio::time::Duration::from_secs_f64(timeout_secs);
657
658        tokio::time::timeout(timeout, async {
659            while !self.is_active() {
660                tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
661            }
662        })
663        .await
664        .map_err(|_| {
665            BybitWsError::ClientError(format!(
666                "WebSocket connection timeout after {timeout_secs} seconds"
667            ))
668        })?;
669
670        Ok(())
671    }
672
673    /// Subscribe to the provided topic strings.
674    pub async fn subscribe(&self, topics: Vec<String>) -> BybitWsResult<()> {
675        if topics.is_empty() {
676            return Ok(());
677        }
678
679        tracing::debug!("Subscribing to topics: {topics:?}");
680
681        // Use reference counting to deduplicate subscriptions
682        let mut topics_to_send = Vec::new();
683
684        for topic in topics {
685            // Returns true if this is the first subscription (ref count 0 -> 1)
686            if self.subscriptions.add_reference(&topic) {
687                self.subscriptions.mark_subscribe(&topic);
688                topics_to_send.push(topic.clone());
689            } else {
690                tracing::debug!("Already subscribed to {topic}, skipping duplicate subscription");
691            }
692        }
693
694        if topics_to_send.is_empty() {
695            return Ok(());
696        }
697
698        // Serialize subscription messages
699        let mut payloads = Vec::with_capacity(topics_to_send.len());
700        for topic in &topics_to_send {
701            let message = BybitSubscription {
702                op: BybitWsOperation::Subscribe,
703                args: vec![topic.clone()],
704            };
705            let payload = serde_json::to_string(&message).map_err(|e| {
706                BybitWsError::Json(format!("Failed to serialize subscription: {e}"))
707            })?;
708            payloads.push(payload);
709        }
710
711        let cmd = HandlerCommand::Subscribe { topics: payloads };
712        self.cmd_tx
713            .read()
714            .await
715            .send(cmd)
716            .map_err(|e| BybitWsError::Send(format!("Failed to send subscribe command: {e}")))?;
717
718        Ok(())
719    }
720
721    /// Unsubscribe from the provided topics.
722    pub async fn unsubscribe(&self, topics: Vec<String>) -> BybitWsResult<()> {
723        if topics.is_empty() {
724            return Ok(());
725        }
726
727        tracing::debug!("Attempting to unsubscribe from topics: {topics:?}");
728
729        if self.signal.load(Ordering::Relaxed) {
730            tracing::debug!("Shutdown signal detected, skipping unsubscribe");
731            return Ok(());
732        }
733
734        // Use reference counting to avoid unsubscribing while other consumers still need the topic
735        let mut topics_to_send = Vec::new();
736
737        for topic in topics {
738            // Returns true if this was the last subscription (ref count 1 -> 0)
739            if self.subscriptions.remove_reference(&topic) {
740                self.subscriptions.mark_unsubscribe(&topic);
741                topics_to_send.push(topic.clone());
742            } else {
743                tracing::debug!("Topic {topic} still has active subscriptions, not unsubscribing");
744            }
745        }
746
747        if topics_to_send.is_empty() {
748            return Ok(());
749        }
750
751        // Serialize unsubscription messages
752        let mut payloads = Vec::with_capacity(topics_to_send.len());
753        for topic in &topics_to_send {
754            let message = BybitSubscription {
755                op: BybitWsOperation::Unsubscribe,
756                args: vec![topic.clone()],
757            };
758            if let Ok(payload) = serde_json::to_string(&message) {
759                payloads.push(payload);
760            }
761        }
762
763        let cmd = HandlerCommand::Unsubscribe { topics: payloads };
764        if let Err(e) = self.cmd_tx.read().await.send(cmd) {
765            tracing::debug!(error = %e, "Failed to send unsubscribe command");
766        }
767
768        Ok(())
769    }
770
771    /// Returns a stream of parsed [`NautilusWsMessage`] items.
772    ///
773    /// # Panics
774    ///
775    /// Panics if called before [`Self::connect`] or if the stream has already been taken.
776    pub fn stream(&mut self) -> impl futures_util::Stream<Item = NautilusWsMessage> + use<> {
777        let rx = self
778            .out_rx
779            .take()
780            .expect("Stream receiver already taken or client not connected");
781        let mut rx = Arc::try_unwrap(rx).expect("Cannot take ownership - other references exist");
782        async_stream::stream! {
783            while let Some(msg) = rx.recv().await {
784                yield msg;
785            }
786        }
787    }
788
789    /// Returns the number of currently registered subscriptions.
790    #[must_use]
791    pub fn subscription_count(&self) -> usize {
792        self.subscriptions.len()
793    }
794
795    /// Returns the credential associated with this client, if any.
796    #[must_use]
797    pub fn credential(&self) -> Option<&Credential> {
798        self.credential.as_ref()
799    }
800
801    /// Caches a single instrument.
802    ///
803    /// Any existing instrument with the same ID will be replaced.
804    pub fn cache_instrument(&self, instrument: InstrumentAny) {
805        self.instruments_cache
806            .insert(instrument.symbol().inner(), instrument.clone());
807
808        // Before connect() the handler isn't running; this send will fail and that's expected
809        // because connect() replays the instruments via InitializeInstruments
810        if let Ok(cmd_tx) = self.cmd_tx.try_read() {
811            let cmd = HandlerCommand::UpdateInstrument(instrument);
812            if let Err(e) = cmd_tx.send(cmd) {
813                tracing::debug!("Failed to send instrument update to handler: {e}");
814            }
815        }
816    }
817
818    /// Caches multiple instruments.
819    ///
820    /// Clears the existing cache first, then adds all provided instruments.
821    pub fn cache_instruments(&mut self, instruments: Vec<InstrumentAny>) {
822        self.instruments_cache.clear();
823        let mut count = 0;
824
825        tracing::debug!("Initializing Bybit instrument cache");
826
827        for inst in instruments {
828            let symbol = inst.symbol().inner();
829            self.instruments_cache.insert(symbol, inst.clone());
830            tracing::debug!("Cached instrument: {symbol}");
831            count += 1;
832        }
833
834        tracing::info!("Bybit instrument cache initialized with {count} instruments");
835    }
836
837    /// Sets the account ID for account message parsing.
838    pub fn set_account_id(&mut self, account_id: AccountId) {
839        self.account_id = Some(account_id);
840    }
841
842    /// Sets the account market maker level.
843    pub fn set_mm_level(&self, mm_level: u8) {
844        self.mm_level.store(mm_level, Ordering::Relaxed);
845    }
846
847    /// Returns a reference to the instruments cache.
848    #[must_use]
849    pub fn instruments(&self) -> &Arc<DashMap<Ustr, InstrumentAny>> {
850        &self.instruments_cache
851    }
852
853    /// Returns the account ID if set.
854    #[must_use]
855    pub fn account_id(&self) -> Option<AccountId> {
856        self.account_id
857    }
858
859    /// Returns the product type for public connections.
860    #[must_use]
861    pub fn product_type(&self) -> Option<BybitProductType> {
862        self.product_type
863    }
864
865    /// Subscribes to orderbook updates for a specific instrument.
866    ///
867    /// # Errors
868    ///
869    /// Returns an error if the subscription request fails.
870    ///
871    /// # References
872    ///
873    /// <https://bybit-exchange.github.io/docs/v5/websocket/public/orderbook>
874    pub async fn subscribe_orderbook(
875        &self,
876        instrument_id: InstrumentId,
877        depth: u32,
878    ) -> BybitWsResult<()> {
879        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
880        let topic = format!(
881            "{}.{depth}.{raw_symbol}",
882            BybitWsPublicChannel::OrderBook.as_ref()
883        );
884        self.subscribe(vec![topic]).await
885    }
886
887    /// Unsubscribes from orderbook updates for a specific instrument.
888    pub async fn unsubscribe_orderbook(
889        &self,
890        instrument_id: InstrumentId,
891        depth: u32,
892    ) -> BybitWsResult<()> {
893        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
894        let topic = format!(
895            "{}.{depth}.{raw_symbol}",
896            BybitWsPublicChannel::OrderBook.as_ref()
897        );
898        self.unsubscribe(vec![topic]).await
899    }
900
901    /// Subscribes to public trade updates for a specific instrument.
902    ///
903    /// # Errors
904    ///
905    /// Returns an error if the subscription request fails.
906    ///
907    /// # References
908    ///
909    /// <https://bybit-exchange.github.io/docs/v5/websocket/public/trade>
910    pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
911        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
912        let topic = format!(
913            "{}.{raw_symbol}",
914            BybitWsPublicChannel::PublicTrade.as_ref()
915        );
916        self.subscribe(vec![topic]).await
917    }
918
919    /// Unsubscribes from public trade updates for a specific instrument.
920    pub async fn unsubscribe_trades(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
921        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
922        let topic = format!(
923            "{}.{raw_symbol}",
924            BybitWsPublicChannel::PublicTrade.as_ref()
925        );
926        self.unsubscribe(vec![topic]).await
927    }
928
929    /// Subscribes to ticker updates for a specific instrument.
930    ///
931    /// # Errors
932    ///
933    /// Returns an error if the subscription request fails.
934    ///
935    /// # References
936    ///
937    /// <https://bybit-exchange.github.io/docs/v5/websocket/public/ticker>
938    pub async fn subscribe_ticker(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
939        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
940        let topic = format!("{}.{raw_symbol}", BybitWsPublicChannel::Tickers.as_ref());
941        self.subscribe(vec![topic]).await
942    }
943
944    /// Unsubscribes from ticker updates for a specific instrument.
945    pub async fn unsubscribe_ticker(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
946        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
947        let topic = format!("{}.{raw_symbol}", BybitWsPublicChannel::Tickers.as_ref());
948
949        // Clear funding rate cache to ensure fresh data on resubscribe
950        let symbol = self.product_type.map_or_else(
951            || instrument_id.symbol.inner(),
952            |pt| make_bybit_symbol(raw_symbol, pt),
953        );
954        self.funding_cache.write().await.remove(&symbol);
955
956        self.unsubscribe(vec![topic]).await
957    }
958
959    /// Subscribes to kline/candlestick updates for a specific instrument.
960    ///
961    /// # Errors
962    ///
963    /// Returns an error if the subscription request fails.
964    ///
965    /// # References
966    ///
967    /// <https://bybit-exchange.github.io/docs/v5/websocket/public/kline>
968    pub async fn subscribe_klines(
969        &self,
970        instrument_id: InstrumentId,
971        interval: impl Into<String>,
972    ) -> BybitWsResult<()> {
973        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
974        let topic = format!(
975            "{}.{}.{raw_symbol}",
976            BybitWsPublicChannel::Kline.as_ref(),
977            interval.into()
978        );
979        self.subscribe(vec![topic]).await
980    }
981
982    /// Unsubscribes from kline/candlestick updates for a specific instrument.
983    pub async fn unsubscribe_klines(
984        &self,
985        instrument_id: InstrumentId,
986        interval: impl Into<String>,
987    ) -> BybitWsResult<()> {
988        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
989        let topic = format!(
990            "{}.{}.{raw_symbol}",
991            BybitWsPublicChannel::Kline.as_ref(),
992            interval.into()
993        );
994        self.unsubscribe(vec![topic]).await
995    }
996
997    /// Subscribes to order updates.
998    ///
999    /// # Errors
1000    ///
1001    /// Returns an error if the subscription request fails or if not authenticated.
1002    ///
1003    /// # References
1004    ///
1005    /// <https://bybit-exchange.github.io/docs/v5/websocket/private/order>
1006    pub async fn subscribe_orders(&self) -> BybitWsResult<()> {
1007        if !self.requires_auth {
1008            return Err(BybitWsError::Authentication(
1009                "Order subscription requires authentication".to_string(),
1010            ));
1011        }
1012        self.subscribe(vec![BybitWsPrivateChannel::Order.as_ref().to_string()])
1013            .await
1014    }
1015
1016    /// Unsubscribes from order updates.
1017    pub async fn unsubscribe_orders(&self) -> BybitWsResult<()> {
1018        self.unsubscribe(vec![BybitWsPrivateChannel::Order.as_ref().to_string()])
1019            .await
1020    }
1021
1022    /// Subscribes to execution/fill updates.
1023    ///
1024    /// # Errors
1025    ///
1026    /// Returns an error if the subscription request fails or if not authenticated.
1027    ///
1028    /// # References
1029    ///
1030    /// <https://bybit-exchange.github.io/docs/v5/websocket/private/execution>
1031    pub async fn subscribe_executions(&self) -> BybitWsResult<()> {
1032        if !self.requires_auth {
1033            return Err(BybitWsError::Authentication(
1034                "Execution subscription requires authentication".to_string(),
1035            ));
1036        }
1037        self.subscribe(vec![BybitWsPrivateChannel::Execution.as_ref().to_string()])
1038            .await
1039    }
1040
1041    /// Unsubscribes from execution/fill updates.
1042    pub async fn unsubscribe_executions(&self) -> BybitWsResult<()> {
1043        self.unsubscribe(vec![BybitWsPrivateChannel::Execution.as_ref().to_string()])
1044            .await
1045    }
1046
1047    /// Subscribes to position updates.
1048    ///
1049    /// # Errors
1050    ///
1051    /// Returns an error if the subscription request fails or if not authenticated.
1052    ///
1053    /// # References
1054    ///
1055    /// <https://bybit-exchange.github.io/docs/v5/websocket/private/position>
1056    pub async fn subscribe_positions(&self) -> BybitWsResult<()> {
1057        if !self.requires_auth {
1058            return Err(BybitWsError::Authentication(
1059                "Position subscription requires authentication".to_string(),
1060            ));
1061        }
1062        self.subscribe(vec![BybitWsPrivateChannel::Position.as_ref().to_string()])
1063            .await
1064    }
1065
1066    /// Unsubscribes from position updates.
1067    pub async fn unsubscribe_positions(&self) -> BybitWsResult<()> {
1068        self.unsubscribe(vec![BybitWsPrivateChannel::Position.as_ref().to_string()])
1069            .await
1070    }
1071
1072    /// Subscribes to wallet/balance updates.
1073    ///
1074    /// # Errors
1075    ///
1076    /// Returns an error if the subscription request fails or if not authenticated.
1077    ///
1078    /// # References
1079    ///
1080    /// <https://bybit-exchange.github.io/docs/v5/websocket/private/wallet>
1081    pub async fn subscribe_wallet(&self) -> BybitWsResult<()> {
1082        if !self.requires_auth {
1083            return Err(BybitWsError::Authentication(
1084                "Wallet subscription requires authentication".to_string(),
1085            ));
1086        }
1087        self.subscribe(vec![BybitWsPrivateChannel::Wallet.as_ref().to_string()])
1088            .await
1089    }
1090
1091    /// Unsubscribes from wallet/balance updates.
1092    pub async fn unsubscribe_wallet(&self) -> BybitWsResult<()> {
1093        self.unsubscribe(vec![BybitWsPrivateChannel::Wallet.as_ref().to_string()])
1094            .await
1095    }
1096
1097    /// Places an order via WebSocket.
1098    ///
1099    /// # Errors
1100    ///
1101    /// Returns an error if the order request fails or if not authenticated.
1102    ///
1103    /// # References
1104    ///
1105    /// <https://bybit-exchange.github.io/docs/v5/websocket/trade/guideline>
1106    pub async fn place_order(
1107        &self,
1108        params: BybitWsPlaceOrderParams,
1109        client_order_id: ClientOrderId,
1110        trader_id: TraderId,
1111        strategy_id: StrategyId,
1112        instrument_id: InstrumentId,
1113    ) -> BybitWsResult<()> {
1114        if !self.is_authenticated.load(Ordering::Relaxed) {
1115            return Err(BybitWsError::Authentication(
1116                "Must be authenticated to place orders".to_string(),
1117            ));
1118        }
1119
1120        let cmd = HandlerCommand::PlaceOrder {
1121            params,
1122            client_order_id,
1123            trader_id,
1124            strategy_id,
1125            instrument_id,
1126        };
1127
1128        self.send_cmd(cmd).await
1129    }
1130
1131    /// Amends an existing order via WebSocket.
1132    ///
1133    /// # Errors
1134    ///
1135    /// Returns an error if the amend request fails or if not authenticated.
1136    ///
1137    /// # References
1138    ///
1139    /// <https://bybit-exchange.github.io/docs/v5/websocket/trade/guideline>
1140    pub async fn amend_order(
1141        &self,
1142        params: BybitWsAmendOrderParams,
1143        client_order_id: ClientOrderId,
1144        trader_id: TraderId,
1145        strategy_id: StrategyId,
1146        instrument_id: InstrumentId,
1147        venue_order_id: Option<VenueOrderId>,
1148    ) -> BybitWsResult<()> {
1149        if !self.is_authenticated.load(Ordering::Relaxed) {
1150            return Err(BybitWsError::Authentication(
1151                "Must be authenticated to amend orders".to_string(),
1152            ));
1153        }
1154
1155        let cmd = HandlerCommand::AmendOrder {
1156            params,
1157            client_order_id,
1158            trader_id,
1159            strategy_id,
1160            instrument_id,
1161            venue_order_id,
1162        };
1163
1164        self.send_cmd(cmd).await
1165    }
1166
1167    /// Cancels an order via WebSocket.
1168    ///
1169    /// # Errors
1170    ///
1171    /// Returns an error if the cancel request fails or if not authenticated.
1172    ///
1173    /// # References
1174    ///
1175    /// <https://bybit-exchange.github.io/docs/v5/websocket/trade/guideline>
1176    pub async fn cancel_order(
1177        &self,
1178        params: BybitWsCancelOrderParams,
1179        client_order_id: ClientOrderId,
1180        trader_id: TraderId,
1181        strategy_id: StrategyId,
1182        instrument_id: InstrumentId,
1183        venue_order_id: Option<VenueOrderId>,
1184    ) -> BybitWsResult<()> {
1185        if !self.is_authenticated.load(Ordering::Relaxed) {
1186            return Err(BybitWsError::Authentication(
1187                "Must be authenticated to cancel orders".to_string(),
1188            ));
1189        }
1190
1191        let cmd = HandlerCommand::CancelOrder {
1192            params,
1193            client_order_id,
1194            trader_id,
1195            strategy_id,
1196            instrument_id,
1197            venue_order_id,
1198        };
1199
1200        self.send_cmd(cmd).await
1201    }
1202
1203    /// Batch creates multiple orders via WebSocket.
1204    ///
1205    /// # Errors
1206    ///
1207    /// Returns an error if the batch request fails or if not authenticated.
1208    ///
1209    /// # References
1210    ///
1211    /// <https://bybit-exchange.github.io/docs/v5/websocket/trade/guideline>
1212    pub async fn batch_place_orders(
1213        &self,
1214        trader_id: TraderId,
1215        strategy_id: StrategyId,
1216        orders: Vec<BybitWsPlaceOrderParams>,
1217    ) -> BybitWsResult<()> {
1218        if !self.is_authenticated.load(Ordering::Relaxed) {
1219            return Err(BybitWsError::Authentication(
1220                "Must be authenticated to place orders".to_string(),
1221            ));
1222        }
1223
1224        if orders.is_empty() {
1225            tracing::warn!("Batch place orders called with empty orders list");
1226            return Ok(());
1227        }
1228
1229        for chunk in orders.chunks(BATCH_PROCESSING_LIMIT) {
1230            self.batch_place_orders_chunk(trader_id, strategy_id, chunk.to_vec())
1231                .await?;
1232        }
1233
1234        Ok(())
1235    }
1236
1237    async fn batch_place_orders_chunk(
1238        &self,
1239        trader_id: TraderId,
1240        strategy_id: StrategyId,
1241        orders: Vec<BybitWsPlaceOrderParams>,
1242    ) -> BybitWsResult<()> {
1243        let category = orders[0].category;
1244
1245        let batch_req_id = UUID4::new().to_string();
1246
1247        // Extract order tracking data before consuming orders to register with handler
1248        let mut batch_order_data = Vec::new();
1249        for order in &orders {
1250            if let Some(order_link_id_str) = &order.order_link_id {
1251                let client_order_id = ClientOrderId::from(order_link_id_str.as_str());
1252                let cache_key = make_bybit_symbol(order.symbol.as_str(), category);
1253                let instrument_id = self
1254                    .instruments_cache
1255                    .get(&cache_key)
1256                    .map(|inst| inst.id())
1257                    .ok_or_else(|| {
1258                        BybitWsError::ClientError(format!(
1259                            "Instrument {} not found in cache",
1260                            cache_key
1261                        ))
1262                    })?;
1263                batch_order_data.push((
1264                    client_order_id,
1265                    (client_order_id, trader_id, strategy_id, instrument_id),
1266                ));
1267            }
1268        }
1269
1270        if !batch_order_data.is_empty() {
1271            let cmd = HandlerCommand::RegisterBatchPlace {
1272                req_id: batch_req_id.clone(),
1273                orders: batch_order_data,
1274            };
1275            let cmd_tx = self.cmd_tx.read().await;
1276            if let Err(e) = cmd_tx.send(cmd) {
1277                tracing::error!("Failed to send RegisterBatchPlace command: {e}");
1278            }
1279        }
1280
1281        let mm_level = self.mm_level.load(Ordering::Relaxed);
1282        let has_non_post_only = orders
1283            .iter()
1284            .any(|o| !matches!(o.time_in_force, Some(BybitTimeInForce::PostOnly)));
1285        let referer = if has_non_post_only || mm_level == 0 {
1286            Some(BYBIT_NAUTILUS_BROKER_ID.to_string())
1287        } else {
1288            None
1289        };
1290
1291        let request_items: Vec<BybitWsBatchPlaceItem> = orders
1292            .into_iter()
1293            .map(|order| BybitWsBatchPlaceItem {
1294                symbol: order.symbol,
1295                side: order.side,
1296                order_type: order.order_type,
1297                qty: order.qty,
1298                is_leverage: order.is_leverage,
1299                market_unit: order.market_unit,
1300                price: order.price,
1301                time_in_force: order.time_in_force,
1302                order_link_id: order.order_link_id,
1303                reduce_only: order.reduce_only,
1304                close_on_trigger: order.close_on_trigger,
1305                trigger_price: order.trigger_price,
1306                trigger_by: order.trigger_by,
1307                trigger_direction: order.trigger_direction,
1308                tpsl_mode: order.tpsl_mode,
1309                take_profit: order.take_profit,
1310                stop_loss: order.stop_loss,
1311                tp_trigger_by: order.tp_trigger_by,
1312                sl_trigger_by: order.sl_trigger_by,
1313                sl_trigger_price: order.sl_trigger_price,
1314                tp_trigger_price: order.tp_trigger_price,
1315                sl_order_type: order.sl_order_type,
1316                tp_order_type: order.tp_order_type,
1317                sl_limit_price: order.sl_limit_price,
1318                tp_limit_price: order.tp_limit_price,
1319            })
1320            .collect();
1321
1322        let args = BybitWsBatchPlaceOrderArgs {
1323            category,
1324            request: request_items,
1325        };
1326
1327        let request = BybitWsRequest {
1328            req_id: Some(batch_req_id),
1329            op: BybitWsOrderRequestOp::CreateBatch,
1330            header: BybitWsHeader::with_referer(referer),
1331            args: vec![args],
1332        };
1333
1334        let payload = serde_json::to_string(&request).map_err(BybitWsError::from)?;
1335
1336        self.send_text(&payload).await
1337    }
1338
1339    /// Batch amends multiple orders via WebSocket.
1340    ///
1341    /// # Errors
1342    ///
1343    /// Returns an error if the batch request fails or if not authenticated.
1344    pub async fn batch_amend_orders(
1345        &self,
1346        #[allow(unused_variables)] trader_id: TraderId,
1347        #[allow(unused_variables)] strategy_id: StrategyId,
1348        orders: Vec<BybitWsAmendOrderParams>,
1349    ) -> BybitWsResult<()> {
1350        if !self.is_authenticated.load(Ordering::Relaxed) {
1351            return Err(BybitWsError::Authentication(
1352                "Must be authenticated to amend orders".to_string(),
1353            ));
1354        }
1355
1356        if orders.is_empty() {
1357            tracing::warn!("Batch amend orders called with empty orders list");
1358            return Ok(());
1359        }
1360
1361        for chunk in orders.chunks(BATCH_PROCESSING_LIMIT) {
1362            self.batch_amend_orders_chunk(trader_id, strategy_id, chunk.to_vec())
1363                .await?;
1364        }
1365
1366        Ok(())
1367    }
1368
1369    async fn batch_amend_orders_chunk(
1370        &self,
1371        #[allow(unused_variables)] trader_id: TraderId,
1372        #[allow(unused_variables)] strategy_id: StrategyId,
1373        orders: Vec<BybitWsAmendOrderParams>,
1374    ) -> BybitWsResult<()> {
1375        let request = BybitWsRequest {
1376            req_id: None,
1377            op: BybitWsOrderRequestOp::AmendBatch,
1378            header: BybitWsHeader::now(),
1379            args: orders,
1380        };
1381
1382        let payload = serde_json::to_string(&request).map_err(BybitWsError::from)?;
1383
1384        self.send_text(&payload).await
1385    }
1386
1387    /// Batch cancels multiple orders via WebSocket.
1388    ///
1389    /// # Errors
1390    ///
1391    /// Returns an error if the batch request fails or if not authenticated.
1392    pub async fn batch_cancel_orders(
1393        &self,
1394        orders: Vec<BybitWsCancelOrderParams>,
1395    ) -> BybitWsResult<()> {
1396        if !self.is_authenticated.load(Ordering::Relaxed) {
1397            return Err(BybitWsError::Authentication(
1398                "Must be authenticated to cancel orders".to_string(),
1399            ));
1400        }
1401
1402        if orders.is_empty() {
1403            tracing::warn!("Batch cancel orders called with empty orders list");
1404            return Ok(());
1405        }
1406
1407        for chunk in orders.chunks(BATCH_PROCESSING_LIMIT) {
1408            self.batch_cancel_orders_chunk(chunk.to_vec()).await?;
1409        }
1410
1411        Ok(())
1412    }
1413
1414    async fn batch_cancel_orders_chunk(
1415        &self,
1416        orders: Vec<BybitWsCancelOrderParams>,
1417    ) -> BybitWsResult<()> {
1418        // Extract category from first order (all orders must have the same category)
1419        let category = orders[0].category;
1420
1421        let request_items: Vec<BybitWsBatchCancelItem> = orders
1422            .into_iter()
1423            .map(|order| BybitWsBatchCancelItem {
1424                symbol: order.symbol,
1425                order_id: order.order_id,
1426                order_link_id: order.order_link_id,
1427            })
1428            .collect();
1429
1430        let args = BybitWsBatchCancelOrderArgs {
1431            category,
1432            request: request_items,
1433        };
1434
1435        let request = BybitWsRequest {
1436            req_id: None,
1437            op: BybitWsOrderRequestOp::CancelBatch,
1438            header: BybitWsHeader::now(),
1439            args: vec![args],
1440        };
1441
1442        let payload = serde_json::to_string(&request).map_err(BybitWsError::from)?;
1443
1444        self.send_text(&payload).await
1445    }
1446
1447    /// Submits an order using Nautilus domain objects.
1448    ///
1449    /// # Errors
1450    ///
1451    /// Returns an error if order submission fails or if not authenticated.
1452    #[allow(clippy::too_many_arguments)]
1453    pub async fn submit_order(
1454        &self,
1455        product_type: BybitProductType,
1456        trader_id: TraderId,
1457        strategy_id: StrategyId,
1458        instrument_id: InstrumentId,
1459        client_order_id: ClientOrderId,
1460        order_side: OrderSide,
1461        order_type: OrderType,
1462        quantity: Quantity,
1463        is_quote_quantity: bool,
1464        time_in_force: Option<TimeInForce>,
1465        price: Option<Price>,
1466        trigger_price: Option<Price>,
1467        post_only: Option<bool>,
1468        reduce_only: Option<bool>,
1469        is_leverage: bool,
1470    ) -> BybitWsResult<()> {
1471        let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1472            .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1473        let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1474
1475        let bybit_side = match order_side {
1476            OrderSide::Buy => BybitOrderSide::Buy,
1477            OrderSide::Sell => BybitOrderSide::Sell,
1478            _ => {
1479                return Err(BybitWsError::ClientError(format!(
1480                    "Invalid order side: {order_side:?}"
1481                )));
1482            }
1483        };
1484
1485        // For stop/conditional orders, Bybit uses Market/Limit with trigger parameters
1486        let (bybit_order_type, is_stop_order) = match order_type {
1487            OrderType::Market => (BybitOrderType::Market, false),
1488            OrderType::Limit => (BybitOrderType::Limit, false),
1489            OrderType::StopMarket | OrderType::MarketIfTouched => (BybitOrderType::Market, true),
1490            OrderType::StopLimit | OrderType::LimitIfTouched => (BybitOrderType::Limit, true),
1491            _ => {
1492                return Err(BybitWsError::ClientError(format!(
1493                    "Unsupported order type: {order_type:?}"
1494                )));
1495            }
1496        };
1497
1498        let bybit_tif = if bybit_order_type == BybitOrderType::Market {
1499            None
1500        } else if post_only == Some(true) {
1501            Some(BybitTimeInForce::PostOnly)
1502        } else if let Some(tif) = time_in_force {
1503            Some(match tif {
1504                TimeInForce::Gtc => BybitTimeInForce::Gtc,
1505                TimeInForce::Ioc => BybitTimeInForce::Ioc,
1506                TimeInForce::Fok => BybitTimeInForce::Fok,
1507                _ => {
1508                    return Err(BybitWsError::ClientError(format!(
1509                        "Unsupported time in force: {tif:?}"
1510                    )));
1511                }
1512            })
1513        } else {
1514            None
1515        };
1516
1517        // For SPOT market orders, specify baseCoin to interpret qty as base currency.
1518        // This ensures Nautilus quantities (always in base currency) are interpreted correctly.
1519        let market_unit = if product_type == BybitProductType::Spot
1520            && bybit_order_type == BybitOrderType::Market
1521        {
1522            if is_quote_quantity {
1523                Some(BYBIT_QUOTE_COIN.to_string())
1524            } else {
1525                Some(BYBIT_BASE_COIN.to_string())
1526            }
1527        } else {
1528            None
1529        };
1530
1531        // Only SPOT products support is_leverage parameter
1532        let is_leverage_value = if product_type == BybitProductType::Spot {
1533            Some(i32::from(is_leverage))
1534        } else {
1535            None
1536        };
1537
1538        // Stop semantics: Buy stops trigger on rise (breakout), sell stops trigger on fall (breakdown)
1539        // MIT semantics: Buy MIT triggers on fall (pullback entry), sell MIT triggers on rise (rally entry)
1540        let trigger_direction = if is_stop_order {
1541            match (order_type, order_side) {
1542                (OrderType::StopMarket | OrderType::StopLimit, OrderSide::Buy) => {
1543                    Some(BybitTriggerDirection::RisesTo as i32)
1544                }
1545                (OrderType::StopMarket | OrderType::StopLimit, OrderSide::Sell) => {
1546                    Some(BybitTriggerDirection::FallsTo as i32)
1547                }
1548                (OrderType::MarketIfTouched | OrderType::LimitIfTouched, OrderSide::Buy) => {
1549                    Some(BybitTriggerDirection::FallsTo as i32)
1550                }
1551                (OrderType::MarketIfTouched | OrderType::LimitIfTouched, OrderSide::Sell) => {
1552                    Some(BybitTriggerDirection::RisesTo as i32)
1553                }
1554                _ => None,
1555            }
1556        } else {
1557            None
1558        };
1559
1560        let params = if is_stop_order {
1561            // For conditional orders, ALL types use triggerPrice field
1562            // sl_trigger_price/tp_trigger_price are only for TP/SL attached to regular orders
1563            BybitWsPlaceOrderParams {
1564                category: product_type,
1565                symbol: raw_symbol,
1566                side: bybit_side,
1567                order_type: bybit_order_type,
1568                qty: quantity.to_string(),
1569                is_leverage: is_leverage_value,
1570                market_unit: market_unit.clone(),
1571                price: price.map(|p| p.to_string()),
1572                time_in_force: bybit_tif,
1573                order_link_id: Some(client_order_id.to_string()),
1574                reduce_only: reduce_only.filter(|&r| r),
1575                close_on_trigger: None,
1576                trigger_price: trigger_price.map(|p| p.to_string()),
1577                trigger_by: Some(BybitTriggerType::LastPrice),
1578                trigger_direction,
1579                tpsl_mode: None, // Not needed for standalone conditional orders
1580                take_profit: None,
1581                stop_loss: None,
1582                tp_trigger_by: None,
1583                sl_trigger_by: None,
1584                sl_trigger_price: None, // Not used for standalone stop orders
1585                tp_trigger_price: None, // Not used for standalone stop orders
1586                sl_order_type: None,
1587                tp_order_type: None,
1588                sl_limit_price: None,
1589                tp_limit_price: None,
1590            }
1591        } else {
1592            // Regular market/limit orders
1593            BybitWsPlaceOrderParams {
1594                category: product_type,
1595                symbol: raw_symbol,
1596                side: bybit_side,
1597                order_type: bybit_order_type,
1598                qty: quantity.to_string(),
1599                is_leverage: is_leverage_value,
1600                market_unit,
1601                price: price.map(|p| p.to_string()),
1602                time_in_force: if bybit_order_type == BybitOrderType::Market {
1603                    None
1604                } else {
1605                    bybit_tif
1606                },
1607                order_link_id: Some(client_order_id.to_string()),
1608                reduce_only: reduce_only.filter(|&r| r),
1609                close_on_trigger: None,
1610                trigger_price: None,
1611                trigger_by: None,
1612                trigger_direction: None,
1613                tpsl_mode: None,
1614                take_profit: None,
1615                stop_loss: None,
1616                tp_trigger_by: None,
1617                sl_trigger_by: None,
1618                sl_trigger_price: None,
1619                tp_trigger_price: None,
1620                sl_order_type: None,
1621                tp_order_type: None,
1622                sl_limit_price: None,
1623                tp_limit_price: None,
1624            }
1625        };
1626
1627        self.place_order(
1628            params,
1629            client_order_id,
1630            trader_id,
1631            strategy_id,
1632            instrument_id,
1633        )
1634        .await
1635    }
1636
1637    /// Modifies an existing order using Nautilus domain objects.
1638    ///
1639    /// # Errors
1640    ///
1641    /// Returns an error if modification fails or if not authenticated.
1642    #[allow(clippy::too_many_arguments)]
1643    pub async fn modify_order(
1644        &self,
1645        product_type: BybitProductType,
1646        trader_id: TraderId,
1647        strategy_id: StrategyId,
1648        instrument_id: InstrumentId,
1649        client_order_id: ClientOrderId,
1650        venue_order_id: Option<VenueOrderId>,
1651        quantity: Option<Quantity>,
1652        price: Option<Price>,
1653    ) -> BybitWsResult<()> {
1654        let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1655            .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1656        let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1657
1658        let params = BybitWsAmendOrderParams {
1659            category: product_type,
1660            symbol: raw_symbol,
1661            order_id: venue_order_id.map(|id| id.to_string()),
1662            order_link_id: Some(client_order_id.to_string()),
1663            qty: quantity.map(|q| q.to_string()),
1664            price: price.map(|p| p.to_string()),
1665            trigger_price: None,
1666            take_profit: None,
1667            stop_loss: None,
1668            tp_trigger_by: None,
1669            sl_trigger_by: None,
1670        };
1671
1672        self.amend_order(
1673            params,
1674            client_order_id,
1675            trader_id,
1676            strategy_id,
1677            instrument_id,
1678            venue_order_id,
1679        )
1680        .await
1681    }
1682
1683    /// Cancels an order using Nautilus domain objects.
1684    ///
1685    /// # Errors
1686    ///
1687    /// Returns an error if cancellation fails or if not authenticated.
1688    pub async fn cancel_order_by_id(
1689        &self,
1690        product_type: BybitProductType,
1691        trader_id: TraderId,
1692        strategy_id: StrategyId,
1693        instrument_id: InstrumentId,
1694        client_order_id: ClientOrderId,
1695        venue_order_id: Option<VenueOrderId>,
1696    ) -> BybitWsResult<()> {
1697        let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1698            .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1699        let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1700
1701        let params = BybitWsCancelOrderParams {
1702            category: product_type,
1703            symbol: raw_symbol,
1704            order_id: venue_order_id.map(|id| id.to_string()),
1705            order_link_id: Some(client_order_id.to_string()),
1706        };
1707
1708        self.cancel_order(
1709            params,
1710            client_order_id,
1711            trader_id,
1712            strategy_id,
1713            instrument_id,
1714            venue_order_id,
1715        )
1716        .await
1717    }
1718
1719    /// Batch cancels multiple orders using Nautilus domain objects.
1720    ///
1721    /// # Errors
1722    ///
1723    /// Returns an error if batch cancellation fails or if not authenticated.
1724    pub async fn batch_cancel_orders_by_id(
1725        &self,
1726        product_type: BybitProductType,
1727        trader_id: TraderId,
1728        strategy_id: StrategyId,
1729        instrument_ids: Vec<InstrumentId>,
1730        venue_order_ids: Vec<Option<VenueOrderId>>,
1731        client_order_ids: Vec<Option<ClientOrderId>>,
1732    ) -> BybitWsResult<()> {
1733        if instrument_ids.len() != venue_order_ids.len()
1734            || instrument_ids.len() != client_order_ids.len()
1735        {
1736            return Err(BybitWsError::ClientError(
1737                "instrument_ids, venue_order_ids, and client_order_ids must have the same length"
1738                    .to_string(),
1739            ));
1740        }
1741
1742        let batch_req_id = UUID4::new().to_string();
1743
1744        let mut params_vec = Vec::new();
1745        let mut batch_cancel_data = Vec::new();
1746
1747        for ((instrument_id, venue_order_id), client_order_id) in instrument_ids
1748            .into_iter()
1749            .zip(venue_order_ids.into_iter())
1750            .zip(client_order_ids.into_iter())
1751        {
1752            let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1753                .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1754            let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1755
1756            let params = BybitWsCancelOrderParams {
1757                category: product_type,
1758                symbol: raw_symbol,
1759                order_id: venue_order_id.map(|id| id.to_string()),
1760                order_link_id: client_order_id.as_ref().map(|id| id.to_string()),
1761            };
1762
1763            params_vec.push(params);
1764
1765            if let Some(client_order_id) = client_order_id {
1766                batch_cancel_data.push((
1767                    client_order_id,
1768                    (
1769                        client_order_id,
1770                        trader_id,
1771                        strategy_id,
1772                        instrument_id,
1773                        venue_order_id,
1774                    ),
1775                ));
1776            }
1777        }
1778
1779        if !batch_cancel_data.is_empty() {
1780            let cmd = HandlerCommand::RegisterBatchCancel {
1781                req_id: batch_req_id.clone(),
1782                cancels: batch_cancel_data,
1783            };
1784            let cmd_tx = self.cmd_tx.read().await;
1785            if let Err(e) = cmd_tx.send(cmd) {
1786                tracing::error!("Failed to send RegisterBatchCancel command: {e}");
1787            }
1788        }
1789
1790        self.batch_cancel_orders_with_req_id(params_vec, batch_req_id)
1791            .await
1792    }
1793
1794    /// Internal method to batch cancel with a specific request ID.
1795    async fn batch_cancel_orders_with_req_id(
1796        &self,
1797        orders: Vec<BybitWsCancelOrderParams>,
1798        req_id: String,
1799    ) -> BybitWsResult<()> {
1800        if !self.is_authenticated.load(Ordering::Relaxed) {
1801            return Err(BybitWsError::Authentication(
1802                "Must be authenticated to cancel orders".to_string(),
1803            ));
1804        }
1805
1806        if orders.is_empty() {
1807            return Ok(());
1808        }
1809
1810        if orders.len() > 20 {
1811            return Err(BybitWsError::ClientError(
1812                "Batch cancel limit is 20 orders per request".to_string(),
1813            ));
1814        }
1815
1816        // Extract category from first order (all orders must have the same category)
1817        let category = orders[0].category;
1818
1819        let request_items: Vec<BybitWsBatchCancelItem> = orders
1820            .into_iter()
1821            .map(|order| BybitWsBatchCancelItem {
1822                symbol: order.symbol,
1823                order_id: order.order_id,
1824                order_link_id: order.order_link_id,
1825            })
1826            .collect();
1827
1828        let args = BybitWsBatchCancelOrderArgs {
1829            category,
1830            request: request_items,
1831        };
1832
1833        let request = BybitWsRequest {
1834            req_id: Some(req_id),
1835            op: BybitWsOrderRequestOp::CancelBatch,
1836            header: BybitWsHeader::now(),
1837            args: vec![args],
1838        };
1839
1840        let payload = serde_json::to_string(&request).map_err(BybitWsError::from)?;
1841
1842        self.send_text(&payload).await
1843    }
1844
1845    /// Builds order params for placing an order.
1846    #[allow(clippy::too_many_arguments)]
1847    pub fn build_place_order_params(
1848        &self,
1849        product_type: BybitProductType,
1850        instrument_id: InstrumentId,
1851        client_order_id: ClientOrderId,
1852        order_side: OrderSide,
1853        order_type: OrderType,
1854        quantity: Quantity,
1855        is_quote_quantity: bool,
1856        time_in_force: Option<TimeInForce>,
1857        price: Option<Price>,
1858        trigger_price: Option<Price>,
1859        post_only: Option<bool>,
1860        reduce_only: Option<bool>,
1861        is_leverage: bool,
1862    ) -> BybitWsResult<BybitWsPlaceOrderParams> {
1863        let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1864            .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1865        let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1866
1867        let bybit_side = match order_side {
1868            OrderSide::Buy => BybitOrderSide::Buy,
1869            OrderSide::Sell => BybitOrderSide::Sell,
1870            _ => {
1871                return Err(BybitWsError::ClientError(format!(
1872                    "Invalid order side: {order_side:?}"
1873                )));
1874            }
1875        };
1876
1877        let (bybit_order_type, is_stop_order) = match order_type {
1878            OrderType::Market => (BybitOrderType::Market, false),
1879            OrderType::Limit => (BybitOrderType::Limit, false),
1880            OrderType::StopMarket | OrderType::MarketIfTouched => (BybitOrderType::Market, true),
1881            OrderType::StopLimit | OrderType::LimitIfTouched => (BybitOrderType::Limit, true),
1882            _ => {
1883                return Err(BybitWsError::ClientError(format!(
1884                    "Unsupported order type: {order_type:?}"
1885                )));
1886            }
1887        };
1888
1889        let bybit_tif = if post_only == Some(true) {
1890            Some(BybitTimeInForce::PostOnly)
1891        } else if let Some(tif) = time_in_force {
1892            Some(match tif {
1893                TimeInForce::Gtc => BybitTimeInForce::Gtc,
1894                TimeInForce::Ioc => BybitTimeInForce::Ioc,
1895                TimeInForce::Fok => BybitTimeInForce::Fok,
1896                _ => {
1897                    return Err(BybitWsError::ClientError(format!(
1898                        "Unsupported time in force: {tif:?}"
1899                    )));
1900                }
1901            })
1902        } else {
1903            None
1904        };
1905
1906        let market_unit = if product_type == BybitProductType::Spot
1907            && bybit_order_type == BybitOrderType::Market
1908        {
1909            if is_quote_quantity {
1910                Some(BYBIT_QUOTE_COIN.to_string())
1911            } else {
1912                Some(BYBIT_BASE_COIN.to_string())
1913            }
1914        } else {
1915            None
1916        };
1917
1918        // Only SPOT products support is_leverage parameter
1919        let is_leverage_value = if product_type == BybitProductType::Spot {
1920            Some(i32::from(is_leverage))
1921        } else {
1922            None
1923        };
1924
1925        // Stop semantics: Buy stops trigger on rise (breakout), sell stops trigger on fall (breakdown)
1926        // MIT semantics: Buy MIT triggers on fall (pullback entry), sell MIT triggers on rise (rally entry)
1927        let trigger_direction = if is_stop_order {
1928            match (order_type, order_side) {
1929                (OrderType::StopMarket | OrderType::StopLimit, OrderSide::Buy) => {
1930                    Some(BybitTriggerDirection::RisesTo as i32)
1931                }
1932                (OrderType::StopMarket | OrderType::StopLimit, OrderSide::Sell) => {
1933                    Some(BybitTriggerDirection::FallsTo as i32)
1934                }
1935                (OrderType::MarketIfTouched | OrderType::LimitIfTouched, OrderSide::Buy) => {
1936                    Some(BybitTriggerDirection::FallsTo as i32)
1937                }
1938                (OrderType::MarketIfTouched | OrderType::LimitIfTouched, OrderSide::Sell) => {
1939                    Some(BybitTriggerDirection::RisesTo as i32)
1940                }
1941                _ => None,
1942            }
1943        } else {
1944            None
1945        };
1946
1947        let params = if is_stop_order {
1948            BybitWsPlaceOrderParams {
1949                category: product_type,
1950                symbol: raw_symbol,
1951                side: bybit_side,
1952                order_type: bybit_order_type,
1953                qty: quantity.to_string(),
1954                is_leverage: is_leverage_value,
1955                market_unit,
1956                price: price.map(|p| p.to_string()),
1957                time_in_force: if bybit_order_type == BybitOrderType::Market {
1958                    None
1959                } else {
1960                    bybit_tif
1961                },
1962                order_link_id: Some(client_order_id.to_string()),
1963                reduce_only: reduce_only.filter(|&r| r),
1964                close_on_trigger: None,
1965                trigger_price: trigger_price.map(|p| p.to_string()),
1966                trigger_by: Some(BybitTriggerType::LastPrice),
1967                trigger_direction,
1968                tpsl_mode: None,
1969                take_profit: None,
1970                stop_loss: None,
1971                tp_trigger_by: None,
1972                sl_trigger_by: None,
1973                sl_trigger_price: None,
1974                tp_trigger_price: None,
1975                sl_order_type: None,
1976                tp_order_type: None,
1977                sl_limit_price: None,
1978                tp_limit_price: None,
1979            }
1980        } else {
1981            BybitWsPlaceOrderParams {
1982                category: product_type,
1983                symbol: raw_symbol,
1984                side: bybit_side,
1985                order_type: bybit_order_type,
1986                qty: quantity.to_string(),
1987                is_leverage: is_leverage_value,
1988                market_unit,
1989                price: price.map(|p| p.to_string()),
1990                time_in_force: if bybit_order_type == BybitOrderType::Market {
1991                    None
1992                } else {
1993                    bybit_tif
1994                },
1995                order_link_id: Some(client_order_id.to_string()),
1996                reduce_only: reduce_only.filter(|&r| r),
1997                close_on_trigger: None,
1998                trigger_price: None,
1999                trigger_by: None,
2000                trigger_direction: None,
2001                tpsl_mode: None,
2002                take_profit: None,
2003                stop_loss: None,
2004                tp_trigger_by: None,
2005                sl_trigger_by: None,
2006                sl_trigger_price: None,
2007                tp_trigger_price: None,
2008                sl_order_type: None,
2009                tp_order_type: None,
2010                sl_limit_price: None,
2011                tp_limit_price: None,
2012            }
2013        };
2014
2015        Ok(params)
2016    }
2017
2018    /// Builds order params for amending an order.
2019    #[allow(clippy::too_many_arguments)]
2020    pub fn build_amend_order_params(
2021        &self,
2022        product_type: BybitProductType,
2023        instrument_id: InstrumentId,
2024        venue_order_id: Option<VenueOrderId>,
2025        client_order_id: Option<ClientOrderId>,
2026        quantity: Option<Quantity>,
2027        price: Option<Price>,
2028    ) -> BybitWsResult<BybitWsAmendOrderParams> {
2029        let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
2030            .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
2031        let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
2032
2033        Ok(BybitWsAmendOrderParams {
2034            category: product_type,
2035            symbol: raw_symbol,
2036            order_id: venue_order_id.map(|v| v.to_string()),
2037            order_link_id: client_order_id.map(|c| c.to_string()),
2038            qty: quantity.map(|q| q.to_string()),
2039            price: price.map(|p| p.to_string()),
2040            trigger_price: None,
2041            take_profit: None,
2042            stop_loss: None,
2043            tp_trigger_by: None,
2044            sl_trigger_by: None,
2045        })
2046    }
2047
2048    fn default_headers() -> Vec<(String, String)> {
2049        vec![
2050            ("Content-Type".to_string(), "application/json".to_string()),
2051            ("User-Agent".to_string(), NAUTILUS_USER_AGENT.to_string()),
2052        ]
2053    }
2054
2055    async fn authenticate_if_required(&self) -> BybitWsResult<()> {
2056        if !self.requires_auth {
2057            return Ok(());
2058        }
2059
2060        let credential = self.credential.as_ref().ok_or_else(|| {
2061            BybitWsError::Authentication("Credentials required for authentication".to_string())
2062        })?;
2063
2064        let expires = chrono::Utc::now().timestamp_millis() + WEBSOCKET_AUTH_WINDOW_MS;
2065        let signature = credential.sign_websocket_auth(expires);
2066
2067        let auth_message = BybitAuthRequest {
2068            op: BybitWsOperation::Auth,
2069            args: vec![
2070                Value::String(credential.api_key().to_string()),
2071                Value::Number(expires.into()),
2072                Value::String(signature),
2073            ],
2074        };
2075
2076        let payload = serde_json::to_string(&auth_message)?;
2077
2078        self.cmd_tx
2079            .read()
2080            .await
2081            .send(HandlerCommand::Authenticate { payload })
2082            .map_err(|e| BybitWsError::Send(format!("Failed to send auth command: {e}")))?;
2083
2084        // Authentication will be processed asynchronously by the handler
2085        // The handler will emit NautilusWsMessage::Authenticated when successful
2086        Ok(())
2087    }
2088
2089    async fn send_text(&self, text: &str) -> BybitWsResult<()> {
2090        let cmd = HandlerCommand::SendText {
2091            payload: text.to_string(),
2092        };
2093
2094        self.send_cmd(cmd).await
2095    }
2096
2097    async fn send_cmd(&self, cmd: HandlerCommand) -> BybitWsResult<()> {
2098        self.cmd_tx
2099            .read()
2100            .await
2101            .send(cmd)
2102            .map_err(|e| BybitWsError::Send(e.to_string()))
2103    }
2104}
2105
2106////////////////////////////////////////////////////////////////////////////////
2107// Tests
2108////////////////////////////////////////////////////////////////////////////////
2109
2110#[cfg(test)]
2111mod tests {
2112    use rstest::rstest;
2113
2114    use super::*;
2115    use crate::{
2116        common::testing::load_test_json,
2117        websocket::{handler::FeedHandler, messages::BybitWsMessage},
2118    };
2119
2120    #[rstest]
2121    fn classify_orderbook_snapshot() {
2122        let json: Value = serde_json::from_str(&load_test_json("ws_orderbook_snapshot.json"))
2123            .expect("invalid fixture");
2124        let message =
2125            FeedHandler::classify_bybit_message(&json).expect("expected orderbook message");
2126        assert!(matches!(message, BybitWsMessage::Orderbook(_)));
2127    }
2128
2129    #[rstest]
2130    fn classify_trade_snapshot() {
2131        let json: Value =
2132            serde_json::from_str(&load_test_json("ws_public_trade.json")).expect("invalid fixture");
2133        let message = FeedHandler::classify_bybit_message(&json).expect("expected trade message");
2134        assert!(matches!(message, BybitWsMessage::Trade(_)));
2135    }
2136
2137    #[rstest]
2138    fn classify_ticker_linear_snapshot() {
2139        let json: Value = serde_json::from_str(&load_test_json("ws_ticker_linear.json"))
2140            .expect("invalid fixture");
2141        let message = FeedHandler::classify_bybit_message(&json).expect("expected ticker message");
2142        assert!(matches!(message, BybitWsMessage::TickerLinear(_)));
2143    }
2144
2145    #[rstest]
2146    fn classify_ticker_option_snapshot() {
2147        let json: Value = serde_json::from_str(&load_test_json("ws_ticker_option.json"))
2148            .expect("invalid fixture");
2149        let message = FeedHandler::classify_bybit_message(&json).expect("expected ticker message");
2150        assert!(matches!(message, BybitWsMessage::TickerOption(_)));
2151    }
2152
2153    #[rstest]
2154    fn test_race_unsubscribe_failure_recovery() {
2155        // Simulates the race condition where venue rejects an unsubscribe request.
2156        // The adapter must perform the 3-step recovery:
2157        // 1. confirm_unsubscribe() - clear pending_unsubscribe
2158        // 2. mark_subscribe() - mark as subscribing again
2159        // 3. confirm_subscribe() - restore to confirmed state
2160        let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER); // Bybit uses dot delimiter
2161
2162        let topic = "publicTrade.BTCUSDT";
2163
2164        // Initial subscribe flow
2165        subscriptions.mark_subscribe(topic);
2166        subscriptions.confirm_subscribe(topic);
2167        assert_eq!(subscriptions.len(), 1);
2168
2169        // User unsubscribes
2170        subscriptions.mark_unsubscribe(topic);
2171        assert_eq!(subscriptions.len(), 0);
2172        assert_eq!(subscriptions.pending_unsubscribe_topics(), vec![topic]);
2173
2174        // Venue REJECTS the unsubscribe (error message)
2175        // Adapter must perform 3-step recovery (from lines 2181-2183)
2176        subscriptions.confirm_unsubscribe(topic); // Step 1: clear pending_unsubscribe
2177        subscriptions.mark_subscribe(topic); // Step 2: mark as subscribing
2178        subscriptions.confirm_subscribe(topic); // Step 3: confirm subscription
2179
2180        // Verify recovery: topic should be back in confirmed state
2181        assert_eq!(subscriptions.len(), 1);
2182        assert!(subscriptions.pending_unsubscribe_topics().is_empty());
2183        assert!(subscriptions.pending_subscribe_topics().is_empty());
2184
2185        // Verify topic is in all_topics() for reconnect
2186        let all = subscriptions.all_topics();
2187        assert_eq!(all.len(), 1);
2188        assert!(all.contains(&topic.to_string()));
2189    }
2190
2191    #[rstest]
2192    fn test_race_resubscribe_before_unsubscribe_ack() {
2193        // Simulates: User unsubscribes, then immediately resubscribes before
2194        // the unsubscribe ACK arrives from the venue.
2195        // This is the race condition fixed in the subscription tracker.
2196        let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER); // Bybit uses dot delimiter
2197
2198        let topic = "orderbook.50.BTCUSDT";
2199
2200        // Initial subscribe
2201        subscriptions.mark_subscribe(topic);
2202        subscriptions.confirm_subscribe(topic);
2203        assert_eq!(subscriptions.len(), 1);
2204
2205        // User unsubscribes
2206        subscriptions.mark_unsubscribe(topic);
2207        assert_eq!(subscriptions.len(), 0);
2208        assert_eq!(subscriptions.pending_unsubscribe_topics(), vec![topic]);
2209
2210        // User immediately changes mind and resubscribes (before unsubscribe ACK)
2211        subscriptions.mark_subscribe(topic);
2212        assert_eq!(subscriptions.pending_subscribe_topics(), vec![topic]);
2213
2214        // NOW the unsubscribe ACK arrives - should NOT clear pending_subscribe
2215        subscriptions.confirm_unsubscribe(topic);
2216        assert!(subscriptions.pending_unsubscribe_topics().is_empty());
2217        assert_eq!(subscriptions.pending_subscribe_topics(), vec![topic]);
2218
2219        // Subscribe ACK arrives
2220        subscriptions.confirm_subscribe(topic);
2221        assert_eq!(subscriptions.len(), 1);
2222        assert!(subscriptions.pending_subscribe_topics().is_empty());
2223
2224        // Verify final state is correct
2225        let all = subscriptions.all_topics();
2226        assert_eq!(all.len(), 1);
2227        assert!(all.contains(&topic.to_string()));
2228    }
2229
2230    #[rstest]
2231    fn test_race_late_subscribe_confirmation_after_unsubscribe() {
2232        // Simulates: User subscribes, then unsubscribes before subscribe ACK arrives.
2233        // The late subscribe ACK should be ignored.
2234        let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER); // Bybit uses dot delimiter
2235
2236        let topic = "tickers.ETHUSDT";
2237
2238        // User subscribes
2239        subscriptions.mark_subscribe(topic);
2240        assert_eq!(subscriptions.pending_subscribe_topics(), vec![topic]);
2241
2242        // User immediately unsubscribes (before subscribe ACK)
2243        subscriptions.mark_unsubscribe(topic);
2244        assert!(subscriptions.pending_subscribe_topics().is_empty()); // Cleared
2245        assert_eq!(subscriptions.pending_unsubscribe_topics(), vec![topic]);
2246
2247        // Late subscribe confirmation arrives - should be IGNORED
2248        subscriptions.confirm_subscribe(topic);
2249        assert_eq!(subscriptions.len(), 0); // Not added to confirmed
2250        assert_eq!(subscriptions.pending_unsubscribe_topics(), vec![topic]);
2251
2252        // Unsubscribe ACK arrives
2253        subscriptions.confirm_unsubscribe(topic);
2254
2255        // Final state: completely empty
2256        assert!(subscriptions.is_empty());
2257        assert!(subscriptions.all_topics().is_empty());
2258    }
2259
2260    #[rstest]
2261    fn test_race_reconnection_with_pending_states() {
2262        // Simulates reconnection with mixed pending states.
2263        let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER); // Bybit uses dot delimiter
2264
2265        // Set up mixed state before reconnection
2266        // Confirmed: publicTrade.BTCUSDT
2267        let trade_btc = "publicTrade.BTCUSDT";
2268        subscriptions.mark_subscribe(trade_btc);
2269        subscriptions.confirm_subscribe(trade_btc);
2270
2271        // Pending subscribe: publicTrade.ETHUSDT
2272        let trade_eth = "publicTrade.ETHUSDT";
2273        subscriptions.mark_subscribe(trade_eth);
2274
2275        // Pending unsubscribe: orderbook.50.BTCUSDT (user cancelled)
2276        let book_btc = "orderbook.50.BTCUSDT";
2277        subscriptions.mark_subscribe(book_btc);
2278        subscriptions.confirm_subscribe(book_btc);
2279        subscriptions.mark_unsubscribe(book_btc);
2280
2281        // Get topics for reconnection
2282        let topics_to_restore = subscriptions.all_topics();
2283
2284        // Should include: confirmed + pending_subscribe (NOT pending_unsubscribe)
2285        assert_eq!(topics_to_restore.len(), 2);
2286        assert!(topics_to_restore.contains(&trade_btc.to_string()));
2287        assert!(topics_to_restore.contains(&trade_eth.to_string()));
2288        assert!(!topics_to_restore.contains(&book_btc.to_string())); // Excluded
2289    }
2290
2291    #[rstest]
2292    fn test_race_duplicate_subscribe_messages_idempotent() {
2293        // Simulates duplicate subscribe requests (e.g., from reconnection logic).
2294        // The subscription tracker should be idempotent and not create duplicate state.
2295        let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER); // Bybit uses dot delimiter
2296
2297        let topic = "publicTrade.BTCUSDT";
2298
2299        // Subscribe and confirm
2300        subscriptions.mark_subscribe(topic);
2301        subscriptions.confirm_subscribe(topic);
2302        assert_eq!(subscriptions.len(), 1);
2303
2304        // Duplicate mark_subscribe on already-confirmed topic (should be no-op)
2305        subscriptions.mark_subscribe(topic);
2306        assert!(subscriptions.pending_subscribe_topics().is_empty()); // Not re-added
2307        assert_eq!(subscriptions.len(), 1); // Still just 1
2308
2309        // Duplicate confirm_subscribe (should be idempotent)
2310        subscriptions.confirm_subscribe(topic);
2311        assert_eq!(subscriptions.len(), 1);
2312
2313        // Verify final state
2314        let all = subscriptions.all_topics();
2315        assert_eq!(all.len(), 1);
2316        assert_eq!(all[0], topic);
2317    }
2318
2319    #[rstest]
2320    #[case::spot_with_leverage(BybitProductType::Spot, true, Some(1))]
2321    #[case::spot_without_leverage(BybitProductType::Spot, false, Some(0))]
2322    #[case::linear_with_leverage(BybitProductType::Linear, true, None)]
2323    #[case::linear_without_leverage(BybitProductType::Linear, false, None)]
2324    #[case::inverse_with_leverage(BybitProductType::Inverse, true, None)]
2325    #[case::option_with_leverage(BybitProductType::Option, true, None)]
2326    fn test_is_leverage_parameter(
2327        #[case] product_type: BybitProductType,
2328        #[case] is_leverage: bool,
2329        #[case] expected: Option<i32>,
2330    ) {
2331        let symbol = match product_type {
2332            BybitProductType::Spot => "BTCUSDT-SPOT.BYBIT",
2333            BybitProductType::Linear => "ETHUSDT-LINEAR.BYBIT",
2334            BybitProductType::Inverse => "BTCUSD-INVERSE.BYBIT",
2335            BybitProductType::Option => "BTC-31MAY24-50000-C-OPTION.BYBIT",
2336        };
2337
2338        let instrument_id = InstrumentId::from(symbol);
2339        let client_order_id = ClientOrderId::from("test-order-1");
2340        let quantity = Quantity::from("1.0");
2341
2342        let client = BybitWebSocketClient::new_trade(
2343            BybitEnvironment::Testnet,
2344            Credential::new("test-key", "test-secret"),
2345            None,
2346            Some(20),
2347        );
2348
2349        let params = client
2350            .build_place_order_params(
2351                product_type,
2352                instrument_id,
2353                client_order_id,
2354                OrderSide::Buy,
2355                OrderType::Limit,
2356                quantity,
2357                false, // is_quote_quantity
2358                Some(TimeInForce::Gtc),
2359                Some(Price::from("50000.0")),
2360                None,
2361                None,
2362                None,
2363                is_leverage,
2364            )
2365            .expect("Failed to build params");
2366
2367        assert_eq!(params.is_leverage, expected);
2368    }
2369
2370    #[rstest]
2371    #[case::spot_market_quote_quantity(BybitProductType::Spot, OrderType::Market, true, Some(BYBIT_QUOTE_COIN.to_string()))]
2372    #[case::spot_market_base_quantity(BybitProductType::Spot, OrderType::Market, false, Some(BYBIT_BASE_COIN.to_string()))]
2373    #[case::spot_limit_no_unit(BybitProductType::Spot, OrderType::Limit, false, None)]
2374    #[case::spot_limit_quote(BybitProductType::Spot, OrderType::Limit, true, None)]
2375    #[case::linear_market_no_unit(BybitProductType::Linear, OrderType::Market, false, None)]
2376    #[case::inverse_market_no_unit(BybitProductType::Inverse, OrderType::Market, true, None)]
2377    fn test_is_quote_quantity_parameter(
2378        #[case] product_type: BybitProductType,
2379        #[case] order_type: OrderType,
2380        #[case] is_quote_quantity: bool,
2381        #[case] expected: Option<String>,
2382    ) {
2383        let symbol = match product_type {
2384            BybitProductType::Spot => "BTCUSDT-SPOT.BYBIT",
2385            BybitProductType::Linear => "ETHUSDT-LINEAR.BYBIT",
2386            BybitProductType::Inverse => "BTCUSD-INVERSE.BYBIT",
2387            BybitProductType::Option => "BTC-31MAY24-50000-C-OPTION.BYBIT",
2388        };
2389
2390        let instrument_id = InstrumentId::from(symbol);
2391        let client_order_id = ClientOrderId::from("test-order-1");
2392        let quantity = Quantity::from("1.0");
2393
2394        let client = BybitWebSocketClient::new_trade(
2395            BybitEnvironment::Testnet,
2396            Credential::new("test-key", "test-secret"),
2397            None,
2398            Some(20),
2399        );
2400
2401        let params = client
2402            .build_place_order_params(
2403                product_type,
2404                instrument_id,
2405                client_order_id,
2406                OrderSide::Buy,
2407                order_type,
2408                quantity,
2409                is_quote_quantity,
2410                Some(TimeInForce::Gtc),
2411                if order_type == OrderType::Market {
2412                    None
2413                } else {
2414                    Some(Price::from("50000.0"))
2415                },
2416                None,
2417                None,
2418                None,
2419                false,
2420            )
2421            .expect("Failed to build params");
2422
2423        assert_eq!(params.market_unit, expected);
2424    }
2425}