nautilus_bybit/websocket/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Bybit WebSocket client providing public market data streaming.
17//!
18//! Bybit API reference <https://bybit-exchange.github.io/docs/>.
19
20use std::{
21    fmt::Debug,
22    sync::{
23        Arc,
24        atomic::{AtomicBool, AtomicU8, Ordering},
25    },
26    time::Duration,
27};
28
29use ahash::AHashMap;
30use arc_swap::ArcSwap;
31use dashmap::DashMap;
32use nautilus_common::live::get_runtime;
33use nautilus_core::{UUID4, consts::NAUTILUS_USER_AGENT, env::get_or_env_var_opt};
34use nautilus_model::{
35    enums::{OrderSide, OrderType, TimeInForce},
36    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
37    instruments::{Instrument, InstrumentAny},
38    types::{Price, Quantity},
39};
40use nautilus_network::{
41    backoff::ExponentialBackoff,
42    mode::ConnectionMode,
43    websocket::{
44        AuthTracker, PingHandler, SubscriptionState, WebSocketClient, WebSocketConfig,
45        channel_message_handler,
46    },
47};
48use serde_json::Value;
49use tokio_util::sync::CancellationToken;
50use ustr::Ustr;
51
52use crate::{
53    common::{
54        consts::{
55            BYBIT_BASE_COIN, BYBIT_NAUTILUS_BROKER_ID, BYBIT_QUOTE_COIN, BYBIT_WS_TOPIC_DELIMITER,
56        },
57        credential::Credential,
58        enums::{
59            BybitEnvironment, BybitOrderSide, BybitOrderType, BybitProductType, BybitTimeInForce,
60            BybitTriggerDirection, BybitTriggerType, BybitWsOrderRequestOp,
61        },
62        parse::{extract_raw_symbol, make_bybit_symbol},
63        symbol::BybitSymbol,
64        urls::{bybit_ws_private_url, bybit_ws_public_url, bybit_ws_trade_url},
65    },
66    websocket::{
67        enums::{BybitWsOperation, BybitWsPrivateChannel, BybitWsPublicChannel},
68        error::{BybitWsError, BybitWsResult},
69        handler::{FeedHandler, HandlerCommand},
70        messages::{
71            BybitAuthRequest, BybitSubscription, BybitWsAmendOrderParams, BybitWsBatchCancelItem,
72            BybitWsBatchCancelOrderArgs, BybitWsBatchPlaceItem, BybitWsBatchPlaceOrderArgs,
73            BybitWsCancelOrderParams, BybitWsHeader, BybitWsPlaceOrderParams, BybitWsRequest,
74            NautilusWsMessage,
75        },
76    },
77};
78
79const DEFAULT_HEARTBEAT_SECS: u64 = 20;
80const WEBSOCKET_AUTH_WINDOW_MS: i64 = 5_000;
81const BATCH_PROCESSING_LIMIT: usize = 20;
82
83/// Type alias for the funding rate cache.
84type FundingCache = Arc<tokio::sync::RwLock<AHashMap<Ustr, (Option<String>, Option<String>)>>>;
85
86/// Resolves credentials from provided values or environment variables.
87///
88/// Priority for environment variables based on environment:
89/// - Demo: `BYBIT_DEMO_API_KEY`, `BYBIT_DEMO_API_SECRET`
90/// - Testnet: `BYBIT_TESTNET_API_KEY`, `BYBIT_TESTNET_API_SECRET`
91/// - Mainnet: `BYBIT_API_KEY`, `BYBIT_API_SECRET`
92fn resolve_credential(
93    environment: BybitEnvironment,
94    api_key: Option<String>,
95    api_secret: Option<String>,
96) -> Option<Credential> {
97    let (api_key_env, api_secret_env) = match environment {
98        BybitEnvironment::Demo => ("BYBIT_DEMO_API_KEY", "BYBIT_DEMO_API_SECRET"),
99        BybitEnvironment::Testnet => ("BYBIT_TESTNET_API_KEY", "BYBIT_TESTNET_API_SECRET"),
100        BybitEnvironment::Mainnet => ("BYBIT_API_KEY", "BYBIT_API_SECRET"),
101    };
102
103    let key = get_or_env_var_opt(api_key, api_key_env);
104    let secret = get_or_env_var_opt(api_secret, api_secret_env);
105
106    match (key, secret) {
107        (Some(k), Some(s)) => Some(Credential::new(k, s)),
108        _ => None,
109    }
110}
111
112/// Public/market data WebSocket client for Bybit.
113#[cfg_attr(feature = "python", pyo3::pyclass)]
114pub struct BybitWebSocketClient {
115    url: String,
116    environment: BybitEnvironment,
117    product_type: Option<BybitProductType>,
118    credential: Option<Credential>,
119    requires_auth: bool,
120    auth_tracker: AuthTracker,
121    heartbeat: Option<u64>,
122    connection_mode: Arc<ArcSwap<AtomicU8>>,
123    cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
124    out_rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>>,
125    signal: Arc<AtomicBool>,
126    task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
127    subscriptions: SubscriptionState,
128    is_authenticated: Arc<AtomicBool>,
129    account_id: Option<AccountId>,
130    mm_level: Arc<AtomicU8>,
131    bars_timestamp_on_close: bool,
132    instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
133    funding_cache: FundingCache,
134    cancellation_token: CancellationToken,
135}
136
137impl Debug for BybitWebSocketClient {
138    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139        f.debug_struct(stringify!(BybitWebSocketClient))
140            .field("url", &self.url)
141            .field("environment", &self.environment)
142            .field("product_type", &self.product_type)
143            .field("requires_auth", &self.requires_auth)
144            .field("heartbeat", &self.heartbeat)
145            .field("confirmed_subscriptions", &self.subscriptions.len())
146            .finish()
147    }
148}
149
150impl Clone for BybitWebSocketClient {
151    fn clone(&self) -> Self {
152        Self {
153            url: self.url.clone(),
154            environment: self.environment,
155            product_type: self.product_type,
156            credential: self.credential.clone(),
157            requires_auth: self.requires_auth,
158            auth_tracker: self.auth_tracker.clone(),
159            heartbeat: self.heartbeat,
160            connection_mode: Arc::clone(&self.connection_mode),
161            cmd_tx: Arc::clone(&self.cmd_tx),
162            out_rx: None, // Each clone gets its own receiver
163            signal: Arc::clone(&self.signal),
164            task_handle: None, // Each clone gets its own task handle
165            subscriptions: self.subscriptions.clone(),
166            is_authenticated: Arc::clone(&self.is_authenticated),
167            account_id: self.account_id,
168            mm_level: Arc::clone(&self.mm_level),
169            bars_timestamp_on_close: self.bars_timestamp_on_close,
170            instruments_cache: Arc::clone(&self.instruments_cache),
171            funding_cache: Arc::clone(&self.funding_cache),
172            cancellation_token: self.cancellation_token.clone(),
173        }
174    }
175}
176
177impl BybitWebSocketClient {
178    /// Creates a new Bybit public WebSocket client.
179    #[must_use]
180    pub fn new_public(url: Option<String>, heartbeat: Option<u64>) -> Self {
181        Self::new_public_with(
182            BybitProductType::Linear,
183            BybitEnvironment::Mainnet,
184            url,
185            heartbeat,
186        )
187    }
188
189    /// Creates a new Bybit public WebSocket client targeting the specified product/environment.
190    #[must_use]
191    pub fn new_public_with(
192        product_type: BybitProductType,
193        environment: BybitEnvironment,
194        url: Option<String>,
195        heartbeat: Option<u64>,
196    ) -> Self {
197        // We don't have a handler yet; this placeholder keeps cache_instrument() working.
198        // connect() swaps in the real channel and replays any queued instruments so the
199        // handler sees them once it starts.
200        let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
201
202        let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
203        let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
204
205        Self {
206            url: url.unwrap_or_else(|| bybit_ws_public_url(product_type, environment)),
207            environment,
208            product_type: Some(product_type),
209            credential: None,
210            requires_auth: false,
211            auth_tracker: AuthTracker::new(),
212            heartbeat: heartbeat.or(Some(DEFAULT_HEARTBEAT_SECS)),
213            connection_mode,
214            cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
215            out_rx: None,
216            signal: Arc::new(AtomicBool::new(false)),
217            task_handle: None,
218            subscriptions: SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER),
219            is_authenticated: Arc::new(AtomicBool::new(false)),
220            instruments_cache: Arc::new(DashMap::new()),
221            account_id: None,
222            mm_level: Arc::new(AtomicU8::new(0)),
223            bars_timestamp_on_close: true,
224            funding_cache: Arc::new(tokio::sync::RwLock::new(AHashMap::new())),
225            cancellation_token: CancellationToken::new(),
226        }
227    }
228
229    /// Creates a new Bybit private WebSocket client.
230    ///
231    /// If `api_key` or `api_secret` are not provided, they will be loaded from
232    /// environment variables based on the environment:
233    /// - Demo: `BYBIT_DEMO_API_KEY`, `BYBIT_DEMO_API_SECRET`
234    /// - Testnet: `BYBIT_TESTNET_API_KEY`, `BYBIT_TESTNET_API_SECRET`
235    /// - Mainnet: `BYBIT_API_KEY`, `BYBIT_API_SECRET`
236    #[must_use]
237    pub fn new_private(
238        environment: BybitEnvironment,
239        api_key: Option<String>,
240        api_secret: Option<String>,
241        url: Option<String>,
242        heartbeat: Option<u64>,
243    ) -> Self {
244        let credential = resolve_credential(environment, api_key, api_secret);
245
246        // We don't have a handler yet; this placeholder keeps cache_instrument() working.
247        // connect() swaps in the real channel and replays any queued instruments so the
248        // handler sees them once it starts.
249        let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
250
251        let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
252        let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
253
254        Self {
255            url: url.unwrap_or_else(|| bybit_ws_private_url(environment).to_string()),
256            environment,
257            product_type: None,
258            credential,
259            requires_auth: true,
260            auth_tracker: AuthTracker::new(),
261            heartbeat: heartbeat.or(Some(DEFAULT_HEARTBEAT_SECS)),
262            connection_mode,
263            cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
264            out_rx: None,
265            signal: Arc::new(AtomicBool::new(false)),
266            task_handle: None,
267            subscriptions: SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER),
268            is_authenticated: Arc::new(AtomicBool::new(false)),
269            instruments_cache: Arc::new(DashMap::new()),
270            account_id: None,
271            mm_level: Arc::new(AtomicU8::new(0)),
272            bars_timestamp_on_close: true,
273            funding_cache: Arc::new(tokio::sync::RwLock::new(AHashMap::new())),
274            cancellation_token: CancellationToken::new(),
275        }
276    }
277
278    /// Creates a new Bybit trade WebSocket client for order operations.
279    ///
280    /// If `api_key` or `api_secret` are not provided, they will be loaded from
281    /// environment variables based on the environment:
282    /// - Demo: `BYBIT_DEMO_API_KEY`, `BYBIT_DEMO_API_SECRET`
283    /// - Testnet: `BYBIT_TESTNET_API_KEY`, `BYBIT_TESTNET_API_SECRET`
284    /// - Mainnet: `BYBIT_API_KEY`, `BYBIT_API_SECRET`
285    #[must_use]
286    pub fn new_trade(
287        environment: BybitEnvironment,
288        api_key: Option<String>,
289        api_secret: Option<String>,
290        url: Option<String>,
291        heartbeat: Option<u64>,
292    ) -> Self {
293        let credential = resolve_credential(environment, api_key, api_secret);
294
295        // We don't have a handler yet; this placeholder keeps cache_instrument() working.
296        // connect() swaps in the real channel and replays any queued instruments so the
297        // handler sees them once it starts.
298        let (cmd_tx, _) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
299
300        let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
301        let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
302
303        Self {
304            url: url.unwrap_or_else(|| bybit_ws_trade_url(environment).to_string()),
305            environment,
306            product_type: None,
307            credential,
308            requires_auth: true,
309            auth_tracker: AuthTracker::new(),
310            heartbeat: heartbeat.or(Some(DEFAULT_HEARTBEAT_SECS)),
311            connection_mode,
312            cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
313            out_rx: None,
314            signal: Arc::new(AtomicBool::new(false)),
315            task_handle: None,
316            subscriptions: SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER),
317            is_authenticated: Arc::new(AtomicBool::new(false)),
318            instruments_cache: Arc::new(DashMap::new()),
319            account_id: None,
320            mm_level: Arc::new(AtomicU8::new(0)),
321            bars_timestamp_on_close: true,
322            funding_cache: Arc::new(tokio::sync::RwLock::new(AHashMap::new())),
323            cancellation_token: CancellationToken::new(),
324        }
325    }
326
327    /// Establishes the WebSocket connection.
328    ///
329    /// # Errors
330    ///
331    /// Returns an error if the underlying WebSocket connection cannot be established,
332    /// after retrying multiple times with exponential backoff.
333    pub async fn connect(&mut self) -> BybitWsResult<()> {
334        const MAX_RETRIES: u32 = 5;
335        const CONNECTION_TIMEOUT_SECS: u64 = 10;
336
337        self.signal.store(false, Ordering::Relaxed);
338
339        let (raw_handler, raw_rx) = channel_message_handler();
340
341        // No-op ping handler: handler owns the WebSocketClient and responds to pings directly
342        // in the message loop for minimal latency (see handler.rs pong response)
343        let ping_handler: PingHandler = Arc::new(move |_payload: Vec<u8>| {
344            // Handler responds to pings internally via select! loop
345        });
346
347        let ping_msg = serde_json::to_string(&BybitSubscription {
348            op: BybitWsOperation::Ping,
349            args: vec![],
350        })?;
351
352        let config = WebSocketConfig {
353            url: self.url.clone(),
354            headers: Self::default_headers(),
355            heartbeat: self.heartbeat,
356            heartbeat_msg: Some(ping_msg),
357            reconnect_timeout_ms: Some(5_000),
358            reconnect_delay_initial_ms: Some(500),
359            reconnect_delay_max_ms: Some(5_000),
360            reconnect_backoff_factor: Some(1.5),
361            reconnect_jitter_ms: Some(250),
362            reconnect_max_attempts: None,
363        };
364
365        // Retry initial connection with exponential backoff to handle transient DNS/network issues
366        // TODO: Eventually expose client config options for this
367        let mut backoff = ExponentialBackoff::new(
368            Duration::from_millis(500),
369            Duration::from_millis(5000),
370            2.0,
371            250,
372            false,
373        )
374        .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
375
376        #[allow(unused_assignments)]
377        let mut last_error = String::new();
378        let mut attempt = 0;
379        let client = loop {
380            attempt += 1;
381
382            match tokio::time::timeout(
383                Duration::from_secs(CONNECTION_TIMEOUT_SECS),
384                WebSocketClient::connect(
385                    config.clone(),
386                    Some(raw_handler.clone()),
387                    Some(ping_handler.clone()),
388                    None,
389                    vec![],
390                    None,
391                ),
392            )
393            .await
394            {
395                Ok(Ok(client)) => {
396                    if attempt > 1 {
397                        log::info!("WebSocket connection established after {attempt} attempts");
398                    }
399                    break client;
400                }
401                Ok(Err(e)) => {
402                    last_error = e.to_string();
403                    log::warn!(
404                        "WebSocket connection attempt failed: attempt={attempt}, max_retries={MAX_RETRIES}, url={}, error={last_error}",
405                        self.url
406                    );
407                }
408                Err(_) => {
409                    last_error = format!(
410                        "Connection timeout after {CONNECTION_TIMEOUT_SECS}s (possible DNS resolution failure)"
411                    );
412                    log::warn!(
413                        "WebSocket connection attempt timed out: attempt={attempt}, max_retries={MAX_RETRIES}, url={}",
414                        self.url
415                    );
416                }
417            }
418
419            if attempt >= MAX_RETRIES {
420                return Err(BybitWsError::Transport(format!(
421                    "Failed to connect to {} after {MAX_RETRIES} attempts: {}. \
422                    If this is a DNS error, check your network configuration and DNS settings.",
423                    self.url,
424                    if last_error.is_empty() {
425                        "unknown error"
426                    } else {
427                        &last_error
428                    }
429                )));
430            }
431
432            let delay = backoff.next_duration();
433            log::debug!(
434                "Retrying in {delay:?} (attempt {}/{MAX_RETRIES})",
435                attempt + 1
436            );
437            tokio::time::sleep(delay).await;
438        };
439
440        self.connection_mode.store(client.connection_mode_atomic());
441
442        let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
443        self.out_rx = Some(Arc::new(out_rx));
444
445        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
446        *self.cmd_tx.write().await = cmd_tx.clone();
447
448        let cmd = HandlerCommand::SetClient(client);
449
450        self.send_cmd(cmd).await?;
451
452        // Replay cached instruments to the new handler via the new channel
453        if !self.instruments_cache.is_empty() {
454            let cached_instruments: Vec<InstrumentAny> = self
455                .instruments_cache
456                .iter()
457                .map(|entry| entry.value().clone())
458                .collect();
459            let cmd = HandlerCommand::InitializeInstruments(cached_instruments);
460            self.send_cmd(cmd).await?;
461        }
462
463        let signal = Arc::clone(&self.signal);
464        let subscriptions = self.subscriptions.clone();
465        let credential = self.credential.clone();
466        let requires_auth = self.requires_auth;
467        let funding_cache = Arc::clone(&self.funding_cache);
468        let account_id = self.account_id;
469        let product_type = self.product_type;
470        let bars_timestamp_on_close = self.bars_timestamp_on_close;
471        let mm_level = Arc::clone(&self.mm_level);
472        let cmd_tx_for_reconnect = cmd_tx.clone();
473        let auth_tracker = self.auth_tracker.clone();
474        let is_authenticated = Arc::clone(&self.is_authenticated);
475
476        let stream_handle = get_runtime().spawn(async move {
477            let mut handler = FeedHandler::new(
478                signal.clone(),
479                cmd_rx,
480                raw_rx,
481                out_tx.clone(),
482                account_id,
483                product_type,
484                bars_timestamp_on_close,
485                mm_level.clone(),
486                auth_tracker,
487                subscriptions.clone(),
488                funding_cache.clone(),
489            );
490
491            // Helper closure to resubscribe all tracked subscriptions after reconnection
492            let resubscribe_all = || async {
493                let topics = subscriptions.all_topics();
494
495                if topics.is_empty() {
496                    return;
497                }
498
499                log::debug!(
500                    "Resubscribing to confirmed subscriptions: count={}",
501                    topics.len()
502                );
503
504                for topic in &topics {
505                    subscriptions.mark_subscribe(topic.as_str());
506                }
507
508                let mut payloads = Vec::with_capacity(topics.len());
509                for topic in &topics {
510                    let message = BybitSubscription {
511                        op: BybitWsOperation::Subscribe,
512                        args: vec![topic.clone()],
513                    };
514                    if let Ok(payload) = serde_json::to_string(&message) {
515                        payloads.push(payload);
516                    }
517                }
518
519                let cmd = HandlerCommand::Subscribe { topics: payloads };
520
521                if let Err(e) = cmd_tx_for_reconnect.send(cmd) {
522                    log::error!("Failed to send resubscribe command: {e}");
523                }
524            };
525
526            // Run message processing with reconnection handling
527            loop {
528                match handler.next().await {
529                    Some(NautilusWsMessage::Reconnected) => {
530                        if signal.load(Ordering::Relaxed) {
531                            continue;
532                        }
533
534                        log::info!("WebSocket reconnected");
535
536                        // Mark all confirmed subscriptions as failed so they transition to pending state
537                        let confirmed_topics: Vec<String> = {
538                            let confirmed = subscriptions.confirmed();
539                            let mut topics = Vec::new();
540                            for entry in confirmed.iter() {
541                                let (channel, symbols) = entry.pair();
542                                for symbol in symbols {
543                                    if symbol.is_empty() {
544                                        topics.push(channel.to_string());
545                                    } else {
546                                        topics.push(format!("{channel}.{symbol}"));
547                                    }
548                                }
549                            }
550                            topics
551                        };
552
553                        if !confirmed_topics.is_empty() {
554                            log::debug!(
555                                "Marking confirmed subscriptions as pending for replay: count={}",
556                                confirmed_topics.len()
557                            );
558                            for topic in confirmed_topics {
559                                subscriptions.mark_failure(&topic);
560                            }
561                        }
562
563                        // Clear caches to prevent stale data after reconnection
564                        funding_cache.write().await.clear();
565
566                        if requires_auth {
567                            is_authenticated.store(false, Ordering::Relaxed);
568                            log::debug!("Re-authenticating after reconnection");
569
570                            if let Some(cred) = &credential {
571                                let expires = chrono::Utc::now().timestamp_millis()
572                                    + WEBSOCKET_AUTH_WINDOW_MS;
573                                let signature = cred.sign_websocket_auth(expires);
574
575                                let auth_message = BybitAuthRequest {
576                                    op: BybitWsOperation::Auth,
577                                    args: vec![
578                                        Value::String(cred.api_key().to_string()),
579                                        Value::Number(expires.into()),
580                                        Value::String(signature),
581                                    ],
582                                };
583
584                                if let Ok(payload) = serde_json::to_string(&auth_message) {
585                                    let cmd = HandlerCommand::Authenticate { payload };
586                                    if let Err(e) = cmd_tx_for_reconnect.send(cmd) {
587                                        log::error!(
588                                            "Failed to send reconnection auth command: error={e}"
589                                        );
590                                    }
591                                } else {
592                                    log::error!("Failed to serialize reconnection auth message");
593                                }
594                            }
595                        }
596
597                        // Unauthenticated sessions resubscribe immediately after reconnection,
598                        // authenticated sessions wait for Authenticated message
599                        if !requires_auth {
600                            log::debug!("No authentication required, resubscribing immediately");
601                            resubscribe_all().await;
602                        }
603
604                        // Forward to out_tx so caller sees the Reconnected message
605                        if out_tx.send(NautilusWsMessage::Reconnected).is_err() {
606                            log::debug!("Receiver dropped, stopping");
607                            break;
608                        }
609                        continue;
610                    }
611                    Some(NautilusWsMessage::Authenticated) => {
612                        log::debug!("Authenticated, resubscribing");
613                        is_authenticated.store(true, Ordering::Relaxed);
614                        resubscribe_all().await;
615                        continue;
616                    }
617                    Some(msg) => {
618                        if out_tx.send(msg).is_err() {
619                            log::error!("Failed to send message (receiver dropped)");
620                            break;
621                        }
622                    }
623                    None => {
624                        // Stream ended - check if it's a stop signal
625                        if handler.is_stopped() {
626                            log::debug!("Stop signal received, ending message processing");
627                            break;
628                        }
629                        // Otherwise it's an unexpected stream end
630                        log::warn!("WebSocket stream ended unexpectedly");
631                        break;
632                    }
633                }
634            }
635
636            log::debug!("Handler task exiting");
637        });
638
639        self.task_handle = Some(Arc::new(stream_handle));
640
641        if requires_auth && let Err(e) = self.authenticate_if_required().await {
642            return Err(e);
643        }
644
645        Ok(())
646    }
647
648    /// Disconnects the WebSocket client and stops the background task.
649    pub async fn close(&mut self) -> BybitWsResult<()> {
650        log::debug!("Starting close process");
651
652        self.signal.store(true, Ordering::Relaxed);
653
654        let cmd = HandlerCommand::Disconnect;
655        if let Err(e) = self.cmd_tx.read().await.send(cmd) {
656            log::debug!(
657                "Failed to send disconnect command (handler may already be shut down): {e}"
658            );
659        }
660
661        if let Some(task_handle) = self.task_handle.take() {
662            match Arc::try_unwrap(task_handle) {
663                Ok(handle) => {
664                    log::debug!("Waiting for task handle to complete");
665                    match tokio::time::timeout(Duration::from_secs(2), handle).await {
666                        Ok(Ok(())) => log::debug!("Task handle completed successfully"),
667                        Ok(Err(e)) => log::error!("Task handle encountered an error: {e:?}"),
668                        Err(_) => {
669                            log::warn!(
670                                "Timeout waiting for task handle, task may still be running"
671                            );
672                            // The task will be dropped and should clean up automatically
673                        }
674                    }
675                }
676                Err(arc_handle) => {
677                    log::debug!(
678                        "Cannot take ownership of task handle - other references exist, aborting task"
679                    );
680                    arc_handle.abort();
681                }
682            }
683        } else {
684            log::debug!("No task handle to await");
685        }
686
687        self.is_authenticated.store(false, Ordering::Relaxed);
688
689        log::debug!("Closed");
690
691        Ok(())
692    }
693
694    /// Returns a value indicating whether the client is active.
695    #[must_use]
696    pub fn is_active(&self) -> bool {
697        let connection_mode_arc = self.connection_mode.load();
698        ConnectionMode::from_atomic(&connection_mode_arc).is_active()
699            && !self.signal.load(Ordering::Relaxed)
700    }
701
702    /// Returns a value indicating whether the client is closed.
703    pub fn is_closed(&self) -> bool {
704        let connection_mode_arc = self.connection_mode.load();
705        ConnectionMode::from_atomic(&connection_mode_arc).is_closed()
706            || self.signal.load(Ordering::Relaxed)
707    }
708
709    /// Waits until the WebSocket client becomes active or times out.
710    ///
711    /// # Errors
712    ///
713    /// Returns an error if the timeout is exceeded before the client becomes active.
714    pub async fn wait_until_active(&self, timeout_secs: f64) -> BybitWsResult<()> {
715        let timeout = tokio::time::Duration::from_secs_f64(timeout_secs);
716
717        tokio::time::timeout(timeout, async {
718            while !self.is_active() {
719                tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
720            }
721        })
722        .await
723        .map_err(|_| {
724            BybitWsError::ClientError(format!(
725                "WebSocket connection timeout after {timeout_secs} seconds"
726            ))
727        })?;
728
729        Ok(())
730    }
731
732    /// Subscribe to the provided topic strings.
733    pub async fn subscribe(&self, topics: Vec<String>) -> BybitWsResult<()> {
734        if topics.is_empty() {
735            return Ok(());
736        }
737
738        log::debug!("Subscribing to topics: {topics:?}");
739
740        // Use reference counting to deduplicate subscriptions
741        let mut topics_to_send = Vec::new();
742
743        for topic in topics {
744            // Returns true if this is the first subscription (ref count 0 -> 1)
745            if self.subscriptions.add_reference(&topic) {
746                self.subscriptions.mark_subscribe(&topic);
747                topics_to_send.push(topic.clone());
748            } else {
749                log::debug!("Already subscribed to {topic}, skipping duplicate subscription");
750            }
751        }
752
753        if topics_to_send.is_empty() {
754            return Ok(());
755        }
756
757        // Serialize subscription messages
758        let mut payloads = Vec::with_capacity(topics_to_send.len());
759        for topic in &topics_to_send {
760            let message = BybitSubscription {
761                op: BybitWsOperation::Subscribe,
762                args: vec![topic.clone()],
763            };
764            let payload = serde_json::to_string(&message).map_err(|e| {
765                BybitWsError::Json(format!("Failed to serialize subscription: {e}"))
766            })?;
767            payloads.push(payload);
768        }
769
770        let cmd = HandlerCommand::Subscribe { topics: payloads };
771        self.cmd_tx
772            .read()
773            .await
774            .send(cmd)
775            .map_err(|e| BybitWsError::Send(format!("Failed to send subscribe command: {e}")))?;
776
777        Ok(())
778    }
779
780    /// Unsubscribe from the provided topics.
781    pub async fn unsubscribe(&self, topics: Vec<String>) -> BybitWsResult<()> {
782        if topics.is_empty() {
783            return Ok(());
784        }
785
786        log::debug!("Attempting to unsubscribe from topics: {topics:?}");
787
788        if self.signal.load(Ordering::Relaxed) {
789            log::debug!("Shutdown signal detected, skipping unsubscribe");
790            return Ok(());
791        }
792
793        // Use reference counting to avoid unsubscribing while other consumers still need the topic
794        let mut topics_to_send = Vec::new();
795
796        for topic in topics {
797            // Returns true if this was the last subscription (ref count 1 -> 0)
798            if self.subscriptions.remove_reference(&topic) {
799                self.subscriptions.mark_unsubscribe(&topic);
800                topics_to_send.push(topic.clone());
801            } else {
802                log::debug!("Topic {topic} still has active subscriptions, not unsubscribing");
803            }
804        }
805
806        if topics_to_send.is_empty() {
807            return Ok(());
808        }
809
810        // Serialize unsubscription messages
811        let mut payloads = Vec::with_capacity(topics_to_send.len());
812        for topic in &topics_to_send {
813            let message = BybitSubscription {
814                op: BybitWsOperation::Unsubscribe,
815                args: vec![topic.clone()],
816            };
817            if let Ok(payload) = serde_json::to_string(&message) {
818                payloads.push(payload);
819            }
820        }
821
822        let cmd = HandlerCommand::Unsubscribe { topics: payloads };
823        if let Err(e) = self.cmd_tx.read().await.send(cmd) {
824            log::debug!("Failed to send unsubscribe command: error={e}");
825        }
826
827        Ok(())
828    }
829
830    /// Returns a stream of parsed [`NautilusWsMessage`] items.
831    ///
832    /// # Panics
833    ///
834    /// Panics if called before [`Self::connect`] or if the stream has already been taken.
835    pub fn stream(&mut self) -> impl futures_util::Stream<Item = NautilusWsMessage> + use<> {
836        let rx = self
837            .out_rx
838            .take()
839            .expect("Stream receiver already taken or client not connected");
840        let mut rx = Arc::try_unwrap(rx).expect("Cannot take ownership - other references exist");
841        async_stream::stream! {
842            while let Some(msg) = rx.recv().await {
843                yield msg;
844            }
845        }
846    }
847
848    /// Returns the number of currently registered subscriptions.
849    #[must_use]
850    pub fn subscription_count(&self) -> usize {
851        self.subscriptions.len()
852    }
853
854    /// Returns the credential associated with this client, if any.
855    #[must_use]
856    pub fn credential(&self) -> Option<&Credential> {
857        self.credential.as_ref()
858    }
859
860    /// Caches a single instrument.
861    ///
862    /// Any existing instrument with the same ID will be replaced.
863    pub fn cache_instrument(&self, instrument: InstrumentAny) {
864        self.instruments_cache
865            .insert(instrument.symbol().inner(), instrument.clone());
866
867        // Before connect() the handler isn't running; this send will fail and that's expected
868        // because connect() replays the instruments via InitializeInstruments
869        if let Ok(cmd_tx) = self.cmd_tx.try_read() {
870            let cmd = HandlerCommand::UpdateInstrument(instrument);
871            if let Err(e) = cmd_tx.send(cmd) {
872                log::debug!("Failed to send instrument update to handler: {e}");
873            }
874        }
875    }
876
877    /// Caches multiple instruments.
878    ///
879    /// Clears the existing cache first, then adds all provided instruments.
880    pub fn cache_instruments(&mut self, instruments: Vec<InstrumentAny>) {
881        self.instruments_cache.clear();
882        let mut count = 0;
883
884        log::debug!("Initializing Bybit instrument cache");
885
886        for inst in instruments {
887            let symbol = inst.symbol().inner();
888            self.instruments_cache.insert(symbol, inst.clone());
889            log::debug!("Cached instrument: {symbol}");
890            count += 1;
891        }
892
893        log::info!("Bybit instrument cache initialized with {count} instruments");
894    }
895
896    /// Sets the account ID for account message parsing.
897    pub fn set_account_id(&mut self, account_id: AccountId) {
898        self.account_id = Some(account_id);
899    }
900
901    /// Sets the account market maker level.
902    pub fn set_mm_level(&self, mm_level: u8) {
903        self.mm_level.store(mm_level, Ordering::Relaxed);
904    }
905
906    /// Sets whether bar timestamps should use the close time.
907    ///
908    /// When `true` (default), bar `ts_event` is set to the bar's close time.
909    /// When `false`, bar `ts_event` is set to the bar's open time.
910    pub fn set_bars_timestamp_on_close(&mut self, value: bool) {
911        self.bars_timestamp_on_close = value;
912    }
913
914    /// Returns a reference to the instruments cache.
915    #[must_use]
916    pub fn instruments(&self) -> &Arc<DashMap<Ustr, InstrumentAny>> {
917        &self.instruments_cache
918    }
919
920    /// Returns the account ID if set.
921    #[must_use]
922    pub fn account_id(&self) -> Option<AccountId> {
923        self.account_id
924    }
925
926    /// Returns the product type for public connections.
927    #[must_use]
928    pub fn product_type(&self) -> Option<BybitProductType> {
929        self.product_type
930    }
931
932    /// Subscribes to orderbook updates for a specific instrument.
933    ///
934    /// # Errors
935    ///
936    /// Returns an error if the subscription request fails.
937    ///
938    /// # References
939    ///
940    /// <https://bybit-exchange.github.io/docs/v5/websocket/public/orderbook>
941    pub async fn subscribe_orderbook(
942        &self,
943        instrument_id: InstrumentId,
944        depth: u32,
945    ) -> BybitWsResult<()> {
946        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
947        let topic = format!(
948            "{}.{depth}.{raw_symbol}",
949            BybitWsPublicChannel::OrderBook.as_ref()
950        );
951        self.subscribe(vec![topic]).await
952    }
953
954    /// Unsubscribes from orderbook updates for a specific instrument.
955    pub async fn unsubscribe_orderbook(
956        &self,
957        instrument_id: InstrumentId,
958        depth: u32,
959    ) -> BybitWsResult<()> {
960        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
961        let topic = format!(
962            "{}.{depth}.{raw_symbol}",
963            BybitWsPublicChannel::OrderBook.as_ref()
964        );
965        self.unsubscribe(vec![topic]).await
966    }
967
968    /// Subscribes to public trade updates for a specific instrument.
969    ///
970    /// # Errors
971    ///
972    /// Returns an error if the subscription request fails.
973    ///
974    /// # References
975    ///
976    /// <https://bybit-exchange.github.io/docs/v5/websocket/public/trade>
977    pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
978        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
979        let topic = format!(
980            "{}.{raw_symbol}",
981            BybitWsPublicChannel::PublicTrade.as_ref()
982        );
983        self.subscribe(vec![topic]).await
984    }
985
986    /// Unsubscribes from public trade updates for a specific instrument.
987    pub async fn unsubscribe_trades(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
988        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
989        let topic = format!(
990            "{}.{raw_symbol}",
991            BybitWsPublicChannel::PublicTrade.as_ref()
992        );
993        self.unsubscribe(vec![topic]).await
994    }
995
996    /// Subscribes to ticker updates for a specific instrument.
997    ///
998    /// # Errors
999    ///
1000    /// Returns an error if the subscription request fails.
1001    ///
1002    /// # References
1003    ///
1004    /// <https://bybit-exchange.github.io/docs/v5/websocket/public/ticker>
1005    pub async fn subscribe_ticker(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
1006        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
1007        let topic = format!("{}.{raw_symbol}", BybitWsPublicChannel::Tickers.as_ref());
1008        self.subscribe(vec![topic]).await
1009    }
1010
1011    /// Unsubscribes from ticker updates for a specific instrument.
1012    pub async fn unsubscribe_ticker(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
1013        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
1014        let topic = format!("{}.{raw_symbol}", BybitWsPublicChannel::Tickers.as_ref());
1015
1016        // Clear funding rate cache to ensure fresh data on resubscribe
1017        let symbol = self.product_type.map_or_else(
1018            || instrument_id.symbol.inner(),
1019            |pt| make_bybit_symbol(raw_symbol, pt),
1020        );
1021        self.funding_cache.write().await.remove(&symbol);
1022
1023        self.unsubscribe(vec![topic]).await
1024    }
1025
1026    /// Subscribes to kline/candlestick updates for a specific instrument.
1027    ///
1028    /// # Errors
1029    ///
1030    /// Returns an error if the subscription request fails.
1031    ///
1032    /// # References
1033    ///
1034    /// <https://bybit-exchange.github.io/docs/v5/websocket/public/kline>
1035    pub async fn subscribe_klines(
1036        &self,
1037        instrument_id: InstrumentId,
1038        interval: impl Into<String>,
1039    ) -> BybitWsResult<()> {
1040        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
1041        let topic = format!(
1042            "{}.{}.{raw_symbol}",
1043            BybitWsPublicChannel::Kline.as_ref(),
1044            interval.into()
1045        );
1046        self.subscribe(vec![topic]).await
1047    }
1048
1049    /// Unsubscribes from kline/candlestick updates for a specific instrument.
1050    pub async fn unsubscribe_klines(
1051        &self,
1052        instrument_id: InstrumentId,
1053        interval: impl Into<String>,
1054    ) -> BybitWsResult<()> {
1055        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
1056        let topic = format!(
1057            "{}.{}.{raw_symbol}",
1058            BybitWsPublicChannel::Kline.as_ref(),
1059            interval.into()
1060        );
1061        self.unsubscribe(vec![topic]).await
1062    }
1063
1064    /// Subscribes to order updates.
1065    ///
1066    /// # Errors
1067    ///
1068    /// Returns an error if the subscription request fails or if not authenticated.
1069    ///
1070    /// # References
1071    ///
1072    /// <https://bybit-exchange.github.io/docs/v5/websocket/private/order>
1073    pub async fn subscribe_orders(&self) -> BybitWsResult<()> {
1074        if !self.requires_auth {
1075            return Err(BybitWsError::Authentication(
1076                "Order subscription requires authentication".to_string(),
1077            ));
1078        }
1079        self.subscribe(vec![BybitWsPrivateChannel::Order.as_ref().to_string()])
1080            .await
1081    }
1082
1083    /// Unsubscribes from order updates.
1084    pub async fn unsubscribe_orders(&self) -> BybitWsResult<()> {
1085        self.unsubscribe(vec![BybitWsPrivateChannel::Order.as_ref().to_string()])
1086            .await
1087    }
1088
1089    /// Subscribes to execution/fill updates.
1090    ///
1091    /// # Errors
1092    ///
1093    /// Returns an error if the subscription request fails or if not authenticated.
1094    ///
1095    /// # References
1096    ///
1097    /// <https://bybit-exchange.github.io/docs/v5/websocket/private/execution>
1098    pub async fn subscribe_executions(&self) -> BybitWsResult<()> {
1099        if !self.requires_auth {
1100            return Err(BybitWsError::Authentication(
1101                "Execution subscription requires authentication".to_string(),
1102            ));
1103        }
1104        self.subscribe(vec![BybitWsPrivateChannel::Execution.as_ref().to_string()])
1105            .await
1106    }
1107
1108    /// Unsubscribes from execution/fill updates.
1109    pub async fn unsubscribe_executions(&self) -> BybitWsResult<()> {
1110        self.unsubscribe(vec![BybitWsPrivateChannel::Execution.as_ref().to_string()])
1111            .await
1112    }
1113
1114    /// Subscribes to position updates.
1115    ///
1116    /// # Errors
1117    ///
1118    /// Returns an error if the subscription request fails or if not authenticated.
1119    ///
1120    /// # References
1121    ///
1122    /// <https://bybit-exchange.github.io/docs/v5/websocket/private/position>
1123    pub async fn subscribe_positions(&self) -> BybitWsResult<()> {
1124        if !self.requires_auth {
1125            return Err(BybitWsError::Authentication(
1126                "Position subscription requires authentication".to_string(),
1127            ));
1128        }
1129        self.subscribe(vec![BybitWsPrivateChannel::Position.as_ref().to_string()])
1130            .await
1131    }
1132
1133    /// Unsubscribes from position updates.
1134    pub async fn unsubscribe_positions(&self) -> BybitWsResult<()> {
1135        self.unsubscribe(vec![BybitWsPrivateChannel::Position.as_ref().to_string()])
1136            .await
1137    }
1138
1139    /// Subscribes to wallet/balance updates.
1140    ///
1141    /// # Errors
1142    ///
1143    /// Returns an error if the subscription request fails or if not authenticated.
1144    ///
1145    /// # References
1146    ///
1147    /// <https://bybit-exchange.github.io/docs/v5/websocket/private/wallet>
1148    pub async fn subscribe_wallet(&self) -> BybitWsResult<()> {
1149        if !self.requires_auth {
1150            return Err(BybitWsError::Authentication(
1151                "Wallet subscription requires authentication".to_string(),
1152            ));
1153        }
1154        self.subscribe(vec![BybitWsPrivateChannel::Wallet.as_ref().to_string()])
1155            .await
1156    }
1157
1158    /// Unsubscribes from wallet/balance updates.
1159    pub async fn unsubscribe_wallet(&self) -> BybitWsResult<()> {
1160        self.unsubscribe(vec![BybitWsPrivateChannel::Wallet.as_ref().to_string()])
1161            .await
1162    }
1163
1164    /// Places an order via WebSocket.
1165    ///
1166    /// # Errors
1167    ///
1168    /// Returns an error if the order request fails or if not authenticated.
1169    ///
1170    /// # References
1171    ///
1172    /// <https://bybit-exchange.github.io/docs/v5/websocket/trade/guideline>
1173    pub async fn place_order(
1174        &self,
1175        params: BybitWsPlaceOrderParams,
1176        client_order_id: ClientOrderId,
1177        trader_id: TraderId,
1178        strategy_id: StrategyId,
1179        instrument_id: InstrumentId,
1180    ) -> BybitWsResult<()> {
1181        if !self.is_authenticated.load(Ordering::Relaxed) {
1182            return Err(BybitWsError::Authentication(
1183                "Must be authenticated to place orders".to_string(),
1184            ));
1185        }
1186
1187        let cmd = HandlerCommand::PlaceOrder {
1188            params,
1189            client_order_id,
1190            trader_id,
1191            strategy_id,
1192            instrument_id,
1193        };
1194
1195        self.send_cmd(cmd).await
1196    }
1197
1198    /// Amends an existing order via WebSocket.
1199    ///
1200    /// # Errors
1201    ///
1202    /// Returns an error if the amend request fails or if not authenticated.
1203    ///
1204    /// # References
1205    ///
1206    /// <https://bybit-exchange.github.io/docs/v5/websocket/trade/guideline>
1207    pub async fn amend_order(
1208        &self,
1209        params: BybitWsAmendOrderParams,
1210        client_order_id: ClientOrderId,
1211        trader_id: TraderId,
1212        strategy_id: StrategyId,
1213        instrument_id: InstrumentId,
1214        venue_order_id: Option<VenueOrderId>,
1215    ) -> BybitWsResult<()> {
1216        if !self.is_authenticated.load(Ordering::Relaxed) {
1217            return Err(BybitWsError::Authentication(
1218                "Must be authenticated to amend orders".to_string(),
1219            ));
1220        }
1221
1222        let cmd = HandlerCommand::AmendOrder {
1223            params,
1224            client_order_id,
1225            trader_id,
1226            strategy_id,
1227            instrument_id,
1228            venue_order_id,
1229        };
1230
1231        self.send_cmd(cmd).await
1232    }
1233
1234    /// Cancels an order via WebSocket.
1235    ///
1236    /// # Errors
1237    ///
1238    /// Returns an error if the cancel request fails or if not authenticated.
1239    ///
1240    /// # References
1241    ///
1242    /// <https://bybit-exchange.github.io/docs/v5/websocket/trade/guideline>
1243    pub async fn cancel_order(
1244        &self,
1245        params: BybitWsCancelOrderParams,
1246        client_order_id: ClientOrderId,
1247        trader_id: TraderId,
1248        strategy_id: StrategyId,
1249        instrument_id: InstrumentId,
1250        venue_order_id: Option<VenueOrderId>,
1251    ) -> BybitWsResult<()> {
1252        if !self.is_authenticated.load(Ordering::Relaxed) {
1253            return Err(BybitWsError::Authentication(
1254                "Must be authenticated to cancel orders".to_string(),
1255            ));
1256        }
1257
1258        let cmd = HandlerCommand::CancelOrder {
1259            params,
1260            client_order_id,
1261            trader_id,
1262            strategy_id,
1263            instrument_id,
1264            venue_order_id,
1265        };
1266
1267        self.send_cmd(cmd).await
1268    }
1269
1270    /// Batch creates multiple orders via WebSocket.
1271    ///
1272    /// # Errors
1273    ///
1274    /// Returns an error if the batch request fails or if not authenticated.
1275    ///
1276    /// # References
1277    ///
1278    /// <https://bybit-exchange.github.io/docs/v5/websocket/trade/guideline>
1279    pub async fn batch_place_orders(
1280        &self,
1281        trader_id: TraderId,
1282        strategy_id: StrategyId,
1283        orders: Vec<BybitWsPlaceOrderParams>,
1284    ) -> BybitWsResult<()> {
1285        if !self.is_authenticated.load(Ordering::Relaxed) {
1286            return Err(BybitWsError::Authentication(
1287                "Must be authenticated to place orders".to_string(),
1288            ));
1289        }
1290
1291        if orders.is_empty() {
1292            log::warn!("Batch place orders called with empty orders list");
1293            return Ok(());
1294        }
1295
1296        for chunk in orders.chunks(BATCH_PROCESSING_LIMIT) {
1297            self.batch_place_orders_chunk(trader_id, strategy_id, chunk.to_vec())
1298                .await?;
1299        }
1300
1301        Ok(())
1302    }
1303
1304    async fn batch_place_orders_chunk(
1305        &self,
1306        trader_id: TraderId,
1307        strategy_id: StrategyId,
1308        orders: Vec<BybitWsPlaceOrderParams>,
1309    ) -> BybitWsResult<()> {
1310        let category = orders[0].category;
1311        let batch_req_id = UUID4::new().to_string();
1312
1313        // Extract order tracking data before consuming orders to register with handler
1314        let mut batch_order_data = Vec::new();
1315        for order in &orders {
1316            if let Some(order_link_id_str) = &order.order_link_id {
1317                let client_order_id = ClientOrderId::from(order_link_id_str.as_str());
1318                let cache_key = make_bybit_symbol(order.symbol.as_str(), category);
1319                let instrument_id = self
1320                    .instruments_cache
1321                    .get(&cache_key)
1322                    .map(|inst| inst.id())
1323                    .ok_or_else(|| {
1324                        BybitWsError::ClientError(format!(
1325                            "Instrument {cache_key} not found in cache"
1326                        ))
1327                    })?;
1328                batch_order_data.push((
1329                    client_order_id,
1330                    (client_order_id, trader_id, strategy_id, instrument_id),
1331                ));
1332            }
1333        }
1334
1335        if !batch_order_data.is_empty() {
1336            let cmd = HandlerCommand::RegisterBatchPlace {
1337                req_id: batch_req_id.clone(),
1338                orders: batch_order_data,
1339            };
1340            let cmd_tx = self.cmd_tx.read().await;
1341            if let Err(e) = cmd_tx.send(cmd) {
1342                log::error!("Failed to send RegisterBatchPlace command: {e}");
1343            }
1344        }
1345
1346        let mm_level = self.mm_level.load(Ordering::Relaxed);
1347        let has_non_post_only = orders
1348            .iter()
1349            .any(|o| !matches!(o.time_in_force, Some(BybitTimeInForce::PostOnly)));
1350        let referer = if has_non_post_only || mm_level == 0 {
1351            Some(BYBIT_NAUTILUS_BROKER_ID.to_string())
1352        } else {
1353            None
1354        };
1355
1356        let request_items: Vec<BybitWsBatchPlaceItem> = orders
1357            .into_iter()
1358            .map(|order| BybitWsBatchPlaceItem {
1359                symbol: order.symbol,
1360                side: order.side,
1361                order_type: order.order_type,
1362                qty: order.qty,
1363                is_leverage: order.is_leverage,
1364                market_unit: order.market_unit,
1365                price: order.price,
1366                time_in_force: order.time_in_force,
1367                order_link_id: order.order_link_id,
1368                reduce_only: order.reduce_only,
1369                close_on_trigger: order.close_on_trigger,
1370                trigger_price: order.trigger_price,
1371                trigger_by: order.trigger_by,
1372                trigger_direction: order.trigger_direction,
1373                tpsl_mode: order.tpsl_mode,
1374                take_profit: order.take_profit,
1375                stop_loss: order.stop_loss,
1376                tp_trigger_by: order.tp_trigger_by,
1377                sl_trigger_by: order.sl_trigger_by,
1378                sl_trigger_price: order.sl_trigger_price,
1379                tp_trigger_price: order.tp_trigger_price,
1380                sl_order_type: order.sl_order_type,
1381                tp_order_type: order.tp_order_type,
1382                sl_limit_price: order.sl_limit_price,
1383                tp_limit_price: order.tp_limit_price,
1384            })
1385            .collect();
1386
1387        let args = BybitWsBatchPlaceOrderArgs {
1388            category,
1389            request: request_items,
1390        };
1391
1392        let request = BybitWsRequest {
1393            req_id: Some(batch_req_id),
1394            op: BybitWsOrderRequestOp::CreateBatch,
1395            header: BybitWsHeader::with_referer(referer),
1396            args: vec![args],
1397        };
1398
1399        let payload = serde_json::to_string(&request).map_err(BybitWsError::from)?;
1400
1401        self.send_text(&payload).await
1402    }
1403
1404    /// Batch amends multiple orders via WebSocket.
1405    ///
1406    /// # Errors
1407    ///
1408    /// Returns an error if the batch request fails or if not authenticated.
1409    pub async fn batch_amend_orders(
1410        &self,
1411        #[allow(unused_variables)] trader_id: TraderId,
1412        #[allow(unused_variables)] strategy_id: StrategyId,
1413        orders: Vec<BybitWsAmendOrderParams>,
1414    ) -> BybitWsResult<()> {
1415        if !self.is_authenticated.load(Ordering::Relaxed) {
1416            return Err(BybitWsError::Authentication(
1417                "Must be authenticated to amend orders".to_string(),
1418            ));
1419        }
1420
1421        if orders.is_empty() {
1422            log::warn!("Batch amend orders called with empty orders list");
1423            return Ok(());
1424        }
1425
1426        for chunk in orders.chunks(BATCH_PROCESSING_LIMIT) {
1427            self.batch_amend_orders_chunk(trader_id, strategy_id, chunk.to_vec())
1428                .await?;
1429        }
1430
1431        Ok(())
1432    }
1433
1434    async fn batch_amend_orders_chunk(
1435        &self,
1436        #[allow(unused_variables)] trader_id: TraderId,
1437        #[allow(unused_variables)] strategy_id: StrategyId,
1438        orders: Vec<BybitWsAmendOrderParams>,
1439    ) -> BybitWsResult<()> {
1440        let request = BybitWsRequest {
1441            req_id: None,
1442            op: BybitWsOrderRequestOp::AmendBatch,
1443            header: BybitWsHeader::now(),
1444            args: orders,
1445        };
1446
1447        let payload = serde_json::to_string(&request).map_err(BybitWsError::from)?;
1448
1449        self.send_text(&payload).await
1450    }
1451
1452    /// Batch cancels multiple orders via WebSocket.
1453    ///
1454    /// # Errors
1455    ///
1456    /// Returns an error if the batch request fails or if not authenticated.
1457    pub async fn batch_cancel_orders(
1458        &self,
1459        trader_id: TraderId,
1460        strategy_id: StrategyId,
1461        orders: Vec<BybitWsCancelOrderParams>,
1462    ) -> BybitWsResult<()> {
1463        if !self.is_authenticated.load(Ordering::Relaxed) {
1464            return Err(BybitWsError::Authentication(
1465                "Must be authenticated to cancel orders".to_string(),
1466            ));
1467        }
1468
1469        if orders.is_empty() {
1470            log::warn!("Batch cancel orders called with empty orders list");
1471            return Ok(());
1472        }
1473
1474        for chunk in orders.chunks(BATCH_PROCESSING_LIMIT) {
1475            self.batch_cancel_orders_chunk(trader_id, strategy_id, chunk.to_vec())
1476                .await?;
1477        }
1478
1479        Ok(())
1480    }
1481
1482    async fn batch_cancel_orders_chunk(
1483        &self,
1484        trader_id: TraderId,
1485        strategy_id: StrategyId,
1486        orders: Vec<BybitWsCancelOrderParams>,
1487    ) -> BybitWsResult<()> {
1488        if orders.is_empty() {
1489            return Ok(());
1490        }
1491
1492        let category = orders[0].category;
1493        let batch_req_id = UUID4::new().to_string();
1494
1495        let mut validated_data = Vec::new();
1496
1497        for order in &orders {
1498            if let Some(order_link_id_str) = &order.order_link_id {
1499                let cache_key = make_bybit_symbol(order.symbol.as_str(), category);
1500                let instrument_id = self
1501                    .instruments_cache
1502                    .get(&cache_key)
1503                    .map(|inst| inst.id())
1504                    .ok_or_else(|| {
1505                        BybitWsError::ClientError(format!(
1506                            "Instrument {cache_key} not found in cache"
1507                        ))
1508                    })?;
1509
1510                let venue_order_id = order
1511                    .order_id
1512                    .as_ref()
1513                    .map(|id| VenueOrderId::from(id.as_str()));
1514
1515                validated_data.push((order_link_id_str.clone(), instrument_id, venue_order_id));
1516            }
1517        }
1518
1519        let batch_cancel_data: Vec<_> = validated_data
1520            .iter()
1521            .map(|(order_link_id_str, instrument_id, venue_order_id)| {
1522                let client_order_id = ClientOrderId::from(order_link_id_str.as_str());
1523                (
1524                    client_order_id,
1525                    (
1526                        client_order_id,
1527                        trader_id,
1528                        strategy_id,
1529                        *instrument_id,
1530                        *venue_order_id,
1531                    ),
1532                )
1533            })
1534            .collect();
1535
1536        if !batch_cancel_data.is_empty() {
1537            let cmd = HandlerCommand::RegisterBatchCancel {
1538                req_id: batch_req_id.clone(),
1539                cancels: batch_cancel_data,
1540            };
1541            let cmd_tx = self.cmd_tx.read().await;
1542            if let Err(e) = cmd_tx.send(cmd) {
1543                log::error!("Failed to send RegisterBatchCancel command: {e}");
1544            }
1545        }
1546
1547        let request_items: Vec<BybitWsBatchCancelItem> = orders
1548            .into_iter()
1549            .map(|order| BybitWsBatchCancelItem {
1550                symbol: order.symbol,
1551                order_id: order.order_id,
1552                order_link_id: order.order_link_id,
1553            })
1554            .collect();
1555
1556        let args = BybitWsBatchCancelOrderArgs {
1557            category,
1558            request: request_items,
1559        };
1560
1561        let request = BybitWsRequest {
1562            req_id: Some(batch_req_id),
1563            op: BybitWsOrderRequestOp::CancelBatch,
1564            header: BybitWsHeader::now(),
1565            args: vec![args],
1566        };
1567
1568        let payload = serde_json::to_string(&request).map_err(BybitWsError::from)?;
1569
1570        self.send_text(&payload).await
1571    }
1572
1573    /// Submits an order using Nautilus domain objects.
1574    ///
1575    /// # Errors
1576    ///
1577    /// Returns an error if order submission fails or if not authenticated.
1578    #[allow(clippy::too_many_arguments)]
1579    pub async fn submit_order(
1580        &self,
1581        product_type: BybitProductType,
1582        trader_id: TraderId,
1583        strategy_id: StrategyId,
1584        instrument_id: InstrumentId,
1585        client_order_id: ClientOrderId,
1586        order_side: OrderSide,
1587        order_type: OrderType,
1588        quantity: Quantity,
1589        is_quote_quantity: bool,
1590        time_in_force: Option<TimeInForce>,
1591        price: Option<Price>,
1592        trigger_price: Option<Price>,
1593        post_only: Option<bool>,
1594        reduce_only: Option<bool>,
1595        is_leverage: bool,
1596    ) -> BybitWsResult<()> {
1597        let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1598            .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1599        let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1600
1601        let bybit_side = match order_side {
1602            OrderSide::Buy => BybitOrderSide::Buy,
1603            OrderSide::Sell => BybitOrderSide::Sell,
1604            _ => {
1605                return Err(BybitWsError::ClientError(format!(
1606                    "Invalid order side: {order_side:?}"
1607                )));
1608            }
1609        };
1610
1611        // For stop/conditional orders, Bybit uses Market/Limit with trigger parameters
1612        let (bybit_order_type, is_stop_order) = match order_type {
1613            OrderType::Market => (BybitOrderType::Market, false),
1614            OrderType::Limit => (BybitOrderType::Limit, false),
1615            OrderType::StopMarket | OrderType::MarketIfTouched => (BybitOrderType::Market, true),
1616            OrderType::StopLimit | OrderType::LimitIfTouched => (BybitOrderType::Limit, true),
1617            _ => {
1618                return Err(BybitWsError::ClientError(format!(
1619                    "Unsupported order type: {order_type:?}"
1620                )));
1621            }
1622        };
1623
1624        let bybit_tif = if bybit_order_type == BybitOrderType::Market {
1625            None
1626        } else if post_only == Some(true) {
1627            Some(BybitTimeInForce::PostOnly)
1628        } else if let Some(tif) = time_in_force {
1629            Some(match tif {
1630                TimeInForce::Gtc => BybitTimeInForce::Gtc,
1631                TimeInForce::Ioc => BybitTimeInForce::Ioc,
1632                TimeInForce::Fok => BybitTimeInForce::Fok,
1633                _ => {
1634                    return Err(BybitWsError::ClientError(format!(
1635                        "Unsupported time in force: {tif:?}"
1636                    )));
1637                }
1638            })
1639        } else {
1640            None
1641        };
1642
1643        // For SPOT market orders, specify baseCoin to interpret qty as base currency.
1644        // This ensures Nautilus quantities (always in base currency) are interpreted correctly.
1645        let market_unit = if product_type == BybitProductType::Spot
1646            && bybit_order_type == BybitOrderType::Market
1647        {
1648            if is_quote_quantity {
1649                Some(BYBIT_QUOTE_COIN.to_string())
1650            } else {
1651                Some(BYBIT_BASE_COIN.to_string())
1652            }
1653        } else {
1654            None
1655        };
1656
1657        // Only SPOT products support is_leverage parameter
1658        let is_leverage_value = if product_type == BybitProductType::Spot {
1659            Some(i32::from(is_leverage))
1660        } else {
1661            None
1662        };
1663
1664        // Stop semantics: Buy stops trigger on rise (breakout), sell stops trigger on fall (breakdown)
1665        // MIT semantics: Buy MIT triggers on fall (pullback entry), sell MIT triggers on rise (rally entry)
1666        let trigger_direction = if is_stop_order {
1667            match (order_type, order_side) {
1668                (OrderType::StopMarket | OrderType::StopLimit, OrderSide::Buy) => {
1669                    Some(BybitTriggerDirection::RisesTo as i32)
1670                }
1671                (OrderType::StopMarket | OrderType::StopLimit, OrderSide::Sell) => {
1672                    Some(BybitTriggerDirection::FallsTo as i32)
1673                }
1674                (OrderType::MarketIfTouched | OrderType::LimitIfTouched, OrderSide::Buy) => {
1675                    Some(BybitTriggerDirection::FallsTo as i32)
1676                }
1677                (OrderType::MarketIfTouched | OrderType::LimitIfTouched, OrderSide::Sell) => {
1678                    Some(BybitTriggerDirection::RisesTo as i32)
1679                }
1680                _ => None,
1681            }
1682        } else {
1683            None
1684        };
1685
1686        let params = if is_stop_order {
1687            // For conditional orders, ALL types use triggerPrice field
1688            // sl_trigger_price/tp_trigger_price are only for TP/SL attached to regular orders
1689            BybitWsPlaceOrderParams {
1690                category: product_type,
1691                symbol: raw_symbol,
1692                side: bybit_side,
1693                order_type: bybit_order_type,
1694                qty: quantity.to_string(),
1695                is_leverage: is_leverage_value,
1696                market_unit: market_unit.clone(),
1697                price: price.map(|p| p.to_string()),
1698                time_in_force: bybit_tif,
1699                order_link_id: Some(client_order_id.to_string()),
1700                reduce_only: reduce_only.filter(|&r| r),
1701                close_on_trigger: None,
1702                trigger_price: trigger_price.map(|p| p.to_string()),
1703                trigger_by: Some(BybitTriggerType::LastPrice),
1704                trigger_direction,
1705                tpsl_mode: None, // Not needed for standalone conditional orders
1706                take_profit: None,
1707                stop_loss: None,
1708                tp_trigger_by: None,
1709                sl_trigger_by: None,
1710                sl_trigger_price: None, // Not used for standalone stop orders
1711                tp_trigger_price: None, // Not used for standalone stop orders
1712                sl_order_type: None,
1713                tp_order_type: None,
1714                sl_limit_price: None,
1715                tp_limit_price: None,
1716            }
1717        } else {
1718            // Regular market/limit orders
1719            BybitWsPlaceOrderParams {
1720                category: product_type,
1721                symbol: raw_symbol,
1722                side: bybit_side,
1723                order_type: bybit_order_type,
1724                qty: quantity.to_string(),
1725                is_leverage: is_leverage_value,
1726                market_unit,
1727                price: price.map(|p| p.to_string()),
1728                time_in_force: if bybit_order_type == BybitOrderType::Market {
1729                    None
1730                } else {
1731                    bybit_tif
1732                },
1733                order_link_id: Some(client_order_id.to_string()),
1734                reduce_only: reduce_only.filter(|&r| r),
1735                close_on_trigger: None,
1736                trigger_price: None,
1737                trigger_by: None,
1738                trigger_direction: None,
1739                tpsl_mode: None,
1740                take_profit: None,
1741                stop_loss: None,
1742                tp_trigger_by: None,
1743                sl_trigger_by: None,
1744                sl_trigger_price: None,
1745                tp_trigger_price: None,
1746                sl_order_type: None,
1747                tp_order_type: None,
1748                sl_limit_price: None,
1749                tp_limit_price: None,
1750            }
1751        };
1752
1753        self.place_order(
1754            params,
1755            client_order_id,
1756            trader_id,
1757            strategy_id,
1758            instrument_id,
1759        )
1760        .await
1761    }
1762
1763    /// Modifies an existing order using Nautilus domain objects.
1764    ///
1765    /// # Errors
1766    ///
1767    /// Returns an error if modification fails or if not authenticated.
1768    #[allow(clippy::too_many_arguments)]
1769    pub async fn modify_order(
1770        &self,
1771        product_type: BybitProductType,
1772        trader_id: TraderId,
1773        strategy_id: StrategyId,
1774        instrument_id: InstrumentId,
1775        client_order_id: ClientOrderId,
1776        venue_order_id: Option<VenueOrderId>,
1777        quantity: Option<Quantity>,
1778        price: Option<Price>,
1779    ) -> BybitWsResult<()> {
1780        let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1781            .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1782        let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1783
1784        let params = BybitWsAmendOrderParams {
1785            category: product_type,
1786            symbol: raw_symbol,
1787            order_id: venue_order_id.map(|id| id.to_string()),
1788            order_link_id: Some(client_order_id.to_string()),
1789            qty: quantity.map(|q| q.to_string()),
1790            price: price.map(|p| p.to_string()),
1791            trigger_price: None,
1792            take_profit: None,
1793            stop_loss: None,
1794            tp_trigger_by: None,
1795            sl_trigger_by: None,
1796        };
1797
1798        self.amend_order(
1799            params,
1800            client_order_id,
1801            trader_id,
1802            strategy_id,
1803            instrument_id,
1804            venue_order_id,
1805        )
1806        .await
1807    }
1808
1809    /// Cancels an order using Nautilus domain objects.
1810    ///
1811    /// # Errors
1812    ///
1813    /// Returns an error if cancellation fails or if not authenticated.
1814    pub async fn cancel_order_by_id(
1815        &self,
1816        product_type: BybitProductType,
1817        trader_id: TraderId,
1818        strategy_id: StrategyId,
1819        instrument_id: InstrumentId,
1820        client_order_id: ClientOrderId,
1821        venue_order_id: Option<VenueOrderId>,
1822    ) -> BybitWsResult<()> {
1823        let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1824            .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1825        let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1826
1827        let params = BybitWsCancelOrderParams {
1828            category: product_type,
1829            symbol: raw_symbol,
1830            order_id: venue_order_id.map(|id| id.to_string()),
1831            order_link_id: Some(client_order_id.to_string()),
1832        };
1833
1834        self.cancel_order(
1835            params,
1836            client_order_id,
1837            trader_id,
1838            strategy_id,
1839            instrument_id,
1840            venue_order_id,
1841        )
1842        .await
1843    }
1844
1845    /// Builds order params for placing an order.
1846    #[allow(clippy::too_many_arguments)]
1847    pub fn build_place_order_params(
1848        &self,
1849        product_type: BybitProductType,
1850        instrument_id: InstrumentId,
1851        client_order_id: ClientOrderId,
1852        order_side: OrderSide,
1853        order_type: OrderType,
1854        quantity: Quantity,
1855        is_quote_quantity: bool,
1856        time_in_force: Option<TimeInForce>,
1857        price: Option<Price>,
1858        trigger_price: Option<Price>,
1859        post_only: Option<bool>,
1860        reduce_only: Option<bool>,
1861        is_leverage: bool,
1862    ) -> BybitWsResult<BybitWsPlaceOrderParams> {
1863        let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1864            .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1865        let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1866
1867        let bybit_side = match order_side {
1868            OrderSide::Buy => BybitOrderSide::Buy,
1869            OrderSide::Sell => BybitOrderSide::Sell,
1870            _ => {
1871                return Err(BybitWsError::ClientError(format!(
1872                    "Invalid order side: {order_side:?}"
1873                )));
1874            }
1875        };
1876
1877        let (bybit_order_type, is_stop_order) = match order_type {
1878            OrderType::Market => (BybitOrderType::Market, false),
1879            OrderType::Limit => (BybitOrderType::Limit, false),
1880            OrderType::StopMarket | OrderType::MarketIfTouched => (BybitOrderType::Market, true),
1881            OrderType::StopLimit | OrderType::LimitIfTouched => (BybitOrderType::Limit, true),
1882            _ => {
1883                return Err(BybitWsError::ClientError(format!(
1884                    "Unsupported order type: {order_type:?}"
1885                )));
1886            }
1887        };
1888
1889        let bybit_tif = if post_only == Some(true) {
1890            Some(BybitTimeInForce::PostOnly)
1891        } else if let Some(tif) = time_in_force {
1892            Some(match tif {
1893                TimeInForce::Gtc => BybitTimeInForce::Gtc,
1894                TimeInForce::Ioc => BybitTimeInForce::Ioc,
1895                TimeInForce::Fok => BybitTimeInForce::Fok,
1896                _ => {
1897                    return Err(BybitWsError::ClientError(format!(
1898                        "Unsupported time in force: {tif:?}"
1899                    )));
1900                }
1901            })
1902        } else {
1903            None
1904        };
1905
1906        let market_unit = if product_type == BybitProductType::Spot
1907            && bybit_order_type == BybitOrderType::Market
1908        {
1909            if is_quote_quantity {
1910                Some(BYBIT_QUOTE_COIN.to_string())
1911            } else {
1912                Some(BYBIT_BASE_COIN.to_string())
1913            }
1914        } else {
1915            None
1916        };
1917
1918        // Only SPOT products support is_leverage parameter
1919        let is_leverage_value = if product_type == BybitProductType::Spot {
1920            Some(i32::from(is_leverage))
1921        } else {
1922            None
1923        };
1924
1925        // Stop semantics: Buy stops trigger on rise (breakout), sell stops trigger on fall (breakdown)
1926        // MIT semantics: Buy MIT triggers on fall (pullback entry), sell MIT triggers on rise (rally entry)
1927        let trigger_direction = if is_stop_order {
1928            match (order_type, order_side) {
1929                (OrderType::StopMarket | OrderType::StopLimit, OrderSide::Buy) => {
1930                    Some(BybitTriggerDirection::RisesTo as i32)
1931                }
1932                (OrderType::StopMarket | OrderType::StopLimit, OrderSide::Sell) => {
1933                    Some(BybitTriggerDirection::FallsTo as i32)
1934                }
1935                (OrderType::MarketIfTouched | OrderType::LimitIfTouched, OrderSide::Buy) => {
1936                    Some(BybitTriggerDirection::FallsTo as i32)
1937                }
1938                (OrderType::MarketIfTouched | OrderType::LimitIfTouched, OrderSide::Sell) => {
1939                    Some(BybitTriggerDirection::RisesTo as i32)
1940                }
1941                _ => None,
1942            }
1943        } else {
1944            None
1945        };
1946
1947        let params = if is_stop_order {
1948            BybitWsPlaceOrderParams {
1949                category: product_type,
1950                symbol: raw_symbol,
1951                side: bybit_side,
1952                order_type: bybit_order_type,
1953                qty: quantity.to_string(),
1954                is_leverage: is_leverage_value,
1955                market_unit,
1956                price: price.map(|p| p.to_string()),
1957                time_in_force: if bybit_order_type == BybitOrderType::Market {
1958                    None
1959                } else {
1960                    bybit_tif
1961                },
1962                order_link_id: Some(client_order_id.to_string()),
1963                reduce_only: reduce_only.filter(|&r| r),
1964                close_on_trigger: None,
1965                trigger_price: trigger_price.map(|p| p.to_string()),
1966                trigger_by: Some(BybitTriggerType::LastPrice),
1967                trigger_direction,
1968                tpsl_mode: None,
1969                take_profit: None,
1970                stop_loss: None,
1971                tp_trigger_by: None,
1972                sl_trigger_by: None,
1973                sl_trigger_price: None,
1974                tp_trigger_price: None,
1975                sl_order_type: None,
1976                tp_order_type: None,
1977                sl_limit_price: None,
1978                tp_limit_price: None,
1979            }
1980        } else {
1981            BybitWsPlaceOrderParams {
1982                category: product_type,
1983                symbol: raw_symbol,
1984                side: bybit_side,
1985                order_type: bybit_order_type,
1986                qty: quantity.to_string(),
1987                is_leverage: is_leverage_value,
1988                market_unit,
1989                price: price.map(|p| p.to_string()),
1990                time_in_force: if bybit_order_type == BybitOrderType::Market {
1991                    None
1992                } else {
1993                    bybit_tif
1994                },
1995                order_link_id: Some(client_order_id.to_string()),
1996                reduce_only: reduce_only.filter(|&r| r),
1997                close_on_trigger: None,
1998                trigger_price: None,
1999                trigger_by: None,
2000                trigger_direction: None,
2001                tpsl_mode: None,
2002                take_profit: None,
2003                stop_loss: None,
2004                tp_trigger_by: None,
2005                sl_trigger_by: None,
2006                sl_trigger_price: None,
2007                tp_trigger_price: None,
2008                sl_order_type: None,
2009                tp_order_type: None,
2010                sl_limit_price: None,
2011                tp_limit_price: None,
2012            }
2013        };
2014
2015        Ok(params)
2016    }
2017
2018    /// Builds order params for amending an order.
2019    #[allow(clippy::too_many_arguments)]
2020    pub fn build_amend_order_params(
2021        &self,
2022        product_type: BybitProductType,
2023        instrument_id: InstrumentId,
2024        venue_order_id: Option<VenueOrderId>,
2025        client_order_id: Option<ClientOrderId>,
2026        quantity: Option<Quantity>,
2027        price: Option<Price>,
2028    ) -> BybitWsResult<BybitWsAmendOrderParams> {
2029        let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
2030            .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
2031        let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
2032
2033        Ok(BybitWsAmendOrderParams {
2034            category: product_type,
2035            symbol: raw_symbol,
2036            order_id: venue_order_id.map(|v| v.to_string()),
2037            order_link_id: client_order_id.map(|c| c.to_string()),
2038            qty: quantity.map(|q| q.to_string()),
2039            price: price.map(|p| p.to_string()),
2040            trigger_price: None,
2041            take_profit: None,
2042            stop_loss: None,
2043            tp_trigger_by: None,
2044            sl_trigger_by: None,
2045        })
2046    }
2047
2048    /// Builds order params for canceling an order via WebSocket.
2049    ///
2050    /// # Errors
2051    ///
2052    /// Returns an error if symbol parsing fails or if neither venue_order_id
2053    /// nor client_order_id is provided.
2054    pub fn build_cancel_order_params(
2055        &self,
2056        product_type: BybitProductType,
2057        instrument_id: InstrumentId,
2058        venue_order_id: Option<VenueOrderId>,
2059        client_order_id: Option<ClientOrderId>,
2060    ) -> BybitWsResult<BybitWsCancelOrderParams> {
2061        if venue_order_id.is_none() && client_order_id.is_none() {
2062            return Err(BybitWsError::ClientError(
2063                "Either venue_order_id or client_order_id must be provided".to_string(),
2064            ));
2065        }
2066
2067        let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
2068            .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
2069        let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
2070
2071        Ok(BybitWsCancelOrderParams {
2072            category: product_type,
2073            symbol: raw_symbol,
2074            order_id: venue_order_id.map(|v| v.to_string()),
2075            order_link_id: client_order_id.map(|c| c.to_string()),
2076        })
2077    }
2078
2079    fn default_headers() -> Vec<(String, String)> {
2080        vec![
2081            ("Content-Type".to_string(), "application/json".to_string()),
2082            ("User-Agent".to_string(), NAUTILUS_USER_AGENT.to_string()),
2083        ]
2084    }
2085
2086    async fn authenticate_if_required(&self) -> BybitWsResult<()> {
2087        if !self.requires_auth {
2088            return Ok(());
2089        }
2090
2091        let credential = self.credential.as_ref().ok_or_else(|| {
2092            BybitWsError::Authentication("Credentials required for authentication".to_string())
2093        })?;
2094
2095        let expires = chrono::Utc::now().timestamp_millis() + WEBSOCKET_AUTH_WINDOW_MS;
2096        let signature = credential.sign_websocket_auth(expires);
2097
2098        let auth_message = BybitAuthRequest {
2099            op: BybitWsOperation::Auth,
2100            args: vec![
2101                Value::String(credential.api_key().to_string()),
2102                Value::Number(expires.into()),
2103                Value::String(signature),
2104            ],
2105        };
2106
2107        let payload = serde_json::to_string(&auth_message)?;
2108
2109        self.cmd_tx
2110            .read()
2111            .await
2112            .send(HandlerCommand::Authenticate { payload })
2113            .map_err(|e| BybitWsError::Send(format!("Failed to send auth command: {e}")))?;
2114
2115        // Authentication will be processed asynchronously by the handler
2116        // The handler will emit NautilusWsMessage::Authenticated when successful
2117        Ok(())
2118    }
2119
2120    async fn send_text(&self, text: &str) -> BybitWsResult<()> {
2121        let cmd = HandlerCommand::SendText {
2122            payload: text.to_string(),
2123        };
2124
2125        self.send_cmd(cmd).await
2126    }
2127
2128    async fn send_cmd(&self, cmd: HandlerCommand) -> BybitWsResult<()> {
2129        self.cmd_tx
2130            .read()
2131            .await
2132            .send(cmd)
2133            .map_err(|e| BybitWsError::Send(e.to_string()))
2134    }
2135}
2136
2137#[cfg(test)]
2138mod tests {
2139    use rstest::rstest;
2140
2141    use super::*;
2142    use crate::{
2143        common::testing::load_test_json,
2144        websocket::{classify_bybit_message, messages::BybitWsMessage},
2145    };
2146
2147    #[rstest]
2148    fn classify_orderbook_snapshot() {
2149        let json: Value = serde_json::from_str(&load_test_json("ws_orderbook_snapshot.json"))
2150            .expect("invalid fixture");
2151        let message = classify_bybit_message(json);
2152        assert!(matches!(message, BybitWsMessage::Orderbook(_)));
2153    }
2154
2155    #[rstest]
2156    fn classify_trade_snapshot() {
2157        let json: Value =
2158            serde_json::from_str(&load_test_json("ws_public_trade.json")).expect("invalid fixture");
2159        let message = classify_bybit_message(json);
2160        assert!(matches!(message, BybitWsMessage::Trade(_)));
2161    }
2162
2163    #[rstest]
2164    fn classify_ticker_linear_snapshot() {
2165        let json: Value = serde_json::from_str(&load_test_json("ws_ticker_linear.json"))
2166            .expect("invalid fixture");
2167        let message = classify_bybit_message(json);
2168        assert!(matches!(message, BybitWsMessage::TickerLinear(_)));
2169    }
2170
2171    #[rstest]
2172    fn classify_ticker_option_snapshot() {
2173        let json: Value = serde_json::from_str(&load_test_json("ws_ticker_option.json"))
2174            .expect("invalid fixture");
2175        let message = classify_bybit_message(json);
2176        assert!(matches!(message, BybitWsMessage::TickerOption(_)));
2177    }
2178
2179    #[rstest]
2180    fn test_race_unsubscribe_failure_recovery() {
2181        // Simulates the race condition where venue rejects an unsubscribe request.
2182        // The adapter must perform the 3-step recovery:
2183        // 1. confirm_unsubscribe() - clear pending_unsubscribe
2184        // 2. mark_subscribe() - mark as subscribing again
2185        // 3. confirm_subscribe() - restore to confirmed state
2186        let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER); // Bybit uses dot delimiter
2187
2188        let topic = "publicTrade.BTCUSDT";
2189
2190        // Initial subscribe flow
2191        subscriptions.mark_subscribe(topic);
2192        subscriptions.confirm_subscribe(topic);
2193        assert_eq!(subscriptions.len(), 1);
2194
2195        // User unsubscribes
2196        subscriptions.mark_unsubscribe(topic);
2197        assert_eq!(subscriptions.len(), 0);
2198        assert_eq!(subscriptions.pending_unsubscribe_topics(), vec![topic]);
2199
2200        // Venue REJECTS the unsubscribe (error message)
2201        // Adapter must perform 3-step recovery (from lines 2181-2183)
2202        subscriptions.confirm_unsubscribe(topic); // Step 1: clear pending_unsubscribe
2203        subscriptions.mark_subscribe(topic); // Step 2: mark as subscribing
2204        subscriptions.confirm_subscribe(topic); // Step 3: confirm subscription
2205
2206        // Verify recovery: topic should be back in confirmed state
2207        assert_eq!(subscriptions.len(), 1);
2208        assert!(subscriptions.pending_unsubscribe_topics().is_empty());
2209        assert!(subscriptions.pending_subscribe_topics().is_empty());
2210
2211        // Verify topic is in all_topics() for reconnect
2212        let all = subscriptions.all_topics();
2213        assert_eq!(all.len(), 1);
2214        assert!(all.contains(&topic.to_string()));
2215    }
2216
2217    #[rstest]
2218    fn test_race_resubscribe_before_unsubscribe_ack() {
2219        // Simulates: User unsubscribes, then immediately resubscribes before
2220        // the unsubscribe ACK arrives from the venue.
2221        // This is the race condition fixed in the subscription tracker.
2222        let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER); // Bybit uses dot delimiter
2223
2224        let topic = "orderbook.50.BTCUSDT";
2225
2226        // Initial subscribe
2227        subscriptions.mark_subscribe(topic);
2228        subscriptions.confirm_subscribe(topic);
2229        assert_eq!(subscriptions.len(), 1);
2230
2231        // User unsubscribes
2232        subscriptions.mark_unsubscribe(topic);
2233        assert_eq!(subscriptions.len(), 0);
2234        assert_eq!(subscriptions.pending_unsubscribe_topics(), vec![topic]);
2235
2236        // User immediately changes mind and resubscribes (before unsubscribe ACK)
2237        subscriptions.mark_subscribe(topic);
2238        assert_eq!(subscriptions.pending_subscribe_topics(), vec![topic]);
2239
2240        // NOW the unsubscribe ACK arrives - should NOT clear pending_subscribe
2241        subscriptions.confirm_unsubscribe(topic);
2242        assert!(subscriptions.pending_unsubscribe_topics().is_empty());
2243        assert_eq!(subscriptions.pending_subscribe_topics(), vec![topic]);
2244
2245        // Subscribe ACK arrives
2246        subscriptions.confirm_subscribe(topic);
2247        assert_eq!(subscriptions.len(), 1);
2248        assert!(subscriptions.pending_subscribe_topics().is_empty());
2249
2250        // Verify final state is correct
2251        let all = subscriptions.all_topics();
2252        assert_eq!(all.len(), 1);
2253        assert!(all.contains(&topic.to_string()));
2254    }
2255
2256    #[rstest]
2257    fn test_race_late_subscribe_confirmation_after_unsubscribe() {
2258        // Simulates: User subscribes, then unsubscribes before subscribe ACK arrives.
2259        // The late subscribe ACK should be ignored.
2260        let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER); // Bybit uses dot delimiter
2261
2262        let topic = "tickers.ETHUSDT";
2263
2264        // User subscribes
2265        subscriptions.mark_subscribe(topic);
2266        assert_eq!(subscriptions.pending_subscribe_topics(), vec![topic]);
2267
2268        // User immediately unsubscribes (before subscribe ACK)
2269        subscriptions.mark_unsubscribe(topic);
2270        assert!(subscriptions.pending_subscribe_topics().is_empty()); // Cleared
2271        assert_eq!(subscriptions.pending_unsubscribe_topics(), vec![topic]);
2272
2273        // Late subscribe confirmation arrives - should be IGNORED
2274        subscriptions.confirm_subscribe(topic);
2275        assert_eq!(subscriptions.len(), 0); // Not added to confirmed
2276        assert_eq!(subscriptions.pending_unsubscribe_topics(), vec![topic]);
2277
2278        // Unsubscribe ACK arrives
2279        subscriptions.confirm_unsubscribe(topic);
2280
2281        // Final state: completely empty
2282        assert!(subscriptions.is_empty());
2283        assert!(subscriptions.all_topics().is_empty());
2284    }
2285
2286    #[rstest]
2287    fn test_race_reconnection_with_pending_states() {
2288        // Simulates reconnection with mixed pending states.
2289        let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER); // Bybit uses dot delimiter
2290
2291        // Set up mixed state before reconnection
2292        // Confirmed: publicTrade.BTCUSDT
2293        let trade_btc = "publicTrade.BTCUSDT";
2294        subscriptions.mark_subscribe(trade_btc);
2295        subscriptions.confirm_subscribe(trade_btc);
2296
2297        // Pending subscribe: publicTrade.ETHUSDT
2298        let trade_eth = "publicTrade.ETHUSDT";
2299        subscriptions.mark_subscribe(trade_eth);
2300
2301        // Pending unsubscribe: orderbook.50.BTCUSDT (user cancelled)
2302        let book_btc = "orderbook.50.BTCUSDT";
2303        subscriptions.mark_subscribe(book_btc);
2304        subscriptions.confirm_subscribe(book_btc);
2305        subscriptions.mark_unsubscribe(book_btc);
2306
2307        // Get topics for reconnection
2308        let topics_to_restore = subscriptions.all_topics();
2309
2310        // Should include: confirmed + pending_subscribe (NOT pending_unsubscribe)
2311        assert_eq!(topics_to_restore.len(), 2);
2312        assert!(topics_to_restore.contains(&trade_btc.to_string()));
2313        assert!(topics_to_restore.contains(&trade_eth.to_string()));
2314        assert!(!topics_to_restore.contains(&book_btc.to_string())); // Excluded
2315    }
2316
2317    #[rstest]
2318    fn test_race_duplicate_subscribe_messages_idempotent() {
2319        // Simulates duplicate subscribe requests (e.g., from reconnection logic).
2320        // The subscription tracker should be idempotent and not create duplicate state.
2321        let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER); // Bybit uses dot delimiter
2322
2323        let topic = "publicTrade.BTCUSDT";
2324
2325        // Subscribe and confirm
2326        subscriptions.mark_subscribe(topic);
2327        subscriptions.confirm_subscribe(topic);
2328        assert_eq!(subscriptions.len(), 1);
2329
2330        // Duplicate mark_subscribe on already-confirmed topic (should be no-op)
2331        subscriptions.mark_subscribe(topic);
2332        assert!(subscriptions.pending_subscribe_topics().is_empty()); // Not re-added
2333        assert_eq!(subscriptions.len(), 1); // Still just 1
2334
2335        // Duplicate confirm_subscribe (should be idempotent)
2336        subscriptions.confirm_subscribe(topic);
2337        assert_eq!(subscriptions.len(), 1);
2338
2339        // Verify final state
2340        let all = subscriptions.all_topics();
2341        assert_eq!(all.len(), 1);
2342        assert_eq!(all[0], topic);
2343    }
2344
2345    #[rstest]
2346    #[case::spot_with_leverage(BybitProductType::Spot, true, Some(1))]
2347    #[case::spot_without_leverage(BybitProductType::Spot, false, Some(0))]
2348    #[case::linear_with_leverage(BybitProductType::Linear, true, None)]
2349    #[case::linear_without_leverage(BybitProductType::Linear, false, None)]
2350    #[case::inverse_with_leverage(BybitProductType::Inverse, true, None)]
2351    #[case::option_with_leverage(BybitProductType::Option, true, None)]
2352    fn test_is_leverage_parameter(
2353        #[case] product_type: BybitProductType,
2354        #[case] is_leverage: bool,
2355        #[case] expected: Option<i32>,
2356    ) {
2357        let symbol = match product_type {
2358            BybitProductType::Spot => "BTCUSDT-SPOT.BYBIT",
2359            BybitProductType::Linear => "ETHUSDT-LINEAR.BYBIT",
2360            BybitProductType::Inverse => "BTCUSD-INVERSE.BYBIT",
2361            BybitProductType::Option => "BTC-31MAY24-50000-C-OPTION.BYBIT",
2362        };
2363
2364        let instrument_id = InstrumentId::from(symbol);
2365        let client_order_id = ClientOrderId::from("test-order-1");
2366        let quantity = Quantity::from("1.0");
2367
2368        let client = BybitWebSocketClient::new_trade(
2369            BybitEnvironment::Testnet,
2370            Some("test-key".to_string()),
2371            Some("test-secret".to_string()),
2372            None,
2373            Some(20),
2374        );
2375
2376        let params = client
2377            .build_place_order_params(
2378                product_type,
2379                instrument_id,
2380                client_order_id,
2381                OrderSide::Buy,
2382                OrderType::Limit,
2383                quantity,
2384                false, // is_quote_quantity
2385                Some(TimeInForce::Gtc),
2386                Some(Price::from("50000.0")),
2387                None,
2388                None,
2389                None,
2390                is_leverage,
2391            )
2392            .expect("Failed to build params");
2393
2394        assert_eq!(params.is_leverage, expected);
2395    }
2396
2397    #[rstest]
2398    #[case::spot_market_quote_quantity(BybitProductType::Spot, OrderType::Market, true, Some(BYBIT_QUOTE_COIN.to_string()))]
2399    #[case::spot_market_base_quantity(BybitProductType::Spot, OrderType::Market, false, Some(BYBIT_BASE_COIN.to_string()))]
2400    #[case::spot_limit_no_unit(BybitProductType::Spot, OrderType::Limit, false, None)]
2401    #[case::spot_limit_quote(BybitProductType::Spot, OrderType::Limit, true, None)]
2402    #[case::linear_market_no_unit(BybitProductType::Linear, OrderType::Market, false, None)]
2403    #[case::inverse_market_no_unit(BybitProductType::Inverse, OrderType::Market, true, None)]
2404    fn test_is_quote_quantity_parameter(
2405        #[case] product_type: BybitProductType,
2406        #[case] order_type: OrderType,
2407        #[case] is_quote_quantity: bool,
2408        #[case] expected: Option<String>,
2409    ) {
2410        let symbol = match product_type {
2411            BybitProductType::Spot => "BTCUSDT-SPOT.BYBIT",
2412            BybitProductType::Linear => "ETHUSDT-LINEAR.BYBIT",
2413            BybitProductType::Inverse => "BTCUSD-INVERSE.BYBIT",
2414            BybitProductType::Option => "BTC-31MAY24-50000-C-OPTION.BYBIT",
2415        };
2416
2417        let instrument_id = InstrumentId::from(symbol);
2418        let client_order_id = ClientOrderId::from("test-order-1");
2419        let quantity = Quantity::from("1.0");
2420
2421        let client = BybitWebSocketClient::new_trade(
2422            BybitEnvironment::Testnet,
2423            Some("test-key".to_string()),
2424            Some("test-secret".to_string()),
2425            None,
2426            Some(20),
2427        );
2428
2429        let params = client
2430            .build_place_order_params(
2431                product_type,
2432                instrument_id,
2433                client_order_id,
2434                OrderSide::Buy,
2435                order_type,
2436                quantity,
2437                is_quote_quantity,
2438                Some(TimeInForce::Gtc),
2439                if order_type == OrderType::Market {
2440                    None
2441                } else {
2442                    Some(Price::from("50000.0"))
2443                },
2444                None,
2445                None,
2446                None,
2447                false,
2448            )
2449            .expect("Failed to build params");
2450
2451        assert_eq!(params.market_unit, expected);
2452    }
2453}