nautilus_bybit/websocket/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Bybit WebSocket client providing public market data streaming.
17//!
18//! Bybit API reference <https://bybit-exchange.github.io/docs/>.
19
20use std::{
21    fmt::Debug,
22    sync::{
23        Arc,
24        atomic::{AtomicBool, AtomicU8, Ordering},
25    },
26    time::Duration,
27};
28
29use ahash::AHashMap;
30use arc_swap::ArcSwap;
31use dashmap::DashMap;
32use nautilus_common::live::runtime::get_runtime;
33use nautilus_core::{UUID4, consts::NAUTILUS_USER_AGENT, env::get_or_env_var_opt};
34use nautilus_model::{
35    enums::{OrderSide, OrderType, TimeInForce},
36    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
37    instruments::{Instrument, InstrumentAny},
38    types::{Price, Quantity},
39};
40use nautilus_network::{
41    backoff::ExponentialBackoff,
42    mode::ConnectionMode,
43    websocket::{
44        AuthTracker, PingHandler, SubscriptionState, WebSocketClient, WebSocketConfig,
45        channel_message_handler,
46    },
47};
48use serde_json::Value;
49use tokio_util::sync::CancellationToken;
50use ustr::Ustr;
51
52use crate::{
53    common::{
54        consts::{
55            BYBIT_BASE_COIN, BYBIT_NAUTILUS_BROKER_ID, BYBIT_QUOTE_COIN, BYBIT_WS_TOPIC_DELIMITER,
56        },
57        credential::Credential,
58        enums::{
59            BybitEnvironment, BybitOrderSide, BybitOrderType, BybitProductType, BybitTimeInForce,
60            BybitTriggerDirection, BybitTriggerType, BybitWsOrderRequestOp,
61        },
62        parse::{extract_raw_symbol, make_bybit_symbol},
63        symbol::BybitSymbol,
64        urls::{bybit_ws_private_url, bybit_ws_public_url, bybit_ws_trade_url},
65    },
66    websocket::{
67        enums::{BybitWsOperation, BybitWsPrivateChannel, BybitWsPublicChannel},
68        error::{BybitWsError, BybitWsResult},
69        handler::{FeedHandler, HandlerCommand},
70        messages::{
71            BybitAuthRequest, BybitSubscription, BybitWsAmendOrderParams, BybitWsBatchCancelItem,
72            BybitWsBatchCancelOrderArgs, BybitWsBatchPlaceItem, BybitWsBatchPlaceOrderArgs,
73            BybitWsCancelOrderParams, BybitWsHeader, BybitWsPlaceOrderParams, BybitWsRequest,
74            NautilusWsMessage,
75        },
76    },
77};
78
79const DEFAULT_HEARTBEAT_SECS: u64 = 20;
80const WEBSOCKET_AUTH_WINDOW_MS: i64 = 5_000;
81const BATCH_PROCESSING_LIMIT: usize = 20;
82
83/// Type alias for the funding rate cache.
84type FundingCache = Arc<tokio::sync::RwLock<AHashMap<Ustr, (Option<String>, Option<String>)>>>;
85
86/// Resolves credentials from provided values or environment variables.
87///
88/// Priority for environment variables based on environment:
89/// - Demo: `BYBIT_DEMO_API_KEY`, `BYBIT_DEMO_API_SECRET`
90/// - Testnet: `BYBIT_TESTNET_API_KEY`, `BYBIT_TESTNET_API_SECRET`
91/// - Mainnet: `BYBIT_API_KEY`, `BYBIT_API_SECRET`
92fn resolve_credential(
93    environment: BybitEnvironment,
94    api_key: Option<String>,
95    api_secret: Option<String>,
96) -> Option<Credential> {
97    let (api_key_env, api_secret_env) = match environment {
98        BybitEnvironment::Demo => ("BYBIT_DEMO_API_KEY", "BYBIT_DEMO_API_SECRET"),
99        BybitEnvironment::Testnet => ("BYBIT_TESTNET_API_KEY", "BYBIT_TESTNET_API_SECRET"),
100        BybitEnvironment::Mainnet => ("BYBIT_API_KEY", "BYBIT_API_SECRET"),
101    };
102
103    let key = get_or_env_var_opt(api_key, api_key_env);
104    let secret = get_or_env_var_opt(api_secret, api_secret_env);
105
106    match (key, secret) {
107        (Some(k), Some(s)) => Some(Credential::new(k, s)),
108        _ => None,
109    }
110}
111
112/// Public/market data WebSocket client for Bybit.
113#[cfg_attr(feature = "python", pyo3::pyclass)]
114pub struct BybitWebSocketClient {
115    url: String,
116    environment: BybitEnvironment,
117    product_type: Option<BybitProductType>,
118    credential: Option<Credential>,
119    requires_auth: bool,
120    auth_tracker: AuthTracker,
121    heartbeat: Option<u64>,
122    connection_mode: Arc<ArcSwap<AtomicU8>>,
123    cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
124    out_rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>>,
125    signal: Arc<AtomicBool>,
126    task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
127    subscriptions: SubscriptionState,
128    is_authenticated: Arc<AtomicBool>,
129    account_id: Option<AccountId>,
130    mm_level: Arc<AtomicU8>,
131    instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
132    funding_cache: FundingCache,
133    cancellation_token: CancellationToken,
134}
135
136impl Debug for BybitWebSocketClient {
137    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138        f.debug_struct("BybitWebSocketClient")
139            .field("url", &self.url)
140            .field("environment", &self.environment)
141            .field("product_type", &self.product_type)
142            .field("requires_auth", &self.requires_auth)
143            .field("heartbeat", &self.heartbeat)
144            .field("confirmed_subscriptions", &self.subscriptions.len())
145            .finish()
146    }
147}
148
149impl Clone for BybitWebSocketClient {
150    fn clone(&self) -> Self {
151        Self {
152            url: self.url.clone(),
153            environment: self.environment,
154            product_type: self.product_type,
155            credential: self.credential.clone(),
156            requires_auth: self.requires_auth,
157            auth_tracker: self.auth_tracker.clone(),
158            heartbeat: self.heartbeat,
159            connection_mode: Arc::clone(&self.connection_mode),
160            cmd_tx: Arc::clone(&self.cmd_tx),
161            out_rx: None, // Each clone gets its own receiver
162            signal: Arc::clone(&self.signal),
163            task_handle: None, // Each clone gets its own task handle
164            subscriptions: self.subscriptions.clone(),
165            is_authenticated: Arc::clone(&self.is_authenticated),
166            account_id: self.account_id,
167            mm_level: Arc::clone(&self.mm_level),
168            instruments_cache: Arc::clone(&self.instruments_cache),
169            funding_cache: Arc::clone(&self.funding_cache),
170            cancellation_token: self.cancellation_token.clone(),
171        }
172    }
173}
174
175impl BybitWebSocketClient {
176    /// Creates a new Bybit public WebSocket client.
177    #[must_use]
178    pub fn new_public(url: Option<String>, heartbeat: Option<u64>) -> Self {
179        Self::new_public_with(
180            BybitProductType::Linear,
181            BybitEnvironment::Mainnet,
182            url,
183            heartbeat,
184        )
185    }
186
187    /// Creates a new Bybit public WebSocket client targeting the specified product/environment.
188    #[must_use]
189    pub fn new_public_with(
190        product_type: BybitProductType,
191        environment: BybitEnvironment,
192        url: Option<String>,
193        heartbeat: Option<u64>,
194    ) -> Self {
195        // We don't have a handler yet; this placeholder keeps cache_instrument() working.
196        // connect() swaps in the real channel and replays any queued instruments so the
197        // handler sees them once it starts.
198        let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
199
200        let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
201        let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
202
203        Self {
204            url: url.unwrap_or_else(|| bybit_ws_public_url(product_type, environment)),
205            environment,
206            product_type: Some(product_type),
207            credential: None,
208            requires_auth: false,
209            auth_tracker: AuthTracker::new(),
210            heartbeat: heartbeat.or(Some(DEFAULT_HEARTBEAT_SECS)),
211            connection_mode,
212            cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
213            out_rx: None,
214            signal: Arc::new(AtomicBool::new(false)),
215            task_handle: None,
216            subscriptions: SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER),
217            is_authenticated: Arc::new(AtomicBool::new(false)),
218            instruments_cache: Arc::new(DashMap::new()),
219            account_id: None,
220            funding_cache: Arc::new(tokio::sync::RwLock::new(AHashMap::new())),
221            cancellation_token: CancellationToken::new(),
222            mm_level: Arc::new(AtomicU8::new(0)),
223        }
224    }
225
226    /// Creates a new Bybit private WebSocket client.
227    ///
228    /// If `api_key` or `api_secret` are not provided, they will be loaded from
229    /// environment variables based on the environment:
230    /// - Demo: `BYBIT_DEMO_API_KEY`, `BYBIT_DEMO_API_SECRET`
231    /// - Testnet: `BYBIT_TESTNET_API_KEY`, `BYBIT_TESTNET_API_SECRET`
232    /// - Mainnet: `BYBIT_API_KEY`, `BYBIT_API_SECRET`
233    #[must_use]
234    pub fn new_private(
235        environment: BybitEnvironment,
236        api_key: Option<String>,
237        api_secret: Option<String>,
238        url: Option<String>,
239        heartbeat: Option<u64>,
240    ) -> Self {
241        let credential = resolve_credential(environment, api_key, api_secret);
242
243        // We don't have a handler yet; this placeholder keeps cache_instrument() working.
244        // connect() swaps in the real channel and replays any queued instruments so the
245        // handler sees them once it starts.
246        let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
247
248        let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
249        let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
250
251        Self {
252            url: url.unwrap_or_else(|| bybit_ws_private_url(environment).to_string()),
253            environment,
254            product_type: None,
255            credential,
256            requires_auth: true,
257            auth_tracker: AuthTracker::new(),
258            heartbeat: heartbeat.or(Some(DEFAULT_HEARTBEAT_SECS)),
259            connection_mode,
260            cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
261            out_rx: None,
262            signal: Arc::new(AtomicBool::new(false)),
263            task_handle: None,
264            subscriptions: SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER),
265            is_authenticated: Arc::new(AtomicBool::new(false)),
266            instruments_cache: Arc::new(DashMap::new()),
267            account_id: None,
268            funding_cache: Arc::new(tokio::sync::RwLock::new(AHashMap::new())),
269            cancellation_token: CancellationToken::new(),
270            mm_level: Arc::new(AtomicU8::new(0)),
271        }
272    }
273
274    /// Creates a new Bybit trade WebSocket client for order operations.
275    ///
276    /// If `api_key` or `api_secret` are not provided, they will be loaded from
277    /// environment variables based on the environment:
278    /// - Demo: `BYBIT_DEMO_API_KEY`, `BYBIT_DEMO_API_SECRET`
279    /// - Testnet: `BYBIT_TESTNET_API_KEY`, `BYBIT_TESTNET_API_SECRET`
280    /// - Mainnet: `BYBIT_API_KEY`, `BYBIT_API_SECRET`
281    #[must_use]
282    pub fn new_trade(
283        environment: BybitEnvironment,
284        api_key: Option<String>,
285        api_secret: Option<String>,
286        url: Option<String>,
287        heartbeat: Option<u64>,
288    ) -> Self {
289        let credential = resolve_credential(environment, api_key, api_secret);
290
291        // We don't have a handler yet; this placeholder keeps cache_instrument() working.
292        // connect() swaps in the real channel and replays any queued instruments so the
293        // handler sees them once it starts.
294        let (cmd_tx, _) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
295
296        let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
297        let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
298
299        Self {
300            url: url.unwrap_or_else(|| bybit_ws_trade_url(environment).to_string()),
301            environment,
302            product_type: None,
303            credential,
304            requires_auth: true,
305            auth_tracker: AuthTracker::new(),
306            heartbeat: heartbeat.or(Some(DEFAULT_HEARTBEAT_SECS)),
307            connection_mode,
308            cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
309            out_rx: None,
310            signal: Arc::new(AtomicBool::new(false)),
311            task_handle: None,
312            subscriptions: SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER),
313            is_authenticated: Arc::new(AtomicBool::new(false)),
314            instruments_cache: Arc::new(DashMap::new()),
315            account_id: None,
316            funding_cache: Arc::new(tokio::sync::RwLock::new(AHashMap::new())),
317            cancellation_token: CancellationToken::new(),
318            mm_level: Arc::new(AtomicU8::new(0)),
319        }
320    }
321
322    /// Establishes the WebSocket connection.
323    ///
324    /// # Errors
325    ///
326    /// Returns an error if the underlying WebSocket connection cannot be established,
327    /// after retrying multiple times with exponential backoff.
328    pub async fn connect(&mut self) -> BybitWsResult<()> {
329        self.signal.store(false, Ordering::Relaxed);
330
331        let (raw_handler, raw_rx) = channel_message_handler();
332
333        // No-op ping handler: handler owns the WebSocketClient and responds to pings directly
334        // in the message loop for minimal latency (see handler.rs pong response)
335        let ping_handler: PingHandler = Arc::new(move |_payload: Vec<u8>| {
336            // Handler responds to pings internally via select! loop
337        });
338
339        let ping_msg = serde_json::to_string(&BybitSubscription {
340            op: BybitWsOperation::Ping,
341            args: vec![],
342        })?;
343
344        let config = WebSocketConfig {
345            url: self.url.clone(),
346            headers: Self::default_headers(),
347            message_handler: Some(raw_handler),
348            heartbeat: self.heartbeat,
349            heartbeat_msg: Some(ping_msg),
350            ping_handler: Some(ping_handler),
351            reconnect_timeout_ms: Some(5_000),
352            reconnect_delay_initial_ms: Some(500),
353            reconnect_delay_max_ms: Some(5_000),
354            reconnect_backoff_factor: Some(1.5),
355            reconnect_jitter_ms: Some(250),
356            reconnect_max_attempts: None,
357        };
358
359        // Retry initial connection with exponential backoff to handle transient DNS/network issues
360        // TODO: Eventually expose client config options for this
361        const MAX_RETRIES: u32 = 5;
362        const CONNECTION_TIMEOUT_SECS: u64 = 10;
363
364        let mut backoff = ExponentialBackoff::new(
365            Duration::from_millis(500),
366            Duration::from_millis(5000),
367            2.0,
368            250,
369            false,
370        )
371        .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
372
373        #[allow(unused_assignments)]
374        let mut last_error = String::new();
375        let mut attempt = 0;
376        let client = loop {
377            attempt += 1;
378
379            match tokio::time::timeout(
380                Duration::from_secs(CONNECTION_TIMEOUT_SECS),
381                WebSocketClient::connect(config.clone(), None, vec![], None),
382            )
383            .await
384            {
385                Ok(Ok(client)) => {
386                    if attempt > 1 {
387                        tracing::info!("WebSocket connection established after {attempt} attempts");
388                    }
389                    break client;
390                }
391                Ok(Err(e)) => {
392                    last_error = e.to_string();
393                    tracing::warn!(
394                        attempt,
395                        max_retries = MAX_RETRIES,
396                        url = %self.url,
397                        error = %last_error,
398                        "WebSocket connection attempt failed"
399                    );
400                }
401                Err(_) => {
402                    last_error = format!(
403                        "Connection timeout after {CONNECTION_TIMEOUT_SECS}s (possible DNS resolution failure)"
404                    );
405                    tracing::warn!(
406                        attempt,
407                        max_retries = MAX_RETRIES,
408                        url = %self.url,
409                        "WebSocket connection attempt timed out"
410                    );
411                }
412            }
413
414            if attempt >= MAX_RETRIES {
415                return Err(BybitWsError::Transport(format!(
416                    "Failed to connect to {} after {MAX_RETRIES} attempts: {}. \
417                    If this is a DNS error, check your network configuration and DNS settings.",
418                    self.url,
419                    if last_error.is_empty() {
420                        "unknown error"
421                    } else {
422                        &last_error
423                    }
424                )));
425            }
426
427            let delay = backoff.next_duration();
428            tracing::debug!(
429                "Retrying in {delay:?} (attempt {}/{MAX_RETRIES})",
430                attempt + 1
431            );
432            tokio::time::sleep(delay).await;
433        };
434
435        self.connection_mode.store(client.connection_mode_atomic());
436
437        let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
438        self.out_rx = Some(Arc::new(out_rx));
439
440        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
441        *self.cmd_tx.write().await = cmd_tx.clone();
442
443        let cmd = HandlerCommand::SetClient(client);
444
445        self.send_cmd(cmd).await?;
446
447        // Replay cached instruments to the new handler via the new channel
448        if !self.instruments_cache.is_empty() {
449            let cached_instruments: Vec<InstrumentAny> = self
450                .instruments_cache
451                .iter()
452                .map(|entry| entry.value().clone())
453                .collect();
454            let cmd = HandlerCommand::InitializeInstruments(cached_instruments);
455            self.send_cmd(cmd).await?;
456        }
457
458        let signal = Arc::clone(&self.signal);
459        let subscriptions = self.subscriptions.clone();
460        let credential = self.credential.clone();
461        let requires_auth = self.requires_auth;
462        let funding_cache = Arc::clone(&self.funding_cache);
463        let account_id = self.account_id;
464        let product_type = self.product_type;
465        let mm_level = Arc::clone(&self.mm_level);
466        let cmd_tx_for_reconnect = cmd_tx.clone();
467        let auth_tracker = self.auth_tracker.clone();
468        let is_authenticated = Arc::clone(&self.is_authenticated);
469
470        let stream_handle = get_runtime().spawn(async move {
471            let mut handler = FeedHandler::new(
472                signal.clone(),
473                cmd_rx,
474                raw_rx,
475                out_tx.clone(),
476                account_id,
477                product_type,
478                mm_level.clone(),
479                auth_tracker,
480                subscriptions.clone(),
481                funding_cache.clone(),
482            );
483
484            // Helper closure to resubscribe all tracked subscriptions after reconnection
485            let resubscribe_all = || async {
486                let topics = subscriptions.all_topics();
487
488                if topics.is_empty() {
489                    return;
490                }
491
492                tracing::debug!(count = topics.len(), "Resubscribing to confirmed subscriptions");
493
494                for topic in &topics {
495                    subscriptions.mark_subscribe(topic.as_str());
496                }
497
498                let mut payloads = Vec::with_capacity(topics.len());
499                for topic in &topics {
500                    let message = BybitSubscription {
501                        op: BybitWsOperation::Subscribe,
502                        args: vec![topic.clone()],
503                    };
504                    if let Ok(payload) = serde_json::to_string(&message) {
505                        payloads.push(payload);
506                    }
507                }
508
509                let cmd = HandlerCommand::Subscribe { topics: payloads };
510
511                if let Err(e) = cmd_tx_for_reconnect.send(cmd) {
512                    tracing::error!("Failed to send resubscribe command: {e}");
513                }
514            };
515
516            // Run message processing with reconnection handling
517            loop {
518                match handler.next().await {
519                    Some(NautilusWsMessage::Reconnected) => {
520                        if signal.load(Ordering::Relaxed) {
521                            continue;
522                        }
523
524                        tracing::info!("WebSocket reconnected");
525
526                        // Mark all confirmed subscriptions as failed so they transition to pending state
527                        let confirmed_topics: Vec<String> = {
528                            let confirmed = subscriptions.confirmed();
529                            let mut topics = Vec::new();
530                            for entry in confirmed.iter() {
531                                let (channel, symbols) = entry.pair();
532                                for symbol in symbols {
533                                    if symbol.is_empty() {
534                                        topics.push(channel.to_string());
535                                    } else {
536                                        topics.push(format!("{channel}.{symbol}"));
537                                    }
538                                }
539                            }
540                            topics
541                        };
542
543                        if !confirmed_topics.is_empty() {
544                            tracing::debug!(count = confirmed_topics.len(), "Marking confirmed subscriptions as pending for replay");
545                            for topic in confirmed_topics {
546                                subscriptions.mark_failure(&topic);
547                            }
548                        }
549
550                        // Clear caches to prevent stale data after reconnection
551                        funding_cache.write().await.clear();
552
553                        if requires_auth {
554                            is_authenticated.store(false, Ordering::Relaxed);
555                            tracing::debug!("Re-authenticating after reconnection");
556
557                            if let Some(cred) = &credential {
558                                let expires = chrono::Utc::now().timestamp_millis() + WEBSOCKET_AUTH_WINDOW_MS;
559                                let signature = cred.sign_websocket_auth(expires);
560
561                                let auth_message = BybitAuthRequest {
562                                    op: BybitWsOperation::Auth,
563                                    args: vec![
564                                        Value::String(cred.api_key().to_string()),
565                                        Value::Number(expires.into()),
566                                        Value::String(signature),
567                                    ],
568                                };
569
570                                if let Ok(payload) = serde_json::to_string(&auth_message) {
571                                    let cmd = HandlerCommand::Authenticate { payload };
572                                    if let Err(e) = cmd_tx_for_reconnect.send(cmd) {
573                                        tracing::error!(error = %e, "Failed to send reconnection auth command");
574                                    }
575                                } else {
576                                    tracing::error!("Failed to serialize reconnection auth message");
577                                }
578                            }
579                        }
580
581                        // Unauthenticated sessions resubscribe immediately after reconnection,
582                        // authenticated sessions wait for Authenticated message
583                        if !requires_auth {
584                            tracing::debug!("No authentication required, resubscribing immediately");
585                            resubscribe_all().await;
586                        }
587
588                        // Forward to out_tx so caller sees the Reconnected message
589                        if out_tx.send(NautilusWsMessage::Reconnected).is_err() {
590                            tracing::debug!("Receiver dropped, stopping");
591                            break;
592                        }
593                        continue;
594                    }
595                    Some(NautilusWsMessage::Authenticated) => {
596                        tracing::debug!("Authenticated, resubscribing");
597                        is_authenticated.store(true, Ordering::Relaxed);
598                        resubscribe_all().await;
599                        continue;
600                    }
601                    Some(msg) => {
602                        if out_tx.send(msg).is_err() {
603                            tracing::error!("Failed to send message (receiver dropped)");
604                            break;
605                        }
606                    }
607                    None => {
608                        // Stream ended - check if it's a stop signal
609                        if handler.is_stopped() {
610                            tracing::debug!("Stop signal received, ending message processing");
611                            break;
612                        }
613                        // Otherwise it's an unexpected stream end
614                        tracing::warn!("WebSocket stream ended unexpectedly");
615                        break;
616                    }
617                }
618            }
619
620            tracing::debug!("Handler task exiting");
621        });
622
623        self.task_handle = Some(Arc::new(stream_handle));
624
625        if requires_auth && let Err(e) = self.authenticate_if_required().await {
626            return Err(e);
627        }
628
629        Ok(())
630    }
631
632    /// Disconnects the WebSocket client and stops the background task.
633    pub async fn close(&mut self) -> BybitWsResult<()> {
634        tracing::debug!("Starting close process");
635
636        self.signal.store(true, Ordering::Relaxed);
637
638        let cmd = HandlerCommand::Disconnect;
639        if let Err(e) = self.cmd_tx.read().await.send(cmd) {
640            tracing::debug!(
641                "Failed to send disconnect command (handler may already be shut down): {e}"
642            );
643        }
644
645        if let Some(task_handle) = self.task_handle.take() {
646            match Arc::try_unwrap(task_handle) {
647                Ok(handle) => {
648                    tracing::debug!("Waiting for task handle to complete");
649                    match tokio::time::timeout(Duration::from_secs(2), handle).await {
650                        Ok(Ok(())) => tracing::debug!("Task handle completed successfully"),
651                        Ok(Err(e)) => tracing::error!("Task handle encountered an error: {e:?}"),
652                        Err(_) => {
653                            tracing::warn!(
654                                "Timeout waiting for task handle, task may still be running"
655                            );
656                            // The task will be dropped and should clean up automatically
657                        }
658                    }
659                }
660                Err(arc_handle) => {
661                    tracing::debug!(
662                        "Cannot take ownership of task handle - other references exist, aborting task"
663                    );
664                    arc_handle.abort();
665                }
666            }
667        } else {
668            tracing::debug!("No task handle to await");
669        }
670
671        self.is_authenticated.store(false, Ordering::Relaxed);
672
673        tracing::debug!("Closed");
674
675        Ok(())
676    }
677
678    /// Returns a value indicating whether the client is active.
679    #[must_use]
680    pub fn is_active(&self) -> bool {
681        let connection_mode_arc = self.connection_mode.load();
682        ConnectionMode::from_atomic(&connection_mode_arc).is_active()
683            && !self.signal.load(Ordering::Relaxed)
684    }
685
686    /// Returns a value indicating whether the client is closed.
687    pub fn is_closed(&self) -> bool {
688        let connection_mode_arc = self.connection_mode.load();
689        ConnectionMode::from_atomic(&connection_mode_arc).is_closed()
690            || self.signal.load(Ordering::Relaxed)
691    }
692
693    /// Waits until the WebSocket client becomes active or times out.
694    ///
695    /// # Errors
696    ///
697    /// Returns an error if the timeout is exceeded before the client becomes active.
698    pub async fn wait_until_active(&self, timeout_secs: f64) -> BybitWsResult<()> {
699        let timeout = tokio::time::Duration::from_secs_f64(timeout_secs);
700
701        tokio::time::timeout(timeout, async {
702            while !self.is_active() {
703                tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
704            }
705        })
706        .await
707        .map_err(|_| {
708            BybitWsError::ClientError(format!(
709                "WebSocket connection timeout after {timeout_secs} seconds"
710            ))
711        })?;
712
713        Ok(())
714    }
715
716    /// Subscribe to the provided topic strings.
717    pub async fn subscribe(&self, topics: Vec<String>) -> BybitWsResult<()> {
718        if topics.is_empty() {
719            return Ok(());
720        }
721
722        tracing::debug!("Subscribing to topics: {topics:?}");
723
724        // Use reference counting to deduplicate subscriptions
725        let mut topics_to_send = Vec::new();
726
727        for topic in topics {
728            // Returns true if this is the first subscription (ref count 0 -> 1)
729            if self.subscriptions.add_reference(&topic) {
730                self.subscriptions.mark_subscribe(&topic);
731                topics_to_send.push(topic.clone());
732            } else {
733                tracing::debug!("Already subscribed to {topic}, skipping duplicate subscription");
734            }
735        }
736
737        if topics_to_send.is_empty() {
738            return Ok(());
739        }
740
741        // Serialize subscription messages
742        let mut payloads = Vec::with_capacity(topics_to_send.len());
743        for topic in &topics_to_send {
744            let message = BybitSubscription {
745                op: BybitWsOperation::Subscribe,
746                args: vec![topic.clone()],
747            };
748            let payload = serde_json::to_string(&message).map_err(|e| {
749                BybitWsError::Json(format!("Failed to serialize subscription: {e}"))
750            })?;
751            payloads.push(payload);
752        }
753
754        let cmd = HandlerCommand::Subscribe { topics: payloads };
755        self.cmd_tx
756            .read()
757            .await
758            .send(cmd)
759            .map_err(|e| BybitWsError::Send(format!("Failed to send subscribe command: {e}")))?;
760
761        Ok(())
762    }
763
764    /// Unsubscribe from the provided topics.
765    pub async fn unsubscribe(&self, topics: Vec<String>) -> BybitWsResult<()> {
766        if topics.is_empty() {
767            return Ok(());
768        }
769
770        tracing::debug!("Attempting to unsubscribe from topics: {topics:?}");
771
772        if self.signal.load(Ordering::Relaxed) {
773            tracing::debug!("Shutdown signal detected, skipping unsubscribe");
774            return Ok(());
775        }
776
777        // Use reference counting to avoid unsubscribing while other consumers still need the topic
778        let mut topics_to_send = Vec::new();
779
780        for topic in topics {
781            // Returns true if this was the last subscription (ref count 1 -> 0)
782            if self.subscriptions.remove_reference(&topic) {
783                self.subscriptions.mark_unsubscribe(&topic);
784                topics_to_send.push(topic.clone());
785            } else {
786                tracing::debug!("Topic {topic} still has active subscriptions, not unsubscribing");
787            }
788        }
789
790        if topics_to_send.is_empty() {
791            return Ok(());
792        }
793
794        // Serialize unsubscription messages
795        let mut payloads = Vec::with_capacity(topics_to_send.len());
796        for topic in &topics_to_send {
797            let message = BybitSubscription {
798                op: BybitWsOperation::Unsubscribe,
799                args: vec![topic.clone()],
800            };
801            if let Ok(payload) = serde_json::to_string(&message) {
802                payloads.push(payload);
803            }
804        }
805
806        let cmd = HandlerCommand::Unsubscribe { topics: payloads };
807        if let Err(e) = self.cmd_tx.read().await.send(cmd) {
808            tracing::debug!(error = %e, "Failed to send unsubscribe command");
809        }
810
811        Ok(())
812    }
813
814    /// Returns a stream of parsed [`NautilusWsMessage`] items.
815    ///
816    /// # Panics
817    ///
818    /// Panics if called before [`Self::connect`] or if the stream has already been taken.
819    pub fn stream(&mut self) -> impl futures_util::Stream<Item = NautilusWsMessage> + use<> {
820        let rx = self
821            .out_rx
822            .take()
823            .expect("Stream receiver already taken or client not connected");
824        let mut rx = Arc::try_unwrap(rx).expect("Cannot take ownership - other references exist");
825        async_stream::stream! {
826            while let Some(msg) = rx.recv().await {
827                yield msg;
828            }
829        }
830    }
831
832    /// Returns the number of currently registered subscriptions.
833    #[must_use]
834    pub fn subscription_count(&self) -> usize {
835        self.subscriptions.len()
836    }
837
838    /// Returns the credential associated with this client, if any.
839    #[must_use]
840    pub fn credential(&self) -> Option<&Credential> {
841        self.credential.as_ref()
842    }
843
844    /// Caches a single instrument.
845    ///
846    /// Any existing instrument with the same ID will be replaced.
847    pub fn cache_instrument(&self, instrument: InstrumentAny) {
848        self.instruments_cache
849            .insert(instrument.symbol().inner(), instrument.clone());
850
851        // Before connect() the handler isn't running; this send will fail and that's expected
852        // because connect() replays the instruments via InitializeInstruments
853        if let Ok(cmd_tx) = self.cmd_tx.try_read() {
854            let cmd = HandlerCommand::UpdateInstrument(instrument);
855            if let Err(e) = cmd_tx.send(cmd) {
856                tracing::debug!("Failed to send instrument update to handler: {e}");
857            }
858        }
859    }
860
861    /// Caches multiple instruments.
862    ///
863    /// Clears the existing cache first, then adds all provided instruments.
864    pub fn cache_instruments(&mut self, instruments: Vec<InstrumentAny>) {
865        self.instruments_cache.clear();
866        let mut count = 0;
867
868        tracing::debug!("Initializing Bybit instrument cache");
869
870        for inst in instruments {
871            let symbol = inst.symbol().inner();
872            self.instruments_cache.insert(symbol, inst.clone());
873            tracing::debug!("Cached instrument: {symbol}");
874            count += 1;
875        }
876
877        tracing::info!("Bybit instrument cache initialized with {count} instruments");
878    }
879
880    /// Sets the account ID for account message parsing.
881    pub fn set_account_id(&mut self, account_id: AccountId) {
882        self.account_id = Some(account_id);
883    }
884
885    /// Sets the account market maker level.
886    pub fn set_mm_level(&self, mm_level: u8) {
887        self.mm_level.store(mm_level, Ordering::Relaxed);
888    }
889
890    /// Returns a reference to the instruments cache.
891    #[must_use]
892    pub fn instruments(&self) -> &Arc<DashMap<Ustr, InstrumentAny>> {
893        &self.instruments_cache
894    }
895
896    /// Returns the account ID if set.
897    #[must_use]
898    pub fn account_id(&self) -> Option<AccountId> {
899        self.account_id
900    }
901
902    /// Returns the product type for public connections.
903    #[must_use]
904    pub fn product_type(&self) -> Option<BybitProductType> {
905        self.product_type
906    }
907
908    /// Subscribes to orderbook updates for a specific instrument.
909    ///
910    /// # Errors
911    ///
912    /// Returns an error if the subscription request fails.
913    ///
914    /// # References
915    ///
916    /// <https://bybit-exchange.github.io/docs/v5/websocket/public/orderbook>
917    pub async fn subscribe_orderbook(
918        &self,
919        instrument_id: InstrumentId,
920        depth: u32,
921    ) -> BybitWsResult<()> {
922        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
923        let topic = format!(
924            "{}.{depth}.{raw_symbol}",
925            BybitWsPublicChannel::OrderBook.as_ref()
926        );
927        self.subscribe(vec![topic]).await
928    }
929
930    /// Unsubscribes from orderbook updates for a specific instrument.
931    pub async fn unsubscribe_orderbook(
932        &self,
933        instrument_id: InstrumentId,
934        depth: u32,
935    ) -> BybitWsResult<()> {
936        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
937        let topic = format!(
938            "{}.{depth}.{raw_symbol}",
939            BybitWsPublicChannel::OrderBook.as_ref()
940        );
941        self.unsubscribe(vec![topic]).await
942    }
943
944    /// Subscribes to public trade updates for a specific instrument.
945    ///
946    /// # Errors
947    ///
948    /// Returns an error if the subscription request fails.
949    ///
950    /// # References
951    ///
952    /// <https://bybit-exchange.github.io/docs/v5/websocket/public/trade>
953    pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
954        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
955        let topic = format!(
956            "{}.{raw_symbol}",
957            BybitWsPublicChannel::PublicTrade.as_ref()
958        );
959        self.subscribe(vec![topic]).await
960    }
961
962    /// Unsubscribes from public trade updates for a specific instrument.
963    pub async fn unsubscribe_trades(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
964        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
965        let topic = format!(
966            "{}.{raw_symbol}",
967            BybitWsPublicChannel::PublicTrade.as_ref()
968        );
969        self.unsubscribe(vec![topic]).await
970    }
971
972    /// Subscribes to ticker updates for a specific instrument.
973    ///
974    /// # Errors
975    ///
976    /// Returns an error if the subscription request fails.
977    ///
978    /// # References
979    ///
980    /// <https://bybit-exchange.github.io/docs/v5/websocket/public/ticker>
981    pub async fn subscribe_ticker(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
982        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
983        let topic = format!("{}.{raw_symbol}", BybitWsPublicChannel::Tickers.as_ref());
984        self.subscribe(vec![topic]).await
985    }
986
987    /// Unsubscribes from ticker updates for a specific instrument.
988    pub async fn unsubscribe_ticker(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
989        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
990        let topic = format!("{}.{raw_symbol}", BybitWsPublicChannel::Tickers.as_ref());
991
992        // Clear funding rate cache to ensure fresh data on resubscribe
993        let symbol = self.product_type.map_or_else(
994            || instrument_id.symbol.inner(),
995            |pt| make_bybit_symbol(raw_symbol, pt),
996        );
997        self.funding_cache.write().await.remove(&symbol);
998
999        self.unsubscribe(vec![topic]).await
1000    }
1001
1002    /// Subscribes to kline/candlestick updates for a specific instrument.
1003    ///
1004    /// # Errors
1005    ///
1006    /// Returns an error if the subscription request fails.
1007    ///
1008    /// # References
1009    ///
1010    /// <https://bybit-exchange.github.io/docs/v5/websocket/public/kline>
1011    pub async fn subscribe_klines(
1012        &self,
1013        instrument_id: InstrumentId,
1014        interval: impl Into<String>,
1015    ) -> BybitWsResult<()> {
1016        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
1017        let topic = format!(
1018            "{}.{}.{raw_symbol}",
1019            BybitWsPublicChannel::Kline.as_ref(),
1020            interval.into()
1021        );
1022        self.subscribe(vec![topic]).await
1023    }
1024
1025    /// Unsubscribes from kline/candlestick updates for a specific instrument.
1026    pub async fn unsubscribe_klines(
1027        &self,
1028        instrument_id: InstrumentId,
1029        interval: impl Into<String>,
1030    ) -> BybitWsResult<()> {
1031        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
1032        let topic = format!(
1033            "{}.{}.{raw_symbol}",
1034            BybitWsPublicChannel::Kline.as_ref(),
1035            interval.into()
1036        );
1037        self.unsubscribe(vec![topic]).await
1038    }
1039
1040    /// Subscribes to order updates.
1041    ///
1042    /// # Errors
1043    ///
1044    /// Returns an error if the subscription request fails or if not authenticated.
1045    ///
1046    /// # References
1047    ///
1048    /// <https://bybit-exchange.github.io/docs/v5/websocket/private/order>
1049    pub async fn subscribe_orders(&self) -> BybitWsResult<()> {
1050        if !self.requires_auth {
1051            return Err(BybitWsError::Authentication(
1052                "Order subscription requires authentication".to_string(),
1053            ));
1054        }
1055        self.subscribe(vec![BybitWsPrivateChannel::Order.as_ref().to_string()])
1056            .await
1057    }
1058
1059    /// Unsubscribes from order updates.
1060    pub async fn unsubscribe_orders(&self) -> BybitWsResult<()> {
1061        self.unsubscribe(vec![BybitWsPrivateChannel::Order.as_ref().to_string()])
1062            .await
1063    }
1064
1065    /// Subscribes to execution/fill updates.
1066    ///
1067    /// # Errors
1068    ///
1069    /// Returns an error if the subscription request fails or if not authenticated.
1070    ///
1071    /// # References
1072    ///
1073    /// <https://bybit-exchange.github.io/docs/v5/websocket/private/execution>
1074    pub async fn subscribe_executions(&self) -> BybitWsResult<()> {
1075        if !self.requires_auth {
1076            return Err(BybitWsError::Authentication(
1077                "Execution subscription requires authentication".to_string(),
1078            ));
1079        }
1080        self.subscribe(vec![BybitWsPrivateChannel::Execution.as_ref().to_string()])
1081            .await
1082    }
1083
1084    /// Unsubscribes from execution/fill updates.
1085    pub async fn unsubscribe_executions(&self) -> BybitWsResult<()> {
1086        self.unsubscribe(vec![BybitWsPrivateChannel::Execution.as_ref().to_string()])
1087            .await
1088    }
1089
1090    /// Subscribes to position updates.
1091    ///
1092    /// # Errors
1093    ///
1094    /// Returns an error if the subscription request fails or if not authenticated.
1095    ///
1096    /// # References
1097    ///
1098    /// <https://bybit-exchange.github.io/docs/v5/websocket/private/position>
1099    pub async fn subscribe_positions(&self) -> BybitWsResult<()> {
1100        if !self.requires_auth {
1101            return Err(BybitWsError::Authentication(
1102                "Position subscription requires authentication".to_string(),
1103            ));
1104        }
1105        self.subscribe(vec![BybitWsPrivateChannel::Position.as_ref().to_string()])
1106            .await
1107    }
1108
1109    /// Unsubscribes from position updates.
1110    pub async fn unsubscribe_positions(&self) -> BybitWsResult<()> {
1111        self.unsubscribe(vec![BybitWsPrivateChannel::Position.as_ref().to_string()])
1112            .await
1113    }
1114
1115    /// Subscribes to wallet/balance updates.
1116    ///
1117    /// # Errors
1118    ///
1119    /// Returns an error if the subscription request fails or if not authenticated.
1120    ///
1121    /// # References
1122    ///
1123    /// <https://bybit-exchange.github.io/docs/v5/websocket/private/wallet>
1124    pub async fn subscribe_wallet(&self) -> BybitWsResult<()> {
1125        if !self.requires_auth {
1126            return Err(BybitWsError::Authentication(
1127                "Wallet subscription requires authentication".to_string(),
1128            ));
1129        }
1130        self.subscribe(vec![BybitWsPrivateChannel::Wallet.as_ref().to_string()])
1131            .await
1132    }
1133
1134    /// Unsubscribes from wallet/balance updates.
1135    pub async fn unsubscribe_wallet(&self) -> BybitWsResult<()> {
1136        self.unsubscribe(vec![BybitWsPrivateChannel::Wallet.as_ref().to_string()])
1137            .await
1138    }
1139
1140    /// Places an order via WebSocket.
1141    ///
1142    /// # Errors
1143    ///
1144    /// Returns an error if the order request fails or if not authenticated.
1145    ///
1146    /// # References
1147    ///
1148    /// <https://bybit-exchange.github.io/docs/v5/websocket/trade/guideline>
1149    pub async fn place_order(
1150        &self,
1151        params: BybitWsPlaceOrderParams,
1152        client_order_id: ClientOrderId,
1153        trader_id: TraderId,
1154        strategy_id: StrategyId,
1155        instrument_id: InstrumentId,
1156    ) -> BybitWsResult<()> {
1157        if !self.is_authenticated.load(Ordering::Relaxed) {
1158            return Err(BybitWsError::Authentication(
1159                "Must be authenticated to place orders".to_string(),
1160            ));
1161        }
1162
1163        let cmd = HandlerCommand::PlaceOrder {
1164            params,
1165            client_order_id,
1166            trader_id,
1167            strategy_id,
1168            instrument_id,
1169        };
1170
1171        self.send_cmd(cmd).await
1172    }
1173
1174    /// Amends an existing order via WebSocket.
1175    ///
1176    /// # Errors
1177    ///
1178    /// Returns an error if the amend request fails or if not authenticated.
1179    ///
1180    /// # References
1181    ///
1182    /// <https://bybit-exchange.github.io/docs/v5/websocket/trade/guideline>
1183    pub async fn amend_order(
1184        &self,
1185        params: BybitWsAmendOrderParams,
1186        client_order_id: ClientOrderId,
1187        trader_id: TraderId,
1188        strategy_id: StrategyId,
1189        instrument_id: InstrumentId,
1190        venue_order_id: Option<VenueOrderId>,
1191    ) -> BybitWsResult<()> {
1192        if !self.is_authenticated.load(Ordering::Relaxed) {
1193            return Err(BybitWsError::Authentication(
1194                "Must be authenticated to amend orders".to_string(),
1195            ));
1196        }
1197
1198        let cmd = HandlerCommand::AmendOrder {
1199            params,
1200            client_order_id,
1201            trader_id,
1202            strategy_id,
1203            instrument_id,
1204            venue_order_id,
1205        };
1206
1207        self.send_cmd(cmd).await
1208    }
1209
1210    /// Cancels an order via WebSocket.
1211    ///
1212    /// # Errors
1213    ///
1214    /// Returns an error if the cancel request fails or if not authenticated.
1215    ///
1216    /// # References
1217    ///
1218    /// <https://bybit-exchange.github.io/docs/v5/websocket/trade/guideline>
1219    pub async fn cancel_order(
1220        &self,
1221        params: BybitWsCancelOrderParams,
1222        client_order_id: ClientOrderId,
1223        trader_id: TraderId,
1224        strategy_id: StrategyId,
1225        instrument_id: InstrumentId,
1226        venue_order_id: Option<VenueOrderId>,
1227    ) -> BybitWsResult<()> {
1228        if !self.is_authenticated.load(Ordering::Relaxed) {
1229            return Err(BybitWsError::Authentication(
1230                "Must be authenticated to cancel orders".to_string(),
1231            ));
1232        }
1233
1234        let cmd = HandlerCommand::CancelOrder {
1235            params,
1236            client_order_id,
1237            trader_id,
1238            strategy_id,
1239            instrument_id,
1240            venue_order_id,
1241        };
1242
1243        self.send_cmd(cmd).await
1244    }
1245
1246    /// Batch creates multiple orders via WebSocket.
1247    ///
1248    /// # Errors
1249    ///
1250    /// Returns an error if the batch request fails or if not authenticated.
1251    ///
1252    /// # References
1253    ///
1254    /// <https://bybit-exchange.github.io/docs/v5/websocket/trade/guideline>
1255    pub async fn batch_place_orders(
1256        &self,
1257        trader_id: TraderId,
1258        strategy_id: StrategyId,
1259        orders: Vec<BybitWsPlaceOrderParams>,
1260    ) -> BybitWsResult<()> {
1261        if !self.is_authenticated.load(Ordering::Relaxed) {
1262            return Err(BybitWsError::Authentication(
1263                "Must be authenticated to place orders".to_string(),
1264            ));
1265        }
1266
1267        if orders.is_empty() {
1268            tracing::warn!("Batch place orders called with empty orders list");
1269            return Ok(());
1270        }
1271
1272        for chunk in orders.chunks(BATCH_PROCESSING_LIMIT) {
1273            self.batch_place_orders_chunk(trader_id, strategy_id, chunk.to_vec())
1274                .await?;
1275        }
1276
1277        Ok(())
1278    }
1279
1280    async fn batch_place_orders_chunk(
1281        &self,
1282        trader_id: TraderId,
1283        strategy_id: StrategyId,
1284        orders: Vec<BybitWsPlaceOrderParams>,
1285    ) -> BybitWsResult<()> {
1286        let category = orders[0].category;
1287        let batch_req_id = UUID4::new().to_string();
1288
1289        // Extract order tracking data before consuming orders to register with handler
1290        let mut batch_order_data = Vec::new();
1291        for order in &orders {
1292            if let Some(order_link_id_str) = &order.order_link_id {
1293                let client_order_id = ClientOrderId::from(order_link_id_str.as_str());
1294                let cache_key = make_bybit_symbol(order.symbol.as_str(), category);
1295                let instrument_id = self
1296                    .instruments_cache
1297                    .get(&cache_key)
1298                    .map(|inst| inst.id())
1299                    .ok_or_else(|| {
1300                        BybitWsError::ClientError(format!(
1301                            "Instrument {cache_key} not found in cache"
1302                        ))
1303                    })?;
1304                batch_order_data.push((
1305                    client_order_id,
1306                    (client_order_id, trader_id, strategy_id, instrument_id),
1307                ));
1308            }
1309        }
1310
1311        if !batch_order_data.is_empty() {
1312            let cmd = HandlerCommand::RegisterBatchPlace {
1313                req_id: batch_req_id.clone(),
1314                orders: batch_order_data,
1315            };
1316            let cmd_tx = self.cmd_tx.read().await;
1317            if let Err(e) = cmd_tx.send(cmd) {
1318                tracing::error!("Failed to send RegisterBatchPlace command: {e}");
1319            }
1320        }
1321
1322        let mm_level = self.mm_level.load(Ordering::Relaxed);
1323        let has_non_post_only = orders
1324            .iter()
1325            .any(|o| !matches!(o.time_in_force, Some(BybitTimeInForce::PostOnly)));
1326        let referer = if has_non_post_only || mm_level == 0 {
1327            Some(BYBIT_NAUTILUS_BROKER_ID.to_string())
1328        } else {
1329            None
1330        };
1331
1332        let request_items: Vec<BybitWsBatchPlaceItem> = orders
1333            .into_iter()
1334            .map(|order| BybitWsBatchPlaceItem {
1335                symbol: order.symbol,
1336                side: order.side,
1337                order_type: order.order_type,
1338                qty: order.qty,
1339                is_leverage: order.is_leverage,
1340                market_unit: order.market_unit,
1341                price: order.price,
1342                time_in_force: order.time_in_force,
1343                order_link_id: order.order_link_id,
1344                reduce_only: order.reduce_only,
1345                close_on_trigger: order.close_on_trigger,
1346                trigger_price: order.trigger_price,
1347                trigger_by: order.trigger_by,
1348                trigger_direction: order.trigger_direction,
1349                tpsl_mode: order.tpsl_mode,
1350                take_profit: order.take_profit,
1351                stop_loss: order.stop_loss,
1352                tp_trigger_by: order.tp_trigger_by,
1353                sl_trigger_by: order.sl_trigger_by,
1354                sl_trigger_price: order.sl_trigger_price,
1355                tp_trigger_price: order.tp_trigger_price,
1356                sl_order_type: order.sl_order_type,
1357                tp_order_type: order.tp_order_type,
1358                sl_limit_price: order.sl_limit_price,
1359                tp_limit_price: order.tp_limit_price,
1360            })
1361            .collect();
1362
1363        let args = BybitWsBatchPlaceOrderArgs {
1364            category,
1365            request: request_items,
1366        };
1367
1368        let request = BybitWsRequest {
1369            req_id: Some(batch_req_id),
1370            op: BybitWsOrderRequestOp::CreateBatch,
1371            header: BybitWsHeader::with_referer(referer),
1372            args: vec![args],
1373        };
1374
1375        let payload = serde_json::to_string(&request).map_err(BybitWsError::from)?;
1376
1377        self.send_text(&payload).await
1378    }
1379
1380    /// Batch amends multiple orders via WebSocket.
1381    ///
1382    /// # Errors
1383    ///
1384    /// Returns an error if the batch request fails or if not authenticated.
1385    pub async fn batch_amend_orders(
1386        &self,
1387        #[allow(unused_variables)] trader_id: TraderId,
1388        #[allow(unused_variables)] strategy_id: StrategyId,
1389        orders: Vec<BybitWsAmendOrderParams>,
1390    ) -> BybitWsResult<()> {
1391        if !self.is_authenticated.load(Ordering::Relaxed) {
1392            return Err(BybitWsError::Authentication(
1393                "Must be authenticated to amend orders".to_string(),
1394            ));
1395        }
1396
1397        if orders.is_empty() {
1398            tracing::warn!("Batch amend orders called with empty orders list");
1399            return Ok(());
1400        }
1401
1402        for chunk in orders.chunks(BATCH_PROCESSING_LIMIT) {
1403            self.batch_amend_orders_chunk(trader_id, strategy_id, chunk.to_vec())
1404                .await?;
1405        }
1406
1407        Ok(())
1408    }
1409
1410    async fn batch_amend_orders_chunk(
1411        &self,
1412        #[allow(unused_variables)] trader_id: TraderId,
1413        #[allow(unused_variables)] strategy_id: StrategyId,
1414        orders: Vec<BybitWsAmendOrderParams>,
1415    ) -> BybitWsResult<()> {
1416        let request = BybitWsRequest {
1417            req_id: None,
1418            op: BybitWsOrderRequestOp::AmendBatch,
1419            header: BybitWsHeader::now(),
1420            args: orders,
1421        };
1422
1423        let payload = serde_json::to_string(&request).map_err(BybitWsError::from)?;
1424
1425        self.send_text(&payload).await
1426    }
1427
1428    /// Batch cancels multiple orders via WebSocket.
1429    ///
1430    /// # Errors
1431    ///
1432    /// Returns an error if the batch request fails or if not authenticated.
1433    pub async fn batch_cancel_orders(
1434        &self,
1435        trader_id: TraderId,
1436        strategy_id: StrategyId,
1437        orders: Vec<BybitWsCancelOrderParams>,
1438    ) -> BybitWsResult<()> {
1439        if !self.is_authenticated.load(Ordering::Relaxed) {
1440            return Err(BybitWsError::Authentication(
1441                "Must be authenticated to cancel orders".to_string(),
1442            ));
1443        }
1444
1445        if orders.is_empty() {
1446            tracing::warn!("Batch cancel orders called with empty orders list");
1447            return Ok(());
1448        }
1449
1450        for chunk in orders.chunks(BATCH_PROCESSING_LIMIT) {
1451            self.batch_cancel_orders_chunk(trader_id, strategy_id, chunk.to_vec())
1452                .await?;
1453        }
1454
1455        Ok(())
1456    }
1457
1458    async fn batch_cancel_orders_chunk(
1459        &self,
1460        trader_id: TraderId,
1461        strategy_id: StrategyId,
1462        orders: Vec<BybitWsCancelOrderParams>,
1463    ) -> BybitWsResult<()> {
1464        if orders.is_empty() {
1465            return Ok(());
1466        }
1467
1468        let category = orders[0].category;
1469        let batch_req_id = UUID4::new().to_string();
1470
1471        let mut validated_data = Vec::new();
1472
1473        for order in &orders {
1474            if let Some(order_link_id_str) = &order.order_link_id {
1475                let cache_key = make_bybit_symbol(order.symbol.as_str(), category);
1476                let instrument_id = self
1477                    .instruments_cache
1478                    .get(&cache_key)
1479                    .map(|inst| inst.id())
1480                    .ok_or_else(|| {
1481                        BybitWsError::ClientError(format!(
1482                            "Instrument {cache_key} not found in cache"
1483                        ))
1484                    })?;
1485
1486                let venue_order_id = order
1487                    .order_id
1488                    .as_ref()
1489                    .map(|id| VenueOrderId::from(id.as_str()));
1490
1491                validated_data.push((order_link_id_str.clone(), instrument_id, venue_order_id));
1492            }
1493        }
1494
1495        let batch_cancel_data: Vec<_> = validated_data
1496            .iter()
1497            .map(|(order_link_id_str, instrument_id, venue_order_id)| {
1498                let client_order_id = ClientOrderId::from(order_link_id_str.as_str());
1499                (
1500                    client_order_id,
1501                    (
1502                        client_order_id,
1503                        trader_id,
1504                        strategy_id,
1505                        *instrument_id,
1506                        *venue_order_id,
1507                    ),
1508                )
1509            })
1510            .collect();
1511
1512        if !batch_cancel_data.is_empty() {
1513            let cmd = HandlerCommand::RegisterBatchCancel {
1514                req_id: batch_req_id.clone(),
1515                cancels: batch_cancel_data,
1516            };
1517            let cmd_tx = self.cmd_tx.read().await;
1518            if let Err(e) = cmd_tx.send(cmd) {
1519                tracing::error!("Failed to send RegisterBatchCancel command: {e}");
1520            }
1521        }
1522
1523        let request_items: Vec<BybitWsBatchCancelItem> = orders
1524            .into_iter()
1525            .map(|order| BybitWsBatchCancelItem {
1526                symbol: order.symbol,
1527                order_id: order.order_id,
1528                order_link_id: order.order_link_id,
1529            })
1530            .collect();
1531
1532        let args = BybitWsBatchCancelOrderArgs {
1533            category,
1534            request: request_items,
1535        };
1536
1537        let request = BybitWsRequest {
1538            req_id: Some(batch_req_id),
1539            op: BybitWsOrderRequestOp::CancelBatch,
1540            header: BybitWsHeader::now(),
1541            args: vec![args],
1542        };
1543
1544        let payload = serde_json::to_string(&request).map_err(BybitWsError::from)?;
1545
1546        self.send_text(&payload).await
1547    }
1548
1549    /// Submits an order using Nautilus domain objects.
1550    ///
1551    /// # Errors
1552    ///
1553    /// Returns an error if order submission fails or if not authenticated.
1554    #[allow(clippy::too_many_arguments)]
1555    pub async fn submit_order(
1556        &self,
1557        product_type: BybitProductType,
1558        trader_id: TraderId,
1559        strategy_id: StrategyId,
1560        instrument_id: InstrumentId,
1561        client_order_id: ClientOrderId,
1562        order_side: OrderSide,
1563        order_type: OrderType,
1564        quantity: Quantity,
1565        is_quote_quantity: bool,
1566        time_in_force: Option<TimeInForce>,
1567        price: Option<Price>,
1568        trigger_price: Option<Price>,
1569        post_only: Option<bool>,
1570        reduce_only: Option<bool>,
1571        is_leverage: bool,
1572    ) -> BybitWsResult<()> {
1573        let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1574            .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1575        let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1576
1577        let bybit_side = match order_side {
1578            OrderSide::Buy => BybitOrderSide::Buy,
1579            OrderSide::Sell => BybitOrderSide::Sell,
1580            _ => {
1581                return Err(BybitWsError::ClientError(format!(
1582                    "Invalid order side: {order_side:?}"
1583                )));
1584            }
1585        };
1586
1587        // For stop/conditional orders, Bybit uses Market/Limit with trigger parameters
1588        let (bybit_order_type, is_stop_order) = match order_type {
1589            OrderType::Market => (BybitOrderType::Market, false),
1590            OrderType::Limit => (BybitOrderType::Limit, false),
1591            OrderType::StopMarket | OrderType::MarketIfTouched => (BybitOrderType::Market, true),
1592            OrderType::StopLimit | OrderType::LimitIfTouched => (BybitOrderType::Limit, true),
1593            _ => {
1594                return Err(BybitWsError::ClientError(format!(
1595                    "Unsupported order type: {order_type:?}"
1596                )));
1597            }
1598        };
1599
1600        let bybit_tif = if bybit_order_type == BybitOrderType::Market {
1601            None
1602        } else if post_only == Some(true) {
1603            Some(BybitTimeInForce::PostOnly)
1604        } else if let Some(tif) = time_in_force {
1605            Some(match tif {
1606                TimeInForce::Gtc => BybitTimeInForce::Gtc,
1607                TimeInForce::Ioc => BybitTimeInForce::Ioc,
1608                TimeInForce::Fok => BybitTimeInForce::Fok,
1609                _ => {
1610                    return Err(BybitWsError::ClientError(format!(
1611                        "Unsupported time in force: {tif:?}"
1612                    )));
1613                }
1614            })
1615        } else {
1616            None
1617        };
1618
1619        // For SPOT market orders, specify baseCoin to interpret qty as base currency.
1620        // This ensures Nautilus quantities (always in base currency) are interpreted correctly.
1621        let market_unit = if product_type == BybitProductType::Spot
1622            && bybit_order_type == BybitOrderType::Market
1623        {
1624            if is_quote_quantity {
1625                Some(BYBIT_QUOTE_COIN.to_string())
1626            } else {
1627                Some(BYBIT_BASE_COIN.to_string())
1628            }
1629        } else {
1630            None
1631        };
1632
1633        // Only SPOT products support is_leverage parameter
1634        let is_leverage_value = if product_type == BybitProductType::Spot {
1635            Some(i32::from(is_leverage))
1636        } else {
1637            None
1638        };
1639
1640        // Stop semantics: Buy stops trigger on rise (breakout), sell stops trigger on fall (breakdown)
1641        // MIT semantics: Buy MIT triggers on fall (pullback entry), sell MIT triggers on rise (rally entry)
1642        let trigger_direction = if is_stop_order {
1643            match (order_type, order_side) {
1644                (OrderType::StopMarket | OrderType::StopLimit, OrderSide::Buy) => {
1645                    Some(BybitTriggerDirection::RisesTo as i32)
1646                }
1647                (OrderType::StopMarket | OrderType::StopLimit, OrderSide::Sell) => {
1648                    Some(BybitTriggerDirection::FallsTo as i32)
1649                }
1650                (OrderType::MarketIfTouched | OrderType::LimitIfTouched, OrderSide::Buy) => {
1651                    Some(BybitTriggerDirection::FallsTo as i32)
1652                }
1653                (OrderType::MarketIfTouched | OrderType::LimitIfTouched, OrderSide::Sell) => {
1654                    Some(BybitTriggerDirection::RisesTo as i32)
1655                }
1656                _ => None,
1657            }
1658        } else {
1659            None
1660        };
1661
1662        let params = if is_stop_order {
1663            // For conditional orders, ALL types use triggerPrice field
1664            // sl_trigger_price/tp_trigger_price are only for TP/SL attached to regular orders
1665            BybitWsPlaceOrderParams {
1666                category: product_type,
1667                symbol: raw_symbol,
1668                side: bybit_side,
1669                order_type: bybit_order_type,
1670                qty: quantity.to_string(),
1671                is_leverage: is_leverage_value,
1672                market_unit: market_unit.clone(),
1673                price: price.map(|p| p.to_string()),
1674                time_in_force: bybit_tif,
1675                order_link_id: Some(client_order_id.to_string()),
1676                reduce_only: reduce_only.filter(|&r| r),
1677                close_on_trigger: None,
1678                trigger_price: trigger_price.map(|p| p.to_string()),
1679                trigger_by: Some(BybitTriggerType::LastPrice),
1680                trigger_direction,
1681                tpsl_mode: None, // Not needed for standalone conditional orders
1682                take_profit: None,
1683                stop_loss: None,
1684                tp_trigger_by: None,
1685                sl_trigger_by: None,
1686                sl_trigger_price: None, // Not used for standalone stop orders
1687                tp_trigger_price: None, // Not used for standalone stop orders
1688                sl_order_type: None,
1689                tp_order_type: None,
1690                sl_limit_price: None,
1691                tp_limit_price: None,
1692            }
1693        } else {
1694            // Regular market/limit orders
1695            BybitWsPlaceOrderParams {
1696                category: product_type,
1697                symbol: raw_symbol,
1698                side: bybit_side,
1699                order_type: bybit_order_type,
1700                qty: quantity.to_string(),
1701                is_leverage: is_leverage_value,
1702                market_unit,
1703                price: price.map(|p| p.to_string()),
1704                time_in_force: if bybit_order_type == BybitOrderType::Market {
1705                    None
1706                } else {
1707                    bybit_tif
1708                },
1709                order_link_id: Some(client_order_id.to_string()),
1710                reduce_only: reduce_only.filter(|&r| r),
1711                close_on_trigger: None,
1712                trigger_price: None,
1713                trigger_by: None,
1714                trigger_direction: None,
1715                tpsl_mode: None,
1716                take_profit: None,
1717                stop_loss: None,
1718                tp_trigger_by: None,
1719                sl_trigger_by: None,
1720                sl_trigger_price: None,
1721                tp_trigger_price: None,
1722                sl_order_type: None,
1723                tp_order_type: None,
1724                sl_limit_price: None,
1725                tp_limit_price: None,
1726            }
1727        };
1728
1729        self.place_order(
1730            params,
1731            client_order_id,
1732            trader_id,
1733            strategy_id,
1734            instrument_id,
1735        )
1736        .await
1737    }
1738
1739    /// Modifies an existing order using Nautilus domain objects.
1740    ///
1741    /// # Errors
1742    ///
1743    /// Returns an error if modification fails or if not authenticated.
1744    #[allow(clippy::too_many_arguments)]
1745    pub async fn modify_order(
1746        &self,
1747        product_type: BybitProductType,
1748        trader_id: TraderId,
1749        strategy_id: StrategyId,
1750        instrument_id: InstrumentId,
1751        client_order_id: ClientOrderId,
1752        venue_order_id: Option<VenueOrderId>,
1753        quantity: Option<Quantity>,
1754        price: Option<Price>,
1755    ) -> BybitWsResult<()> {
1756        let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1757            .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1758        let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1759
1760        let params = BybitWsAmendOrderParams {
1761            category: product_type,
1762            symbol: raw_symbol,
1763            order_id: venue_order_id.map(|id| id.to_string()),
1764            order_link_id: Some(client_order_id.to_string()),
1765            qty: quantity.map(|q| q.to_string()),
1766            price: price.map(|p| p.to_string()),
1767            trigger_price: None,
1768            take_profit: None,
1769            stop_loss: None,
1770            tp_trigger_by: None,
1771            sl_trigger_by: None,
1772        };
1773
1774        self.amend_order(
1775            params,
1776            client_order_id,
1777            trader_id,
1778            strategy_id,
1779            instrument_id,
1780            venue_order_id,
1781        )
1782        .await
1783    }
1784
1785    /// Cancels an order using Nautilus domain objects.
1786    ///
1787    /// # Errors
1788    ///
1789    /// Returns an error if cancellation fails or if not authenticated.
1790    pub async fn cancel_order_by_id(
1791        &self,
1792        product_type: BybitProductType,
1793        trader_id: TraderId,
1794        strategy_id: StrategyId,
1795        instrument_id: InstrumentId,
1796        client_order_id: ClientOrderId,
1797        venue_order_id: Option<VenueOrderId>,
1798    ) -> BybitWsResult<()> {
1799        let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1800            .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1801        let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1802
1803        let params = BybitWsCancelOrderParams {
1804            category: product_type,
1805            symbol: raw_symbol,
1806            order_id: venue_order_id.map(|id| id.to_string()),
1807            order_link_id: Some(client_order_id.to_string()),
1808        };
1809
1810        self.cancel_order(
1811            params,
1812            client_order_id,
1813            trader_id,
1814            strategy_id,
1815            instrument_id,
1816            venue_order_id,
1817        )
1818        .await
1819    }
1820
1821    /// Builds order params for placing an order.
1822    #[allow(clippy::too_many_arguments)]
1823    pub fn build_place_order_params(
1824        &self,
1825        product_type: BybitProductType,
1826        instrument_id: InstrumentId,
1827        client_order_id: ClientOrderId,
1828        order_side: OrderSide,
1829        order_type: OrderType,
1830        quantity: Quantity,
1831        is_quote_quantity: bool,
1832        time_in_force: Option<TimeInForce>,
1833        price: Option<Price>,
1834        trigger_price: Option<Price>,
1835        post_only: Option<bool>,
1836        reduce_only: Option<bool>,
1837        is_leverage: bool,
1838    ) -> BybitWsResult<BybitWsPlaceOrderParams> {
1839        let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1840            .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1841        let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1842
1843        let bybit_side = match order_side {
1844            OrderSide::Buy => BybitOrderSide::Buy,
1845            OrderSide::Sell => BybitOrderSide::Sell,
1846            _ => {
1847                return Err(BybitWsError::ClientError(format!(
1848                    "Invalid order side: {order_side:?}"
1849                )));
1850            }
1851        };
1852
1853        let (bybit_order_type, is_stop_order) = match order_type {
1854            OrderType::Market => (BybitOrderType::Market, false),
1855            OrderType::Limit => (BybitOrderType::Limit, false),
1856            OrderType::StopMarket | OrderType::MarketIfTouched => (BybitOrderType::Market, true),
1857            OrderType::StopLimit | OrderType::LimitIfTouched => (BybitOrderType::Limit, true),
1858            _ => {
1859                return Err(BybitWsError::ClientError(format!(
1860                    "Unsupported order type: {order_type:?}"
1861                )));
1862            }
1863        };
1864
1865        let bybit_tif = if post_only == Some(true) {
1866            Some(BybitTimeInForce::PostOnly)
1867        } else if let Some(tif) = time_in_force {
1868            Some(match tif {
1869                TimeInForce::Gtc => BybitTimeInForce::Gtc,
1870                TimeInForce::Ioc => BybitTimeInForce::Ioc,
1871                TimeInForce::Fok => BybitTimeInForce::Fok,
1872                _ => {
1873                    return Err(BybitWsError::ClientError(format!(
1874                        "Unsupported time in force: {tif:?}"
1875                    )));
1876                }
1877            })
1878        } else {
1879            None
1880        };
1881
1882        let market_unit = if product_type == BybitProductType::Spot
1883            && bybit_order_type == BybitOrderType::Market
1884        {
1885            if is_quote_quantity {
1886                Some(BYBIT_QUOTE_COIN.to_string())
1887            } else {
1888                Some(BYBIT_BASE_COIN.to_string())
1889            }
1890        } else {
1891            None
1892        };
1893
1894        // Only SPOT products support is_leverage parameter
1895        let is_leverage_value = if product_type == BybitProductType::Spot {
1896            Some(i32::from(is_leverage))
1897        } else {
1898            None
1899        };
1900
1901        // Stop semantics: Buy stops trigger on rise (breakout), sell stops trigger on fall (breakdown)
1902        // MIT semantics: Buy MIT triggers on fall (pullback entry), sell MIT triggers on rise (rally entry)
1903        let trigger_direction = if is_stop_order {
1904            match (order_type, order_side) {
1905                (OrderType::StopMarket | OrderType::StopLimit, OrderSide::Buy) => {
1906                    Some(BybitTriggerDirection::RisesTo as i32)
1907                }
1908                (OrderType::StopMarket | OrderType::StopLimit, OrderSide::Sell) => {
1909                    Some(BybitTriggerDirection::FallsTo as i32)
1910                }
1911                (OrderType::MarketIfTouched | OrderType::LimitIfTouched, OrderSide::Buy) => {
1912                    Some(BybitTriggerDirection::FallsTo as i32)
1913                }
1914                (OrderType::MarketIfTouched | OrderType::LimitIfTouched, OrderSide::Sell) => {
1915                    Some(BybitTriggerDirection::RisesTo as i32)
1916                }
1917                _ => None,
1918            }
1919        } else {
1920            None
1921        };
1922
1923        let params = if is_stop_order {
1924            BybitWsPlaceOrderParams {
1925                category: product_type,
1926                symbol: raw_symbol,
1927                side: bybit_side,
1928                order_type: bybit_order_type,
1929                qty: quantity.to_string(),
1930                is_leverage: is_leverage_value,
1931                market_unit,
1932                price: price.map(|p| p.to_string()),
1933                time_in_force: if bybit_order_type == BybitOrderType::Market {
1934                    None
1935                } else {
1936                    bybit_tif
1937                },
1938                order_link_id: Some(client_order_id.to_string()),
1939                reduce_only: reduce_only.filter(|&r| r),
1940                close_on_trigger: None,
1941                trigger_price: trigger_price.map(|p| p.to_string()),
1942                trigger_by: Some(BybitTriggerType::LastPrice),
1943                trigger_direction,
1944                tpsl_mode: None,
1945                take_profit: None,
1946                stop_loss: None,
1947                tp_trigger_by: None,
1948                sl_trigger_by: None,
1949                sl_trigger_price: None,
1950                tp_trigger_price: None,
1951                sl_order_type: None,
1952                tp_order_type: None,
1953                sl_limit_price: None,
1954                tp_limit_price: None,
1955            }
1956        } else {
1957            BybitWsPlaceOrderParams {
1958                category: product_type,
1959                symbol: raw_symbol,
1960                side: bybit_side,
1961                order_type: bybit_order_type,
1962                qty: quantity.to_string(),
1963                is_leverage: is_leverage_value,
1964                market_unit,
1965                price: price.map(|p| p.to_string()),
1966                time_in_force: if bybit_order_type == BybitOrderType::Market {
1967                    None
1968                } else {
1969                    bybit_tif
1970                },
1971                order_link_id: Some(client_order_id.to_string()),
1972                reduce_only: reduce_only.filter(|&r| r),
1973                close_on_trigger: None,
1974                trigger_price: None,
1975                trigger_by: None,
1976                trigger_direction: None,
1977                tpsl_mode: None,
1978                take_profit: None,
1979                stop_loss: None,
1980                tp_trigger_by: None,
1981                sl_trigger_by: None,
1982                sl_trigger_price: None,
1983                tp_trigger_price: None,
1984                sl_order_type: None,
1985                tp_order_type: None,
1986                sl_limit_price: None,
1987                tp_limit_price: None,
1988            }
1989        };
1990
1991        Ok(params)
1992    }
1993
1994    /// Builds order params for amending an order.
1995    #[allow(clippy::too_many_arguments)]
1996    pub fn build_amend_order_params(
1997        &self,
1998        product_type: BybitProductType,
1999        instrument_id: InstrumentId,
2000        venue_order_id: Option<VenueOrderId>,
2001        client_order_id: Option<ClientOrderId>,
2002        quantity: Option<Quantity>,
2003        price: Option<Price>,
2004    ) -> BybitWsResult<BybitWsAmendOrderParams> {
2005        let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
2006            .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
2007        let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
2008
2009        Ok(BybitWsAmendOrderParams {
2010            category: product_type,
2011            symbol: raw_symbol,
2012            order_id: venue_order_id.map(|v| v.to_string()),
2013            order_link_id: client_order_id.map(|c| c.to_string()),
2014            qty: quantity.map(|q| q.to_string()),
2015            price: price.map(|p| p.to_string()),
2016            trigger_price: None,
2017            take_profit: None,
2018            stop_loss: None,
2019            tp_trigger_by: None,
2020            sl_trigger_by: None,
2021        })
2022    }
2023
2024    /// Builds order params for canceling an order via WebSocket.
2025    ///
2026    /// # Errors
2027    ///
2028    /// Returns an error if symbol parsing fails or if neither venue_order_id
2029    /// nor client_order_id is provided.
2030    pub fn build_cancel_order_params(
2031        &self,
2032        product_type: BybitProductType,
2033        instrument_id: InstrumentId,
2034        venue_order_id: Option<VenueOrderId>,
2035        client_order_id: Option<ClientOrderId>,
2036    ) -> BybitWsResult<BybitWsCancelOrderParams> {
2037        if venue_order_id.is_none() && client_order_id.is_none() {
2038            return Err(BybitWsError::ClientError(
2039                "Either venue_order_id or client_order_id must be provided".to_string(),
2040            ));
2041        }
2042
2043        let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
2044            .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
2045        let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
2046
2047        Ok(BybitWsCancelOrderParams {
2048            category: product_type,
2049            symbol: raw_symbol,
2050            order_id: venue_order_id.map(|v| v.to_string()),
2051            order_link_id: client_order_id.map(|c| c.to_string()),
2052        })
2053    }
2054
2055    fn default_headers() -> Vec<(String, String)> {
2056        vec![
2057            ("Content-Type".to_string(), "application/json".to_string()),
2058            ("User-Agent".to_string(), NAUTILUS_USER_AGENT.to_string()),
2059        ]
2060    }
2061
2062    async fn authenticate_if_required(&self) -> BybitWsResult<()> {
2063        if !self.requires_auth {
2064            return Ok(());
2065        }
2066
2067        let credential = self.credential.as_ref().ok_or_else(|| {
2068            BybitWsError::Authentication("Credentials required for authentication".to_string())
2069        })?;
2070
2071        let expires = chrono::Utc::now().timestamp_millis() + WEBSOCKET_AUTH_WINDOW_MS;
2072        let signature = credential.sign_websocket_auth(expires);
2073
2074        let auth_message = BybitAuthRequest {
2075            op: BybitWsOperation::Auth,
2076            args: vec![
2077                Value::String(credential.api_key().to_string()),
2078                Value::Number(expires.into()),
2079                Value::String(signature),
2080            ],
2081        };
2082
2083        let payload = serde_json::to_string(&auth_message)?;
2084
2085        self.cmd_tx
2086            .read()
2087            .await
2088            .send(HandlerCommand::Authenticate { payload })
2089            .map_err(|e| BybitWsError::Send(format!("Failed to send auth command: {e}")))?;
2090
2091        // Authentication will be processed asynchronously by the handler
2092        // The handler will emit NautilusWsMessage::Authenticated when successful
2093        Ok(())
2094    }
2095
2096    async fn send_text(&self, text: &str) -> BybitWsResult<()> {
2097        let cmd = HandlerCommand::SendText {
2098            payload: text.to_string(),
2099        };
2100
2101        self.send_cmd(cmd).await
2102    }
2103
2104    async fn send_cmd(&self, cmd: HandlerCommand) -> BybitWsResult<()> {
2105        self.cmd_tx
2106            .read()
2107            .await
2108            .send(cmd)
2109            .map_err(|e| BybitWsError::Send(e.to_string()))
2110    }
2111}
2112
2113#[cfg(test)]
2114mod tests {
2115    use rstest::rstest;
2116
2117    use super::*;
2118    use crate::{
2119        common::testing::load_test_json,
2120        websocket::{classify_bybit_message, messages::BybitWsMessage},
2121    };
2122
2123    #[rstest]
2124    fn classify_orderbook_snapshot() {
2125        let json: Value = serde_json::from_str(&load_test_json("ws_orderbook_snapshot.json"))
2126            .expect("invalid fixture");
2127        let message = classify_bybit_message(json);
2128        assert!(matches!(message, BybitWsMessage::Orderbook(_)));
2129    }
2130
2131    #[rstest]
2132    fn classify_trade_snapshot() {
2133        let json: Value =
2134            serde_json::from_str(&load_test_json("ws_public_trade.json")).expect("invalid fixture");
2135        let message = classify_bybit_message(json);
2136        assert!(matches!(message, BybitWsMessage::Trade(_)));
2137    }
2138
2139    #[rstest]
2140    fn classify_ticker_linear_snapshot() {
2141        let json: Value = serde_json::from_str(&load_test_json("ws_ticker_linear.json"))
2142            .expect("invalid fixture");
2143        let message = classify_bybit_message(json);
2144        assert!(matches!(message, BybitWsMessage::TickerLinear(_)));
2145    }
2146
2147    #[rstest]
2148    fn classify_ticker_option_snapshot() {
2149        let json: Value = serde_json::from_str(&load_test_json("ws_ticker_option.json"))
2150            .expect("invalid fixture");
2151        let message = classify_bybit_message(json);
2152        assert!(matches!(message, BybitWsMessage::TickerOption(_)));
2153    }
2154
2155    #[rstest]
2156    fn test_race_unsubscribe_failure_recovery() {
2157        // Simulates the race condition where venue rejects an unsubscribe request.
2158        // The adapter must perform the 3-step recovery:
2159        // 1. confirm_unsubscribe() - clear pending_unsubscribe
2160        // 2. mark_subscribe() - mark as subscribing again
2161        // 3. confirm_subscribe() - restore to confirmed state
2162        let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER); // Bybit uses dot delimiter
2163
2164        let topic = "publicTrade.BTCUSDT";
2165
2166        // Initial subscribe flow
2167        subscriptions.mark_subscribe(topic);
2168        subscriptions.confirm_subscribe(topic);
2169        assert_eq!(subscriptions.len(), 1);
2170
2171        // User unsubscribes
2172        subscriptions.mark_unsubscribe(topic);
2173        assert_eq!(subscriptions.len(), 0);
2174        assert_eq!(subscriptions.pending_unsubscribe_topics(), vec![topic]);
2175
2176        // Venue REJECTS the unsubscribe (error message)
2177        // Adapter must perform 3-step recovery (from lines 2181-2183)
2178        subscriptions.confirm_unsubscribe(topic); // Step 1: clear pending_unsubscribe
2179        subscriptions.mark_subscribe(topic); // Step 2: mark as subscribing
2180        subscriptions.confirm_subscribe(topic); // Step 3: confirm subscription
2181
2182        // Verify recovery: topic should be back in confirmed state
2183        assert_eq!(subscriptions.len(), 1);
2184        assert!(subscriptions.pending_unsubscribe_topics().is_empty());
2185        assert!(subscriptions.pending_subscribe_topics().is_empty());
2186
2187        // Verify topic is in all_topics() for reconnect
2188        let all = subscriptions.all_topics();
2189        assert_eq!(all.len(), 1);
2190        assert!(all.contains(&topic.to_string()));
2191    }
2192
2193    #[rstest]
2194    fn test_race_resubscribe_before_unsubscribe_ack() {
2195        // Simulates: User unsubscribes, then immediately resubscribes before
2196        // the unsubscribe ACK arrives from the venue.
2197        // This is the race condition fixed in the subscription tracker.
2198        let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER); // Bybit uses dot delimiter
2199
2200        let topic = "orderbook.50.BTCUSDT";
2201
2202        // Initial subscribe
2203        subscriptions.mark_subscribe(topic);
2204        subscriptions.confirm_subscribe(topic);
2205        assert_eq!(subscriptions.len(), 1);
2206
2207        // User unsubscribes
2208        subscriptions.mark_unsubscribe(topic);
2209        assert_eq!(subscriptions.len(), 0);
2210        assert_eq!(subscriptions.pending_unsubscribe_topics(), vec![topic]);
2211
2212        // User immediately changes mind and resubscribes (before unsubscribe ACK)
2213        subscriptions.mark_subscribe(topic);
2214        assert_eq!(subscriptions.pending_subscribe_topics(), vec![topic]);
2215
2216        // NOW the unsubscribe ACK arrives - should NOT clear pending_subscribe
2217        subscriptions.confirm_unsubscribe(topic);
2218        assert!(subscriptions.pending_unsubscribe_topics().is_empty());
2219        assert_eq!(subscriptions.pending_subscribe_topics(), vec![topic]);
2220
2221        // Subscribe ACK arrives
2222        subscriptions.confirm_subscribe(topic);
2223        assert_eq!(subscriptions.len(), 1);
2224        assert!(subscriptions.pending_subscribe_topics().is_empty());
2225
2226        // Verify final state is correct
2227        let all = subscriptions.all_topics();
2228        assert_eq!(all.len(), 1);
2229        assert!(all.contains(&topic.to_string()));
2230    }
2231
2232    #[rstest]
2233    fn test_race_late_subscribe_confirmation_after_unsubscribe() {
2234        // Simulates: User subscribes, then unsubscribes before subscribe ACK arrives.
2235        // The late subscribe ACK should be ignored.
2236        let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER); // Bybit uses dot delimiter
2237
2238        let topic = "tickers.ETHUSDT";
2239
2240        // User subscribes
2241        subscriptions.mark_subscribe(topic);
2242        assert_eq!(subscriptions.pending_subscribe_topics(), vec![topic]);
2243
2244        // User immediately unsubscribes (before subscribe ACK)
2245        subscriptions.mark_unsubscribe(topic);
2246        assert!(subscriptions.pending_subscribe_topics().is_empty()); // Cleared
2247        assert_eq!(subscriptions.pending_unsubscribe_topics(), vec![topic]);
2248
2249        // Late subscribe confirmation arrives - should be IGNORED
2250        subscriptions.confirm_subscribe(topic);
2251        assert_eq!(subscriptions.len(), 0); // Not added to confirmed
2252        assert_eq!(subscriptions.pending_unsubscribe_topics(), vec![topic]);
2253
2254        // Unsubscribe ACK arrives
2255        subscriptions.confirm_unsubscribe(topic);
2256
2257        // Final state: completely empty
2258        assert!(subscriptions.is_empty());
2259        assert!(subscriptions.all_topics().is_empty());
2260    }
2261
2262    #[rstest]
2263    fn test_race_reconnection_with_pending_states() {
2264        // Simulates reconnection with mixed pending states.
2265        let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER); // Bybit uses dot delimiter
2266
2267        // Set up mixed state before reconnection
2268        // Confirmed: publicTrade.BTCUSDT
2269        let trade_btc = "publicTrade.BTCUSDT";
2270        subscriptions.mark_subscribe(trade_btc);
2271        subscriptions.confirm_subscribe(trade_btc);
2272
2273        // Pending subscribe: publicTrade.ETHUSDT
2274        let trade_eth = "publicTrade.ETHUSDT";
2275        subscriptions.mark_subscribe(trade_eth);
2276
2277        // Pending unsubscribe: orderbook.50.BTCUSDT (user cancelled)
2278        let book_btc = "orderbook.50.BTCUSDT";
2279        subscriptions.mark_subscribe(book_btc);
2280        subscriptions.confirm_subscribe(book_btc);
2281        subscriptions.mark_unsubscribe(book_btc);
2282
2283        // Get topics for reconnection
2284        let topics_to_restore = subscriptions.all_topics();
2285
2286        // Should include: confirmed + pending_subscribe (NOT pending_unsubscribe)
2287        assert_eq!(topics_to_restore.len(), 2);
2288        assert!(topics_to_restore.contains(&trade_btc.to_string()));
2289        assert!(topics_to_restore.contains(&trade_eth.to_string()));
2290        assert!(!topics_to_restore.contains(&book_btc.to_string())); // Excluded
2291    }
2292
2293    #[rstest]
2294    fn test_race_duplicate_subscribe_messages_idempotent() {
2295        // Simulates duplicate subscribe requests (e.g., from reconnection logic).
2296        // The subscription tracker should be idempotent and not create duplicate state.
2297        let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER); // Bybit uses dot delimiter
2298
2299        let topic = "publicTrade.BTCUSDT";
2300
2301        // Subscribe and confirm
2302        subscriptions.mark_subscribe(topic);
2303        subscriptions.confirm_subscribe(topic);
2304        assert_eq!(subscriptions.len(), 1);
2305
2306        // Duplicate mark_subscribe on already-confirmed topic (should be no-op)
2307        subscriptions.mark_subscribe(topic);
2308        assert!(subscriptions.pending_subscribe_topics().is_empty()); // Not re-added
2309        assert_eq!(subscriptions.len(), 1); // Still just 1
2310
2311        // Duplicate confirm_subscribe (should be idempotent)
2312        subscriptions.confirm_subscribe(topic);
2313        assert_eq!(subscriptions.len(), 1);
2314
2315        // Verify final state
2316        let all = subscriptions.all_topics();
2317        assert_eq!(all.len(), 1);
2318        assert_eq!(all[0], topic);
2319    }
2320
2321    #[rstest]
2322    #[case::spot_with_leverage(BybitProductType::Spot, true, Some(1))]
2323    #[case::spot_without_leverage(BybitProductType::Spot, false, Some(0))]
2324    #[case::linear_with_leverage(BybitProductType::Linear, true, None)]
2325    #[case::linear_without_leverage(BybitProductType::Linear, false, None)]
2326    #[case::inverse_with_leverage(BybitProductType::Inverse, true, None)]
2327    #[case::option_with_leverage(BybitProductType::Option, true, None)]
2328    fn test_is_leverage_parameter(
2329        #[case] product_type: BybitProductType,
2330        #[case] is_leverage: bool,
2331        #[case] expected: Option<i32>,
2332    ) {
2333        let symbol = match product_type {
2334            BybitProductType::Spot => "BTCUSDT-SPOT.BYBIT",
2335            BybitProductType::Linear => "ETHUSDT-LINEAR.BYBIT",
2336            BybitProductType::Inverse => "BTCUSD-INVERSE.BYBIT",
2337            BybitProductType::Option => "BTC-31MAY24-50000-C-OPTION.BYBIT",
2338        };
2339
2340        let instrument_id = InstrumentId::from(symbol);
2341        let client_order_id = ClientOrderId::from("test-order-1");
2342        let quantity = Quantity::from("1.0");
2343
2344        let client = BybitWebSocketClient::new_trade(
2345            BybitEnvironment::Testnet,
2346            Some("test-key".to_string()),
2347            Some("test-secret".to_string()),
2348            None,
2349            Some(20),
2350        );
2351
2352        let params = client
2353            .build_place_order_params(
2354                product_type,
2355                instrument_id,
2356                client_order_id,
2357                OrderSide::Buy,
2358                OrderType::Limit,
2359                quantity,
2360                false, // is_quote_quantity
2361                Some(TimeInForce::Gtc),
2362                Some(Price::from("50000.0")),
2363                None,
2364                None,
2365                None,
2366                is_leverage,
2367            )
2368            .expect("Failed to build params");
2369
2370        assert_eq!(params.is_leverage, expected);
2371    }
2372
2373    #[rstest]
2374    #[case::spot_market_quote_quantity(BybitProductType::Spot, OrderType::Market, true, Some(BYBIT_QUOTE_COIN.to_string()))]
2375    #[case::spot_market_base_quantity(BybitProductType::Spot, OrderType::Market, false, Some(BYBIT_BASE_COIN.to_string()))]
2376    #[case::spot_limit_no_unit(BybitProductType::Spot, OrderType::Limit, false, None)]
2377    #[case::spot_limit_quote(BybitProductType::Spot, OrderType::Limit, true, None)]
2378    #[case::linear_market_no_unit(BybitProductType::Linear, OrderType::Market, false, None)]
2379    #[case::inverse_market_no_unit(BybitProductType::Inverse, OrderType::Market, true, None)]
2380    fn test_is_quote_quantity_parameter(
2381        #[case] product_type: BybitProductType,
2382        #[case] order_type: OrderType,
2383        #[case] is_quote_quantity: bool,
2384        #[case] expected: Option<String>,
2385    ) {
2386        let symbol = match product_type {
2387            BybitProductType::Spot => "BTCUSDT-SPOT.BYBIT",
2388            BybitProductType::Linear => "ETHUSDT-LINEAR.BYBIT",
2389            BybitProductType::Inverse => "BTCUSD-INVERSE.BYBIT",
2390            BybitProductType::Option => "BTC-31MAY24-50000-C-OPTION.BYBIT",
2391        };
2392
2393        let instrument_id = InstrumentId::from(symbol);
2394        let client_order_id = ClientOrderId::from("test-order-1");
2395        let quantity = Quantity::from("1.0");
2396
2397        let client = BybitWebSocketClient::new_trade(
2398            BybitEnvironment::Testnet,
2399            Some("test-key".to_string()),
2400            Some("test-secret".to_string()),
2401            None,
2402            Some(20),
2403        );
2404
2405        let params = client
2406            .build_place_order_params(
2407                product_type,
2408                instrument_id,
2409                client_order_id,
2410                OrderSide::Buy,
2411                order_type,
2412                quantity,
2413                is_quote_quantity,
2414                Some(TimeInForce::Gtc),
2415                if order_type == OrderType::Market {
2416                    None
2417                } else {
2418                    Some(Price::from("50000.0"))
2419                },
2420                None,
2421                None,
2422                None,
2423                false,
2424            )
2425            .expect("Failed to build params");
2426
2427        assert_eq!(params.market_unit, expected);
2428    }
2429}