nautilus_bybit/websocket/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Bybit WebSocket client providing public market data streaming.
17//!
18//! Bybit API reference <https://bybit-exchange.github.io/docs/>.
19
20use std::{
21    fmt::Debug,
22    sync::{
23        Arc,
24        atomic::{AtomicBool, AtomicU8, Ordering},
25    },
26    time::Duration,
27};
28
29use ahash::AHashMap;
30use arc_swap::ArcSwap;
31use dashmap::DashMap;
32use nautilus_common::live::get_runtime;
33use nautilus_core::{UUID4, consts::NAUTILUS_USER_AGENT, env::get_or_env_var_opt};
34use nautilus_model::{
35    enums::{OrderSide, OrderType, TimeInForce},
36    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
37    instruments::{Instrument, InstrumentAny},
38    types::{Price, Quantity},
39};
40use nautilus_network::{
41    backoff::ExponentialBackoff,
42    mode::ConnectionMode,
43    websocket::{
44        AuthTracker, PingHandler, SubscriptionState, WebSocketClient, WebSocketConfig,
45        channel_message_handler,
46    },
47};
48use serde_json::Value;
49use tokio_util::sync::CancellationToken;
50use ustr::Ustr;
51
52use crate::{
53    common::{
54        consts::{
55            BYBIT_BASE_COIN, BYBIT_NAUTILUS_BROKER_ID, BYBIT_QUOTE_COIN, BYBIT_WS_TOPIC_DELIMITER,
56        },
57        credential::Credential,
58        enums::{
59            BybitEnvironment, BybitOrderSide, BybitOrderType, BybitProductType, BybitTimeInForce,
60            BybitTriggerDirection, BybitTriggerType, BybitWsOrderRequestOp,
61        },
62        parse::{extract_raw_symbol, make_bybit_symbol},
63        symbol::BybitSymbol,
64        urls::{bybit_ws_private_url, bybit_ws_public_url, bybit_ws_trade_url},
65    },
66    websocket::{
67        enums::{BybitWsOperation, BybitWsPrivateChannel, BybitWsPublicChannel},
68        error::{BybitWsError, BybitWsResult},
69        handler::{FeedHandler, HandlerCommand},
70        messages::{
71            BybitAuthRequest, BybitSubscription, BybitWsAmendOrderParams, BybitWsBatchCancelItem,
72            BybitWsBatchCancelOrderArgs, BybitWsBatchPlaceItem, BybitWsBatchPlaceOrderArgs,
73            BybitWsCancelOrderParams, BybitWsHeader, BybitWsPlaceOrderParams, BybitWsRequest,
74            NautilusWsMessage,
75        },
76    },
77};
78
79const DEFAULT_HEARTBEAT_SECS: u64 = 20;
80const WEBSOCKET_AUTH_WINDOW_MS: i64 = 5_000;
81const BATCH_PROCESSING_LIMIT: usize = 20;
82
83/// Type alias for the funding rate cache.
84type FundingCache = Arc<tokio::sync::RwLock<AHashMap<Ustr, (Option<String>, Option<String>)>>>;
85
86/// Resolves credentials from provided values or environment variables.
87///
88/// Priority for environment variables based on environment:
89/// - Demo: `BYBIT_DEMO_API_KEY`, `BYBIT_DEMO_API_SECRET`
90/// - Testnet: `BYBIT_TESTNET_API_KEY`, `BYBIT_TESTNET_API_SECRET`
91/// - Mainnet: `BYBIT_API_KEY`, `BYBIT_API_SECRET`
92fn resolve_credential(
93    environment: BybitEnvironment,
94    api_key: Option<String>,
95    api_secret: Option<String>,
96) -> Option<Credential> {
97    let (api_key_env, api_secret_env) = match environment {
98        BybitEnvironment::Demo => ("BYBIT_DEMO_API_KEY", "BYBIT_DEMO_API_SECRET"),
99        BybitEnvironment::Testnet => ("BYBIT_TESTNET_API_KEY", "BYBIT_TESTNET_API_SECRET"),
100        BybitEnvironment::Mainnet => ("BYBIT_API_KEY", "BYBIT_API_SECRET"),
101    };
102
103    let key = get_or_env_var_opt(api_key, api_key_env);
104    let secret = get_or_env_var_opt(api_secret, api_secret_env);
105
106    match (key, secret) {
107        (Some(k), Some(s)) => Some(Credential::new(k, s)),
108        _ => None,
109    }
110}
111
112/// Public/market data WebSocket client for Bybit.
113#[cfg_attr(feature = "python", pyo3::pyclass)]
114pub struct BybitWebSocketClient {
115    url: String,
116    environment: BybitEnvironment,
117    product_type: Option<BybitProductType>,
118    credential: Option<Credential>,
119    requires_auth: bool,
120    auth_tracker: AuthTracker,
121    heartbeat: Option<u64>,
122    connection_mode: Arc<ArcSwap<AtomicU8>>,
123    cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
124    out_rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>>,
125    signal: Arc<AtomicBool>,
126    task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
127    subscriptions: SubscriptionState,
128    is_authenticated: Arc<AtomicBool>,
129    account_id: Option<AccountId>,
130    mm_level: Arc<AtomicU8>,
131    bars_timestamp_on_close: bool,
132    instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
133    funding_cache: FundingCache,
134    cancellation_token: CancellationToken,
135}
136
137impl Debug for BybitWebSocketClient {
138    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139        f.debug_struct("BybitWebSocketClient")
140            .field("url", &self.url)
141            .field("environment", &self.environment)
142            .field("product_type", &self.product_type)
143            .field("requires_auth", &self.requires_auth)
144            .field("heartbeat", &self.heartbeat)
145            .field("confirmed_subscriptions", &self.subscriptions.len())
146            .finish()
147    }
148}
149
150impl Clone for BybitWebSocketClient {
151    fn clone(&self) -> Self {
152        Self {
153            url: self.url.clone(),
154            environment: self.environment,
155            product_type: self.product_type,
156            credential: self.credential.clone(),
157            requires_auth: self.requires_auth,
158            auth_tracker: self.auth_tracker.clone(),
159            heartbeat: self.heartbeat,
160            connection_mode: Arc::clone(&self.connection_mode),
161            cmd_tx: Arc::clone(&self.cmd_tx),
162            out_rx: None, // Each clone gets its own receiver
163            signal: Arc::clone(&self.signal),
164            task_handle: None, // Each clone gets its own task handle
165            subscriptions: self.subscriptions.clone(),
166            is_authenticated: Arc::clone(&self.is_authenticated),
167            account_id: self.account_id,
168            mm_level: Arc::clone(&self.mm_level),
169            bars_timestamp_on_close: self.bars_timestamp_on_close,
170            instruments_cache: Arc::clone(&self.instruments_cache),
171            funding_cache: Arc::clone(&self.funding_cache),
172            cancellation_token: self.cancellation_token.clone(),
173        }
174    }
175}
176
177impl BybitWebSocketClient {
178    /// Creates a new Bybit public WebSocket client.
179    #[must_use]
180    pub fn new_public(url: Option<String>, heartbeat: Option<u64>) -> Self {
181        Self::new_public_with(
182            BybitProductType::Linear,
183            BybitEnvironment::Mainnet,
184            url,
185            heartbeat,
186        )
187    }
188
189    /// Creates a new Bybit public WebSocket client targeting the specified product/environment.
190    #[must_use]
191    pub fn new_public_with(
192        product_type: BybitProductType,
193        environment: BybitEnvironment,
194        url: Option<String>,
195        heartbeat: Option<u64>,
196    ) -> Self {
197        // We don't have a handler yet; this placeholder keeps cache_instrument() working.
198        // connect() swaps in the real channel and replays any queued instruments so the
199        // handler sees them once it starts.
200        let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
201
202        let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
203        let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
204
205        Self {
206            url: url.unwrap_or_else(|| bybit_ws_public_url(product_type, environment)),
207            environment,
208            product_type: Some(product_type),
209            credential: None,
210            requires_auth: false,
211            auth_tracker: AuthTracker::new(),
212            heartbeat: heartbeat.or(Some(DEFAULT_HEARTBEAT_SECS)),
213            connection_mode,
214            cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
215            out_rx: None,
216            signal: Arc::new(AtomicBool::new(false)),
217            task_handle: None,
218            subscriptions: SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER),
219            is_authenticated: Arc::new(AtomicBool::new(false)),
220            instruments_cache: Arc::new(DashMap::new()),
221            account_id: None,
222            mm_level: Arc::new(AtomicU8::new(0)),
223            bars_timestamp_on_close: true,
224            funding_cache: Arc::new(tokio::sync::RwLock::new(AHashMap::new())),
225            cancellation_token: CancellationToken::new(),
226        }
227    }
228
229    /// Creates a new Bybit private WebSocket client.
230    ///
231    /// If `api_key` or `api_secret` are not provided, they will be loaded from
232    /// environment variables based on the environment:
233    /// - Demo: `BYBIT_DEMO_API_KEY`, `BYBIT_DEMO_API_SECRET`
234    /// - Testnet: `BYBIT_TESTNET_API_KEY`, `BYBIT_TESTNET_API_SECRET`
235    /// - Mainnet: `BYBIT_API_KEY`, `BYBIT_API_SECRET`
236    #[must_use]
237    pub fn new_private(
238        environment: BybitEnvironment,
239        api_key: Option<String>,
240        api_secret: Option<String>,
241        url: Option<String>,
242        heartbeat: Option<u64>,
243    ) -> Self {
244        let credential = resolve_credential(environment, api_key, api_secret);
245
246        // We don't have a handler yet; this placeholder keeps cache_instrument() working.
247        // connect() swaps in the real channel and replays any queued instruments so the
248        // handler sees them once it starts.
249        let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
250
251        let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
252        let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
253
254        Self {
255            url: url.unwrap_or_else(|| bybit_ws_private_url(environment).to_string()),
256            environment,
257            product_type: None,
258            credential,
259            requires_auth: true,
260            auth_tracker: AuthTracker::new(),
261            heartbeat: heartbeat.or(Some(DEFAULT_HEARTBEAT_SECS)),
262            connection_mode,
263            cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
264            out_rx: None,
265            signal: Arc::new(AtomicBool::new(false)),
266            task_handle: None,
267            subscriptions: SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER),
268            is_authenticated: Arc::new(AtomicBool::new(false)),
269            instruments_cache: Arc::new(DashMap::new()),
270            account_id: None,
271            mm_level: Arc::new(AtomicU8::new(0)),
272            bars_timestamp_on_close: true,
273            funding_cache: Arc::new(tokio::sync::RwLock::new(AHashMap::new())),
274            cancellation_token: CancellationToken::new(),
275        }
276    }
277
278    /// Creates a new Bybit trade WebSocket client for order operations.
279    ///
280    /// If `api_key` or `api_secret` are not provided, they will be loaded from
281    /// environment variables based on the environment:
282    /// - Demo: `BYBIT_DEMO_API_KEY`, `BYBIT_DEMO_API_SECRET`
283    /// - Testnet: `BYBIT_TESTNET_API_KEY`, `BYBIT_TESTNET_API_SECRET`
284    /// - Mainnet: `BYBIT_API_KEY`, `BYBIT_API_SECRET`
285    #[must_use]
286    pub fn new_trade(
287        environment: BybitEnvironment,
288        api_key: Option<String>,
289        api_secret: Option<String>,
290        url: Option<String>,
291        heartbeat: Option<u64>,
292    ) -> Self {
293        let credential = resolve_credential(environment, api_key, api_secret);
294
295        // We don't have a handler yet; this placeholder keeps cache_instrument() working.
296        // connect() swaps in the real channel and replays any queued instruments so the
297        // handler sees them once it starts.
298        let (cmd_tx, _) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
299
300        let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
301        let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
302
303        Self {
304            url: url.unwrap_or_else(|| bybit_ws_trade_url(environment).to_string()),
305            environment,
306            product_type: None,
307            credential,
308            requires_auth: true,
309            auth_tracker: AuthTracker::new(),
310            heartbeat: heartbeat.or(Some(DEFAULT_HEARTBEAT_SECS)),
311            connection_mode,
312            cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
313            out_rx: None,
314            signal: Arc::new(AtomicBool::new(false)),
315            task_handle: None,
316            subscriptions: SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER),
317            is_authenticated: Arc::new(AtomicBool::new(false)),
318            instruments_cache: Arc::new(DashMap::new()),
319            account_id: None,
320            mm_level: Arc::new(AtomicU8::new(0)),
321            bars_timestamp_on_close: true,
322            funding_cache: Arc::new(tokio::sync::RwLock::new(AHashMap::new())),
323            cancellation_token: CancellationToken::new(),
324        }
325    }
326
327    /// Establishes the WebSocket connection.
328    ///
329    /// # Errors
330    ///
331    /// Returns an error if the underlying WebSocket connection cannot be established,
332    /// after retrying multiple times with exponential backoff.
333    pub async fn connect(&mut self) -> BybitWsResult<()> {
334        self.signal.store(false, Ordering::Relaxed);
335
336        let (raw_handler, raw_rx) = channel_message_handler();
337
338        // No-op ping handler: handler owns the WebSocketClient and responds to pings directly
339        // in the message loop for minimal latency (see handler.rs pong response)
340        let ping_handler: PingHandler = Arc::new(move |_payload: Vec<u8>| {
341            // Handler responds to pings internally via select! loop
342        });
343
344        let ping_msg = serde_json::to_string(&BybitSubscription {
345            op: BybitWsOperation::Ping,
346            args: vec![],
347        })?;
348
349        let config = WebSocketConfig {
350            url: self.url.clone(),
351            headers: Self::default_headers(),
352            heartbeat: self.heartbeat,
353            heartbeat_msg: Some(ping_msg),
354            reconnect_timeout_ms: Some(5_000),
355            reconnect_delay_initial_ms: Some(500),
356            reconnect_delay_max_ms: Some(5_000),
357            reconnect_backoff_factor: Some(1.5),
358            reconnect_jitter_ms: Some(250),
359            reconnect_max_attempts: None,
360        };
361
362        // Retry initial connection with exponential backoff to handle transient DNS/network issues
363        // TODO: Eventually expose client config options for this
364        const MAX_RETRIES: u32 = 5;
365        const CONNECTION_TIMEOUT_SECS: u64 = 10;
366
367        let mut backoff = ExponentialBackoff::new(
368            Duration::from_millis(500),
369            Duration::from_millis(5000),
370            2.0,
371            250,
372            false,
373        )
374        .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
375
376        #[allow(unused_assignments)]
377        let mut last_error = String::new();
378        let mut attempt = 0;
379        let client = loop {
380            attempt += 1;
381
382            match tokio::time::timeout(
383                Duration::from_secs(CONNECTION_TIMEOUT_SECS),
384                WebSocketClient::connect(
385                    config.clone(),
386                    Some(raw_handler.clone()),
387                    Some(ping_handler.clone()),
388                    None,
389                    vec![],
390                    None,
391                ),
392            )
393            .await
394            {
395                Ok(Ok(client)) => {
396                    if attempt > 1 {
397                        tracing::info!("WebSocket connection established after {attempt} attempts");
398                    }
399                    break client;
400                }
401                Ok(Err(e)) => {
402                    last_error = e.to_string();
403                    tracing::warn!(
404                        attempt,
405                        max_retries = MAX_RETRIES,
406                        url = %self.url,
407                        error = %last_error,
408                        "WebSocket connection attempt failed"
409                    );
410                }
411                Err(_) => {
412                    last_error = format!(
413                        "Connection timeout after {CONNECTION_TIMEOUT_SECS}s (possible DNS resolution failure)"
414                    );
415                    tracing::warn!(
416                        attempt,
417                        max_retries = MAX_RETRIES,
418                        url = %self.url,
419                        "WebSocket connection attempt timed out"
420                    );
421                }
422            }
423
424            if attempt >= MAX_RETRIES {
425                return Err(BybitWsError::Transport(format!(
426                    "Failed to connect to {} after {MAX_RETRIES} attempts: {}. \
427                    If this is a DNS error, check your network configuration and DNS settings.",
428                    self.url,
429                    if last_error.is_empty() {
430                        "unknown error"
431                    } else {
432                        &last_error
433                    }
434                )));
435            }
436
437            let delay = backoff.next_duration();
438            tracing::debug!(
439                "Retrying in {delay:?} (attempt {}/{MAX_RETRIES})",
440                attempt + 1
441            );
442            tokio::time::sleep(delay).await;
443        };
444
445        self.connection_mode.store(client.connection_mode_atomic());
446
447        let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
448        self.out_rx = Some(Arc::new(out_rx));
449
450        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
451        *self.cmd_tx.write().await = cmd_tx.clone();
452
453        let cmd = HandlerCommand::SetClient(client);
454
455        self.send_cmd(cmd).await?;
456
457        // Replay cached instruments to the new handler via the new channel
458        if !self.instruments_cache.is_empty() {
459            let cached_instruments: Vec<InstrumentAny> = self
460                .instruments_cache
461                .iter()
462                .map(|entry| entry.value().clone())
463                .collect();
464            let cmd = HandlerCommand::InitializeInstruments(cached_instruments);
465            self.send_cmd(cmd).await?;
466        }
467
468        let signal = Arc::clone(&self.signal);
469        let subscriptions = self.subscriptions.clone();
470        let credential = self.credential.clone();
471        let requires_auth = self.requires_auth;
472        let funding_cache = Arc::clone(&self.funding_cache);
473        let account_id = self.account_id;
474        let product_type = self.product_type;
475        let bars_timestamp_on_close = self.bars_timestamp_on_close;
476        let mm_level = Arc::clone(&self.mm_level);
477        let cmd_tx_for_reconnect = cmd_tx.clone();
478        let auth_tracker = self.auth_tracker.clone();
479        let is_authenticated = Arc::clone(&self.is_authenticated);
480
481        let stream_handle = get_runtime().spawn(async move {
482            let mut handler = FeedHandler::new(
483                signal.clone(),
484                cmd_rx,
485                raw_rx,
486                out_tx.clone(),
487                account_id,
488                product_type,
489                bars_timestamp_on_close,
490                mm_level.clone(),
491                auth_tracker,
492                subscriptions.clone(),
493                funding_cache.clone(),
494            );
495
496            // Helper closure to resubscribe all tracked subscriptions after reconnection
497            let resubscribe_all = || async {
498                let topics = subscriptions.all_topics();
499
500                if topics.is_empty() {
501                    return;
502                }
503
504                tracing::debug!(count = topics.len(), "Resubscribing to confirmed subscriptions");
505
506                for topic in &topics {
507                    subscriptions.mark_subscribe(topic.as_str());
508                }
509
510                let mut payloads = Vec::with_capacity(topics.len());
511                for topic in &topics {
512                    let message = BybitSubscription {
513                        op: BybitWsOperation::Subscribe,
514                        args: vec![topic.clone()],
515                    };
516                    if let Ok(payload) = serde_json::to_string(&message) {
517                        payloads.push(payload);
518                    }
519                }
520
521                let cmd = HandlerCommand::Subscribe { topics: payloads };
522
523                if let Err(e) = cmd_tx_for_reconnect.send(cmd) {
524                    tracing::error!("Failed to send resubscribe command: {e}");
525                }
526            };
527
528            // Run message processing with reconnection handling
529            loop {
530                match handler.next().await {
531                    Some(NautilusWsMessage::Reconnected) => {
532                        if signal.load(Ordering::Relaxed) {
533                            continue;
534                        }
535
536                        tracing::info!("WebSocket reconnected");
537
538                        // Mark all confirmed subscriptions as failed so they transition to pending state
539                        let confirmed_topics: Vec<String> = {
540                            let confirmed = subscriptions.confirmed();
541                            let mut topics = Vec::new();
542                            for entry in confirmed.iter() {
543                                let (channel, symbols) = entry.pair();
544                                for symbol in symbols {
545                                    if symbol.is_empty() {
546                                        topics.push(channel.to_string());
547                                    } else {
548                                        topics.push(format!("{channel}.{symbol}"));
549                                    }
550                                }
551                            }
552                            topics
553                        };
554
555                        if !confirmed_topics.is_empty() {
556                            tracing::debug!(count = confirmed_topics.len(), "Marking confirmed subscriptions as pending for replay");
557                            for topic in confirmed_topics {
558                                subscriptions.mark_failure(&topic);
559                            }
560                        }
561
562                        // Clear caches to prevent stale data after reconnection
563                        funding_cache.write().await.clear();
564
565                        if requires_auth {
566                            is_authenticated.store(false, Ordering::Relaxed);
567                            tracing::debug!("Re-authenticating after reconnection");
568
569                            if let Some(cred) = &credential {
570                                let expires = chrono::Utc::now().timestamp_millis() + WEBSOCKET_AUTH_WINDOW_MS;
571                                let signature = cred.sign_websocket_auth(expires);
572
573                                let auth_message = BybitAuthRequest {
574                                    op: BybitWsOperation::Auth,
575                                    args: vec![
576                                        Value::String(cred.api_key().to_string()),
577                                        Value::Number(expires.into()),
578                                        Value::String(signature),
579                                    ],
580                                };
581
582                                if let Ok(payload) = serde_json::to_string(&auth_message) {
583                                    let cmd = HandlerCommand::Authenticate { payload };
584                                    if let Err(e) = cmd_tx_for_reconnect.send(cmd) {
585                                        tracing::error!(error = %e, "Failed to send reconnection auth command");
586                                    }
587                                } else {
588                                    tracing::error!("Failed to serialize reconnection auth message");
589                                }
590                            }
591                        }
592
593                        // Unauthenticated sessions resubscribe immediately after reconnection,
594                        // authenticated sessions wait for Authenticated message
595                        if !requires_auth {
596                            tracing::debug!("No authentication required, resubscribing immediately");
597                            resubscribe_all().await;
598                        }
599
600                        // Forward to out_tx so caller sees the Reconnected message
601                        if out_tx.send(NautilusWsMessage::Reconnected).is_err() {
602                            tracing::debug!("Receiver dropped, stopping");
603                            break;
604                        }
605                        continue;
606                    }
607                    Some(NautilusWsMessage::Authenticated) => {
608                        tracing::debug!("Authenticated, resubscribing");
609                        is_authenticated.store(true, Ordering::Relaxed);
610                        resubscribe_all().await;
611                        continue;
612                    }
613                    Some(msg) => {
614                        if out_tx.send(msg).is_err() {
615                            tracing::error!("Failed to send message (receiver dropped)");
616                            break;
617                        }
618                    }
619                    None => {
620                        // Stream ended - check if it's a stop signal
621                        if handler.is_stopped() {
622                            tracing::debug!("Stop signal received, ending message processing");
623                            break;
624                        }
625                        // Otherwise it's an unexpected stream end
626                        tracing::warn!("WebSocket stream ended unexpectedly");
627                        break;
628                    }
629                }
630            }
631
632            tracing::debug!("Handler task exiting");
633        });
634
635        self.task_handle = Some(Arc::new(stream_handle));
636
637        if requires_auth && let Err(e) = self.authenticate_if_required().await {
638            return Err(e);
639        }
640
641        Ok(())
642    }
643
644    /// Disconnects the WebSocket client and stops the background task.
645    pub async fn close(&mut self) -> BybitWsResult<()> {
646        tracing::debug!("Starting close process");
647
648        self.signal.store(true, Ordering::Relaxed);
649
650        let cmd = HandlerCommand::Disconnect;
651        if let Err(e) = self.cmd_tx.read().await.send(cmd) {
652            tracing::debug!(
653                "Failed to send disconnect command (handler may already be shut down): {e}"
654            );
655        }
656
657        if let Some(task_handle) = self.task_handle.take() {
658            match Arc::try_unwrap(task_handle) {
659                Ok(handle) => {
660                    tracing::debug!("Waiting for task handle to complete");
661                    match tokio::time::timeout(Duration::from_secs(2), handle).await {
662                        Ok(Ok(())) => tracing::debug!("Task handle completed successfully"),
663                        Ok(Err(e)) => tracing::error!("Task handle encountered an error: {e:?}"),
664                        Err(_) => {
665                            tracing::warn!(
666                                "Timeout waiting for task handle, task may still be running"
667                            );
668                            // The task will be dropped and should clean up automatically
669                        }
670                    }
671                }
672                Err(arc_handle) => {
673                    tracing::debug!(
674                        "Cannot take ownership of task handle - other references exist, aborting task"
675                    );
676                    arc_handle.abort();
677                }
678            }
679        } else {
680            tracing::debug!("No task handle to await");
681        }
682
683        self.is_authenticated.store(false, Ordering::Relaxed);
684
685        tracing::debug!("Closed");
686
687        Ok(())
688    }
689
690    /// Returns a value indicating whether the client is active.
691    #[must_use]
692    pub fn is_active(&self) -> bool {
693        let connection_mode_arc = self.connection_mode.load();
694        ConnectionMode::from_atomic(&connection_mode_arc).is_active()
695            && !self.signal.load(Ordering::Relaxed)
696    }
697
698    /// Returns a value indicating whether the client is closed.
699    pub fn is_closed(&self) -> bool {
700        let connection_mode_arc = self.connection_mode.load();
701        ConnectionMode::from_atomic(&connection_mode_arc).is_closed()
702            || self.signal.load(Ordering::Relaxed)
703    }
704
705    /// Waits until the WebSocket client becomes active or times out.
706    ///
707    /// # Errors
708    ///
709    /// Returns an error if the timeout is exceeded before the client becomes active.
710    pub async fn wait_until_active(&self, timeout_secs: f64) -> BybitWsResult<()> {
711        let timeout = tokio::time::Duration::from_secs_f64(timeout_secs);
712
713        tokio::time::timeout(timeout, async {
714            while !self.is_active() {
715                tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
716            }
717        })
718        .await
719        .map_err(|_| {
720            BybitWsError::ClientError(format!(
721                "WebSocket connection timeout after {timeout_secs} seconds"
722            ))
723        })?;
724
725        Ok(())
726    }
727
728    /// Subscribe to the provided topic strings.
729    pub async fn subscribe(&self, topics: Vec<String>) -> BybitWsResult<()> {
730        if topics.is_empty() {
731            return Ok(());
732        }
733
734        tracing::debug!("Subscribing to topics: {topics:?}");
735
736        // Use reference counting to deduplicate subscriptions
737        let mut topics_to_send = Vec::new();
738
739        for topic in topics {
740            // Returns true if this is the first subscription (ref count 0 -> 1)
741            if self.subscriptions.add_reference(&topic) {
742                self.subscriptions.mark_subscribe(&topic);
743                topics_to_send.push(topic.clone());
744            } else {
745                tracing::debug!("Already subscribed to {topic}, skipping duplicate subscription");
746            }
747        }
748
749        if topics_to_send.is_empty() {
750            return Ok(());
751        }
752
753        // Serialize subscription messages
754        let mut payloads = Vec::with_capacity(topics_to_send.len());
755        for topic in &topics_to_send {
756            let message = BybitSubscription {
757                op: BybitWsOperation::Subscribe,
758                args: vec![topic.clone()],
759            };
760            let payload = serde_json::to_string(&message).map_err(|e| {
761                BybitWsError::Json(format!("Failed to serialize subscription: {e}"))
762            })?;
763            payloads.push(payload);
764        }
765
766        let cmd = HandlerCommand::Subscribe { topics: payloads };
767        self.cmd_tx
768            .read()
769            .await
770            .send(cmd)
771            .map_err(|e| BybitWsError::Send(format!("Failed to send subscribe command: {e}")))?;
772
773        Ok(())
774    }
775
776    /// Unsubscribe from the provided topics.
777    pub async fn unsubscribe(&self, topics: Vec<String>) -> BybitWsResult<()> {
778        if topics.is_empty() {
779            return Ok(());
780        }
781
782        tracing::debug!("Attempting to unsubscribe from topics: {topics:?}");
783
784        if self.signal.load(Ordering::Relaxed) {
785            tracing::debug!("Shutdown signal detected, skipping unsubscribe");
786            return Ok(());
787        }
788
789        // Use reference counting to avoid unsubscribing while other consumers still need the topic
790        let mut topics_to_send = Vec::new();
791
792        for topic in topics {
793            // Returns true if this was the last subscription (ref count 1 -> 0)
794            if self.subscriptions.remove_reference(&topic) {
795                self.subscriptions.mark_unsubscribe(&topic);
796                topics_to_send.push(topic.clone());
797            } else {
798                tracing::debug!("Topic {topic} still has active subscriptions, not unsubscribing");
799            }
800        }
801
802        if topics_to_send.is_empty() {
803            return Ok(());
804        }
805
806        // Serialize unsubscription messages
807        let mut payloads = Vec::with_capacity(topics_to_send.len());
808        for topic in &topics_to_send {
809            let message = BybitSubscription {
810                op: BybitWsOperation::Unsubscribe,
811                args: vec![topic.clone()],
812            };
813            if let Ok(payload) = serde_json::to_string(&message) {
814                payloads.push(payload);
815            }
816        }
817
818        let cmd = HandlerCommand::Unsubscribe { topics: payloads };
819        if let Err(e) = self.cmd_tx.read().await.send(cmd) {
820            tracing::debug!(error = %e, "Failed to send unsubscribe command");
821        }
822
823        Ok(())
824    }
825
826    /// Returns a stream of parsed [`NautilusWsMessage`] items.
827    ///
828    /// # Panics
829    ///
830    /// Panics if called before [`Self::connect`] or if the stream has already been taken.
831    pub fn stream(&mut self) -> impl futures_util::Stream<Item = NautilusWsMessage> + use<> {
832        let rx = self
833            .out_rx
834            .take()
835            .expect("Stream receiver already taken or client not connected");
836        let mut rx = Arc::try_unwrap(rx).expect("Cannot take ownership - other references exist");
837        async_stream::stream! {
838            while let Some(msg) = rx.recv().await {
839                yield msg;
840            }
841        }
842    }
843
844    /// Returns the number of currently registered subscriptions.
845    #[must_use]
846    pub fn subscription_count(&self) -> usize {
847        self.subscriptions.len()
848    }
849
850    /// Returns the credential associated with this client, if any.
851    #[must_use]
852    pub fn credential(&self) -> Option<&Credential> {
853        self.credential.as_ref()
854    }
855
856    /// Caches a single instrument.
857    ///
858    /// Any existing instrument with the same ID will be replaced.
859    pub fn cache_instrument(&self, instrument: InstrumentAny) {
860        self.instruments_cache
861            .insert(instrument.symbol().inner(), instrument.clone());
862
863        // Before connect() the handler isn't running; this send will fail and that's expected
864        // because connect() replays the instruments via InitializeInstruments
865        if let Ok(cmd_tx) = self.cmd_tx.try_read() {
866            let cmd = HandlerCommand::UpdateInstrument(instrument);
867            if let Err(e) = cmd_tx.send(cmd) {
868                tracing::debug!("Failed to send instrument update to handler: {e}");
869            }
870        }
871    }
872
873    /// Caches multiple instruments.
874    ///
875    /// Clears the existing cache first, then adds all provided instruments.
876    pub fn cache_instruments(&mut self, instruments: Vec<InstrumentAny>) {
877        self.instruments_cache.clear();
878        let mut count = 0;
879
880        tracing::debug!("Initializing Bybit instrument cache");
881
882        for inst in instruments {
883            let symbol = inst.symbol().inner();
884            self.instruments_cache.insert(symbol, inst.clone());
885            tracing::debug!("Cached instrument: {symbol}");
886            count += 1;
887        }
888
889        tracing::info!("Bybit instrument cache initialized with {count} instruments");
890    }
891
892    /// Sets the account ID for account message parsing.
893    pub fn set_account_id(&mut self, account_id: AccountId) {
894        self.account_id = Some(account_id);
895    }
896
897    /// Sets the account market maker level.
898    pub fn set_mm_level(&self, mm_level: u8) {
899        self.mm_level.store(mm_level, Ordering::Relaxed);
900    }
901
902    /// Sets whether bar timestamps should use the close time.
903    ///
904    /// When `true` (default), bar `ts_event` is set to the bar's close time.
905    /// When `false`, bar `ts_event` is set to the bar's open time.
906    pub fn set_bars_timestamp_on_close(&mut self, value: bool) {
907        self.bars_timestamp_on_close = value;
908    }
909
910    /// Returns a reference to the instruments cache.
911    #[must_use]
912    pub fn instruments(&self) -> &Arc<DashMap<Ustr, InstrumentAny>> {
913        &self.instruments_cache
914    }
915
916    /// Returns the account ID if set.
917    #[must_use]
918    pub fn account_id(&self) -> Option<AccountId> {
919        self.account_id
920    }
921
922    /// Returns the product type for public connections.
923    #[must_use]
924    pub fn product_type(&self) -> Option<BybitProductType> {
925        self.product_type
926    }
927
928    /// Subscribes to orderbook updates for a specific instrument.
929    ///
930    /// # Errors
931    ///
932    /// Returns an error if the subscription request fails.
933    ///
934    /// # References
935    ///
936    /// <https://bybit-exchange.github.io/docs/v5/websocket/public/orderbook>
937    pub async fn subscribe_orderbook(
938        &self,
939        instrument_id: InstrumentId,
940        depth: u32,
941    ) -> BybitWsResult<()> {
942        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
943        let topic = format!(
944            "{}.{depth}.{raw_symbol}",
945            BybitWsPublicChannel::OrderBook.as_ref()
946        );
947        self.subscribe(vec![topic]).await
948    }
949
950    /// Unsubscribes from orderbook updates for a specific instrument.
951    pub async fn unsubscribe_orderbook(
952        &self,
953        instrument_id: InstrumentId,
954        depth: u32,
955    ) -> BybitWsResult<()> {
956        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
957        let topic = format!(
958            "{}.{depth}.{raw_symbol}",
959            BybitWsPublicChannel::OrderBook.as_ref()
960        );
961        self.unsubscribe(vec![topic]).await
962    }
963
964    /// Subscribes to public trade updates for a specific instrument.
965    ///
966    /// # Errors
967    ///
968    /// Returns an error if the subscription request fails.
969    ///
970    /// # References
971    ///
972    /// <https://bybit-exchange.github.io/docs/v5/websocket/public/trade>
973    pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
974        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
975        let topic = format!(
976            "{}.{raw_symbol}",
977            BybitWsPublicChannel::PublicTrade.as_ref()
978        );
979        self.subscribe(vec![topic]).await
980    }
981
982    /// Unsubscribes from public trade updates for a specific instrument.
983    pub async fn unsubscribe_trades(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
984        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
985        let topic = format!(
986            "{}.{raw_symbol}",
987            BybitWsPublicChannel::PublicTrade.as_ref()
988        );
989        self.unsubscribe(vec![topic]).await
990    }
991
992    /// Subscribes to ticker updates for a specific instrument.
993    ///
994    /// # Errors
995    ///
996    /// Returns an error if the subscription request fails.
997    ///
998    /// # References
999    ///
1000    /// <https://bybit-exchange.github.io/docs/v5/websocket/public/ticker>
1001    pub async fn subscribe_ticker(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
1002        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
1003        let topic = format!("{}.{raw_symbol}", BybitWsPublicChannel::Tickers.as_ref());
1004        self.subscribe(vec![topic]).await
1005    }
1006
1007    /// Unsubscribes from ticker updates for a specific instrument.
1008    pub async fn unsubscribe_ticker(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
1009        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
1010        let topic = format!("{}.{raw_symbol}", BybitWsPublicChannel::Tickers.as_ref());
1011
1012        // Clear funding rate cache to ensure fresh data on resubscribe
1013        let symbol = self.product_type.map_or_else(
1014            || instrument_id.symbol.inner(),
1015            |pt| make_bybit_symbol(raw_symbol, pt),
1016        );
1017        self.funding_cache.write().await.remove(&symbol);
1018
1019        self.unsubscribe(vec![topic]).await
1020    }
1021
1022    /// Subscribes to kline/candlestick updates for a specific instrument.
1023    ///
1024    /// # Errors
1025    ///
1026    /// Returns an error if the subscription request fails.
1027    ///
1028    /// # References
1029    ///
1030    /// <https://bybit-exchange.github.io/docs/v5/websocket/public/kline>
1031    pub async fn subscribe_klines(
1032        &self,
1033        instrument_id: InstrumentId,
1034        interval: impl Into<String>,
1035    ) -> BybitWsResult<()> {
1036        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
1037        let topic = format!(
1038            "{}.{}.{raw_symbol}",
1039            BybitWsPublicChannel::Kline.as_ref(),
1040            interval.into()
1041        );
1042        self.subscribe(vec![topic]).await
1043    }
1044
1045    /// Unsubscribes from kline/candlestick updates for a specific instrument.
1046    pub async fn unsubscribe_klines(
1047        &self,
1048        instrument_id: InstrumentId,
1049        interval: impl Into<String>,
1050    ) -> BybitWsResult<()> {
1051        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
1052        let topic = format!(
1053            "{}.{}.{raw_symbol}",
1054            BybitWsPublicChannel::Kline.as_ref(),
1055            interval.into()
1056        );
1057        self.unsubscribe(vec![topic]).await
1058    }
1059
1060    /// Subscribes to order updates.
1061    ///
1062    /// # Errors
1063    ///
1064    /// Returns an error if the subscription request fails or if not authenticated.
1065    ///
1066    /// # References
1067    ///
1068    /// <https://bybit-exchange.github.io/docs/v5/websocket/private/order>
1069    pub async fn subscribe_orders(&self) -> BybitWsResult<()> {
1070        if !self.requires_auth {
1071            return Err(BybitWsError::Authentication(
1072                "Order subscription requires authentication".to_string(),
1073            ));
1074        }
1075        self.subscribe(vec![BybitWsPrivateChannel::Order.as_ref().to_string()])
1076            .await
1077    }
1078
1079    /// Unsubscribes from order updates.
1080    pub async fn unsubscribe_orders(&self) -> BybitWsResult<()> {
1081        self.unsubscribe(vec![BybitWsPrivateChannel::Order.as_ref().to_string()])
1082            .await
1083    }
1084
1085    /// Subscribes to execution/fill updates.
1086    ///
1087    /// # Errors
1088    ///
1089    /// Returns an error if the subscription request fails or if not authenticated.
1090    ///
1091    /// # References
1092    ///
1093    /// <https://bybit-exchange.github.io/docs/v5/websocket/private/execution>
1094    pub async fn subscribe_executions(&self) -> BybitWsResult<()> {
1095        if !self.requires_auth {
1096            return Err(BybitWsError::Authentication(
1097                "Execution subscription requires authentication".to_string(),
1098            ));
1099        }
1100        self.subscribe(vec![BybitWsPrivateChannel::Execution.as_ref().to_string()])
1101            .await
1102    }
1103
1104    /// Unsubscribes from execution/fill updates.
1105    pub async fn unsubscribe_executions(&self) -> BybitWsResult<()> {
1106        self.unsubscribe(vec![BybitWsPrivateChannel::Execution.as_ref().to_string()])
1107            .await
1108    }
1109
1110    /// Subscribes to position updates.
1111    ///
1112    /// # Errors
1113    ///
1114    /// Returns an error if the subscription request fails or if not authenticated.
1115    ///
1116    /// # References
1117    ///
1118    /// <https://bybit-exchange.github.io/docs/v5/websocket/private/position>
1119    pub async fn subscribe_positions(&self) -> BybitWsResult<()> {
1120        if !self.requires_auth {
1121            return Err(BybitWsError::Authentication(
1122                "Position subscription requires authentication".to_string(),
1123            ));
1124        }
1125        self.subscribe(vec![BybitWsPrivateChannel::Position.as_ref().to_string()])
1126            .await
1127    }
1128
1129    /// Unsubscribes from position updates.
1130    pub async fn unsubscribe_positions(&self) -> BybitWsResult<()> {
1131        self.unsubscribe(vec![BybitWsPrivateChannel::Position.as_ref().to_string()])
1132            .await
1133    }
1134
1135    /// Subscribes to wallet/balance updates.
1136    ///
1137    /// # Errors
1138    ///
1139    /// Returns an error if the subscription request fails or if not authenticated.
1140    ///
1141    /// # References
1142    ///
1143    /// <https://bybit-exchange.github.io/docs/v5/websocket/private/wallet>
1144    pub async fn subscribe_wallet(&self) -> BybitWsResult<()> {
1145        if !self.requires_auth {
1146            return Err(BybitWsError::Authentication(
1147                "Wallet subscription requires authentication".to_string(),
1148            ));
1149        }
1150        self.subscribe(vec![BybitWsPrivateChannel::Wallet.as_ref().to_string()])
1151            .await
1152    }
1153
1154    /// Unsubscribes from wallet/balance updates.
1155    pub async fn unsubscribe_wallet(&self) -> BybitWsResult<()> {
1156        self.unsubscribe(vec![BybitWsPrivateChannel::Wallet.as_ref().to_string()])
1157            .await
1158    }
1159
1160    /// Places an order via WebSocket.
1161    ///
1162    /// # Errors
1163    ///
1164    /// Returns an error if the order request fails or if not authenticated.
1165    ///
1166    /// # References
1167    ///
1168    /// <https://bybit-exchange.github.io/docs/v5/websocket/trade/guideline>
1169    pub async fn place_order(
1170        &self,
1171        params: BybitWsPlaceOrderParams,
1172        client_order_id: ClientOrderId,
1173        trader_id: TraderId,
1174        strategy_id: StrategyId,
1175        instrument_id: InstrumentId,
1176    ) -> BybitWsResult<()> {
1177        if !self.is_authenticated.load(Ordering::Relaxed) {
1178            return Err(BybitWsError::Authentication(
1179                "Must be authenticated to place orders".to_string(),
1180            ));
1181        }
1182
1183        let cmd = HandlerCommand::PlaceOrder {
1184            params,
1185            client_order_id,
1186            trader_id,
1187            strategy_id,
1188            instrument_id,
1189        };
1190
1191        self.send_cmd(cmd).await
1192    }
1193
1194    /// Amends an existing order via WebSocket.
1195    ///
1196    /// # Errors
1197    ///
1198    /// Returns an error if the amend request fails or if not authenticated.
1199    ///
1200    /// # References
1201    ///
1202    /// <https://bybit-exchange.github.io/docs/v5/websocket/trade/guideline>
1203    pub async fn amend_order(
1204        &self,
1205        params: BybitWsAmendOrderParams,
1206        client_order_id: ClientOrderId,
1207        trader_id: TraderId,
1208        strategy_id: StrategyId,
1209        instrument_id: InstrumentId,
1210        venue_order_id: Option<VenueOrderId>,
1211    ) -> BybitWsResult<()> {
1212        if !self.is_authenticated.load(Ordering::Relaxed) {
1213            return Err(BybitWsError::Authentication(
1214                "Must be authenticated to amend orders".to_string(),
1215            ));
1216        }
1217
1218        let cmd = HandlerCommand::AmendOrder {
1219            params,
1220            client_order_id,
1221            trader_id,
1222            strategy_id,
1223            instrument_id,
1224            venue_order_id,
1225        };
1226
1227        self.send_cmd(cmd).await
1228    }
1229
1230    /// Cancels an order via WebSocket.
1231    ///
1232    /// # Errors
1233    ///
1234    /// Returns an error if the cancel request fails or if not authenticated.
1235    ///
1236    /// # References
1237    ///
1238    /// <https://bybit-exchange.github.io/docs/v5/websocket/trade/guideline>
1239    pub async fn cancel_order(
1240        &self,
1241        params: BybitWsCancelOrderParams,
1242        client_order_id: ClientOrderId,
1243        trader_id: TraderId,
1244        strategy_id: StrategyId,
1245        instrument_id: InstrumentId,
1246        venue_order_id: Option<VenueOrderId>,
1247    ) -> BybitWsResult<()> {
1248        if !self.is_authenticated.load(Ordering::Relaxed) {
1249            return Err(BybitWsError::Authentication(
1250                "Must be authenticated to cancel orders".to_string(),
1251            ));
1252        }
1253
1254        let cmd = HandlerCommand::CancelOrder {
1255            params,
1256            client_order_id,
1257            trader_id,
1258            strategy_id,
1259            instrument_id,
1260            venue_order_id,
1261        };
1262
1263        self.send_cmd(cmd).await
1264    }
1265
1266    /// Batch creates multiple orders via WebSocket.
1267    ///
1268    /// # Errors
1269    ///
1270    /// Returns an error if the batch request fails or if not authenticated.
1271    ///
1272    /// # References
1273    ///
1274    /// <https://bybit-exchange.github.io/docs/v5/websocket/trade/guideline>
1275    pub async fn batch_place_orders(
1276        &self,
1277        trader_id: TraderId,
1278        strategy_id: StrategyId,
1279        orders: Vec<BybitWsPlaceOrderParams>,
1280    ) -> BybitWsResult<()> {
1281        if !self.is_authenticated.load(Ordering::Relaxed) {
1282            return Err(BybitWsError::Authentication(
1283                "Must be authenticated to place orders".to_string(),
1284            ));
1285        }
1286
1287        if orders.is_empty() {
1288            tracing::warn!("Batch place orders called with empty orders list");
1289            return Ok(());
1290        }
1291
1292        for chunk in orders.chunks(BATCH_PROCESSING_LIMIT) {
1293            self.batch_place_orders_chunk(trader_id, strategy_id, chunk.to_vec())
1294                .await?;
1295        }
1296
1297        Ok(())
1298    }
1299
1300    async fn batch_place_orders_chunk(
1301        &self,
1302        trader_id: TraderId,
1303        strategy_id: StrategyId,
1304        orders: Vec<BybitWsPlaceOrderParams>,
1305    ) -> BybitWsResult<()> {
1306        let category = orders[0].category;
1307        let batch_req_id = UUID4::new().to_string();
1308
1309        // Extract order tracking data before consuming orders to register with handler
1310        let mut batch_order_data = Vec::new();
1311        for order in &orders {
1312            if let Some(order_link_id_str) = &order.order_link_id {
1313                let client_order_id = ClientOrderId::from(order_link_id_str.as_str());
1314                let cache_key = make_bybit_symbol(order.symbol.as_str(), category);
1315                let instrument_id = self
1316                    .instruments_cache
1317                    .get(&cache_key)
1318                    .map(|inst| inst.id())
1319                    .ok_or_else(|| {
1320                        BybitWsError::ClientError(format!(
1321                            "Instrument {cache_key} not found in cache"
1322                        ))
1323                    })?;
1324                batch_order_data.push((
1325                    client_order_id,
1326                    (client_order_id, trader_id, strategy_id, instrument_id),
1327                ));
1328            }
1329        }
1330
1331        if !batch_order_data.is_empty() {
1332            let cmd = HandlerCommand::RegisterBatchPlace {
1333                req_id: batch_req_id.clone(),
1334                orders: batch_order_data,
1335            };
1336            let cmd_tx = self.cmd_tx.read().await;
1337            if let Err(e) = cmd_tx.send(cmd) {
1338                tracing::error!("Failed to send RegisterBatchPlace command: {e}");
1339            }
1340        }
1341
1342        let mm_level = self.mm_level.load(Ordering::Relaxed);
1343        let has_non_post_only = orders
1344            .iter()
1345            .any(|o| !matches!(o.time_in_force, Some(BybitTimeInForce::PostOnly)));
1346        let referer = if has_non_post_only || mm_level == 0 {
1347            Some(BYBIT_NAUTILUS_BROKER_ID.to_string())
1348        } else {
1349            None
1350        };
1351
1352        let request_items: Vec<BybitWsBatchPlaceItem> = orders
1353            .into_iter()
1354            .map(|order| BybitWsBatchPlaceItem {
1355                symbol: order.symbol,
1356                side: order.side,
1357                order_type: order.order_type,
1358                qty: order.qty,
1359                is_leverage: order.is_leverage,
1360                market_unit: order.market_unit,
1361                price: order.price,
1362                time_in_force: order.time_in_force,
1363                order_link_id: order.order_link_id,
1364                reduce_only: order.reduce_only,
1365                close_on_trigger: order.close_on_trigger,
1366                trigger_price: order.trigger_price,
1367                trigger_by: order.trigger_by,
1368                trigger_direction: order.trigger_direction,
1369                tpsl_mode: order.tpsl_mode,
1370                take_profit: order.take_profit,
1371                stop_loss: order.stop_loss,
1372                tp_trigger_by: order.tp_trigger_by,
1373                sl_trigger_by: order.sl_trigger_by,
1374                sl_trigger_price: order.sl_trigger_price,
1375                tp_trigger_price: order.tp_trigger_price,
1376                sl_order_type: order.sl_order_type,
1377                tp_order_type: order.tp_order_type,
1378                sl_limit_price: order.sl_limit_price,
1379                tp_limit_price: order.tp_limit_price,
1380            })
1381            .collect();
1382
1383        let args = BybitWsBatchPlaceOrderArgs {
1384            category,
1385            request: request_items,
1386        };
1387
1388        let request = BybitWsRequest {
1389            req_id: Some(batch_req_id),
1390            op: BybitWsOrderRequestOp::CreateBatch,
1391            header: BybitWsHeader::with_referer(referer),
1392            args: vec![args],
1393        };
1394
1395        let payload = serde_json::to_string(&request).map_err(BybitWsError::from)?;
1396
1397        self.send_text(&payload).await
1398    }
1399
1400    /// Batch amends multiple orders via WebSocket.
1401    ///
1402    /// # Errors
1403    ///
1404    /// Returns an error if the batch request fails or if not authenticated.
1405    pub async fn batch_amend_orders(
1406        &self,
1407        #[allow(unused_variables)] trader_id: TraderId,
1408        #[allow(unused_variables)] strategy_id: StrategyId,
1409        orders: Vec<BybitWsAmendOrderParams>,
1410    ) -> BybitWsResult<()> {
1411        if !self.is_authenticated.load(Ordering::Relaxed) {
1412            return Err(BybitWsError::Authentication(
1413                "Must be authenticated to amend orders".to_string(),
1414            ));
1415        }
1416
1417        if orders.is_empty() {
1418            tracing::warn!("Batch amend orders called with empty orders list");
1419            return Ok(());
1420        }
1421
1422        for chunk in orders.chunks(BATCH_PROCESSING_LIMIT) {
1423            self.batch_amend_orders_chunk(trader_id, strategy_id, chunk.to_vec())
1424                .await?;
1425        }
1426
1427        Ok(())
1428    }
1429
1430    async fn batch_amend_orders_chunk(
1431        &self,
1432        #[allow(unused_variables)] trader_id: TraderId,
1433        #[allow(unused_variables)] strategy_id: StrategyId,
1434        orders: Vec<BybitWsAmendOrderParams>,
1435    ) -> BybitWsResult<()> {
1436        let request = BybitWsRequest {
1437            req_id: None,
1438            op: BybitWsOrderRequestOp::AmendBatch,
1439            header: BybitWsHeader::now(),
1440            args: orders,
1441        };
1442
1443        let payload = serde_json::to_string(&request).map_err(BybitWsError::from)?;
1444
1445        self.send_text(&payload).await
1446    }
1447
1448    /// Batch cancels multiple orders via WebSocket.
1449    ///
1450    /// # Errors
1451    ///
1452    /// Returns an error if the batch request fails or if not authenticated.
1453    pub async fn batch_cancel_orders(
1454        &self,
1455        trader_id: TraderId,
1456        strategy_id: StrategyId,
1457        orders: Vec<BybitWsCancelOrderParams>,
1458    ) -> BybitWsResult<()> {
1459        if !self.is_authenticated.load(Ordering::Relaxed) {
1460            return Err(BybitWsError::Authentication(
1461                "Must be authenticated to cancel orders".to_string(),
1462            ));
1463        }
1464
1465        if orders.is_empty() {
1466            tracing::warn!("Batch cancel orders called with empty orders list");
1467            return Ok(());
1468        }
1469
1470        for chunk in orders.chunks(BATCH_PROCESSING_LIMIT) {
1471            self.batch_cancel_orders_chunk(trader_id, strategy_id, chunk.to_vec())
1472                .await?;
1473        }
1474
1475        Ok(())
1476    }
1477
1478    async fn batch_cancel_orders_chunk(
1479        &self,
1480        trader_id: TraderId,
1481        strategy_id: StrategyId,
1482        orders: Vec<BybitWsCancelOrderParams>,
1483    ) -> BybitWsResult<()> {
1484        if orders.is_empty() {
1485            return Ok(());
1486        }
1487
1488        let category = orders[0].category;
1489        let batch_req_id = UUID4::new().to_string();
1490
1491        let mut validated_data = Vec::new();
1492
1493        for order in &orders {
1494            if let Some(order_link_id_str) = &order.order_link_id {
1495                let cache_key = make_bybit_symbol(order.symbol.as_str(), category);
1496                let instrument_id = self
1497                    .instruments_cache
1498                    .get(&cache_key)
1499                    .map(|inst| inst.id())
1500                    .ok_or_else(|| {
1501                        BybitWsError::ClientError(format!(
1502                            "Instrument {cache_key} not found in cache"
1503                        ))
1504                    })?;
1505
1506                let venue_order_id = order
1507                    .order_id
1508                    .as_ref()
1509                    .map(|id| VenueOrderId::from(id.as_str()));
1510
1511                validated_data.push((order_link_id_str.clone(), instrument_id, venue_order_id));
1512            }
1513        }
1514
1515        let batch_cancel_data: Vec<_> = validated_data
1516            .iter()
1517            .map(|(order_link_id_str, instrument_id, venue_order_id)| {
1518                let client_order_id = ClientOrderId::from(order_link_id_str.as_str());
1519                (
1520                    client_order_id,
1521                    (
1522                        client_order_id,
1523                        trader_id,
1524                        strategy_id,
1525                        *instrument_id,
1526                        *venue_order_id,
1527                    ),
1528                )
1529            })
1530            .collect();
1531
1532        if !batch_cancel_data.is_empty() {
1533            let cmd = HandlerCommand::RegisterBatchCancel {
1534                req_id: batch_req_id.clone(),
1535                cancels: batch_cancel_data,
1536            };
1537            let cmd_tx = self.cmd_tx.read().await;
1538            if let Err(e) = cmd_tx.send(cmd) {
1539                tracing::error!("Failed to send RegisterBatchCancel command: {e}");
1540            }
1541        }
1542
1543        let request_items: Vec<BybitWsBatchCancelItem> = orders
1544            .into_iter()
1545            .map(|order| BybitWsBatchCancelItem {
1546                symbol: order.symbol,
1547                order_id: order.order_id,
1548                order_link_id: order.order_link_id,
1549            })
1550            .collect();
1551
1552        let args = BybitWsBatchCancelOrderArgs {
1553            category,
1554            request: request_items,
1555        };
1556
1557        let request = BybitWsRequest {
1558            req_id: Some(batch_req_id),
1559            op: BybitWsOrderRequestOp::CancelBatch,
1560            header: BybitWsHeader::now(),
1561            args: vec![args],
1562        };
1563
1564        let payload = serde_json::to_string(&request).map_err(BybitWsError::from)?;
1565
1566        self.send_text(&payload).await
1567    }
1568
1569    /// Submits an order using Nautilus domain objects.
1570    ///
1571    /// # Errors
1572    ///
1573    /// Returns an error if order submission fails or if not authenticated.
1574    #[allow(clippy::too_many_arguments)]
1575    pub async fn submit_order(
1576        &self,
1577        product_type: BybitProductType,
1578        trader_id: TraderId,
1579        strategy_id: StrategyId,
1580        instrument_id: InstrumentId,
1581        client_order_id: ClientOrderId,
1582        order_side: OrderSide,
1583        order_type: OrderType,
1584        quantity: Quantity,
1585        is_quote_quantity: bool,
1586        time_in_force: Option<TimeInForce>,
1587        price: Option<Price>,
1588        trigger_price: Option<Price>,
1589        post_only: Option<bool>,
1590        reduce_only: Option<bool>,
1591        is_leverage: bool,
1592    ) -> BybitWsResult<()> {
1593        let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1594            .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1595        let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1596
1597        let bybit_side = match order_side {
1598            OrderSide::Buy => BybitOrderSide::Buy,
1599            OrderSide::Sell => BybitOrderSide::Sell,
1600            _ => {
1601                return Err(BybitWsError::ClientError(format!(
1602                    "Invalid order side: {order_side:?}"
1603                )));
1604            }
1605        };
1606
1607        // For stop/conditional orders, Bybit uses Market/Limit with trigger parameters
1608        let (bybit_order_type, is_stop_order) = match order_type {
1609            OrderType::Market => (BybitOrderType::Market, false),
1610            OrderType::Limit => (BybitOrderType::Limit, false),
1611            OrderType::StopMarket | OrderType::MarketIfTouched => (BybitOrderType::Market, true),
1612            OrderType::StopLimit | OrderType::LimitIfTouched => (BybitOrderType::Limit, true),
1613            _ => {
1614                return Err(BybitWsError::ClientError(format!(
1615                    "Unsupported order type: {order_type:?}"
1616                )));
1617            }
1618        };
1619
1620        let bybit_tif = if bybit_order_type == BybitOrderType::Market {
1621            None
1622        } else if post_only == Some(true) {
1623            Some(BybitTimeInForce::PostOnly)
1624        } else if let Some(tif) = time_in_force {
1625            Some(match tif {
1626                TimeInForce::Gtc => BybitTimeInForce::Gtc,
1627                TimeInForce::Ioc => BybitTimeInForce::Ioc,
1628                TimeInForce::Fok => BybitTimeInForce::Fok,
1629                _ => {
1630                    return Err(BybitWsError::ClientError(format!(
1631                        "Unsupported time in force: {tif:?}"
1632                    )));
1633                }
1634            })
1635        } else {
1636            None
1637        };
1638
1639        // For SPOT market orders, specify baseCoin to interpret qty as base currency.
1640        // This ensures Nautilus quantities (always in base currency) are interpreted correctly.
1641        let market_unit = if product_type == BybitProductType::Spot
1642            && bybit_order_type == BybitOrderType::Market
1643        {
1644            if is_quote_quantity {
1645                Some(BYBIT_QUOTE_COIN.to_string())
1646            } else {
1647                Some(BYBIT_BASE_COIN.to_string())
1648            }
1649        } else {
1650            None
1651        };
1652
1653        // Only SPOT products support is_leverage parameter
1654        let is_leverage_value = if product_type == BybitProductType::Spot {
1655            Some(i32::from(is_leverage))
1656        } else {
1657            None
1658        };
1659
1660        // Stop semantics: Buy stops trigger on rise (breakout), sell stops trigger on fall (breakdown)
1661        // MIT semantics: Buy MIT triggers on fall (pullback entry), sell MIT triggers on rise (rally entry)
1662        let trigger_direction = if is_stop_order {
1663            match (order_type, order_side) {
1664                (OrderType::StopMarket | OrderType::StopLimit, OrderSide::Buy) => {
1665                    Some(BybitTriggerDirection::RisesTo as i32)
1666                }
1667                (OrderType::StopMarket | OrderType::StopLimit, OrderSide::Sell) => {
1668                    Some(BybitTriggerDirection::FallsTo as i32)
1669                }
1670                (OrderType::MarketIfTouched | OrderType::LimitIfTouched, OrderSide::Buy) => {
1671                    Some(BybitTriggerDirection::FallsTo as i32)
1672                }
1673                (OrderType::MarketIfTouched | OrderType::LimitIfTouched, OrderSide::Sell) => {
1674                    Some(BybitTriggerDirection::RisesTo as i32)
1675                }
1676                _ => None,
1677            }
1678        } else {
1679            None
1680        };
1681
1682        let params = if is_stop_order {
1683            // For conditional orders, ALL types use triggerPrice field
1684            // sl_trigger_price/tp_trigger_price are only for TP/SL attached to regular orders
1685            BybitWsPlaceOrderParams {
1686                category: product_type,
1687                symbol: raw_symbol,
1688                side: bybit_side,
1689                order_type: bybit_order_type,
1690                qty: quantity.to_string(),
1691                is_leverage: is_leverage_value,
1692                market_unit: market_unit.clone(),
1693                price: price.map(|p| p.to_string()),
1694                time_in_force: bybit_tif,
1695                order_link_id: Some(client_order_id.to_string()),
1696                reduce_only: reduce_only.filter(|&r| r),
1697                close_on_trigger: None,
1698                trigger_price: trigger_price.map(|p| p.to_string()),
1699                trigger_by: Some(BybitTriggerType::LastPrice),
1700                trigger_direction,
1701                tpsl_mode: None, // Not needed for standalone conditional orders
1702                take_profit: None,
1703                stop_loss: None,
1704                tp_trigger_by: None,
1705                sl_trigger_by: None,
1706                sl_trigger_price: None, // Not used for standalone stop orders
1707                tp_trigger_price: None, // Not used for standalone stop orders
1708                sl_order_type: None,
1709                tp_order_type: None,
1710                sl_limit_price: None,
1711                tp_limit_price: None,
1712            }
1713        } else {
1714            // Regular market/limit orders
1715            BybitWsPlaceOrderParams {
1716                category: product_type,
1717                symbol: raw_symbol,
1718                side: bybit_side,
1719                order_type: bybit_order_type,
1720                qty: quantity.to_string(),
1721                is_leverage: is_leverage_value,
1722                market_unit,
1723                price: price.map(|p| p.to_string()),
1724                time_in_force: if bybit_order_type == BybitOrderType::Market {
1725                    None
1726                } else {
1727                    bybit_tif
1728                },
1729                order_link_id: Some(client_order_id.to_string()),
1730                reduce_only: reduce_only.filter(|&r| r),
1731                close_on_trigger: None,
1732                trigger_price: None,
1733                trigger_by: None,
1734                trigger_direction: None,
1735                tpsl_mode: None,
1736                take_profit: None,
1737                stop_loss: None,
1738                tp_trigger_by: None,
1739                sl_trigger_by: None,
1740                sl_trigger_price: None,
1741                tp_trigger_price: None,
1742                sl_order_type: None,
1743                tp_order_type: None,
1744                sl_limit_price: None,
1745                tp_limit_price: None,
1746            }
1747        };
1748
1749        self.place_order(
1750            params,
1751            client_order_id,
1752            trader_id,
1753            strategy_id,
1754            instrument_id,
1755        )
1756        .await
1757    }
1758
1759    /// Modifies an existing order using Nautilus domain objects.
1760    ///
1761    /// # Errors
1762    ///
1763    /// Returns an error if modification fails or if not authenticated.
1764    #[allow(clippy::too_many_arguments)]
1765    pub async fn modify_order(
1766        &self,
1767        product_type: BybitProductType,
1768        trader_id: TraderId,
1769        strategy_id: StrategyId,
1770        instrument_id: InstrumentId,
1771        client_order_id: ClientOrderId,
1772        venue_order_id: Option<VenueOrderId>,
1773        quantity: Option<Quantity>,
1774        price: Option<Price>,
1775    ) -> BybitWsResult<()> {
1776        let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1777            .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1778        let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1779
1780        let params = BybitWsAmendOrderParams {
1781            category: product_type,
1782            symbol: raw_symbol,
1783            order_id: venue_order_id.map(|id| id.to_string()),
1784            order_link_id: Some(client_order_id.to_string()),
1785            qty: quantity.map(|q| q.to_string()),
1786            price: price.map(|p| p.to_string()),
1787            trigger_price: None,
1788            take_profit: None,
1789            stop_loss: None,
1790            tp_trigger_by: None,
1791            sl_trigger_by: None,
1792        };
1793
1794        self.amend_order(
1795            params,
1796            client_order_id,
1797            trader_id,
1798            strategy_id,
1799            instrument_id,
1800            venue_order_id,
1801        )
1802        .await
1803    }
1804
1805    /// Cancels an order using Nautilus domain objects.
1806    ///
1807    /// # Errors
1808    ///
1809    /// Returns an error if cancellation fails or if not authenticated.
1810    pub async fn cancel_order_by_id(
1811        &self,
1812        product_type: BybitProductType,
1813        trader_id: TraderId,
1814        strategy_id: StrategyId,
1815        instrument_id: InstrumentId,
1816        client_order_id: ClientOrderId,
1817        venue_order_id: Option<VenueOrderId>,
1818    ) -> BybitWsResult<()> {
1819        let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1820            .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1821        let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1822
1823        let params = BybitWsCancelOrderParams {
1824            category: product_type,
1825            symbol: raw_symbol,
1826            order_id: venue_order_id.map(|id| id.to_string()),
1827            order_link_id: Some(client_order_id.to_string()),
1828        };
1829
1830        self.cancel_order(
1831            params,
1832            client_order_id,
1833            trader_id,
1834            strategy_id,
1835            instrument_id,
1836            venue_order_id,
1837        )
1838        .await
1839    }
1840
1841    /// Builds order params for placing an order.
1842    #[allow(clippy::too_many_arguments)]
1843    pub fn build_place_order_params(
1844        &self,
1845        product_type: BybitProductType,
1846        instrument_id: InstrumentId,
1847        client_order_id: ClientOrderId,
1848        order_side: OrderSide,
1849        order_type: OrderType,
1850        quantity: Quantity,
1851        is_quote_quantity: bool,
1852        time_in_force: Option<TimeInForce>,
1853        price: Option<Price>,
1854        trigger_price: Option<Price>,
1855        post_only: Option<bool>,
1856        reduce_only: Option<bool>,
1857        is_leverage: bool,
1858    ) -> BybitWsResult<BybitWsPlaceOrderParams> {
1859        let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1860            .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1861        let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1862
1863        let bybit_side = match order_side {
1864            OrderSide::Buy => BybitOrderSide::Buy,
1865            OrderSide::Sell => BybitOrderSide::Sell,
1866            _ => {
1867                return Err(BybitWsError::ClientError(format!(
1868                    "Invalid order side: {order_side:?}"
1869                )));
1870            }
1871        };
1872
1873        let (bybit_order_type, is_stop_order) = match order_type {
1874            OrderType::Market => (BybitOrderType::Market, false),
1875            OrderType::Limit => (BybitOrderType::Limit, false),
1876            OrderType::StopMarket | OrderType::MarketIfTouched => (BybitOrderType::Market, true),
1877            OrderType::StopLimit | OrderType::LimitIfTouched => (BybitOrderType::Limit, true),
1878            _ => {
1879                return Err(BybitWsError::ClientError(format!(
1880                    "Unsupported order type: {order_type:?}"
1881                )));
1882            }
1883        };
1884
1885        let bybit_tif = if post_only == Some(true) {
1886            Some(BybitTimeInForce::PostOnly)
1887        } else if let Some(tif) = time_in_force {
1888            Some(match tif {
1889                TimeInForce::Gtc => BybitTimeInForce::Gtc,
1890                TimeInForce::Ioc => BybitTimeInForce::Ioc,
1891                TimeInForce::Fok => BybitTimeInForce::Fok,
1892                _ => {
1893                    return Err(BybitWsError::ClientError(format!(
1894                        "Unsupported time in force: {tif:?}"
1895                    )));
1896                }
1897            })
1898        } else {
1899            None
1900        };
1901
1902        let market_unit = if product_type == BybitProductType::Spot
1903            && bybit_order_type == BybitOrderType::Market
1904        {
1905            if is_quote_quantity {
1906                Some(BYBIT_QUOTE_COIN.to_string())
1907            } else {
1908                Some(BYBIT_BASE_COIN.to_string())
1909            }
1910        } else {
1911            None
1912        };
1913
1914        // Only SPOT products support is_leverage parameter
1915        let is_leverage_value = if product_type == BybitProductType::Spot {
1916            Some(i32::from(is_leverage))
1917        } else {
1918            None
1919        };
1920
1921        // Stop semantics: Buy stops trigger on rise (breakout), sell stops trigger on fall (breakdown)
1922        // MIT semantics: Buy MIT triggers on fall (pullback entry), sell MIT triggers on rise (rally entry)
1923        let trigger_direction = if is_stop_order {
1924            match (order_type, order_side) {
1925                (OrderType::StopMarket | OrderType::StopLimit, OrderSide::Buy) => {
1926                    Some(BybitTriggerDirection::RisesTo as i32)
1927                }
1928                (OrderType::StopMarket | OrderType::StopLimit, OrderSide::Sell) => {
1929                    Some(BybitTriggerDirection::FallsTo as i32)
1930                }
1931                (OrderType::MarketIfTouched | OrderType::LimitIfTouched, OrderSide::Buy) => {
1932                    Some(BybitTriggerDirection::FallsTo as i32)
1933                }
1934                (OrderType::MarketIfTouched | OrderType::LimitIfTouched, OrderSide::Sell) => {
1935                    Some(BybitTriggerDirection::RisesTo as i32)
1936                }
1937                _ => None,
1938            }
1939        } else {
1940            None
1941        };
1942
1943        let params = if is_stop_order {
1944            BybitWsPlaceOrderParams {
1945                category: product_type,
1946                symbol: raw_symbol,
1947                side: bybit_side,
1948                order_type: bybit_order_type,
1949                qty: quantity.to_string(),
1950                is_leverage: is_leverage_value,
1951                market_unit,
1952                price: price.map(|p| p.to_string()),
1953                time_in_force: if bybit_order_type == BybitOrderType::Market {
1954                    None
1955                } else {
1956                    bybit_tif
1957                },
1958                order_link_id: Some(client_order_id.to_string()),
1959                reduce_only: reduce_only.filter(|&r| r),
1960                close_on_trigger: None,
1961                trigger_price: trigger_price.map(|p| p.to_string()),
1962                trigger_by: Some(BybitTriggerType::LastPrice),
1963                trigger_direction,
1964                tpsl_mode: None,
1965                take_profit: None,
1966                stop_loss: None,
1967                tp_trigger_by: None,
1968                sl_trigger_by: None,
1969                sl_trigger_price: None,
1970                tp_trigger_price: None,
1971                sl_order_type: None,
1972                tp_order_type: None,
1973                sl_limit_price: None,
1974                tp_limit_price: None,
1975            }
1976        } else {
1977            BybitWsPlaceOrderParams {
1978                category: product_type,
1979                symbol: raw_symbol,
1980                side: bybit_side,
1981                order_type: bybit_order_type,
1982                qty: quantity.to_string(),
1983                is_leverage: is_leverage_value,
1984                market_unit,
1985                price: price.map(|p| p.to_string()),
1986                time_in_force: if bybit_order_type == BybitOrderType::Market {
1987                    None
1988                } else {
1989                    bybit_tif
1990                },
1991                order_link_id: Some(client_order_id.to_string()),
1992                reduce_only: reduce_only.filter(|&r| r),
1993                close_on_trigger: None,
1994                trigger_price: None,
1995                trigger_by: None,
1996                trigger_direction: None,
1997                tpsl_mode: None,
1998                take_profit: None,
1999                stop_loss: None,
2000                tp_trigger_by: None,
2001                sl_trigger_by: None,
2002                sl_trigger_price: None,
2003                tp_trigger_price: None,
2004                sl_order_type: None,
2005                tp_order_type: None,
2006                sl_limit_price: None,
2007                tp_limit_price: None,
2008            }
2009        };
2010
2011        Ok(params)
2012    }
2013
2014    /// Builds order params for amending an order.
2015    #[allow(clippy::too_many_arguments)]
2016    pub fn build_amend_order_params(
2017        &self,
2018        product_type: BybitProductType,
2019        instrument_id: InstrumentId,
2020        venue_order_id: Option<VenueOrderId>,
2021        client_order_id: Option<ClientOrderId>,
2022        quantity: Option<Quantity>,
2023        price: Option<Price>,
2024    ) -> BybitWsResult<BybitWsAmendOrderParams> {
2025        let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
2026            .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
2027        let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
2028
2029        Ok(BybitWsAmendOrderParams {
2030            category: product_type,
2031            symbol: raw_symbol,
2032            order_id: venue_order_id.map(|v| v.to_string()),
2033            order_link_id: client_order_id.map(|c| c.to_string()),
2034            qty: quantity.map(|q| q.to_string()),
2035            price: price.map(|p| p.to_string()),
2036            trigger_price: None,
2037            take_profit: None,
2038            stop_loss: None,
2039            tp_trigger_by: None,
2040            sl_trigger_by: None,
2041        })
2042    }
2043
2044    /// Builds order params for canceling an order via WebSocket.
2045    ///
2046    /// # Errors
2047    ///
2048    /// Returns an error if symbol parsing fails or if neither venue_order_id
2049    /// nor client_order_id is provided.
2050    pub fn build_cancel_order_params(
2051        &self,
2052        product_type: BybitProductType,
2053        instrument_id: InstrumentId,
2054        venue_order_id: Option<VenueOrderId>,
2055        client_order_id: Option<ClientOrderId>,
2056    ) -> BybitWsResult<BybitWsCancelOrderParams> {
2057        if venue_order_id.is_none() && client_order_id.is_none() {
2058            return Err(BybitWsError::ClientError(
2059                "Either venue_order_id or client_order_id must be provided".to_string(),
2060            ));
2061        }
2062
2063        let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
2064            .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
2065        let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
2066
2067        Ok(BybitWsCancelOrderParams {
2068            category: product_type,
2069            symbol: raw_symbol,
2070            order_id: venue_order_id.map(|v| v.to_string()),
2071            order_link_id: client_order_id.map(|c| c.to_string()),
2072        })
2073    }
2074
2075    fn default_headers() -> Vec<(String, String)> {
2076        vec![
2077            ("Content-Type".to_string(), "application/json".to_string()),
2078            ("User-Agent".to_string(), NAUTILUS_USER_AGENT.to_string()),
2079        ]
2080    }
2081
2082    async fn authenticate_if_required(&self) -> BybitWsResult<()> {
2083        if !self.requires_auth {
2084            return Ok(());
2085        }
2086
2087        let credential = self.credential.as_ref().ok_or_else(|| {
2088            BybitWsError::Authentication("Credentials required for authentication".to_string())
2089        })?;
2090
2091        let expires = chrono::Utc::now().timestamp_millis() + WEBSOCKET_AUTH_WINDOW_MS;
2092        let signature = credential.sign_websocket_auth(expires);
2093
2094        let auth_message = BybitAuthRequest {
2095            op: BybitWsOperation::Auth,
2096            args: vec![
2097                Value::String(credential.api_key().to_string()),
2098                Value::Number(expires.into()),
2099                Value::String(signature),
2100            ],
2101        };
2102
2103        let payload = serde_json::to_string(&auth_message)?;
2104
2105        self.cmd_tx
2106            .read()
2107            .await
2108            .send(HandlerCommand::Authenticate { payload })
2109            .map_err(|e| BybitWsError::Send(format!("Failed to send auth command: {e}")))?;
2110
2111        // Authentication will be processed asynchronously by the handler
2112        // The handler will emit NautilusWsMessage::Authenticated when successful
2113        Ok(())
2114    }
2115
2116    async fn send_text(&self, text: &str) -> BybitWsResult<()> {
2117        let cmd = HandlerCommand::SendText {
2118            payload: text.to_string(),
2119        };
2120
2121        self.send_cmd(cmd).await
2122    }
2123
2124    async fn send_cmd(&self, cmd: HandlerCommand) -> BybitWsResult<()> {
2125        self.cmd_tx
2126            .read()
2127            .await
2128            .send(cmd)
2129            .map_err(|e| BybitWsError::Send(e.to_string()))
2130    }
2131}
2132
2133#[cfg(test)]
2134mod tests {
2135    use rstest::rstest;
2136
2137    use super::*;
2138    use crate::{
2139        common::testing::load_test_json,
2140        websocket::{classify_bybit_message, messages::BybitWsMessage},
2141    };
2142
2143    #[rstest]
2144    fn classify_orderbook_snapshot() {
2145        let json: Value = serde_json::from_str(&load_test_json("ws_orderbook_snapshot.json"))
2146            .expect("invalid fixture");
2147        let message = classify_bybit_message(json);
2148        assert!(matches!(message, BybitWsMessage::Orderbook(_)));
2149    }
2150
2151    #[rstest]
2152    fn classify_trade_snapshot() {
2153        let json: Value =
2154            serde_json::from_str(&load_test_json("ws_public_trade.json")).expect("invalid fixture");
2155        let message = classify_bybit_message(json);
2156        assert!(matches!(message, BybitWsMessage::Trade(_)));
2157    }
2158
2159    #[rstest]
2160    fn classify_ticker_linear_snapshot() {
2161        let json: Value = serde_json::from_str(&load_test_json("ws_ticker_linear.json"))
2162            .expect("invalid fixture");
2163        let message = classify_bybit_message(json);
2164        assert!(matches!(message, BybitWsMessage::TickerLinear(_)));
2165    }
2166
2167    #[rstest]
2168    fn classify_ticker_option_snapshot() {
2169        let json: Value = serde_json::from_str(&load_test_json("ws_ticker_option.json"))
2170            .expect("invalid fixture");
2171        let message = classify_bybit_message(json);
2172        assert!(matches!(message, BybitWsMessage::TickerOption(_)));
2173    }
2174
2175    #[rstest]
2176    fn test_race_unsubscribe_failure_recovery() {
2177        // Simulates the race condition where venue rejects an unsubscribe request.
2178        // The adapter must perform the 3-step recovery:
2179        // 1. confirm_unsubscribe() - clear pending_unsubscribe
2180        // 2. mark_subscribe() - mark as subscribing again
2181        // 3. confirm_subscribe() - restore to confirmed state
2182        let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER); // Bybit uses dot delimiter
2183
2184        let topic = "publicTrade.BTCUSDT";
2185
2186        // Initial subscribe flow
2187        subscriptions.mark_subscribe(topic);
2188        subscriptions.confirm_subscribe(topic);
2189        assert_eq!(subscriptions.len(), 1);
2190
2191        // User unsubscribes
2192        subscriptions.mark_unsubscribe(topic);
2193        assert_eq!(subscriptions.len(), 0);
2194        assert_eq!(subscriptions.pending_unsubscribe_topics(), vec![topic]);
2195
2196        // Venue REJECTS the unsubscribe (error message)
2197        // Adapter must perform 3-step recovery (from lines 2181-2183)
2198        subscriptions.confirm_unsubscribe(topic); // Step 1: clear pending_unsubscribe
2199        subscriptions.mark_subscribe(topic); // Step 2: mark as subscribing
2200        subscriptions.confirm_subscribe(topic); // Step 3: confirm subscription
2201
2202        // Verify recovery: topic should be back in confirmed state
2203        assert_eq!(subscriptions.len(), 1);
2204        assert!(subscriptions.pending_unsubscribe_topics().is_empty());
2205        assert!(subscriptions.pending_subscribe_topics().is_empty());
2206
2207        // Verify topic is in all_topics() for reconnect
2208        let all = subscriptions.all_topics();
2209        assert_eq!(all.len(), 1);
2210        assert!(all.contains(&topic.to_string()));
2211    }
2212
2213    #[rstest]
2214    fn test_race_resubscribe_before_unsubscribe_ack() {
2215        // Simulates: User unsubscribes, then immediately resubscribes before
2216        // the unsubscribe ACK arrives from the venue.
2217        // This is the race condition fixed in the subscription tracker.
2218        let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER); // Bybit uses dot delimiter
2219
2220        let topic = "orderbook.50.BTCUSDT";
2221
2222        // Initial subscribe
2223        subscriptions.mark_subscribe(topic);
2224        subscriptions.confirm_subscribe(topic);
2225        assert_eq!(subscriptions.len(), 1);
2226
2227        // User unsubscribes
2228        subscriptions.mark_unsubscribe(topic);
2229        assert_eq!(subscriptions.len(), 0);
2230        assert_eq!(subscriptions.pending_unsubscribe_topics(), vec![topic]);
2231
2232        // User immediately changes mind and resubscribes (before unsubscribe ACK)
2233        subscriptions.mark_subscribe(topic);
2234        assert_eq!(subscriptions.pending_subscribe_topics(), vec![topic]);
2235
2236        // NOW the unsubscribe ACK arrives - should NOT clear pending_subscribe
2237        subscriptions.confirm_unsubscribe(topic);
2238        assert!(subscriptions.pending_unsubscribe_topics().is_empty());
2239        assert_eq!(subscriptions.pending_subscribe_topics(), vec![topic]);
2240
2241        // Subscribe ACK arrives
2242        subscriptions.confirm_subscribe(topic);
2243        assert_eq!(subscriptions.len(), 1);
2244        assert!(subscriptions.pending_subscribe_topics().is_empty());
2245
2246        // Verify final state is correct
2247        let all = subscriptions.all_topics();
2248        assert_eq!(all.len(), 1);
2249        assert!(all.contains(&topic.to_string()));
2250    }
2251
2252    #[rstest]
2253    fn test_race_late_subscribe_confirmation_after_unsubscribe() {
2254        // Simulates: User subscribes, then unsubscribes before subscribe ACK arrives.
2255        // The late subscribe ACK should be ignored.
2256        let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER); // Bybit uses dot delimiter
2257
2258        let topic = "tickers.ETHUSDT";
2259
2260        // User subscribes
2261        subscriptions.mark_subscribe(topic);
2262        assert_eq!(subscriptions.pending_subscribe_topics(), vec![topic]);
2263
2264        // User immediately unsubscribes (before subscribe ACK)
2265        subscriptions.mark_unsubscribe(topic);
2266        assert!(subscriptions.pending_subscribe_topics().is_empty()); // Cleared
2267        assert_eq!(subscriptions.pending_unsubscribe_topics(), vec![topic]);
2268
2269        // Late subscribe confirmation arrives - should be IGNORED
2270        subscriptions.confirm_subscribe(topic);
2271        assert_eq!(subscriptions.len(), 0); // Not added to confirmed
2272        assert_eq!(subscriptions.pending_unsubscribe_topics(), vec![topic]);
2273
2274        // Unsubscribe ACK arrives
2275        subscriptions.confirm_unsubscribe(topic);
2276
2277        // Final state: completely empty
2278        assert!(subscriptions.is_empty());
2279        assert!(subscriptions.all_topics().is_empty());
2280    }
2281
2282    #[rstest]
2283    fn test_race_reconnection_with_pending_states() {
2284        // Simulates reconnection with mixed pending states.
2285        let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER); // Bybit uses dot delimiter
2286
2287        // Set up mixed state before reconnection
2288        // Confirmed: publicTrade.BTCUSDT
2289        let trade_btc = "publicTrade.BTCUSDT";
2290        subscriptions.mark_subscribe(trade_btc);
2291        subscriptions.confirm_subscribe(trade_btc);
2292
2293        // Pending subscribe: publicTrade.ETHUSDT
2294        let trade_eth = "publicTrade.ETHUSDT";
2295        subscriptions.mark_subscribe(trade_eth);
2296
2297        // Pending unsubscribe: orderbook.50.BTCUSDT (user cancelled)
2298        let book_btc = "orderbook.50.BTCUSDT";
2299        subscriptions.mark_subscribe(book_btc);
2300        subscriptions.confirm_subscribe(book_btc);
2301        subscriptions.mark_unsubscribe(book_btc);
2302
2303        // Get topics for reconnection
2304        let topics_to_restore = subscriptions.all_topics();
2305
2306        // Should include: confirmed + pending_subscribe (NOT pending_unsubscribe)
2307        assert_eq!(topics_to_restore.len(), 2);
2308        assert!(topics_to_restore.contains(&trade_btc.to_string()));
2309        assert!(topics_to_restore.contains(&trade_eth.to_string()));
2310        assert!(!topics_to_restore.contains(&book_btc.to_string())); // Excluded
2311    }
2312
2313    #[rstest]
2314    fn test_race_duplicate_subscribe_messages_idempotent() {
2315        // Simulates duplicate subscribe requests (e.g., from reconnection logic).
2316        // The subscription tracker should be idempotent and not create duplicate state.
2317        let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER); // Bybit uses dot delimiter
2318
2319        let topic = "publicTrade.BTCUSDT";
2320
2321        // Subscribe and confirm
2322        subscriptions.mark_subscribe(topic);
2323        subscriptions.confirm_subscribe(topic);
2324        assert_eq!(subscriptions.len(), 1);
2325
2326        // Duplicate mark_subscribe on already-confirmed topic (should be no-op)
2327        subscriptions.mark_subscribe(topic);
2328        assert!(subscriptions.pending_subscribe_topics().is_empty()); // Not re-added
2329        assert_eq!(subscriptions.len(), 1); // Still just 1
2330
2331        // Duplicate confirm_subscribe (should be idempotent)
2332        subscriptions.confirm_subscribe(topic);
2333        assert_eq!(subscriptions.len(), 1);
2334
2335        // Verify final state
2336        let all = subscriptions.all_topics();
2337        assert_eq!(all.len(), 1);
2338        assert_eq!(all[0], topic);
2339    }
2340
2341    #[rstest]
2342    #[case::spot_with_leverage(BybitProductType::Spot, true, Some(1))]
2343    #[case::spot_without_leverage(BybitProductType::Spot, false, Some(0))]
2344    #[case::linear_with_leverage(BybitProductType::Linear, true, None)]
2345    #[case::linear_without_leverage(BybitProductType::Linear, false, None)]
2346    #[case::inverse_with_leverage(BybitProductType::Inverse, true, None)]
2347    #[case::option_with_leverage(BybitProductType::Option, true, None)]
2348    fn test_is_leverage_parameter(
2349        #[case] product_type: BybitProductType,
2350        #[case] is_leverage: bool,
2351        #[case] expected: Option<i32>,
2352    ) {
2353        let symbol = match product_type {
2354            BybitProductType::Spot => "BTCUSDT-SPOT.BYBIT",
2355            BybitProductType::Linear => "ETHUSDT-LINEAR.BYBIT",
2356            BybitProductType::Inverse => "BTCUSD-INVERSE.BYBIT",
2357            BybitProductType::Option => "BTC-31MAY24-50000-C-OPTION.BYBIT",
2358        };
2359
2360        let instrument_id = InstrumentId::from(symbol);
2361        let client_order_id = ClientOrderId::from("test-order-1");
2362        let quantity = Quantity::from("1.0");
2363
2364        let client = BybitWebSocketClient::new_trade(
2365            BybitEnvironment::Testnet,
2366            Some("test-key".to_string()),
2367            Some("test-secret".to_string()),
2368            None,
2369            Some(20),
2370        );
2371
2372        let params = client
2373            .build_place_order_params(
2374                product_type,
2375                instrument_id,
2376                client_order_id,
2377                OrderSide::Buy,
2378                OrderType::Limit,
2379                quantity,
2380                false, // is_quote_quantity
2381                Some(TimeInForce::Gtc),
2382                Some(Price::from("50000.0")),
2383                None,
2384                None,
2385                None,
2386                is_leverage,
2387            )
2388            .expect("Failed to build params");
2389
2390        assert_eq!(params.is_leverage, expected);
2391    }
2392
2393    #[rstest]
2394    #[case::spot_market_quote_quantity(BybitProductType::Spot, OrderType::Market, true, Some(BYBIT_QUOTE_COIN.to_string()))]
2395    #[case::spot_market_base_quantity(BybitProductType::Spot, OrderType::Market, false, Some(BYBIT_BASE_COIN.to_string()))]
2396    #[case::spot_limit_no_unit(BybitProductType::Spot, OrderType::Limit, false, None)]
2397    #[case::spot_limit_quote(BybitProductType::Spot, OrderType::Limit, true, None)]
2398    #[case::linear_market_no_unit(BybitProductType::Linear, OrderType::Market, false, None)]
2399    #[case::inverse_market_no_unit(BybitProductType::Inverse, OrderType::Market, true, None)]
2400    fn test_is_quote_quantity_parameter(
2401        #[case] product_type: BybitProductType,
2402        #[case] order_type: OrderType,
2403        #[case] is_quote_quantity: bool,
2404        #[case] expected: Option<String>,
2405    ) {
2406        let symbol = match product_type {
2407            BybitProductType::Spot => "BTCUSDT-SPOT.BYBIT",
2408            BybitProductType::Linear => "ETHUSDT-LINEAR.BYBIT",
2409            BybitProductType::Inverse => "BTCUSD-INVERSE.BYBIT",
2410            BybitProductType::Option => "BTC-31MAY24-50000-C-OPTION.BYBIT",
2411        };
2412
2413        let instrument_id = InstrumentId::from(symbol);
2414        let client_order_id = ClientOrderId::from("test-order-1");
2415        let quantity = Quantity::from("1.0");
2416
2417        let client = BybitWebSocketClient::new_trade(
2418            BybitEnvironment::Testnet,
2419            Some("test-key".to_string()),
2420            Some("test-secret".to_string()),
2421            None,
2422            Some(20),
2423        );
2424
2425        let params = client
2426            .build_place_order_params(
2427                product_type,
2428                instrument_id,
2429                client_order_id,
2430                OrderSide::Buy,
2431                order_type,
2432                quantity,
2433                is_quote_quantity,
2434                Some(TimeInForce::Gtc),
2435                if order_type == OrderType::Market {
2436                    None
2437                } else {
2438                    Some(Price::from("50000.0"))
2439                },
2440                None,
2441                None,
2442                None,
2443                false,
2444            )
2445            .expect("Failed to build params");
2446
2447        assert_eq!(params.market_unit, expected);
2448    }
2449}