Skip to main content

nautilus_bybit/websocket/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Bybit WebSocket client providing public market data streaming.
17//!
18//! Bybit API reference <https://bybit-exchange.github.io/docs/>.
19
20use std::{
21    fmt::Debug,
22    sync::{
23        Arc,
24        atomic::{AtomicBool, AtomicU8, Ordering},
25    },
26    time::Duration,
27};
28
29use ahash::AHashMap;
30use arc_swap::ArcSwap;
31use dashmap::DashMap;
32use nautilus_common::live::get_runtime;
33use nautilus_core::{UUID4, consts::NAUTILUS_USER_AGENT, env::get_or_env_var_opt};
34use nautilus_model::{
35    data::BarType,
36    enums::{AggregationSource, BarAggregation, OrderSide, OrderType, PriceType, TimeInForce},
37    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
38    instruments::{Instrument, InstrumentAny},
39    types::{Price, Quantity},
40};
41use nautilus_network::{
42    backoff::ExponentialBackoff,
43    mode::ConnectionMode,
44    websocket::{
45        AuthTracker, PingHandler, SubscriptionState, WebSocketClient, WebSocketConfig,
46        channel_message_handler,
47    },
48};
49use serde_json::Value;
50use tokio_util::sync::CancellationToken;
51use ustr::Ustr;
52
53use crate::{
54    common::{
55        consts::{
56            BYBIT_BASE_COIN, BYBIT_NAUTILUS_BROKER_ID, BYBIT_QUOTE_COIN, BYBIT_WS_TOPIC_DELIMITER,
57        },
58        credential::Credential,
59        enums::{
60            BybitEnvironment, BybitOrderSide, BybitOrderType, BybitProductType, BybitTimeInForce,
61            BybitTriggerDirection, BybitTriggerType, BybitWsOrderRequestOp,
62        },
63        parse::{bar_spec_to_bybit_interval, extract_raw_symbol, make_bybit_symbol},
64        symbol::BybitSymbol,
65        urls::{bybit_ws_private_url, bybit_ws_public_url, bybit_ws_trade_url},
66    },
67    websocket::{
68        enums::{BybitWsOperation, BybitWsPrivateChannel, BybitWsPublicChannel},
69        error::{BybitWsError, BybitWsResult},
70        handler::{FeedHandler, HandlerCommand},
71        messages::{
72            BybitAuthRequest, BybitSubscription, BybitWsAmendOrderParams, BybitWsBatchCancelItem,
73            BybitWsBatchCancelOrderArgs, BybitWsBatchPlaceItem, BybitWsBatchPlaceOrderArgs,
74            BybitWsCancelOrderParams, BybitWsHeader, BybitWsPlaceOrderParams, BybitWsRequest,
75            NautilusWsMessage,
76        },
77    },
78};
79
80const DEFAULT_HEARTBEAT_SECS: u64 = 20;
81const WEBSOCKET_AUTH_WINDOW_MS: i64 = 5_000;
82const BATCH_PROCESSING_LIMIT: usize = 20;
83
84/// Type alias for the funding rate cache.
85type FundingCache = Arc<tokio::sync::RwLock<AHashMap<Ustr, (Option<String>, Option<String>)>>>;
86
87/// Resolves credentials from provided values or environment variables.
88///
89/// Priority for environment variables based on environment:
90/// - Demo: `BYBIT_DEMO_API_KEY`, `BYBIT_DEMO_API_SECRET`
91/// - Testnet: `BYBIT_TESTNET_API_KEY`, `BYBIT_TESTNET_API_SECRET`
92/// - Mainnet: `BYBIT_API_KEY`, `BYBIT_API_SECRET`
93fn resolve_credential(
94    environment: BybitEnvironment,
95    api_key: Option<String>,
96    api_secret: Option<String>,
97) -> Option<Credential> {
98    let (api_key_env, api_secret_env) = match environment {
99        BybitEnvironment::Demo => ("BYBIT_DEMO_API_KEY", "BYBIT_DEMO_API_SECRET"),
100        BybitEnvironment::Testnet => ("BYBIT_TESTNET_API_KEY", "BYBIT_TESTNET_API_SECRET"),
101        BybitEnvironment::Mainnet => ("BYBIT_API_KEY", "BYBIT_API_SECRET"),
102    };
103
104    let key = get_or_env_var_opt(api_key, api_key_env);
105    let secret = get_or_env_var_opt(api_secret, api_secret_env);
106
107    match (key, secret) {
108        (Some(k), Some(s)) => Some(Credential::new(k, s)),
109        _ => None,
110    }
111}
112
113/// Public/market data WebSocket client for Bybit.
114#[cfg_attr(feature = "python", pyo3::pyclass(from_py_object))]
115pub struct BybitWebSocketClient {
116    url: String,
117    environment: BybitEnvironment,
118    product_type: Option<BybitProductType>,
119    credential: Option<Credential>,
120    requires_auth: bool,
121    auth_tracker: AuthTracker,
122    heartbeat: Option<u64>,
123    connection_mode: Arc<ArcSwap<AtomicU8>>,
124    cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
125    out_rx: Option<Arc<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>>,
126    signal: Arc<AtomicBool>,
127    task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
128    subscriptions: SubscriptionState,
129    account_id: Option<AccountId>,
130    mm_level: Arc<AtomicU8>,
131    bars_timestamp_on_close: bool,
132    instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
133    bar_types_cache: Arc<DashMap<String, BarType>>,
134    funding_cache: FundingCache,
135    cancellation_token: CancellationToken,
136}
137
138impl Debug for BybitWebSocketClient {
139    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
140        f.debug_struct(stringify!(BybitWebSocketClient))
141            .field("url", &self.url)
142            .field("environment", &self.environment)
143            .field("product_type", &self.product_type)
144            .field("requires_auth", &self.requires_auth)
145            .field("heartbeat", &self.heartbeat)
146            .field("confirmed_subscriptions", &self.subscriptions.len())
147            .finish()
148    }
149}
150
151impl Clone for BybitWebSocketClient {
152    fn clone(&self) -> Self {
153        Self {
154            url: self.url.clone(),
155            environment: self.environment,
156            product_type: self.product_type,
157            credential: self.credential.clone(),
158            requires_auth: self.requires_auth,
159            auth_tracker: self.auth_tracker.clone(),
160            heartbeat: self.heartbeat,
161            connection_mode: Arc::clone(&self.connection_mode),
162            cmd_tx: Arc::clone(&self.cmd_tx),
163            out_rx: None, // Each clone gets its own receiver
164            signal: Arc::clone(&self.signal),
165            task_handle: None, // Each clone gets its own task handle
166            subscriptions: self.subscriptions.clone(),
167            account_id: self.account_id,
168            mm_level: Arc::clone(&self.mm_level),
169            bars_timestamp_on_close: self.bars_timestamp_on_close,
170            instruments_cache: Arc::clone(&self.instruments_cache),
171            bar_types_cache: Arc::clone(&self.bar_types_cache),
172            funding_cache: Arc::clone(&self.funding_cache),
173            cancellation_token: self.cancellation_token.clone(),
174        }
175    }
176}
177
178impl BybitWebSocketClient {
179    /// Creates a new Bybit public WebSocket client.
180    #[must_use]
181    pub fn new_public(url: Option<String>, heartbeat: Option<u64>) -> Self {
182        Self::new_public_with(
183            BybitProductType::Linear,
184            BybitEnvironment::Mainnet,
185            url,
186            heartbeat,
187        )
188    }
189
190    /// Creates a new Bybit public WebSocket client targeting the specified product/environment.
191    #[must_use]
192    pub fn new_public_with(
193        product_type: BybitProductType,
194        environment: BybitEnvironment,
195        url: Option<String>,
196        heartbeat: Option<u64>,
197    ) -> Self {
198        // We don't have a handler yet; this placeholder keeps cache_instrument() working.
199        // connect() swaps in the real channel and replays any queued instruments so the
200        // handler sees them once it starts.
201        let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
202
203        let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
204        let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
205
206        Self {
207            url: url.unwrap_or_else(|| bybit_ws_public_url(product_type, environment)),
208            environment,
209            product_type: Some(product_type),
210            credential: None,
211            requires_auth: false,
212            auth_tracker: AuthTracker::new(),
213            heartbeat: heartbeat.or(Some(DEFAULT_HEARTBEAT_SECS)),
214            connection_mode,
215            cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
216            out_rx: None,
217            signal: Arc::new(AtomicBool::new(false)),
218            task_handle: None,
219            subscriptions: SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER),
220            instruments_cache: Arc::new(DashMap::new()),
221            bar_types_cache: Arc::new(DashMap::new()),
222            account_id: None,
223            mm_level: Arc::new(AtomicU8::new(0)),
224            bars_timestamp_on_close: true,
225            funding_cache: Arc::new(tokio::sync::RwLock::new(AHashMap::new())),
226            cancellation_token: CancellationToken::new(),
227        }
228    }
229
230    /// Creates a new Bybit private WebSocket client.
231    ///
232    /// If `api_key` or `api_secret` are not provided, they will be loaded from
233    /// environment variables based on the environment:
234    /// - Demo: `BYBIT_DEMO_API_KEY`, `BYBIT_DEMO_API_SECRET`
235    /// - Testnet: `BYBIT_TESTNET_API_KEY`, `BYBIT_TESTNET_API_SECRET`
236    /// - Mainnet: `BYBIT_API_KEY`, `BYBIT_API_SECRET`
237    #[must_use]
238    pub fn new_private(
239        environment: BybitEnvironment,
240        api_key: Option<String>,
241        api_secret: Option<String>,
242        url: Option<String>,
243        heartbeat: Option<u64>,
244    ) -> Self {
245        let credential = resolve_credential(environment, api_key, api_secret);
246
247        // We don't have a handler yet; this placeholder keeps cache_instrument() working.
248        // connect() swaps in the real channel and replays any queued instruments so the
249        // handler sees them once it starts.
250        let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
251
252        let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
253        let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
254
255        Self {
256            url: url.unwrap_or_else(|| bybit_ws_private_url(environment).to_string()),
257            environment,
258            product_type: None,
259            credential,
260            requires_auth: true,
261            auth_tracker: AuthTracker::new(),
262            heartbeat: heartbeat.or(Some(DEFAULT_HEARTBEAT_SECS)),
263            connection_mode,
264            cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
265            out_rx: None,
266            signal: Arc::new(AtomicBool::new(false)),
267            task_handle: None,
268            subscriptions: SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER),
269            instruments_cache: Arc::new(DashMap::new()),
270            bar_types_cache: Arc::new(DashMap::new()),
271            account_id: None,
272            mm_level: Arc::new(AtomicU8::new(0)),
273            bars_timestamp_on_close: true,
274            funding_cache: Arc::new(tokio::sync::RwLock::new(AHashMap::new())),
275            cancellation_token: CancellationToken::new(),
276        }
277    }
278
279    /// Creates a new Bybit trade WebSocket client for order operations.
280    ///
281    /// If `api_key` or `api_secret` are not provided, they will be loaded from
282    /// environment variables based on the environment:
283    /// - Demo: `BYBIT_DEMO_API_KEY`, `BYBIT_DEMO_API_SECRET`
284    /// - Testnet: `BYBIT_TESTNET_API_KEY`, `BYBIT_TESTNET_API_SECRET`
285    /// - Mainnet: `BYBIT_API_KEY`, `BYBIT_API_SECRET`
286    #[must_use]
287    pub fn new_trade(
288        environment: BybitEnvironment,
289        api_key: Option<String>,
290        api_secret: Option<String>,
291        url: Option<String>,
292        heartbeat: Option<u64>,
293    ) -> Self {
294        let credential = resolve_credential(environment, api_key, api_secret);
295
296        // We don't have a handler yet; this placeholder keeps cache_instrument() working.
297        // connect() swaps in the real channel and replays any queued instruments so the
298        // handler sees them once it starts.
299        let (cmd_tx, _) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
300
301        let initial_mode = AtomicU8::new(ConnectionMode::Closed.as_u8());
302        let connection_mode = Arc::new(ArcSwap::from_pointee(initial_mode));
303
304        Self {
305            url: url.unwrap_or_else(|| bybit_ws_trade_url(environment).to_string()),
306            environment,
307            product_type: None,
308            credential,
309            requires_auth: true,
310            auth_tracker: AuthTracker::new(),
311            heartbeat: heartbeat.or(Some(DEFAULT_HEARTBEAT_SECS)),
312            connection_mode,
313            cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
314            out_rx: None,
315            signal: Arc::new(AtomicBool::new(false)),
316            task_handle: None,
317            subscriptions: SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER),
318            instruments_cache: Arc::new(DashMap::new()),
319            bar_types_cache: Arc::new(DashMap::new()),
320            account_id: None,
321            mm_level: Arc::new(AtomicU8::new(0)),
322            bars_timestamp_on_close: true,
323            funding_cache: Arc::new(tokio::sync::RwLock::new(AHashMap::new())),
324            cancellation_token: CancellationToken::new(),
325        }
326    }
327
328    /// Establishes the WebSocket connection.
329    ///
330    /// # Errors
331    ///
332    /// Returns an error if the underlying WebSocket connection cannot be established,
333    /// after retrying multiple times with exponential backoff.
334    pub async fn connect(&mut self) -> BybitWsResult<()> {
335        const MAX_RETRIES: u32 = 5;
336        const CONNECTION_TIMEOUT_SECS: u64 = 10;
337
338        self.signal.store(false, Ordering::Relaxed);
339
340        let (raw_handler, raw_rx) = channel_message_handler();
341
342        // No-op ping handler: handler owns the WebSocketClient and responds to pings directly
343        // in the message loop for minimal latency (see handler.rs pong response)
344        let ping_handler: PingHandler = Arc::new(move |_payload: Vec<u8>| {
345            // Handler responds to pings internally via select! loop
346        });
347
348        let ping_msg = serde_json::to_string(&BybitSubscription {
349            op: BybitWsOperation::Ping,
350            args: vec![],
351        })?;
352
353        let config = WebSocketConfig {
354            url: self.url.clone(),
355            headers: Self::default_headers(),
356            heartbeat: self.heartbeat,
357            heartbeat_msg: Some(ping_msg),
358            reconnect_timeout_ms: Some(5_000),
359            reconnect_delay_initial_ms: Some(500),
360            reconnect_delay_max_ms: Some(5_000),
361            reconnect_backoff_factor: Some(1.5),
362            reconnect_jitter_ms: Some(250),
363            reconnect_max_attempts: None,
364        };
365
366        // Retry initial connection with exponential backoff to handle transient DNS/network issues
367        // TODO: Eventually expose client config options for this
368        let mut backoff = ExponentialBackoff::new(
369            Duration::from_millis(500),
370            Duration::from_millis(5000),
371            2.0,
372            250,
373            false,
374        )
375        .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
376
377        #[allow(unused_assignments)]
378        let mut last_error = String::new();
379        let mut attempt = 0;
380        let client = loop {
381            attempt += 1;
382
383            match tokio::time::timeout(
384                Duration::from_secs(CONNECTION_TIMEOUT_SECS),
385                WebSocketClient::connect(
386                    config.clone(),
387                    Some(raw_handler.clone()),
388                    Some(ping_handler.clone()),
389                    None,
390                    vec![],
391                    None,
392                ),
393            )
394            .await
395            {
396                Ok(Ok(client)) => {
397                    if attempt > 1 {
398                        log::info!("WebSocket connection established after {attempt} attempts");
399                    }
400                    break client;
401                }
402                Ok(Err(e)) => {
403                    last_error = e.to_string();
404                    log::warn!(
405                        "WebSocket connection attempt failed: attempt={attempt}, max_retries={MAX_RETRIES}, url={}, error={last_error}",
406                        self.url
407                    );
408                }
409                Err(_) => {
410                    last_error = format!(
411                        "Connection timeout after {CONNECTION_TIMEOUT_SECS}s (possible DNS resolution failure)"
412                    );
413                    log::warn!(
414                        "WebSocket connection attempt timed out: attempt={attempt}, max_retries={MAX_RETRIES}, url={}",
415                        self.url
416                    );
417                }
418            }
419
420            if attempt >= MAX_RETRIES {
421                return Err(BybitWsError::Transport(format!(
422                    "Failed to connect to {} after {MAX_RETRIES} attempts: {}. \
423                    If this is a DNS error, check your network configuration and DNS settings.",
424                    self.url,
425                    if last_error.is_empty() {
426                        "unknown error"
427                    } else {
428                        &last_error
429                    }
430                )));
431            }
432
433            let delay = backoff.next_duration();
434            log::debug!(
435                "Retrying in {delay:?} (attempt {}/{MAX_RETRIES})",
436                attempt + 1
437            );
438            tokio::time::sleep(delay).await;
439        };
440
441        self.connection_mode.store(client.connection_mode_atomic());
442
443        let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
444        self.out_rx = Some(Arc::new(out_rx));
445
446        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
447        *self.cmd_tx.write().await = cmd_tx.clone();
448
449        let cmd = HandlerCommand::SetClient(client);
450
451        self.send_cmd(cmd).await?;
452
453        // Replay cached instruments to the new handler via the new channel
454        if !self.instruments_cache.is_empty() {
455            let cached_instruments: Vec<InstrumentAny> = self
456                .instruments_cache
457                .iter()
458                .map(|entry| entry.value().clone())
459                .collect();
460            let cmd = HandlerCommand::InitializeInstruments(cached_instruments);
461            self.send_cmd(cmd).await?;
462        }
463
464        let signal = Arc::clone(&self.signal);
465        let subscriptions = self.subscriptions.clone();
466        let credential = self.credential.clone();
467        let requires_auth = self.requires_auth;
468        let funding_cache = Arc::clone(&self.funding_cache);
469        let bar_types_cache = Arc::clone(&self.bar_types_cache);
470        let account_id = self.account_id;
471        let product_type = self.product_type;
472        let bars_timestamp_on_close = self.bars_timestamp_on_close;
473        let mm_level = Arc::clone(&self.mm_level);
474        let cmd_tx_for_reconnect = cmd_tx.clone();
475        let auth_tracker = self.auth_tracker.clone();
476        let auth_tracker_for_handler = auth_tracker.clone();
477
478        let stream_handle = get_runtime().spawn(async move {
479            let mut handler = FeedHandler::new(
480                signal.clone(),
481                cmd_rx,
482                raw_rx,
483                out_tx.clone(),
484                account_id,
485                product_type,
486                bars_timestamp_on_close,
487                mm_level.clone(),
488                auth_tracker_for_handler,
489                subscriptions.clone(),
490                funding_cache.clone(),
491                bar_types_cache.clone(),
492            );
493
494            // Helper closure to resubscribe all tracked subscriptions after reconnection
495            let resubscribe_all = || async {
496                let topics = subscriptions.all_topics();
497
498                if topics.is_empty() {
499                    return;
500                }
501
502                log::debug!(
503                    "Resubscribing to confirmed subscriptions: count={}",
504                    topics.len()
505                );
506
507                for topic in &topics {
508                    subscriptions.mark_subscribe(topic.as_str());
509                }
510
511                let mut payloads = Vec::with_capacity(topics.len());
512                for topic in &topics {
513                    let message = BybitSubscription {
514                        op: BybitWsOperation::Subscribe,
515                        args: vec![topic.clone()],
516                    };
517                    if let Ok(payload) = serde_json::to_string(&message) {
518                        payloads.push(payload);
519                    }
520                }
521
522                let cmd = HandlerCommand::Subscribe { topics: payloads };
523
524                if let Err(e) = cmd_tx_for_reconnect.send(cmd) {
525                    log::error!("Failed to send resubscribe command: {e}");
526                }
527            };
528
529            // Run message processing with reconnection handling
530            loop {
531                match handler.next().await {
532                    Some(NautilusWsMessage::Reconnected) => {
533                        if signal.load(Ordering::Relaxed) {
534                            continue;
535                        }
536
537                        log::info!("WebSocket reconnected");
538
539                        // Mark all confirmed subscriptions as failed so they transition to pending state
540                        let confirmed_topics: Vec<String> = {
541                            let confirmed = subscriptions.confirmed();
542                            let mut topics = Vec::new();
543                            for entry in confirmed.iter() {
544                                let (channel, symbols) = entry.pair();
545                                for symbol in symbols {
546                                    if symbol.is_empty() {
547                                        topics.push(channel.to_string());
548                                    } else {
549                                        topics.push(format!("{channel}.{symbol}"));
550                                    }
551                                }
552                            }
553                            topics
554                        };
555
556                        if !confirmed_topics.is_empty() {
557                            log::debug!(
558                                "Marking confirmed subscriptions as pending for replay: count={}",
559                                confirmed_topics.len()
560                            );
561                            for topic in confirmed_topics {
562                                subscriptions.mark_failure(&topic);
563                            }
564                        }
565
566                        // Clear caches to prevent stale data after reconnection
567                        funding_cache.write().await.clear();
568
569                        if requires_auth {
570                            log::debug!("Re-authenticating after reconnection");
571
572                            if let Some(cred) = &credential {
573                                // Begin auth attempt so succeed() will update state
574                                let _rx = auth_tracker.begin();
575
576                                let expires = chrono::Utc::now().timestamp_millis()
577                                    + WEBSOCKET_AUTH_WINDOW_MS;
578                                let signature = cred.sign_websocket_auth(expires);
579
580                                let auth_message = BybitAuthRequest {
581                                    op: BybitWsOperation::Auth,
582                                    args: vec![
583                                        Value::String(cred.api_key().to_string()),
584                                        Value::Number(expires.into()),
585                                        Value::String(signature),
586                                    ],
587                                };
588
589                                if let Ok(payload) = serde_json::to_string(&auth_message) {
590                                    let cmd = HandlerCommand::Authenticate { payload };
591                                    if let Err(e) = cmd_tx_for_reconnect.send(cmd) {
592                                        log::error!(
593                                            "Failed to send reconnection auth command: error={e}"
594                                        );
595                                    }
596                                } else {
597                                    log::error!("Failed to serialize reconnection auth message");
598                                }
599                            }
600                        }
601
602                        // Unauthenticated sessions resubscribe immediately after reconnection,
603                        // authenticated sessions wait for Authenticated message
604                        if !requires_auth {
605                            log::debug!("No authentication required, resubscribing immediately");
606                            resubscribe_all().await;
607                        }
608
609                        // Forward to out_tx so caller sees the Reconnected message
610                        if out_tx.send(NautilusWsMessage::Reconnected).is_err() {
611                            log::debug!("Receiver dropped, stopping");
612                            break;
613                        }
614                        continue;
615                    }
616                    Some(NautilusWsMessage::Authenticated) => {
617                        log::debug!("Authenticated, resubscribing");
618                        resubscribe_all().await;
619                        continue;
620                    }
621                    Some(msg) => {
622                        if out_tx.send(msg).is_err() {
623                            log::error!("Failed to send message (receiver dropped)");
624                            break;
625                        }
626                    }
627                    None => {
628                        // Stream ended - check if it's a stop signal
629                        if handler.is_stopped() {
630                            log::debug!("Stop signal received, ending message processing");
631                            break;
632                        }
633                        // Otherwise it's an unexpected stream end
634                        log::warn!("WebSocket stream ended unexpectedly");
635                        break;
636                    }
637                }
638            }
639
640            log::debug!("Handler task exiting");
641        });
642
643        self.task_handle = Some(Arc::new(stream_handle));
644
645        if requires_auth && let Err(e) = self.authenticate_if_required().await {
646            return Err(e);
647        }
648
649        Ok(())
650    }
651
652    /// Disconnects the WebSocket client and stops the background task.
653    pub async fn close(&mut self) -> BybitWsResult<()> {
654        log::debug!("Starting close process");
655
656        self.signal.store(true, Ordering::Relaxed);
657
658        let cmd = HandlerCommand::Disconnect;
659        if let Err(e) = self.cmd_tx.read().await.send(cmd) {
660            log::debug!(
661                "Failed to send disconnect command (handler may already be shut down): {e}"
662            );
663        }
664
665        if let Some(task_handle) = self.task_handle.take() {
666            match Arc::try_unwrap(task_handle) {
667                Ok(handle) => {
668                    log::debug!("Waiting for task handle to complete");
669                    match tokio::time::timeout(Duration::from_secs(2), handle).await {
670                        Ok(Ok(())) => log::debug!("Task handle completed successfully"),
671                        Ok(Err(e)) => log::error!("Task handle encountered an error: {e:?}"),
672                        Err(_) => {
673                            log::warn!(
674                                "Timeout waiting for task handle, task may still be running"
675                            );
676                            // The task will be dropped and should clean up automatically
677                        }
678                    }
679                }
680                Err(arc_handle) => {
681                    log::debug!(
682                        "Cannot take ownership of task handle - other references exist, aborting task"
683                    );
684                    arc_handle.abort();
685                }
686            }
687        } else {
688            log::debug!("No task handle to await");
689        }
690
691        self.auth_tracker.invalidate();
692
693        log::debug!("Closed");
694
695        Ok(())
696    }
697
698    /// Returns a value indicating whether the client is active.
699    #[must_use]
700    pub fn is_active(&self) -> bool {
701        let connection_mode_arc = self.connection_mode.load();
702        ConnectionMode::from_atomic(&connection_mode_arc).is_active()
703            && !self.signal.load(Ordering::Relaxed)
704    }
705
706    /// Returns a value indicating whether the client is closed.
707    pub fn is_closed(&self) -> bool {
708        let connection_mode_arc = self.connection_mode.load();
709        ConnectionMode::from_atomic(&connection_mode_arc).is_closed()
710            || self.signal.load(Ordering::Relaxed)
711    }
712
713    /// Waits until the WebSocket client becomes active or times out.
714    ///
715    /// # Errors
716    ///
717    /// Returns an error if the timeout is exceeded before the client becomes active.
718    pub async fn wait_until_active(&self, timeout_secs: f64) -> BybitWsResult<()> {
719        let timeout = tokio::time::Duration::from_secs_f64(timeout_secs);
720
721        tokio::time::timeout(timeout, async {
722            while !self.is_active() {
723                tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
724            }
725        })
726        .await
727        .map_err(|_| {
728            BybitWsError::ClientError(format!(
729                "WebSocket connection timeout after {timeout_secs} seconds"
730            ))
731        })?;
732
733        Ok(())
734    }
735
736    /// Subscribe to the provided topic strings.
737    pub async fn subscribe(&self, topics: Vec<String>) -> BybitWsResult<()> {
738        if topics.is_empty() {
739            return Ok(());
740        }
741
742        log::debug!("Subscribing to topics: {topics:?}");
743
744        // Use reference counting to deduplicate subscriptions
745        let mut topics_to_send = Vec::new();
746
747        for topic in topics {
748            // Returns true if this is the first subscription (ref count 0 -> 1)
749            if self.subscriptions.add_reference(&topic) {
750                self.subscriptions.mark_subscribe(&topic);
751                topics_to_send.push(topic.clone());
752            } else {
753                log::debug!("Already subscribed to {topic}, skipping duplicate subscription");
754            }
755        }
756
757        if topics_to_send.is_empty() {
758            return Ok(());
759        }
760
761        // Serialize subscription messages
762        let mut payloads = Vec::with_capacity(topics_to_send.len());
763        for topic in &topics_to_send {
764            let message = BybitSubscription {
765                op: BybitWsOperation::Subscribe,
766                args: vec![topic.clone()],
767            };
768            let payload = serde_json::to_string(&message).map_err(|e| {
769                BybitWsError::Json(format!("Failed to serialize subscription: {e}"))
770            })?;
771            payloads.push(payload);
772        }
773
774        let cmd = HandlerCommand::Subscribe { topics: payloads };
775        self.cmd_tx
776            .read()
777            .await
778            .send(cmd)
779            .map_err(|e| BybitWsError::Send(format!("Failed to send subscribe command: {e}")))?;
780
781        Ok(())
782    }
783
784    /// Unsubscribe from the provided topics.
785    pub async fn unsubscribe(&self, topics: Vec<String>) -> BybitWsResult<()> {
786        if topics.is_empty() {
787            return Ok(());
788        }
789
790        log::debug!("Attempting to unsubscribe from topics: {topics:?}");
791
792        if self.signal.load(Ordering::Relaxed) {
793            log::debug!("Shutdown signal detected, skipping unsubscribe");
794            return Ok(());
795        }
796
797        // Use reference counting to avoid unsubscribing while other consumers still need the topic
798        let mut topics_to_send = Vec::new();
799
800        for topic in topics {
801            // Returns true if this was the last subscription (ref count 1 -> 0)
802            if self.subscriptions.remove_reference(&topic) {
803                self.subscriptions.mark_unsubscribe(&topic);
804                topics_to_send.push(topic.clone());
805            } else {
806                log::debug!("Topic {topic} still has active subscriptions, not unsubscribing");
807            }
808        }
809
810        if topics_to_send.is_empty() {
811            return Ok(());
812        }
813
814        // Serialize unsubscription messages
815        let mut payloads = Vec::with_capacity(topics_to_send.len());
816        for topic in &topics_to_send {
817            let message = BybitSubscription {
818                op: BybitWsOperation::Unsubscribe,
819                args: vec![topic.clone()],
820            };
821            if let Ok(payload) = serde_json::to_string(&message) {
822                payloads.push(payload);
823            }
824        }
825
826        let cmd = HandlerCommand::Unsubscribe { topics: payloads };
827        if let Err(e) = self.cmd_tx.read().await.send(cmd) {
828            log::debug!("Failed to send unsubscribe command: error={e}");
829        }
830
831        Ok(())
832    }
833
834    /// Returns a stream of parsed [`NautilusWsMessage`] items.
835    ///
836    /// # Panics
837    ///
838    /// Panics if called before [`Self::connect`] or if the stream has already been taken.
839    pub fn stream(&mut self) -> impl futures_util::Stream<Item = NautilusWsMessage> + use<> {
840        let rx = self
841            .out_rx
842            .take()
843            .expect("Stream receiver already taken or client not connected");
844        let mut rx = Arc::try_unwrap(rx).expect("Cannot take ownership - other references exist");
845        async_stream::stream! {
846            while let Some(msg) = rx.recv().await {
847                yield msg;
848            }
849        }
850    }
851
852    /// Returns the number of currently registered subscriptions.
853    #[must_use]
854    pub fn subscription_count(&self) -> usize {
855        self.subscriptions.len()
856    }
857
858    /// Returns the credential associated with this client, if any.
859    #[must_use]
860    pub fn credential(&self) -> Option<&Credential> {
861        self.credential.as_ref()
862    }
863
864    /// Caches a single instrument.
865    ///
866    /// Any existing instrument with the same ID will be replaced.
867    pub fn cache_instrument(&self, instrument: InstrumentAny) {
868        self.instruments_cache
869            .insert(instrument.symbol().inner(), instrument.clone());
870
871        // Before connect() the handler isn't running; this send will fail and that's expected
872        // because connect() replays the instruments via InitializeInstruments
873        if let Ok(cmd_tx) = self.cmd_tx.try_read() {
874            let cmd = HandlerCommand::UpdateInstrument(instrument);
875            if let Err(e) = cmd_tx.send(cmd) {
876                log::debug!("Failed to send instrument update to handler: {e}");
877            }
878        }
879    }
880
881    /// Caches multiple instruments.
882    ///
883    /// Clears the existing cache first, then adds all provided instruments.
884    pub fn cache_instruments(&mut self, instruments: Vec<InstrumentAny>) {
885        self.instruments_cache.clear();
886        let mut count = 0;
887
888        log::debug!("Initializing Bybit instrument cache");
889
890        for inst in instruments {
891            let symbol = inst.symbol().inner();
892            self.instruments_cache.insert(symbol, inst.clone());
893            log::debug!("Cached instrument: {symbol}");
894            count += 1;
895        }
896
897        log::info!("Bybit instrument cache initialized with {count} instruments");
898    }
899
900    /// Sets the account ID for account message parsing.
901    pub fn set_account_id(&mut self, account_id: AccountId) {
902        self.account_id = Some(account_id);
903    }
904
905    /// Sets the account market maker level.
906    pub fn set_mm_level(&self, mm_level: u8) {
907        self.mm_level.store(mm_level, Ordering::Relaxed);
908    }
909
910    /// Sets whether bar timestamps should use the close time.
911    ///
912    /// When `true` (default), bar `ts_event` is set to the bar's close time.
913    /// When `false`, bar `ts_event` is set to the bar's open time.
914    pub fn set_bars_timestamp_on_close(&mut self, value: bool) {
915        self.bars_timestamp_on_close = value;
916    }
917
918    /// Returns a reference to the instruments cache.
919    #[must_use]
920    pub fn instruments(&self) -> &Arc<DashMap<Ustr, InstrumentAny>> {
921        &self.instruments_cache
922    }
923
924    /// Returns the account ID if set.
925    #[must_use]
926    pub fn account_id(&self) -> Option<AccountId> {
927        self.account_id
928    }
929
930    /// Returns the product type for public connections.
931    #[must_use]
932    pub fn product_type(&self) -> Option<BybitProductType> {
933        self.product_type
934    }
935
936    /// Subscribes to orderbook updates for a specific instrument.
937    ///
938    /// # Errors
939    ///
940    /// Returns an error if the subscription request fails.
941    ///
942    /// # References
943    ///
944    /// <https://bybit-exchange.github.io/docs/v5/websocket/public/orderbook>
945    pub async fn subscribe_orderbook(
946        &self,
947        instrument_id: InstrumentId,
948        depth: u32,
949    ) -> BybitWsResult<()> {
950        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
951        let topic = format!(
952            "{}.{depth}.{raw_symbol}",
953            BybitWsPublicChannel::OrderBook.as_ref()
954        );
955        self.subscribe(vec![topic]).await
956    }
957
958    /// Unsubscribes from orderbook updates for a specific instrument.
959    pub async fn unsubscribe_orderbook(
960        &self,
961        instrument_id: InstrumentId,
962        depth: u32,
963    ) -> BybitWsResult<()> {
964        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
965        let topic = format!(
966            "{}.{depth}.{raw_symbol}",
967            BybitWsPublicChannel::OrderBook.as_ref()
968        );
969        self.unsubscribe(vec![topic]).await
970    }
971
972    /// Subscribes to public trade updates for a specific instrument.
973    ///
974    /// # Errors
975    ///
976    /// Returns an error if the subscription request fails.
977    ///
978    /// # References
979    ///
980    /// <https://bybit-exchange.github.io/docs/v5/websocket/public/trade>
981    pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
982        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
983        let topic = format!(
984            "{}.{raw_symbol}",
985            BybitWsPublicChannel::PublicTrade.as_ref()
986        );
987        self.subscribe(vec![topic]).await
988    }
989
990    /// Unsubscribes from public trade updates for a specific instrument.
991    pub async fn unsubscribe_trades(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
992        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
993        let topic = format!(
994            "{}.{raw_symbol}",
995            BybitWsPublicChannel::PublicTrade.as_ref()
996        );
997        self.unsubscribe(vec![topic]).await
998    }
999
1000    /// Subscribes to ticker updates for a specific instrument.
1001    ///
1002    /// # Errors
1003    ///
1004    /// Returns an error if the subscription request fails.
1005    ///
1006    /// # References
1007    ///
1008    /// <https://bybit-exchange.github.io/docs/v5/websocket/public/ticker>
1009    pub async fn subscribe_ticker(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
1010        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
1011        let topic = format!("{}.{raw_symbol}", BybitWsPublicChannel::Tickers.as_ref());
1012        self.subscribe(vec![topic]).await
1013    }
1014
1015    /// Unsubscribes from ticker updates for a specific instrument.
1016    pub async fn unsubscribe_ticker(&self, instrument_id: InstrumentId) -> BybitWsResult<()> {
1017        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
1018        let topic = format!("{}.{raw_symbol}", BybitWsPublicChannel::Tickers.as_ref());
1019
1020        // Clear funding rate cache to ensure fresh data on resubscribe
1021        let symbol = self.product_type.map_or_else(
1022            || instrument_id.symbol.inner(),
1023            |pt| make_bybit_symbol(raw_symbol, pt),
1024        );
1025        self.funding_cache.write().await.remove(&symbol);
1026
1027        self.unsubscribe(vec![topic]).await
1028    }
1029
1030    /// Subscribes to kline/candlestick updates for a specific instrument.
1031    ///
1032    /// # Errors
1033    ///
1034    /// Returns an error if the subscription request fails.
1035    ///
1036    /// # References
1037    ///
1038    /// <https://bybit-exchange.github.io/docs/v5/websocket/public/kline>
1039    pub async fn subscribe_bars(&self, bar_type: BarType) -> BybitWsResult<()> {
1040        let spec = bar_type.spec();
1041
1042        // Bybit klines are always last-trade OHLC from the exchange
1043        if spec.price_type != PriceType::Last {
1044            return Err(BybitWsError::ClientError(format!(
1045                "Bybit bars only support LAST price type, received {}",
1046                spec.price_type
1047            )));
1048        }
1049        if bar_type.aggregation_source() != AggregationSource::External {
1050            return Err(BybitWsError::ClientError(format!(
1051                "Bybit bars only support EXTERNAL aggregation source, received {}",
1052                bar_type.aggregation_source()
1053            )));
1054        }
1055
1056        // Reject MINUTE with step >= 60, require HOUR aggregation instead
1057        let step = spec.step.get();
1058        if spec.aggregation == BarAggregation::Minute && step >= 60 {
1059            let hours = step / 60;
1060            return Err(BybitWsError::ClientError(format!(
1061                "Invalid bar type: {step}-MINUTE not supported, use {hours}-HOUR instead"
1062            )));
1063        }
1064
1065        let interval = bar_spec_to_bybit_interval(spec.aggregation, spec.step.get() as u64)
1066            .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1067
1068        let instrument_id = bar_type.instrument_id();
1069        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
1070        let topic = format!(
1071            "{}.{}.{raw_symbol}",
1072            BybitWsPublicChannel::Kline.as_ref(),
1073            interval
1074        );
1075
1076        // Coordinate with reference counting to avoid duplicate cache entries
1077        if self.subscriptions.get_reference_count(&topic) == 0 {
1078            self.bar_types_cache.insert(topic.clone(), bar_type);
1079        }
1080
1081        self.subscribe(vec![topic]).await
1082    }
1083
1084    /// Unsubscribes from kline/candlestick updates for a specific instrument.
1085    pub async fn unsubscribe_bars(&self, bar_type: BarType) -> BybitWsResult<()> {
1086        let spec = bar_type.spec();
1087        let interval = bar_spec_to_bybit_interval(spec.aggregation, spec.step.get() as u64)
1088            .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1089
1090        let instrument_id = bar_type.instrument_id();
1091        let raw_symbol = extract_raw_symbol(instrument_id.symbol.as_str());
1092        let topic = format!(
1093            "{}.{}.{raw_symbol}",
1094            BybitWsPublicChannel::Kline.as_ref(),
1095            interval
1096        );
1097
1098        // Coordinate with reference counting to preserve cache for other subscribers
1099        if self.subscriptions.get_reference_count(&topic) == 1 {
1100            self.bar_types_cache.remove(&topic);
1101        }
1102
1103        self.unsubscribe(vec![topic]).await
1104    }
1105
1106    /// Subscribes to order updates.
1107    ///
1108    /// # Errors
1109    ///
1110    /// Returns an error if the subscription request fails or if not authenticated.
1111    ///
1112    /// # References
1113    ///
1114    /// <https://bybit-exchange.github.io/docs/v5/websocket/private/order>
1115    pub async fn subscribe_orders(&self) -> BybitWsResult<()> {
1116        if !self.requires_auth {
1117            return Err(BybitWsError::Authentication(
1118                "Order subscription requires authentication".to_string(),
1119            ));
1120        }
1121        self.subscribe(vec![BybitWsPrivateChannel::Order.as_ref().to_string()])
1122            .await
1123    }
1124
1125    /// Unsubscribes from order updates.
1126    pub async fn unsubscribe_orders(&self) -> BybitWsResult<()> {
1127        self.unsubscribe(vec![BybitWsPrivateChannel::Order.as_ref().to_string()])
1128            .await
1129    }
1130
1131    /// Subscribes to execution/fill updates.
1132    ///
1133    /// # Errors
1134    ///
1135    /// Returns an error if the subscription request fails or if not authenticated.
1136    ///
1137    /// # References
1138    ///
1139    /// <https://bybit-exchange.github.io/docs/v5/websocket/private/execution>
1140    pub async fn subscribe_executions(&self) -> BybitWsResult<()> {
1141        if !self.requires_auth {
1142            return Err(BybitWsError::Authentication(
1143                "Execution subscription requires authentication".to_string(),
1144            ));
1145        }
1146        self.subscribe(vec![BybitWsPrivateChannel::Execution.as_ref().to_string()])
1147            .await
1148    }
1149
1150    /// Unsubscribes from execution/fill updates.
1151    pub async fn unsubscribe_executions(&self) -> BybitWsResult<()> {
1152        self.unsubscribe(vec![BybitWsPrivateChannel::Execution.as_ref().to_string()])
1153            .await
1154    }
1155
1156    /// Subscribes to position updates.
1157    ///
1158    /// # Errors
1159    ///
1160    /// Returns an error if the subscription request fails or if not authenticated.
1161    ///
1162    /// # References
1163    ///
1164    /// <https://bybit-exchange.github.io/docs/v5/websocket/private/position>
1165    pub async fn subscribe_positions(&self) -> BybitWsResult<()> {
1166        if !self.requires_auth {
1167            return Err(BybitWsError::Authentication(
1168                "Position subscription requires authentication".to_string(),
1169            ));
1170        }
1171        self.subscribe(vec![BybitWsPrivateChannel::Position.as_ref().to_string()])
1172            .await
1173    }
1174
1175    /// Unsubscribes from position updates.
1176    pub async fn unsubscribe_positions(&self) -> BybitWsResult<()> {
1177        self.unsubscribe(vec![BybitWsPrivateChannel::Position.as_ref().to_string()])
1178            .await
1179    }
1180
1181    /// Subscribes to wallet/balance updates.
1182    ///
1183    /// # Errors
1184    ///
1185    /// Returns an error if the subscription request fails or if not authenticated.
1186    ///
1187    /// # References
1188    ///
1189    /// <https://bybit-exchange.github.io/docs/v5/websocket/private/wallet>
1190    pub async fn subscribe_wallet(&self) -> BybitWsResult<()> {
1191        if !self.requires_auth {
1192            return Err(BybitWsError::Authentication(
1193                "Wallet subscription requires authentication".to_string(),
1194            ));
1195        }
1196        self.subscribe(vec![BybitWsPrivateChannel::Wallet.as_ref().to_string()])
1197            .await
1198    }
1199
1200    /// Unsubscribes from wallet/balance updates.
1201    pub async fn unsubscribe_wallet(&self) -> BybitWsResult<()> {
1202        self.unsubscribe(vec![BybitWsPrivateChannel::Wallet.as_ref().to_string()])
1203            .await
1204    }
1205
1206    /// Places an order via WebSocket.
1207    ///
1208    /// # Errors
1209    ///
1210    /// Returns an error if the order request fails or if not authenticated.
1211    ///
1212    /// # References
1213    ///
1214    /// <https://bybit-exchange.github.io/docs/v5/websocket/trade/guideline>
1215    pub async fn place_order(
1216        &self,
1217        params: BybitWsPlaceOrderParams,
1218        client_order_id: ClientOrderId,
1219        trader_id: TraderId,
1220        strategy_id: StrategyId,
1221        instrument_id: InstrumentId,
1222    ) -> BybitWsResult<()> {
1223        if !self.auth_tracker.is_authenticated() {
1224            return Err(BybitWsError::Authentication(
1225                "Must be authenticated to place orders".to_string(),
1226            ));
1227        }
1228
1229        let cmd = HandlerCommand::PlaceOrder {
1230            params,
1231            client_order_id,
1232            trader_id,
1233            strategy_id,
1234            instrument_id,
1235        };
1236
1237        self.send_cmd(cmd).await
1238    }
1239
1240    /// Amends an existing order via WebSocket.
1241    ///
1242    /// # Errors
1243    ///
1244    /// Returns an error if the amend request fails or if not authenticated.
1245    ///
1246    /// # References
1247    ///
1248    /// <https://bybit-exchange.github.io/docs/v5/websocket/trade/guideline>
1249    pub async fn amend_order(
1250        &self,
1251        params: BybitWsAmendOrderParams,
1252        client_order_id: ClientOrderId,
1253        trader_id: TraderId,
1254        strategy_id: StrategyId,
1255        instrument_id: InstrumentId,
1256        venue_order_id: Option<VenueOrderId>,
1257    ) -> BybitWsResult<()> {
1258        if !self.auth_tracker.is_authenticated() {
1259            return Err(BybitWsError::Authentication(
1260                "Must be authenticated to amend orders".to_string(),
1261            ));
1262        }
1263
1264        let cmd = HandlerCommand::AmendOrder {
1265            params,
1266            client_order_id,
1267            trader_id,
1268            strategy_id,
1269            instrument_id,
1270            venue_order_id,
1271        };
1272
1273        self.send_cmd(cmd).await
1274    }
1275
1276    /// Cancels an order via WebSocket.
1277    ///
1278    /// # Errors
1279    ///
1280    /// Returns an error if the cancel request fails or if not authenticated.
1281    ///
1282    /// # References
1283    ///
1284    /// <https://bybit-exchange.github.io/docs/v5/websocket/trade/guideline>
1285    pub async fn cancel_order(
1286        &self,
1287        params: BybitWsCancelOrderParams,
1288        client_order_id: ClientOrderId,
1289        trader_id: TraderId,
1290        strategy_id: StrategyId,
1291        instrument_id: InstrumentId,
1292        venue_order_id: Option<VenueOrderId>,
1293    ) -> BybitWsResult<()> {
1294        if !self.auth_tracker.is_authenticated() {
1295            return Err(BybitWsError::Authentication(
1296                "Must be authenticated to cancel orders".to_string(),
1297            ));
1298        }
1299
1300        let cmd = HandlerCommand::CancelOrder {
1301            params,
1302            client_order_id,
1303            trader_id,
1304            strategy_id,
1305            instrument_id,
1306            venue_order_id,
1307        };
1308
1309        self.send_cmd(cmd).await
1310    }
1311
1312    /// Batch creates multiple orders via WebSocket.
1313    ///
1314    /// # Errors
1315    ///
1316    /// Returns an error if the batch request fails or if not authenticated.
1317    ///
1318    /// # References
1319    ///
1320    /// <https://bybit-exchange.github.io/docs/v5/websocket/trade/guideline>
1321    pub async fn batch_place_orders(
1322        &self,
1323        trader_id: TraderId,
1324        strategy_id: StrategyId,
1325        orders: Vec<BybitWsPlaceOrderParams>,
1326    ) -> BybitWsResult<()> {
1327        if !self.auth_tracker.is_authenticated() {
1328            return Err(BybitWsError::Authentication(
1329                "Must be authenticated to place orders".to_string(),
1330            ));
1331        }
1332
1333        if orders.is_empty() {
1334            log::warn!("Batch place orders called with empty orders list");
1335            return Ok(());
1336        }
1337
1338        for chunk in orders.chunks(BATCH_PROCESSING_LIMIT) {
1339            self.batch_place_orders_chunk(trader_id, strategy_id, chunk.to_vec())
1340                .await?;
1341        }
1342
1343        Ok(())
1344    }
1345
1346    async fn batch_place_orders_chunk(
1347        &self,
1348        trader_id: TraderId,
1349        strategy_id: StrategyId,
1350        orders: Vec<BybitWsPlaceOrderParams>,
1351    ) -> BybitWsResult<()> {
1352        let category = orders[0].category;
1353        let batch_req_id = UUID4::new().to_string();
1354
1355        // Extract order tracking data before consuming orders to register with handler
1356        let mut batch_order_data = Vec::new();
1357        for order in &orders {
1358            if let Some(order_link_id_str) = &order.order_link_id {
1359                let client_order_id = ClientOrderId::from(order_link_id_str.as_str());
1360                let cache_key = make_bybit_symbol(order.symbol.as_str(), category);
1361                let instrument_id = self
1362                    .instruments_cache
1363                    .get(&cache_key)
1364                    .map(|inst| inst.id())
1365                    .ok_or_else(|| {
1366                        BybitWsError::ClientError(format!(
1367                            "Instrument {cache_key} not found in cache"
1368                        ))
1369                    })?;
1370                batch_order_data.push((
1371                    client_order_id,
1372                    (client_order_id, trader_id, strategy_id, instrument_id),
1373                ));
1374            }
1375        }
1376
1377        if !batch_order_data.is_empty() {
1378            let cmd = HandlerCommand::RegisterBatchPlace {
1379                req_id: batch_req_id.clone(),
1380                orders: batch_order_data,
1381            };
1382            let cmd_tx = self.cmd_tx.read().await;
1383            if let Err(e) = cmd_tx.send(cmd) {
1384                log::error!("Failed to send RegisterBatchPlace command: {e}");
1385            }
1386        }
1387
1388        let mm_level = self.mm_level.load(Ordering::Relaxed);
1389        let has_non_post_only = orders
1390            .iter()
1391            .any(|o| !matches!(o.time_in_force, Some(BybitTimeInForce::PostOnly)));
1392        let referer = if has_non_post_only || mm_level == 0 {
1393            Some(BYBIT_NAUTILUS_BROKER_ID.to_string())
1394        } else {
1395            None
1396        };
1397
1398        let request_items: Vec<BybitWsBatchPlaceItem> = orders
1399            .into_iter()
1400            .map(|order| BybitWsBatchPlaceItem {
1401                symbol: order.symbol,
1402                side: order.side,
1403                order_type: order.order_type,
1404                qty: order.qty,
1405                is_leverage: order.is_leverage,
1406                market_unit: order.market_unit,
1407                price: order.price,
1408                time_in_force: order.time_in_force,
1409                order_link_id: order.order_link_id,
1410                reduce_only: order.reduce_only,
1411                close_on_trigger: order.close_on_trigger,
1412                trigger_price: order.trigger_price,
1413                trigger_by: order.trigger_by,
1414                trigger_direction: order.trigger_direction,
1415                tpsl_mode: order.tpsl_mode,
1416                take_profit: order.take_profit,
1417                stop_loss: order.stop_loss,
1418                tp_trigger_by: order.tp_trigger_by,
1419                sl_trigger_by: order.sl_trigger_by,
1420                sl_trigger_price: order.sl_trigger_price,
1421                tp_trigger_price: order.tp_trigger_price,
1422                sl_order_type: order.sl_order_type,
1423                tp_order_type: order.tp_order_type,
1424                sl_limit_price: order.sl_limit_price,
1425                tp_limit_price: order.tp_limit_price,
1426            })
1427            .collect();
1428
1429        let args = BybitWsBatchPlaceOrderArgs {
1430            category,
1431            request: request_items,
1432        };
1433
1434        let request = BybitWsRequest {
1435            req_id: Some(batch_req_id),
1436            op: BybitWsOrderRequestOp::CreateBatch,
1437            header: BybitWsHeader::with_referer(referer),
1438            args: vec![args],
1439        };
1440
1441        let payload = serde_json::to_string(&request).map_err(BybitWsError::from)?;
1442
1443        self.send_text(&payload).await
1444    }
1445
1446    /// Batch amends multiple orders via WebSocket.
1447    ///
1448    /// # Errors
1449    ///
1450    /// Returns an error if the batch request fails or if not authenticated.
1451    pub async fn batch_amend_orders(
1452        &self,
1453        #[allow(unused_variables)] trader_id: TraderId,
1454        #[allow(unused_variables)] strategy_id: StrategyId,
1455        orders: Vec<BybitWsAmendOrderParams>,
1456    ) -> BybitWsResult<()> {
1457        if !self.auth_tracker.is_authenticated() {
1458            return Err(BybitWsError::Authentication(
1459                "Must be authenticated to amend orders".to_string(),
1460            ));
1461        }
1462
1463        if orders.is_empty() {
1464            log::warn!("Batch amend orders called with empty orders list");
1465            return Ok(());
1466        }
1467
1468        for chunk in orders.chunks(BATCH_PROCESSING_LIMIT) {
1469            self.batch_amend_orders_chunk(trader_id, strategy_id, chunk.to_vec())
1470                .await?;
1471        }
1472
1473        Ok(())
1474    }
1475
1476    async fn batch_amend_orders_chunk(
1477        &self,
1478        #[allow(unused_variables)] trader_id: TraderId,
1479        #[allow(unused_variables)] strategy_id: StrategyId,
1480        orders: Vec<BybitWsAmendOrderParams>,
1481    ) -> BybitWsResult<()> {
1482        let request = BybitWsRequest {
1483            req_id: None,
1484            op: BybitWsOrderRequestOp::AmendBatch,
1485            header: BybitWsHeader::now(),
1486            args: orders,
1487        };
1488
1489        let payload = serde_json::to_string(&request).map_err(BybitWsError::from)?;
1490
1491        self.send_text(&payload).await
1492    }
1493
1494    /// Batch cancels multiple orders via WebSocket.
1495    ///
1496    /// # Errors
1497    ///
1498    /// Returns an error if the batch request fails or if not authenticated.
1499    pub async fn batch_cancel_orders(
1500        &self,
1501        trader_id: TraderId,
1502        strategy_id: StrategyId,
1503        orders: Vec<BybitWsCancelOrderParams>,
1504    ) -> BybitWsResult<()> {
1505        if !self.auth_tracker.is_authenticated() {
1506            return Err(BybitWsError::Authentication(
1507                "Must be authenticated to cancel orders".to_string(),
1508            ));
1509        }
1510
1511        if orders.is_empty() {
1512            log::warn!("Batch cancel orders called with empty orders list");
1513            return Ok(());
1514        }
1515
1516        for chunk in orders.chunks(BATCH_PROCESSING_LIMIT) {
1517            self.batch_cancel_orders_chunk(trader_id, strategy_id, chunk.to_vec())
1518                .await?;
1519        }
1520
1521        Ok(())
1522    }
1523
1524    async fn batch_cancel_orders_chunk(
1525        &self,
1526        trader_id: TraderId,
1527        strategy_id: StrategyId,
1528        orders: Vec<BybitWsCancelOrderParams>,
1529    ) -> BybitWsResult<()> {
1530        if orders.is_empty() {
1531            return Ok(());
1532        }
1533
1534        let category = orders[0].category;
1535        let batch_req_id = UUID4::new().to_string();
1536
1537        let mut validated_data = Vec::new();
1538
1539        for order in &orders {
1540            if let Some(order_link_id_str) = &order.order_link_id {
1541                let cache_key = make_bybit_symbol(order.symbol.as_str(), category);
1542                let instrument_id = self
1543                    .instruments_cache
1544                    .get(&cache_key)
1545                    .map(|inst| inst.id())
1546                    .ok_or_else(|| {
1547                        BybitWsError::ClientError(format!(
1548                            "Instrument {cache_key} not found in cache"
1549                        ))
1550                    })?;
1551
1552                let venue_order_id = order
1553                    .order_id
1554                    .as_ref()
1555                    .map(|id| VenueOrderId::from(id.as_str()));
1556
1557                validated_data.push((order_link_id_str.clone(), instrument_id, venue_order_id));
1558            }
1559        }
1560
1561        let batch_cancel_data: Vec<_> = validated_data
1562            .iter()
1563            .map(|(order_link_id_str, instrument_id, venue_order_id)| {
1564                let client_order_id = ClientOrderId::from(order_link_id_str.as_str());
1565                (
1566                    client_order_id,
1567                    (
1568                        client_order_id,
1569                        trader_id,
1570                        strategy_id,
1571                        *instrument_id,
1572                        *venue_order_id,
1573                    ),
1574                )
1575            })
1576            .collect();
1577
1578        if !batch_cancel_data.is_empty() {
1579            let cmd = HandlerCommand::RegisterBatchCancel {
1580                req_id: batch_req_id.clone(),
1581                cancels: batch_cancel_data,
1582            };
1583            let cmd_tx = self.cmd_tx.read().await;
1584            if let Err(e) = cmd_tx.send(cmd) {
1585                log::error!("Failed to send RegisterBatchCancel command: {e}");
1586            }
1587        }
1588
1589        let request_items: Vec<BybitWsBatchCancelItem> = orders
1590            .into_iter()
1591            .map(|order| BybitWsBatchCancelItem {
1592                symbol: order.symbol,
1593                order_id: order.order_id,
1594                order_link_id: order.order_link_id,
1595            })
1596            .collect();
1597
1598        let args = BybitWsBatchCancelOrderArgs {
1599            category,
1600            request: request_items,
1601        };
1602
1603        let request = BybitWsRequest {
1604            req_id: Some(batch_req_id),
1605            op: BybitWsOrderRequestOp::CancelBatch,
1606            header: BybitWsHeader::now(),
1607            args: vec![args],
1608        };
1609
1610        let payload = serde_json::to_string(&request).map_err(BybitWsError::from)?;
1611
1612        self.send_text(&payload).await
1613    }
1614
1615    /// Submits an order using Nautilus domain objects.
1616    ///
1617    /// # Errors
1618    ///
1619    /// Returns an error if order submission fails or if not authenticated.
1620    #[allow(clippy::too_many_arguments)]
1621    pub async fn submit_order(
1622        &self,
1623        product_type: BybitProductType,
1624        trader_id: TraderId,
1625        strategy_id: StrategyId,
1626        instrument_id: InstrumentId,
1627        client_order_id: ClientOrderId,
1628        order_side: OrderSide,
1629        order_type: OrderType,
1630        quantity: Quantity,
1631        is_quote_quantity: bool,
1632        time_in_force: Option<TimeInForce>,
1633        price: Option<Price>,
1634        trigger_price: Option<Price>,
1635        post_only: Option<bool>,
1636        reduce_only: Option<bool>,
1637        is_leverage: bool,
1638    ) -> BybitWsResult<()> {
1639        let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1640            .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1641        let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1642
1643        let bybit_side = match order_side {
1644            OrderSide::Buy => BybitOrderSide::Buy,
1645            OrderSide::Sell => BybitOrderSide::Sell,
1646            _ => {
1647                return Err(BybitWsError::ClientError(format!(
1648                    "Invalid order side: {order_side:?}"
1649                )));
1650            }
1651        };
1652
1653        // For stop/conditional orders, Bybit uses Market/Limit with trigger parameters
1654        let (bybit_order_type, is_stop_order) = match order_type {
1655            OrderType::Market => (BybitOrderType::Market, false),
1656            OrderType::Limit => (BybitOrderType::Limit, false),
1657            OrderType::StopMarket | OrderType::MarketIfTouched => (BybitOrderType::Market, true),
1658            OrderType::StopLimit | OrderType::LimitIfTouched => (BybitOrderType::Limit, true),
1659            _ => {
1660                return Err(BybitWsError::ClientError(format!(
1661                    "Unsupported order type: {order_type:?}"
1662                )));
1663            }
1664        };
1665
1666        let bybit_tif = if bybit_order_type == BybitOrderType::Market {
1667            None
1668        } else if post_only == Some(true) {
1669            Some(BybitTimeInForce::PostOnly)
1670        } else if let Some(tif) = time_in_force {
1671            Some(match tif {
1672                TimeInForce::Gtc => BybitTimeInForce::Gtc,
1673                TimeInForce::Ioc => BybitTimeInForce::Ioc,
1674                TimeInForce::Fok => BybitTimeInForce::Fok,
1675                _ => {
1676                    return Err(BybitWsError::ClientError(format!(
1677                        "Unsupported time in force: {tif:?}"
1678                    )));
1679                }
1680            })
1681        } else {
1682            None
1683        };
1684
1685        // For SPOT market orders, specify baseCoin to interpret qty as base currency.
1686        // This ensures Nautilus quantities (always in base currency) are interpreted correctly.
1687        let market_unit = if product_type == BybitProductType::Spot
1688            && bybit_order_type == BybitOrderType::Market
1689        {
1690            if is_quote_quantity {
1691                Some(BYBIT_QUOTE_COIN.to_string())
1692            } else {
1693                Some(BYBIT_BASE_COIN.to_string())
1694            }
1695        } else {
1696            None
1697        };
1698
1699        // Only SPOT products support is_leverage parameter
1700        let is_leverage_value = if product_type == BybitProductType::Spot {
1701            Some(i32::from(is_leverage))
1702        } else {
1703            None
1704        };
1705
1706        // Stop semantics: Buy stops trigger on rise (breakout), sell stops trigger on fall (breakdown)
1707        // MIT semantics: Buy MIT triggers on fall (pullback entry), sell MIT triggers on rise (rally entry)
1708        let trigger_direction = if is_stop_order {
1709            match (order_type, order_side) {
1710                (OrderType::StopMarket | OrderType::StopLimit, OrderSide::Buy) => {
1711                    Some(BybitTriggerDirection::RisesTo as i32)
1712                }
1713                (OrderType::StopMarket | OrderType::StopLimit, OrderSide::Sell) => {
1714                    Some(BybitTriggerDirection::FallsTo as i32)
1715                }
1716                (OrderType::MarketIfTouched | OrderType::LimitIfTouched, OrderSide::Buy) => {
1717                    Some(BybitTriggerDirection::FallsTo as i32)
1718                }
1719                (OrderType::MarketIfTouched | OrderType::LimitIfTouched, OrderSide::Sell) => {
1720                    Some(BybitTriggerDirection::RisesTo as i32)
1721                }
1722                _ => None,
1723            }
1724        } else {
1725            None
1726        };
1727
1728        let params = if is_stop_order {
1729            // For conditional orders, ALL types use triggerPrice field
1730            // sl_trigger_price/tp_trigger_price are only for TP/SL attached to regular orders
1731            BybitWsPlaceOrderParams {
1732                category: product_type,
1733                symbol: raw_symbol,
1734                side: bybit_side,
1735                order_type: bybit_order_type,
1736                qty: quantity.to_string(),
1737                is_leverage: is_leverage_value,
1738                market_unit: market_unit.clone(),
1739                price: price.map(|p| p.to_string()),
1740                time_in_force: bybit_tif,
1741                order_link_id: Some(client_order_id.to_string()),
1742                reduce_only: reduce_only.filter(|&r| r),
1743                close_on_trigger: None,
1744                trigger_price: trigger_price.map(|p| p.to_string()),
1745                trigger_by: Some(BybitTriggerType::LastPrice),
1746                trigger_direction,
1747                tpsl_mode: None, // Not needed for standalone conditional orders
1748                take_profit: None,
1749                stop_loss: None,
1750                tp_trigger_by: None,
1751                sl_trigger_by: None,
1752                sl_trigger_price: None, // Not used for standalone stop orders
1753                tp_trigger_price: None, // Not used for standalone stop orders
1754                sl_order_type: None,
1755                tp_order_type: None,
1756                sl_limit_price: None,
1757                tp_limit_price: None,
1758            }
1759        } else {
1760            // Regular market/limit orders
1761            BybitWsPlaceOrderParams {
1762                category: product_type,
1763                symbol: raw_symbol,
1764                side: bybit_side,
1765                order_type: bybit_order_type,
1766                qty: quantity.to_string(),
1767                is_leverage: is_leverage_value,
1768                market_unit,
1769                price: price.map(|p| p.to_string()),
1770                time_in_force: if bybit_order_type == BybitOrderType::Market {
1771                    None
1772                } else {
1773                    bybit_tif
1774                },
1775                order_link_id: Some(client_order_id.to_string()),
1776                reduce_only: reduce_only.filter(|&r| r),
1777                close_on_trigger: None,
1778                trigger_price: None,
1779                trigger_by: None,
1780                trigger_direction: None,
1781                tpsl_mode: None,
1782                take_profit: None,
1783                stop_loss: None,
1784                tp_trigger_by: None,
1785                sl_trigger_by: None,
1786                sl_trigger_price: None,
1787                tp_trigger_price: None,
1788                sl_order_type: None,
1789                tp_order_type: None,
1790                sl_limit_price: None,
1791                tp_limit_price: None,
1792            }
1793        };
1794
1795        self.place_order(
1796            params,
1797            client_order_id,
1798            trader_id,
1799            strategy_id,
1800            instrument_id,
1801        )
1802        .await
1803    }
1804
1805    /// Modifies an existing order using Nautilus domain objects.
1806    ///
1807    /// # Errors
1808    ///
1809    /// Returns an error if modification fails or if not authenticated.
1810    #[allow(clippy::too_many_arguments)]
1811    pub async fn modify_order(
1812        &self,
1813        product_type: BybitProductType,
1814        trader_id: TraderId,
1815        strategy_id: StrategyId,
1816        instrument_id: InstrumentId,
1817        client_order_id: ClientOrderId,
1818        venue_order_id: Option<VenueOrderId>,
1819        quantity: Option<Quantity>,
1820        price: Option<Price>,
1821    ) -> BybitWsResult<()> {
1822        let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1823            .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1824        let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1825
1826        let params = BybitWsAmendOrderParams {
1827            category: product_type,
1828            symbol: raw_symbol,
1829            order_id: venue_order_id.map(|id| id.to_string()),
1830            order_link_id: Some(client_order_id.to_string()),
1831            qty: quantity.map(|q| q.to_string()),
1832            price: price.map(|p| p.to_string()),
1833            trigger_price: None,
1834            take_profit: None,
1835            stop_loss: None,
1836            tp_trigger_by: None,
1837            sl_trigger_by: None,
1838        };
1839
1840        self.amend_order(
1841            params,
1842            client_order_id,
1843            trader_id,
1844            strategy_id,
1845            instrument_id,
1846            venue_order_id,
1847        )
1848        .await
1849    }
1850
1851    /// Cancels an order using Nautilus domain objects.
1852    ///
1853    /// # Errors
1854    ///
1855    /// Returns an error if cancellation fails or if not authenticated.
1856    pub async fn cancel_order_by_id(
1857        &self,
1858        product_type: BybitProductType,
1859        trader_id: TraderId,
1860        strategy_id: StrategyId,
1861        instrument_id: InstrumentId,
1862        client_order_id: ClientOrderId,
1863        venue_order_id: Option<VenueOrderId>,
1864    ) -> BybitWsResult<()> {
1865        let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1866            .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1867        let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1868
1869        let params = BybitWsCancelOrderParams {
1870            category: product_type,
1871            symbol: raw_symbol,
1872            order_id: venue_order_id.map(|id| id.to_string()),
1873            order_link_id: Some(client_order_id.to_string()),
1874        };
1875
1876        self.cancel_order(
1877            params,
1878            client_order_id,
1879            trader_id,
1880            strategy_id,
1881            instrument_id,
1882            venue_order_id,
1883        )
1884        .await
1885    }
1886
1887    /// Builds order params for placing an order.
1888    #[allow(clippy::too_many_arguments)]
1889    pub fn build_place_order_params(
1890        &self,
1891        product_type: BybitProductType,
1892        instrument_id: InstrumentId,
1893        client_order_id: ClientOrderId,
1894        order_side: OrderSide,
1895        order_type: OrderType,
1896        quantity: Quantity,
1897        is_quote_quantity: bool,
1898        time_in_force: Option<TimeInForce>,
1899        price: Option<Price>,
1900        trigger_price: Option<Price>,
1901        post_only: Option<bool>,
1902        reduce_only: Option<bool>,
1903        is_leverage: bool,
1904        take_profit: Option<Price>,
1905        stop_loss: Option<Price>,
1906    ) -> BybitWsResult<BybitWsPlaceOrderParams> {
1907        let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
1908            .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
1909        let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
1910
1911        let bybit_side = match order_side {
1912            OrderSide::Buy => BybitOrderSide::Buy,
1913            OrderSide::Sell => BybitOrderSide::Sell,
1914            _ => {
1915                return Err(BybitWsError::ClientError(format!(
1916                    "Invalid order side: {order_side:?}"
1917                )));
1918            }
1919        };
1920
1921        let (bybit_order_type, is_stop_order) = match order_type {
1922            OrderType::Market => (BybitOrderType::Market, false),
1923            OrderType::Limit => (BybitOrderType::Limit, false),
1924            OrderType::StopMarket | OrderType::MarketIfTouched => (BybitOrderType::Market, true),
1925            OrderType::StopLimit | OrderType::LimitIfTouched => (BybitOrderType::Limit, true),
1926            _ => {
1927                return Err(BybitWsError::ClientError(format!(
1928                    "Unsupported order type: {order_type:?}"
1929                )));
1930            }
1931        };
1932
1933        let bybit_tif = if post_only == Some(true) {
1934            Some(BybitTimeInForce::PostOnly)
1935        } else if let Some(tif) = time_in_force {
1936            Some(match tif {
1937                TimeInForce::Gtc => BybitTimeInForce::Gtc,
1938                TimeInForce::Ioc => BybitTimeInForce::Ioc,
1939                TimeInForce::Fok => BybitTimeInForce::Fok,
1940                _ => {
1941                    return Err(BybitWsError::ClientError(format!(
1942                        "Unsupported time in force: {tif:?}"
1943                    )));
1944                }
1945            })
1946        } else {
1947            None
1948        };
1949
1950        let market_unit = if product_type == BybitProductType::Spot
1951            && bybit_order_type == BybitOrderType::Market
1952        {
1953            if is_quote_quantity {
1954                Some(BYBIT_QUOTE_COIN.to_string())
1955            } else {
1956                Some(BYBIT_BASE_COIN.to_string())
1957            }
1958        } else {
1959            None
1960        };
1961
1962        // Only SPOT products support is_leverage parameter
1963        let is_leverage_value = if product_type == BybitProductType::Spot {
1964            Some(i32::from(is_leverage))
1965        } else {
1966            None
1967        };
1968
1969        // Stop semantics: Buy stops trigger on rise (breakout), sell stops trigger on fall (breakdown)
1970        // MIT semantics: Buy MIT triggers on fall (pullback entry), sell MIT triggers on rise (rally entry)
1971        let trigger_direction = if is_stop_order {
1972            match (order_type, order_side) {
1973                (OrderType::StopMarket | OrderType::StopLimit, OrderSide::Buy) => {
1974                    Some(BybitTriggerDirection::RisesTo as i32)
1975                }
1976                (OrderType::StopMarket | OrderType::StopLimit, OrderSide::Sell) => {
1977                    Some(BybitTriggerDirection::FallsTo as i32)
1978                }
1979                (OrderType::MarketIfTouched | OrderType::LimitIfTouched, OrderSide::Buy) => {
1980                    Some(BybitTriggerDirection::FallsTo as i32)
1981                }
1982                (OrderType::MarketIfTouched | OrderType::LimitIfTouched, OrderSide::Sell) => {
1983                    Some(BybitTriggerDirection::RisesTo as i32)
1984                }
1985                _ => None,
1986            }
1987        } else {
1988            None
1989        };
1990
1991        let params = if is_stop_order {
1992            BybitWsPlaceOrderParams {
1993                category: product_type,
1994                symbol: raw_symbol,
1995                side: bybit_side,
1996                order_type: bybit_order_type,
1997                qty: quantity.to_string(),
1998                is_leverage: is_leverage_value,
1999                market_unit,
2000                price: price.map(|p| p.to_string()),
2001                time_in_force: if bybit_order_type == BybitOrderType::Market {
2002                    None
2003                } else {
2004                    bybit_tif
2005                },
2006                order_link_id: Some(client_order_id.to_string()),
2007                reduce_only: reduce_only.filter(|&r| r),
2008                close_on_trigger: None,
2009                trigger_price: trigger_price.map(|p| p.to_string()),
2010                trigger_by: Some(BybitTriggerType::LastPrice),
2011                trigger_direction,
2012                tpsl_mode: if take_profit.is_some() || stop_loss.is_some() {
2013                    Some("Full".to_string())
2014                } else {
2015                    None
2016                },
2017                take_profit: take_profit.map(|p| p.to_string()),
2018                stop_loss: stop_loss.map(|p| p.to_string()),
2019                tp_trigger_by: take_profit.map(|_| BybitTriggerType::LastPrice),
2020                sl_trigger_by: stop_loss.map(|_| BybitTriggerType::LastPrice),
2021                sl_trigger_price: None,
2022                tp_trigger_price: None,
2023                sl_order_type: None,
2024                tp_order_type: None,
2025                sl_limit_price: None,
2026                tp_limit_price: None,
2027            }
2028        } else {
2029            BybitWsPlaceOrderParams {
2030                category: product_type,
2031                symbol: raw_symbol,
2032                side: bybit_side,
2033                order_type: bybit_order_type,
2034                qty: quantity.to_string(),
2035                is_leverage: is_leverage_value,
2036                market_unit,
2037                price: price.map(|p| p.to_string()),
2038                time_in_force: if bybit_order_type == BybitOrderType::Market {
2039                    None
2040                } else {
2041                    bybit_tif
2042                },
2043                order_link_id: Some(client_order_id.to_string()),
2044                reduce_only: reduce_only.filter(|&r| r),
2045                close_on_trigger: None,
2046                trigger_price: None,
2047                trigger_by: None,
2048                trigger_direction: None,
2049                tpsl_mode: if take_profit.is_some() || stop_loss.is_some() {
2050                    Some("Full".to_string())
2051                } else {
2052                    None
2053                },
2054                take_profit: take_profit.map(|p| p.to_string()),
2055                stop_loss: stop_loss.map(|p| p.to_string()),
2056                tp_trigger_by: take_profit.map(|_| BybitTriggerType::LastPrice),
2057                sl_trigger_by: stop_loss.map(|_| BybitTriggerType::LastPrice),
2058                sl_trigger_price: None,
2059                tp_trigger_price: None,
2060                sl_order_type: None,
2061                tp_order_type: None,
2062                sl_limit_price: None,
2063                tp_limit_price: None,
2064            }
2065        };
2066
2067        Ok(params)
2068    }
2069
2070    /// Builds order params for amending an order.
2071    #[allow(clippy::too_many_arguments)]
2072    pub fn build_amend_order_params(
2073        &self,
2074        product_type: BybitProductType,
2075        instrument_id: InstrumentId,
2076        venue_order_id: Option<VenueOrderId>,
2077        client_order_id: Option<ClientOrderId>,
2078        quantity: Option<Quantity>,
2079        price: Option<Price>,
2080    ) -> BybitWsResult<BybitWsAmendOrderParams> {
2081        let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
2082            .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
2083        let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
2084
2085        Ok(BybitWsAmendOrderParams {
2086            category: product_type,
2087            symbol: raw_symbol,
2088            order_id: venue_order_id.map(|v| v.to_string()),
2089            order_link_id: client_order_id.map(|c| c.to_string()),
2090            qty: quantity.map(|q| q.to_string()),
2091            price: price.map(|p| p.to_string()),
2092            trigger_price: None,
2093            take_profit: None,
2094            stop_loss: None,
2095            tp_trigger_by: None,
2096            sl_trigger_by: None,
2097        })
2098    }
2099
2100    /// Builds order params for canceling an order via WebSocket.
2101    ///
2102    /// # Errors
2103    ///
2104    /// Returns an error if symbol parsing fails or if neither venue_order_id
2105    /// nor client_order_id is provided.
2106    pub fn build_cancel_order_params(
2107        &self,
2108        product_type: BybitProductType,
2109        instrument_id: InstrumentId,
2110        venue_order_id: Option<VenueOrderId>,
2111        client_order_id: Option<ClientOrderId>,
2112    ) -> BybitWsResult<BybitWsCancelOrderParams> {
2113        if venue_order_id.is_none() && client_order_id.is_none() {
2114            return Err(BybitWsError::ClientError(
2115                "Either venue_order_id or client_order_id must be provided".to_string(),
2116            ));
2117        }
2118
2119        let bybit_symbol = BybitSymbol::new(instrument_id.symbol.as_str())
2120            .map_err(|e| BybitWsError::ClientError(e.to_string()))?;
2121        let raw_symbol = Ustr::from(bybit_symbol.raw_symbol());
2122
2123        Ok(BybitWsCancelOrderParams {
2124            category: product_type,
2125            symbol: raw_symbol,
2126            order_id: venue_order_id.map(|v| v.to_string()),
2127            order_link_id: client_order_id.map(|c| c.to_string()),
2128        })
2129    }
2130
2131    fn default_headers() -> Vec<(String, String)> {
2132        vec![
2133            ("Content-Type".to_string(), "application/json".to_string()),
2134            ("User-Agent".to_string(), NAUTILUS_USER_AGENT.to_string()),
2135        ]
2136    }
2137
2138    async fn authenticate_if_required(&self) -> BybitWsResult<()> {
2139        if !self.requires_auth {
2140            return Ok(());
2141        }
2142
2143        let credential = self.credential.as_ref().ok_or_else(|| {
2144            BybitWsError::Authentication("Credentials required for authentication".to_string())
2145        })?;
2146
2147        let expires = chrono::Utc::now().timestamp_millis() + WEBSOCKET_AUTH_WINDOW_MS;
2148        let signature = credential.sign_websocket_auth(expires);
2149
2150        let auth_message = BybitAuthRequest {
2151            op: BybitWsOperation::Auth,
2152            args: vec![
2153                Value::String(credential.api_key().to_string()),
2154                Value::Number(expires.into()),
2155                Value::String(signature),
2156            ],
2157        };
2158
2159        let payload = serde_json::to_string(&auth_message)?;
2160
2161        // Begin auth attempt so succeed() will update state
2162        let _rx = self.auth_tracker.begin();
2163
2164        self.cmd_tx
2165            .read()
2166            .await
2167            .send(HandlerCommand::Authenticate { payload })
2168            .map_err(|e| BybitWsError::Send(format!("Failed to send auth command: {e}")))?;
2169
2170        // Authentication will be processed asynchronously by the handler
2171        // The handler will emit NautilusWsMessage::Authenticated when successful
2172        Ok(())
2173    }
2174
2175    async fn send_text(&self, text: &str) -> BybitWsResult<()> {
2176        let cmd = HandlerCommand::SendText {
2177            payload: text.to_string(),
2178        };
2179
2180        self.send_cmd(cmd).await
2181    }
2182
2183    async fn send_cmd(&self, cmd: HandlerCommand) -> BybitWsResult<()> {
2184        self.cmd_tx
2185            .read()
2186            .await
2187            .send(cmd)
2188            .map_err(|e| BybitWsError::Send(e.to_string()))
2189    }
2190}
2191
2192#[cfg(test)]
2193mod tests {
2194    use rstest::rstest;
2195
2196    use super::*;
2197    use crate::{
2198        common::testing::load_test_json,
2199        websocket::{classify_bybit_message, messages::BybitWsMessage},
2200    };
2201
2202    #[rstest]
2203    fn classify_orderbook_snapshot() {
2204        let json: Value = serde_json::from_str(&load_test_json("ws_orderbook_snapshot.json"))
2205            .expect("invalid fixture");
2206        let message = classify_bybit_message(json);
2207        assert!(matches!(message, BybitWsMessage::Orderbook(_)));
2208    }
2209
2210    #[rstest]
2211    fn classify_trade_snapshot() {
2212        let json: Value =
2213            serde_json::from_str(&load_test_json("ws_public_trade.json")).expect("invalid fixture");
2214        let message = classify_bybit_message(json);
2215        assert!(matches!(message, BybitWsMessage::Trade(_)));
2216    }
2217
2218    #[rstest]
2219    fn classify_ticker_linear_snapshot() {
2220        let json: Value = serde_json::from_str(&load_test_json("ws_ticker_linear.json"))
2221            .expect("invalid fixture");
2222        let message = classify_bybit_message(json);
2223        assert!(matches!(message, BybitWsMessage::TickerLinear(_)));
2224    }
2225
2226    #[rstest]
2227    fn classify_ticker_option_snapshot() {
2228        let json: Value = serde_json::from_str(&load_test_json("ws_ticker_option.json"))
2229            .expect("invalid fixture");
2230        let message = classify_bybit_message(json);
2231        assert!(matches!(message, BybitWsMessage::TickerOption(_)));
2232    }
2233
2234    #[rstest]
2235    fn test_race_unsubscribe_failure_recovery() {
2236        // Simulates the race condition where venue rejects an unsubscribe request.
2237        // The adapter must perform the 3-step recovery:
2238        // 1. confirm_unsubscribe() - clear pending_unsubscribe
2239        // 2. mark_subscribe() - mark as subscribing again
2240        // 3. confirm_subscribe() - restore to confirmed state
2241        let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER); // Bybit uses dot delimiter
2242
2243        let topic = "publicTrade.BTCUSDT";
2244
2245        // Initial subscribe flow
2246        subscriptions.mark_subscribe(topic);
2247        subscriptions.confirm_subscribe(topic);
2248        assert_eq!(subscriptions.len(), 1);
2249
2250        // User unsubscribes
2251        subscriptions.mark_unsubscribe(topic);
2252        assert_eq!(subscriptions.len(), 0);
2253        assert_eq!(subscriptions.pending_unsubscribe_topics(), vec![topic]);
2254
2255        // Venue REJECTS the unsubscribe (error message)
2256        // Adapter must perform 3-step recovery (from lines 2181-2183)
2257        subscriptions.confirm_unsubscribe(topic); // Step 1: clear pending_unsubscribe
2258        subscriptions.mark_subscribe(topic); // Step 2: mark as subscribing
2259        subscriptions.confirm_subscribe(topic); // Step 3: confirm subscription
2260
2261        // Verify recovery: topic should be back in confirmed state
2262        assert_eq!(subscriptions.len(), 1);
2263        assert!(subscriptions.pending_unsubscribe_topics().is_empty());
2264        assert!(subscriptions.pending_subscribe_topics().is_empty());
2265
2266        // Verify topic is in all_topics() for reconnect
2267        let all = subscriptions.all_topics();
2268        assert_eq!(all.len(), 1);
2269        assert!(all.contains(&topic.to_string()));
2270    }
2271
2272    #[rstest]
2273    fn test_race_resubscribe_before_unsubscribe_ack() {
2274        // Simulates: User unsubscribes, then immediately resubscribes before
2275        // the unsubscribe ACK arrives from the venue.
2276        // This is the race condition fixed in the subscription tracker.
2277        let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER); // Bybit uses dot delimiter
2278
2279        let topic = "orderbook.50.BTCUSDT";
2280
2281        // Initial subscribe
2282        subscriptions.mark_subscribe(topic);
2283        subscriptions.confirm_subscribe(topic);
2284        assert_eq!(subscriptions.len(), 1);
2285
2286        // User unsubscribes
2287        subscriptions.mark_unsubscribe(topic);
2288        assert_eq!(subscriptions.len(), 0);
2289        assert_eq!(subscriptions.pending_unsubscribe_topics(), vec![topic]);
2290
2291        // User immediately changes mind and resubscribes (before unsubscribe ACK)
2292        subscriptions.mark_subscribe(topic);
2293        assert_eq!(subscriptions.pending_subscribe_topics(), vec![topic]);
2294
2295        // NOW the unsubscribe ACK arrives - should NOT clear pending_subscribe
2296        subscriptions.confirm_unsubscribe(topic);
2297        assert!(subscriptions.pending_unsubscribe_topics().is_empty());
2298        assert_eq!(subscriptions.pending_subscribe_topics(), vec![topic]);
2299
2300        // Subscribe ACK arrives
2301        subscriptions.confirm_subscribe(topic);
2302        assert_eq!(subscriptions.len(), 1);
2303        assert!(subscriptions.pending_subscribe_topics().is_empty());
2304
2305        // Verify final state is correct
2306        let all = subscriptions.all_topics();
2307        assert_eq!(all.len(), 1);
2308        assert!(all.contains(&topic.to_string()));
2309    }
2310
2311    #[rstest]
2312    fn test_race_late_subscribe_confirmation_after_unsubscribe() {
2313        // Simulates: User subscribes, then unsubscribes before subscribe ACK arrives.
2314        // The late subscribe ACK should be ignored.
2315        let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER); // Bybit uses dot delimiter
2316
2317        let topic = "tickers.ETHUSDT";
2318
2319        // User subscribes
2320        subscriptions.mark_subscribe(topic);
2321        assert_eq!(subscriptions.pending_subscribe_topics(), vec![topic]);
2322
2323        // User immediately unsubscribes (before subscribe ACK)
2324        subscriptions.mark_unsubscribe(topic);
2325        assert!(subscriptions.pending_subscribe_topics().is_empty()); // Cleared
2326        assert_eq!(subscriptions.pending_unsubscribe_topics(), vec![topic]);
2327
2328        // Late subscribe confirmation arrives - should be IGNORED
2329        subscriptions.confirm_subscribe(topic);
2330        assert_eq!(subscriptions.len(), 0); // Not added to confirmed
2331        assert_eq!(subscriptions.pending_unsubscribe_topics(), vec![topic]);
2332
2333        // Unsubscribe ACK arrives
2334        subscriptions.confirm_unsubscribe(topic);
2335
2336        // Final state: completely empty
2337        assert!(subscriptions.is_empty());
2338        assert!(subscriptions.all_topics().is_empty());
2339    }
2340
2341    #[rstest]
2342    fn test_race_reconnection_with_pending_states() {
2343        // Simulates reconnection with mixed pending states.
2344        let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER); // Bybit uses dot delimiter
2345
2346        // Set up mixed state before reconnection
2347        // Confirmed: publicTrade.BTCUSDT
2348        let trade_btc = "publicTrade.BTCUSDT";
2349        subscriptions.mark_subscribe(trade_btc);
2350        subscriptions.confirm_subscribe(trade_btc);
2351
2352        // Pending subscribe: publicTrade.ETHUSDT
2353        let trade_eth = "publicTrade.ETHUSDT";
2354        subscriptions.mark_subscribe(trade_eth);
2355
2356        // Pending unsubscribe: orderbook.50.BTCUSDT (user cancelled)
2357        let book_btc = "orderbook.50.BTCUSDT";
2358        subscriptions.mark_subscribe(book_btc);
2359        subscriptions.confirm_subscribe(book_btc);
2360        subscriptions.mark_unsubscribe(book_btc);
2361
2362        // Get topics for reconnection
2363        let topics_to_restore = subscriptions.all_topics();
2364
2365        // Should include: confirmed + pending_subscribe (NOT pending_unsubscribe)
2366        assert_eq!(topics_to_restore.len(), 2);
2367        assert!(topics_to_restore.contains(&trade_btc.to_string()));
2368        assert!(topics_to_restore.contains(&trade_eth.to_string()));
2369        assert!(!topics_to_restore.contains(&book_btc.to_string())); // Excluded
2370    }
2371
2372    #[rstest]
2373    fn test_race_duplicate_subscribe_messages_idempotent() {
2374        // Simulates duplicate subscribe requests (e.g., from reconnection logic).
2375        // The subscription tracker should be idempotent and not create duplicate state.
2376        let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER); // Bybit uses dot delimiter
2377
2378        let topic = "publicTrade.BTCUSDT";
2379
2380        // Subscribe and confirm
2381        subscriptions.mark_subscribe(topic);
2382        subscriptions.confirm_subscribe(topic);
2383        assert_eq!(subscriptions.len(), 1);
2384
2385        // Duplicate mark_subscribe on already-confirmed topic (should be no-op)
2386        subscriptions.mark_subscribe(topic);
2387        assert!(subscriptions.pending_subscribe_topics().is_empty()); // Not re-added
2388        assert_eq!(subscriptions.len(), 1); // Still just 1
2389
2390        // Duplicate confirm_subscribe (should be idempotent)
2391        subscriptions.confirm_subscribe(topic);
2392        assert_eq!(subscriptions.len(), 1);
2393
2394        // Verify final state
2395        let all = subscriptions.all_topics();
2396        assert_eq!(all.len(), 1);
2397        assert_eq!(all[0], topic);
2398    }
2399
2400    #[rstest]
2401    #[case::spot_with_leverage(BybitProductType::Spot, true, Some(1))]
2402    #[case::spot_without_leverage(BybitProductType::Spot, false, Some(0))]
2403    #[case::linear_with_leverage(BybitProductType::Linear, true, None)]
2404    #[case::linear_without_leverage(BybitProductType::Linear, false, None)]
2405    #[case::inverse_with_leverage(BybitProductType::Inverse, true, None)]
2406    #[case::option_with_leverage(BybitProductType::Option, true, None)]
2407    fn test_is_leverage_parameter(
2408        #[case] product_type: BybitProductType,
2409        #[case] is_leverage: bool,
2410        #[case] expected: Option<i32>,
2411    ) {
2412        let symbol = match product_type {
2413            BybitProductType::Spot => "BTCUSDT-SPOT.BYBIT",
2414            BybitProductType::Linear => "ETHUSDT-LINEAR.BYBIT",
2415            BybitProductType::Inverse => "BTCUSD-INVERSE.BYBIT",
2416            BybitProductType::Option => "BTC-31MAY24-50000-C-OPTION.BYBIT",
2417        };
2418
2419        let instrument_id = InstrumentId::from(symbol);
2420        let client_order_id = ClientOrderId::from("test-order-1");
2421        let quantity = Quantity::from("1.0");
2422
2423        let client = BybitWebSocketClient::new_trade(
2424            BybitEnvironment::Testnet,
2425            Some("test-key".to_string()),
2426            Some("test-secret".to_string()),
2427            None,
2428            Some(20),
2429        );
2430
2431        let params = client
2432            .build_place_order_params(
2433                product_type,
2434                instrument_id,
2435                client_order_id,
2436                OrderSide::Buy,
2437                OrderType::Limit,
2438                quantity,
2439                false, // is_quote_quantity
2440                Some(TimeInForce::Gtc),
2441                Some(Price::from("50000.0")),
2442                None,
2443                None,
2444                None,
2445                is_leverage,
2446                None, // take_profit
2447                None, // stop_loss
2448            )
2449            .expect("Failed to build params");
2450
2451        assert_eq!(params.is_leverage, expected);
2452    }
2453
2454    #[rstest]
2455    #[case::spot_market_quote_quantity(BybitProductType::Spot, OrderType::Market, true, Some(BYBIT_QUOTE_COIN.to_string()))]
2456    #[case::spot_market_base_quantity(BybitProductType::Spot, OrderType::Market, false, Some(BYBIT_BASE_COIN.to_string()))]
2457    #[case::spot_limit_no_unit(BybitProductType::Spot, OrderType::Limit, false, None)]
2458    #[case::spot_limit_quote(BybitProductType::Spot, OrderType::Limit, true, None)]
2459    #[case::linear_market_no_unit(BybitProductType::Linear, OrderType::Market, false, None)]
2460    #[case::inverse_market_no_unit(BybitProductType::Inverse, OrderType::Market, true, None)]
2461    fn test_is_quote_quantity_parameter(
2462        #[case] product_type: BybitProductType,
2463        #[case] order_type: OrderType,
2464        #[case] is_quote_quantity: bool,
2465        #[case] expected: Option<String>,
2466    ) {
2467        let symbol = match product_type {
2468            BybitProductType::Spot => "BTCUSDT-SPOT.BYBIT",
2469            BybitProductType::Linear => "ETHUSDT-LINEAR.BYBIT",
2470            BybitProductType::Inverse => "BTCUSD-INVERSE.BYBIT",
2471            BybitProductType::Option => "BTC-31MAY24-50000-C-OPTION.BYBIT",
2472        };
2473
2474        let instrument_id = InstrumentId::from(symbol);
2475        let client_order_id = ClientOrderId::from("test-order-1");
2476        let quantity = Quantity::from("1.0");
2477
2478        let client = BybitWebSocketClient::new_trade(
2479            BybitEnvironment::Testnet,
2480            Some("test-key".to_string()),
2481            Some("test-secret".to_string()),
2482            None,
2483            Some(20),
2484        );
2485
2486        let params = client
2487            .build_place_order_params(
2488                product_type,
2489                instrument_id,
2490                client_order_id,
2491                OrderSide::Buy,
2492                order_type,
2493                quantity,
2494                is_quote_quantity,
2495                Some(TimeInForce::Gtc),
2496                if order_type == OrderType::Market {
2497                    None
2498                } else {
2499                    Some(Price::from("50000.0"))
2500                },
2501                None,
2502                None,
2503                None,
2504                false,
2505                None, // take_profit
2506                None, // stop_loss
2507            )
2508            .expect("Failed to build params");
2509
2510        assert_eq!(params.market_unit, expected);
2511    }
2512}