nautilus_bybit/websocket/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket message handler for Bybit.
17
18use std::{
19    collections::VecDeque,
20    num::NonZero,
21    sync::{
22        Arc,
23        atomic::{AtomicBool, AtomicU8, Ordering},
24    },
25};
26
27use ahash::AHashMap;
28use dashmap::DashMap;
29use nautilus_common::cache::quote::QuoteCache;
30use nautilus_core::{UUID4, nanos::UnixNanos, time::get_atomic_clock_realtime};
31use nautilus_model::{
32    data::{BarSpecification, BarType, Data},
33    enums::{AggregationSource, BarAggregation, PriceType},
34    events::{OrderCancelRejected, OrderModifyRejected, OrderRejected},
35    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
36    instruments::{Instrument, InstrumentAny},
37};
38use nautilus_network::{
39    retry::{RetryManager, create_websocket_retry_manager},
40    websocket::{AuthTracker, SubscriptionState, WebSocketClient},
41};
42use tokio_tungstenite::tungstenite::Message;
43use ustr::Ustr;
44
45use super::{
46    enums::BybitWsOperation,
47    error::{BybitWsError, create_bybit_timeout_error, should_retry_bybit_error},
48    messages::{
49        BybitWebSocketError, BybitWsAuthResponse, BybitWsHeader, BybitWsMessage, BybitWsRequest,
50        BybitWsResponse, BybitWsSubscriptionMsg, NautilusWsMessage,
51    },
52    parse::{
53        parse_kline_topic, parse_millis_i64, parse_orderbook_deltas, parse_orderbook_quote,
54        parse_ticker_linear_funding, parse_ws_account_state, parse_ws_fill_report,
55        parse_ws_kline_bar, parse_ws_order_status_report, parse_ws_position_status_report,
56        parse_ws_trade_tick,
57    },
58};
59use crate::{
60    common::{
61        consts::{
62            BYBIT_NAUTILUS_BROKER_ID, BYBIT_TOPIC_EXECUTION, BYBIT_TOPIC_KLINE, BYBIT_TOPIC_ORDER,
63            BYBIT_TOPIC_ORDERBOOK, BYBIT_TOPIC_POSITION, BYBIT_TOPIC_PUBLIC_TRADE,
64            BYBIT_TOPIC_TICKERS, BYBIT_TOPIC_TRADE, BYBIT_TOPIC_WALLET,
65        },
66        enums::{BybitProductType, BybitTimeInForce, BybitWsOrderRequestOp},
67        parse::{make_bybit_symbol, parse_price_with_precision, parse_quantity_with_precision},
68    },
69    websocket::messages::{
70        BybitBatchOrderError, BybitWsAmendOrderParams, BybitWsCancelOrderParams,
71        BybitWsOrderResponse, BybitWsPlaceOrderParams,
72    },
73};
74
75/// Commands sent from the outer client to the inner message handler.
76#[derive(Debug)]
77#[allow(
78    clippy::large_enum_variant,
79    reason = "Commands are ephemeral and immediately consumed"
80)]
81pub enum HandlerCommand {
82    SetClient(WebSocketClient),
83    Disconnect,
84    Authenticate {
85        payload: String,
86    },
87    Subscribe {
88        topics: Vec<String>,
89    },
90    Unsubscribe {
91        topics: Vec<String>,
92    },
93    SendText {
94        payload: String,
95    },
96    PlaceOrder {
97        params: BybitWsPlaceOrderParams,
98        client_order_id: ClientOrderId,
99        trader_id: TraderId,
100        strategy_id: StrategyId,
101        instrument_id: InstrumentId,
102    },
103    AmendOrder {
104        params: BybitWsAmendOrderParams,
105        client_order_id: ClientOrderId,
106        trader_id: TraderId,
107        strategy_id: StrategyId,
108        instrument_id: InstrumentId,
109        venue_order_id: Option<VenueOrderId>,
110    },
111    CancelOrder {
112        params: BybitWsCancelOrderParams,
113        client_order_id: ClientOrderId,
114        trader_id: TraderId,
115        strategy_id: StrategyId,
116        instrument_id: InstrumentId,
117        venue_order_id: Option<VenueOrderId>,
118    },
119    RegisterBatchPlace {
120        req_id: String,
121        orders: Vec<BatchOrderData>,
122    },
123    RegisterBatchCancel {
124        req_id: String,
125        cancels: Vec<BatchCancelData>,
126    },
127    InitializeInstruments(Vec<InstrumentAny>),
128    UpdateInstrument(InstrumentAny),
129}
130
131/// Type alias for the funding rate cache.
132type FundingCache = Arc<tokio::sync::RwLock<AHashMap<Ustr, (Option<String>, Option<String>)>>>;
133
134/// Data cached for pending place requests to correlate with responses.
135type PlaceRequestData = (ClientOrderId, TraderId, StrategyId, InstrumentId);
136
137/// Data cached for pending cancel requests to correlate with responses.
138type CancelRequestData = (
139    ClientOrderId,
140    TraderId,
141    StrategyId,
142    InstrumentId,
143    Option<VenueOrderId>,
144);
145
146/// Data cached for pending amend requests to correlate with responses.
147type AmendRequestData = (
148    ClientOrderId,
149    TraderId,
150    StrategyId,
151    InstrumentId,
152    Option<VenueOrderId>,
153);
154
155/// Data for a single order in a batch request.
156type BatchOrderData = (ClientOrderId, PlaceRequestData);
157
158/// Data for a single cancel in a batch request.
159type BatchCancelData = (ClientOrderId, CancelRequestData);
160
161pub(super) struct FeedHandler {
162    signal: Arc<AtomicBool>,
163    client: Option<WebSocketClient>,
164    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
165    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
166    out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
167    auth_tracker: AuthTracker,
168    subscriptions: SubscriptionState,
169    instruments_cache: AHashMap<Ustr, InstrumentAny>,
170    account_id: Option<AccountId>,
171    mm_level: Arc<AtomicU8>,
172    product_type: Option<BybitProductType>,
173    bars_timestamp_on_close: bool,
174    quote_cache: QuoteCache,
175    funding_cache: FundingCache,
176    retry_manager: RetryManager<BybitWsError>,
177    pending_place_requests: DashMap<String, PlaceRequestData>,
178    pending_cancel_requests: DashMap<String, CancelRequestData>,
179    pending_amend_requests: DashMap<String, AmendRequestData>,
180    pending_batch_place_requests: DashMap<String, Vec<BatchOrderData>>,
181    pending_batch_cancel_requests: DashMap<String, Vec<BatchCancelData>>,
182    message_queue: VecDeque<NautilusWsMessage>,
183}
184
185impl FeedHandler {
186    /// Creates a new [`FeedHandler`] instance.
187    #[allow(clippy::too_many_arguments)]
188    pub(super) fn new(
189        signal: Arc<AtomicBool>,
190        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
191        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
192        out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
193        account_id: Option<AccountId>,
194        product_type: Option<BybitProductType>,
195        bars_timestamp_on_close: bool,
196        mm_level: Arc<AtomicU8>,
197        auth_tracker: AuthTracker,
198        subscriptions: SubscriptionState,
199        funding_cache: FundingCache,
200    ) -> Self {
201        Self {
202            signal,
203            client: None,
204            cmd_rx,
205            raw_rx,
206            out_tx,
207            auth_tracker,
208            subscriptions,
209            instruments_cache: AHashMap::new(),
210            account_id,
211            mm_level,
212            product_type,
213            bars_timestamp_on_close,
214            quote_cache: QuoteCache::new(),
215            funding_cache,
216            retry_manager: create_websocket_retry_manager(),
217            pending_place_requests: DashMap::new(),
218            pending_cancel_requests: DashMap::new(),
219            pending_amend_requests: DashMap::new(),
220            pending_batch_place_requests: DashMap::new(),
221            pending_batch_cancel_requests: DashMap::new(),
222            message_queue: VecDeque::new(),
223        }
224    }
225
226    pub(super) fn is_stopped(&self) -> bool {
227        self.signal.load(Ordering::Relaxed)
228    }
229
230    #[allow(dead_code)]
231    pub(super) fn send(&self, msg: NautilusWsMessage) -> Result<(), ()> {
232        self.out_tx.send(msg).map_err(|_| ())
233    }
234
235    fn generate_unique_request_id(&self) -> String {
236        UUID4::new().to_string()
237    }
238
239    fn find_and_remove_place_request_by_client_order_id(
240        &self,
241        client_order_id: &ClientOrderId,
242    ) -> Option<(String, PlaceRequestData)> {
243        self.pending_place_requests
244            .iter()
245            .find(|entry| entry.value().0 == *client_order_id)
246            .and_then(|entry| {
247                let key = entry.key().clone();
248                drop(entry);
249                self.pending_place_requests.remove(&key)
250            })
251    }
252
253    fn find_and_remove_cancel_request_by_client_order_id(
254        &self,
255        client_order_id: &ClientOrderId,
256    ) -> Option<(String, CancelRequestData)> {
257        self.pending_cancel_requests
258            .iter()
259            .find(|entry| entry.value().0 == *client_order_id)
260            .and_then(|entry| {
261                let key = entry.key().clone();
262                drop(entry);
263                self.pending_cancel_requests.remove(&key)
264            })
265    }
266
267    fn find_and_remove_amend_request_by_client_order_id(
268        &self,
269        client_order_id: &ClientOrderId,
270    ) -> Option<(String, AmendRequestData)> {
271        self.pending_amend_requests
272            .iter()
273            .find(|entry| entry.value().0 == *client_order_id)
274            .and_then(|entry| {
275                let key = entry.key().clone();
276                drop(entry);
277                self.pending_amend_requests.remove(&key)
278            })
279    }
280
281    fn include_referer_header(&self, time_in_force: Option<BybitTimeInForce>) -> bool {
282        let is_post_only = matches!(time_in_force, Some(BybitTimeInForce::PostOnly));
283        let mm_level = self.mm_level.load(Ordering::Relaxed);
284        !(is_post_only && mm_level > 0)
285    }
286
287    /// Sends a WebSocket message with retry logic.
288    async fn send_with_retry(&self, payload: String) -> Result<(), BybitWsError> {
289        if let Some(client) = &self.client {
290            self.retry_manager
291                .execute_with_retry(
292                    "websocket_send",
293                    || {
294                        let payload = payload.clone();
295                        async move {
296                            client
297                                .send_text(payload, None)
298                                .await
299                                .map_err(|e| BybitWsError::Transport(format!("Send failed: {e}")))
300                        }
301                    },
302                    should_retry_bybit_error,
303                    create_bybit_timeout_error,
304                )
305                .await
306        } else {
307            Err(BybitWsError::ClientError(
308                "No active WebSocket client".to_string(),
309            ))
310        }
311    }
312
313    /// Handles batch operation request-level failures (ret_code != 0).
314    ///
315    /// When a batch request fails entirely, generate rejection events for all orders
316    /// in the batch and clean up tracking data.
317    fn handle_batch_failure(
318        &self,
319        req_id: &str,
320        ret_msg: &str,
321        op: &str,
322        ts_init: UnixNanos,
323        result: &mut Vec<NautilusWsMessage>,
324    ) {
325        if op.contains("create") {
326            if let Some((_, batch_data)) = self.pending_batch_place_requests.remove(req_id) {
327                tracing::warn!(
328                    req_id = %req_id,
329                    ret_msg = %ret_msg,
330                    num_orders = batch_data.len(),
331                    "Batch place request failed"
332                );
333
334                let Some(account_id) = self.account_id else {
335                    tracing::error!("Cannot create OrderRejected events: account_id is None");
336                    return;
337                };
338
339                let reason = Ustr::from(ret_msg);
340                for (client_order_id, (_, trader_id, strategy_id, instrument_id)) in batch_data {
341                    let rejected = OrderRejected::new(
342                        trader_id,
343                        strategy_id,
344                        instrument_id,
345                        client_order_id,
346                        account_id,
347                        reason,
348                        UUID4::new(),
349                        ts_init,
350                        ts_init,
351                        false,
352                        false,
353                    );
354                    result.push(NautilusWsMessage::OrderRejected(rejected));
355                }
356            }
357        } else if op.contains("cancel")
358            && let Some((_, batch_data)) = self.pending_batch_cancel_requests.remove(req_id)
359        {
360            tracing::warn!(
361                req_id = %req_id,
362                ret_msg = %ret_msg,
363                num_cancels = batch_data.len(),
364                "Batch cancel request failed"
365            );
366
367            let reason = Ustr::from(ret_msg);
368            for (client_order_id, (_, trader_id, strategy_id, instrument_id, venue_order_id)) in
369                batch_data
370            {
371                let rejected = OrderCancelRejected::new(
372                    trader_id,
373                    strategy_id,
374                    instrument_id,
375                    client_order_id,
376                    reason,
377                    UUID4::new(),
378                    ts_init,
379                    ts_init,
380                    false,
381                    venue_order_id,
382                    self.account_id,
383                );
384                result.push(NautilusWsMessage::OrderCancelRejected(rejected));
385            }
386        }
387    }
388
389    /// Handles batch operation responses, checking for individual order failures.
390    fn handle_batch_response(
391        &self,
392        resp: &BybitWsOrderResponse,
393        result: &mut Vec<NautilusWsMessage>,
394    ) {
395        let Some(req_id) = &resp.req_id else {
396            tracing::warn!(
397                op = %resp.op,
398                "Batch response missing req_id - cannot correlate with pending requests"
399            );
400            return;
401        };
402
403        let batch_errors = resp.extract_batch_errors();
404
405        if resp.op.contains("create") {
406            if let Some((_, batch_data)) = self.pending_batch_place_requests.remove(req_id) {
407                self.process_batch_place_errors(batch_data, batch_errors, result);
408            } else {
409                tracing::debug!(
410                    req_id = %req_id,
411                    "Batch place response received but no pending request found"
412                );
413            }
414        } else if resp.op.contains("cancel") {
415            if let Some((_, batch_data)) = self.pending_batch_cancel_requests.remove(req_id) {
416                self.process_batch_cancel_errors(batch_data, batch_errors, result);
417            } else {
418                tracing::debug!(
419                    req_id = %req_id,
420                    "Batch cancel response received but no pending request found"
421                );
422            }
423        }
424    }
425
426    /// Processes individual order errors from a batch place operation.
427    fn process_batch_place_errors(
428        &self,
429        batch_data: Vec<BatchOrderData>,
430        errors: Vec<BybitBatchOrderError>,
431        result: &mut Vec<NautilusWsMessage>,
432    ) {
433        let Some(account_id) = self.account_id else {
434            tracing::error!("Cannot create OrderRejected events: account_id is None");
435            return;
436        };
437
438        let clock = get_atomic_clock_realtime();
439        let ts_init = clock.get_time_ns();
440
441        for (idx, (client_order_id, (_, trader_id, strategy_id, instrument_id))) in
442            batch_data.into_iter().enumerate()
443        {
444            if let Some(error) = errors.get(idx)
445                && error.code != 0
446            {
447                tracing::warn!(
448                    client_order_id = %client_order_id,
449                    error_code = error.code,
450                    error_msg = %error.msg,
451                    "Batch order rejected"
452                );
453
454                let rejected = OrderRejected::new(
455                    trader_id,
456                    strategy_id,
457                    instrument_id,
458                    client_order_id,
459                    account_id,
460                    Ustr::from(&error.msg),
461                    UUID4::new(),
462                    ts_init,
463                    ts_init,
464                    false,
465                    false,
466                );
467                result.push(NautilusWsMessage::OrderRejected(rejected));
468            }
469        }
470    }
471
472    /// Processes individual order errors from a batch cancel operation.
473    fn process_batch_cancel_errors(
474        &self,
475        batch_data: Vec<BatchCancelData>,
476        errors: Vec<BybitBatchOrderError>,
477        result: &mut Vec<NautilusWsMessage>,
478    ) {
479        let clock = get_atomic_clock_realtime();
480        let ts_init = clock.get_time_ns();
481
482        for (idx, (client_order_id, (_, trader_id, strategy_id, instrument_id, venue_order_id))) in
483            batch_data.into_iter().enumerate()
484        {
485            if let Some(error) = errors.get(idx)
486                && error.code != 0
487            {
488                tracing::warn!(
489                    client_order_id = %client_order_id,
490                    error_code = error.code,
491                    error_msg = %error.msg,
492                    "Batch cancel rejected"
493                );
494
495                let rejected = OrderCancelRejected::new(
496                    trader_id,
497                    strategy_id,
498                    instrument_id,
499                    client_order_id,
500                    Ustr::from(&error.msg),
501                    UUID4::new(),
502                    ts_init,
503                    ts_init,
504                    false,
505                    venue_order_id,
506                    self.account_id,
507                );
508                result.push(NautilusWsMessage::OrderCancelRejected(rejected));
509            }
510        }
511    }
512
513    pub(super) async fn next(&mut self) -> Option<NautilusWsMessage> {
514        let clock = get_atomic_clock_realtime();
515
516        loop {
517            if let Some(msg) = self.message_queue.pop_front() {
518                return Some(msg);
519            }
520
521            tokio::select! {
522                Some(cmd) = self.cmd_rx.recv() => {
523                    match cmd {
524                        HandlerCommand::SetClient(client) => {
525                            tracing::debug!("WebSocketClient received by handler");
526                            self.client = Some(client);
527                        }
528                        HandlerCommand::Disconnect => {
529                            tracing::debug!("Disconnect command received");
530
531                            if let Some(client) = self.client.take() {
532                                client.disconnect().await;
533                            }
534                        }
535                        HandlerCommand::Authenticate { payload } => {
536                            tracing::debug!("Authenticate command received");
537                            if let Err(e) = self.send_with_retry(payload).await {
538                                tracing::error!("Failed to send authentication after retries: {e}");
539                            }
540                        }
541                        HandlerCommand::Subscribe { topics } => {
542                            for topic in topics {
543                                tracing::debug!(topic = %topic, "Subscribing to topic");
544                                if let Err(e) = self.send_with_retry(topic.clone()).await {
545                                    tracing::error!(topic = %topic, error = %e, "Failed to send subscription after retries");
546                                }
547                            }
548                        }
549                        HandlerCommand::Unsubscribe { topics } => {
550                            for topic in topics {
551                                tracing::debug!(topic = %topic, "Unsubscribing from topic");
552                                if let Err(e) = self.send_with_retry(topic.clone()).await {
553                                    tracing::error!(topic = %topic, error = %e, "Failed to send unsubscription after retries");
554                                }
555                            }
556                        }
557                        HandlerCommand::SendText { payload } => {
558                            if let Err(e) = self.send_with_retry(payload).await {
559                                tracing::error!("Error sending text with retry: {e}");
560                            }
561                        }
562                        HandlerCommand::InitializeInstruments(instruments) => {
563                            for inst in instruments {
564                                self.instruments_cache.insert(inst.symbol().inner(), inst);
565                            }
566                        }
567                        HandlerCommand::UpdateInstrument(inst) => {
568                            self.instruments_cache.insert(inst.symbol().inner(), inst);
569                        }
570                        HandlerCommand::RegisterBatchPlace { req_id, orders } => {
571                            tracing::debug!(
572                                req_id = %req_id,
573                                num_orders = orders.len(),
574                                "Registering batch place request"
575                            );
576                            self.pending_batch_place_requests.insert(req_id, orders);
577                        }
578                        HandlerCommand::RegisterBatchCancel { req_id, cancels } => {
579                            tracing::debug!(
580                                req_id = %req_id,
581                                num_cancels = cancels.len(),
582                                "Registering batch cancel request"
583                            );
584                            self.pending_batch_cancel_requests.insert(req_id, cancels);
585                        }
586                        HandlerCommand::PlaceOrder {
587                            params,
588                            client_order_id,
589                            trader_id,
590                            strategy_id,
591                            instrument_id,
592                        } => {
593                            let request_id = self.generate_unique_request_id();
594
595                            self.pending_place_requests.insert(
596                                request_id.clone(),
597                                (client_order_id, trader_id, strategy_id, instrument_id),
598                            );
599
600                            let referer = if self.include_referer_header(params.time_in_force) {
601                                Some(BYBIT_NAUTILUS_BROKER_ID.to_string())
602                            } else {
603                                None
604                            };
605
606                            let request = BybitWsRequest {
607                                req_id: Some(request_id.clone()),
608                                op: BybitWsOrderRequestOp::Create,
609                                header: BybitWsHeader::with_referer(referer),
610                                args: vec![params],
611                            };
612
613                            if let Ok(payload) = serde_json::to_string(&request)
614                                && let Err(e) = self.send_with_retry(payload).await
615                            {
616                                tracing::error!("Failed to send place order after retries: {e}");
617                                self.pending_place_requests.remove(&request_id);
618                            }
619                        }
620                        HandlerCommand::AmendOrder {
621                            params,
622                            client_order_id,
623                            trader_id,
624                            strategy_id,
625                            instrument_id,
626                            venue_order_id,
627                        } => {
628                            let request_id = self.generate_unique_request_id();
629
630                            self.pending_amend_requests.insert(
631                                request_id.clone(),
632                                (client_order_id, trader_id, strategy_id, instrument_id, venue_order_id),
633                            );
634
635                            let request = BybitWsRequest {
636                                req_id: Some(request_id.clone()),
637                                op: BybitWsOrderRequestOp::Amend,
638                                header: BybitWsHeader::now(),
639                                args: vec![params],
640                            };
641
642                            if let Ok(payload) = serde_json::to_string(&request)
643                                && let Err(e) = self.send_with_retry(payload).await
644                            {
645                                tracing::error!("Failed to send amend order after retries: {e}");
646                                self.pending_amend_requests.remove(&request_id);
647                            }
648                        }
649                        HandlerCommand::CancelOrder {
650                            params,
651                            client_order_id,
652                            trader_id,
653                            strategy_id,
654                            instrument_id,
655                            venue_order_id,
656                        } => {
657                            let request_id = self.generate_unique_request_id();
658
659                            self.pending_cancel_requests.insert(
660                                request_id.clone(),
661                                (client_order_id, trader_id, strategy_id, instrument_id, venue_order_id),
662                            );
663
664                            let request = BybitWsRequest {
665                                req_id: Some(request_id.clone()),
666                                op: BybitWsOrderRequestOp::Cancel,
667                                header: BybitWsHeader::now(),
668                                args: vec![params],
669                            };
670
671                            if let Ok(payload) = serde_json::to_string(&request)
672                                && let Err(e) = self.send_with_retry(payload).await
673                            {
674                                tracing::error!("Failed to send cancel order after retries: {e}");
675                                self.pending_cancel_requests.remove(&request_id);
676                            }
677                        }
678                    }
679
680                    continue;
681                }
682
683                _ = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
684                    if self.signal.load(Ordering::Relaxed) {
685                        tracing::debug!("Stop signal received during idle period");
686                        return None;
687                    }
688                    continue;
689                }
690
691                msg = self.raw_rx.recv() => {
692                    let msg = match msg {
693                        Some(msg) => msg,
694                        None => {
695                            tracing::debug!("WebSocket stream closed");
696                            return None;
697                        }
698                    };
699
700                    if let Message::Ping(data) = &msg {
701                        tracing::trace!("Received ping frame with {} bytes", data.len());
702
703                        if let Some(client) = &self.client
704                            && let Err(e) = client.send_pong(data.to_vec()).await
705                        {
706                            tracing::warn!(error = %e, "Failed to send pong frame");
707                        }
708                        continue;
709                    }
710
711                    let event = match Self::parse_raw_message(msg) {
712                        Some(event) => event,
713                        None => continue,
714                    };
715
716                    if self.signal.load(Ordering::Relaxed) {
717                        tracing::debug!("Stop signal received");
718                        return None;
719                    }
720
721                    let ts_init = clock.get_time_ns();
722                    let instruments = self.instruments_cache.clone();
723                    let funding_cache = Arc::clone(&self.funding_cache);
724                    let nautilus_messages = self.parse_to_nautilus_messages(
725                        event,
726                        &instruments,
727                        self.account_id,
728                        self.product_type,
729                        &funding_cache,
730                        ts_init,
731                    )
732                    .await;
733
734                    // Enqueue all parsed messages to emit them one by one
735                    self.message_queue.extend(nautilus_messages);
736                }
737            }
738        }
739    }
740
741    fn parse_raw_message(msg: Message) -> Option<BybitWsMessage> {
742        use serde_json::Value;
743
744        match msg {
745            Message::Text(text) => {
746                if text == nautilus_network::RECONNECTED {
747                    tracing::info!("Received WebSocket reconnected signal");
748                    return Some(BybitWsMessage::Reconnected);
749                }
750
751                if text.trim().eq_ignore_ascii_case("pong") {
752                    return None;
753                }
754
755                tracing::trace!("Raw websocket message: {text}");
756
757                let value: Value = match serde_json::from_str(&text) {
758                    Ok(v) => v,
759                    Err(e) => {
760                        tracing::error!("Failed to parse WebSocket message: {e}: {text}");
761                        return None;
762                    }
763                };
764
765                Some(classify_bybit_message(value))
766            }
767            Message::Binary(msg) => {
768                tracing::debug!("Raw binary: {msg:?}");
769                None
770            }
771            Message::Close(_) => {
772                tracing::debug!("Received close message, waiting for reconnection");
773                None
774            }
775            _ => None,
776        }
777    }
778
779    #[allow(clippy::too_many_arguments)]
780    async fn parse_to_nautilus_messages(
781        &mut self,
782        msg: BybitWsMessage,
783        instruments: &AHashMap<Ustr, InstrumentAny>,
784        account_id: Option<AccountId>,
785        product_type: Option<BybitProductType>,
786        funding_cache: &FundingCache,
787        ts_init: UnixNanos,
788    ) -> Vec<NautilusWsMessage> {
789        let mut result = Vec::new();
790
791        match msg {
792            BybitWsMessage::Orderbook(msg) => {
793                let raw_symbol = msg.data.s;
794                let symbol =
795                    product_type.map_or(raw_symbol, |pt| make_bybit_symbol(raw_symbol, pt));
796
797                if let Some(instrument) = instruments.get(&symbol) {
798                    match parse_orderbook_deltas(&msg, instrument, ts_init) {
799                        Ok(deltas) => result.push(NautilusWsMessage::Deltas(deltas)),
800                        Err(e) => tracing::error!("Error parsing orderbook deltas: {e}"),
801                    }
802
803                    // For depth=1 subscriptions, also emit QuoteTick from top-of-book
804                    if let Some(depth_str) = msg.topic.as_str().split('.').nth(1)
805                        && depth_str == "1"
806                    {
807                        let instrument_id = instrument.id();
808                        let last_quote = self.quote_cache.get(&instrument_id);
809
810                        match parse_orderbook_quote(&msg, instrument, last_quote, ts_init) {
811                            Ok(quote) => {
812                                self.quote_cache.insert(instrument_id, quote);
813                                result.push(NautilusWsMessage::Data(vec![Data::Quote(quote)]));
814                            }
815                            Err(e) => tracing::debug!("Skipping orderbook quote: {e}"),
816                        }
817                    }
818                } else {
819                    tracing::debug!(raw_symbol = %raw_symbol, full_symbol = %symbol, "No instrument found for symbol in Orderbook message");
820                }
821            }
822            BybitWsMessage::Trade(msg) => {
823                let mut data_vec = Vec::new();
824                for trade in &msg.data {
825                    let raw_symbol = trade.s;
826                    let symbol =
827                        product_type.map_or(raw_symbol, |pt| make_bybit_symbol(raw_symbol, pt));
828
829                    if let Some(instrument) = instruments.get(&symbol) {
830                        match parse_ws_trade_tick(trade, instrument, ts_init) {
831                            Ok(tick) => data_vec.push(Data::Trade(tick)),
832                            Err(e) => tracing::error!("Error parsing trade tick: {e}"),
833                        }
834                    } else {
835                        tracing::debug!(raw_symbol = %raw_symbol, full_symbol = %symbol, "No instrument found for symbol in Trade message");
836                    }
837                }
838
839                if !data_vec.is_empty() {
840                    result.push(NautilusWsMessage::Data(data_vec));
841                }
842            }
843            BybitWsMessage::Kline(msg) => {
844                let (interval_str, raw_symbol) = match parse_kline_topic(&msg.topic) {
845                    Ok(parts) => parts,
846                    Err(e) => {
847                        tracing::warn!("Failed to parse kline topic: {e}");
848                        return result;
849                    }
850                };
851
852                let symbol = product_type
853                    .map_or_else(|| raw_symbol.into(), |pt| make_bybit_symbol(raw_symbol, pt));
854
855                if let Some(instrument) = instruments.get(&symbol) {
856                    let (step, aggregation) = match interval_str.parse::<usize>() {
857                        Ok(minutes) if minutes > 0 => (minutes, BarAggregation::Minute),
858                        _ => {
859                            tracing::warn!("Unsupported kline interval: {}", interval_str);
860                            return result;
861                        }
862                    };
863
864                    if let Some(non_zero_step) = NonZero::new(step) {
865                        let bar_spec = BarSpecification {
866                            step: non_zero_step,
867                            aggregation,
868                            price_type: PriceType::Last,
869                        };
870                        let bar_type =
871                            BarType::new(instrument.id(), bar_spec, AggregationSource::External);
872
873                        let mut data_vec = Vec::new();
874                        for kline in &msg.data {
875                            // Only process confirmed bars (not partial/building bars)
876                            if !kline.confirm {
877                                continue;
878                            }
879                            match parse_ws_kline_bar(
880                                kline,
881                                instrument,
882                                bar_type,
883                                self.bars_timestamp_on_close,
884                                ts_init,
885                            ) {
886                                Ok(bar) => data_vec.push(Data::Bar(bar)),
887                                Err(e) => tracing::error!("Error parsing kline to bar: {e}"),
888                            }
889                        }
890                        if !data_vec.is_empty() {
891                            result.push(NautilusWsMessage::Data(data_vec));
892                        }
893                    } else {
894                        tracing::error!("Invalid step value: {}", step);
895                    }
896                } else {
897                    tracing::debug!(raw_symbol = %raw_symbol, full_symbol = %symbol, "No instrument found for symbol in Kline message");
898                }
899            }
900            BybitWsMessage::TickerLinear(msg) => {
901                let raw_symbol = msg.data.symbol;
902                let symbol =
903                    product_type.map_or(raw_symbol, |pt| make_bybit_symbol(raw_symbol, pt));
904
905                if let Some(instrument) = instruments.get(&symbol) {
906                    let instrument_id = instrument.id();
907                    let ts_event = parse_millis_i64(msg.ts, "ticker.ts").unwrap_or(ts_init);
908                    let price_precision = instrument.price_precision();
909                    let size_precision = instrument.size_precision();
910
911                    // Parse Bybit linear ticker fields, propagate errors
912                    let bid_price = msg
913                        .data
914                        .bid1_price
915                        .as_deref()
916                        .map(|s| parse_price_with_precision(s, price_precision, "bid1Price"))
917                        .transpose();
918                    let ask_price = msg
919                        .data
920                        .ask1_price
921                        .as_deref()
922                        .map(|s| parse_price_with_precision(s, price_precision, "ask1Price"))
923                        .transpose();
924                    let bid_size = msg
925                        .data
926                        .bid1_size
927                        .as_deref()
928                        .map(|s| parse_quantity_with_precision(s, size_precision, "bid1Size"))
929                        .transpose();
930                    let ask_size = msg
931                        .data
932                        .ask1_size
933                        .as_deref()
934                        .map(|s| parse_quantity_with_precision(s, size_precision, "ask1Size"))
935                        .transpose();
936
937                    match (bid_price, ask_price, bid_size, ask_size) {
938                        (Ok(bp), Ok(ap), Ok(bs), Ok(as_)) => {
939                            match self.quote_cache.process(
940                                instrument_id,
941                                bp,
942                                ap,
943                                bs,
944                                as_,
945                                ts_event,
946                                ts_init,
947                            ) {
948                                Ok(quote) => {
949                                    result.push(NautilusWsMessage::Data(vec![Data::Quote(quote)]));
950                                }
951                                Err(e) => {
952                                    let raw_data = serde_json::to_string(&msg.data)
953                                        .unwrap_or_else(|_| "<failed to serialize>".to_string());
954                                    tracing::debug!(
955                                        "Skipping partial ticker update: {e}, raw_data: {raw_data}"
956                                    );
957                                }
958                            }
959                        }
960                        _ => {
961                            let raw_data = serde_json::to_string(&msg.data)
962                                .unwrap_or_else(|_| "<failed to serialize>".to_string());
963                            tracing::warn!(
964                                "Failed to parse ticker fields, skipping update, raw_data: {raw_data}"
965                            );
966                        }
967                    }
968
969                    // Extract funding rate if available
970                    if msg.data.funding_rate.is_some() && msg.data.next_funding_time.is_some() {
971                        let should_publish = {
972                            let cache = funding_cache.read().await;
973                            match cache.get(&symbol) {
974                                Some((cached_rate, cached_time)) => {
975                                    cached_rate.as_ref() != msg.data.funding_rate.as_ref()
976                                        || cached_time.as_ref()
977                                            != msg.data.next_funding_time.as_ref()
978                                }
979                                None => true,
980                            }
981                        };
982
983                        if should_publish {
984                            match parse_ticker_linear_funding(
985                                &msg.data,
986                                instrument_id,
987                                ts_event,
988                                ts_init,
989                            ) {
990                                Ok(funding) => {
991                                    let cache_key = (
992                                        msg.data.funding_rate.clone(),
993                                        msg.data.next_funding_time.clone(),
994                                    );
995                                    funding_cache.write().await.insert(symbol, cache_key);
996                                    result.push(NautilusWsMessage::FundingRates(vec![funding]));
997                                }
998                                Err(e) => {
999                                    tracing::debug!("Skipping funding rate update: {e}");
1000                                }
1001                            }
1002                        }
1003                    }
1004                } else {
1005                    tracing::debug!(raw_symbol = %raw_symbol, full_symbol = %symbol, "No instrument found for symbol in TickerLinear message");
1006                }
1007            }
1008            BybitWsMessage::TickerOption(msg) => {
1009                let raw_symbol = &msg.data.symbol;
1010                let symbol = product_type.map_or_else(
1011                    || raw_symbol.as_str().into(),
1012                    |pt| make_bybit_symbol(raw_symbol, pt),
1013                );
1014
1015                if let Some(instrument) = instruments.get(&symbol) {
1016                    let instrument_id = instrument.id();
1017                    let ts_event = parse_millis_i64(msg.ts, "ticker.ts").unwrap_or(ts_init);
1018                    let price_precision = instrument.price_precision();
1019                    let size_precision = instrument.size_precision();
1020
1021                    // Parse Bybit option ticker fields (always complete), propagate errors
1022                    let bid_price = parse_price_with_precision(
1023                        &msg.data.bid_price,
1024                        price_precision,
1025                        "bidPrice",
1026                    );
1027                    let ask_price = parse_price_with_precision(
1028                        &msg.data.ask_price,
1029                        price_precision,
1030                        "askPrice",
1031                    );
1032                    let bid_size = parse_quantity_with_precision(
1033                        &msg.data.bid_size,
1034                        size_precision,
1035                        "bidSize",
1036                    );
1037                    let ask_size = parse_quantity_with_precision(
1038                        &msg.data.ask_size,
1039                        size_precision,
1040                        "askSize",
1041                    );
1042
1043                    match (bid_price, ask_price, bid_size, ask_size) {
1044                        (Ok(bp), Ok(ap), Ok(bs), Ok(as_)) => {
1045                            match self.quote_cache.process(
1046                                instrument_id,
1047                                Some(bp),
1048                                Some(ap),
1049                                Some(bs),
1050                                Some(as_),
1051                                ts_event,
1052                                ts_init,
1053                            ) {
1054                                Ok(quote) => {
1055                                    result.push(NautilusWsMessage::Data(vec![Data::Quote(quote)]));
1056                                }
1057                                Err(e) => {
1058                                    let raw_data = serde_json::to_string(&msg.data)
1059                                        .unwrap_or_else(|_| "<failed to serialize>".to_string());
1060                                    tracing::debug!(
1061                                        "Skipping partial ticker update: {e}, raw_data: {raw_data}"
1062                                    );
1063                                }
1064                            }
1065                        }
1066                        _ => {
1067                            let raw_data = serde_json::to_string(&msg.data)
1068                                .unwrap_or_else(|_| "<failed to serialize>".to_string());
1069                            tracing::warn!(
1070                                "Failed to parse ticker fields, skipping update, raw_data: {raw_data}"
1071                            );
1072                        }
1073                    }
1074                } else {
1075                    tracing::debug!(raw_symbol = %raw_symbol, full_symbol = %symbol, "No instrument found for symbol in TickerOption message");
1076                }
1077            }
1078            BybitWsMessage::AccountOrder(msg) => {
1079                if let Some(account_id) = account_id {
1080                    let mut reports = Vec::new();
1081                    for order in &msg.data {
1082                        let raw_symbol = order.symbol;
1083                        let symbol = make_bybit_symbol(raw_symbol, order.category);
1084
1085                        if let Some(instrument) = instruments.get(&symbol) {
1086                            match parse_ws_order_status_report(
1087                                order, instrument, account_id, ts_init,
1088                            ) {
1089                                Ok(report) => reports.push(report),
1090                                Err(e) => tracing::error!("Error parsing order status report: {e}"),
1091                            }
1092                        } else {
1093                            tracing::debug!(raw_symbol = %raw_symbol, full_symbol = %symbol, "No instrument found for symbol in AccountOrder message");
1094                        }
1095                    }
1096                    if !reports.is_empty() {
1097                        result.push(NautilusWsMessage::OrderStatusReports(reports));
1098                    }
1099                }
1100            }
1101            BybitWsMessage::AccountExecution(msg) => {
1102                if let Some(account_id) = account_id {
1103                    let mut reports = Vec::new();
1104                    for execution in &msg.data {
1105                        let raw_symbol = execution.symbol;
1106                        let symbol = make_bybit_symbol(raw_symbol, execution.category);
1107
1108                        if let Some(instrument) = instruments.get(&symbol) {
1109                            match parse_ws_fill_report(execution, account_id, instrument, ts_init) {
1110                                Ok(report) => reports.push(report),
1111                                Err(e) => tracing::error!("Error parsing fill report: {e}"),
1112                            }
1113                        } else {
1114                            tracing::debug!(raw_symbol = %raw_symbol, full_symbol = %symbol, "No instrument found for symbol in AccountExecution message");
1115                        }
1116                    }
1117                    if !reports.is_empty() {
1118                        result.push(NautilusWsMessage::FillReports(reports));
1119                    }
1120                }
1121            }
1122            BybitWsMessage::AccountPosition(msg) => {
1123                if let Some(account_id) = account_id {
1124                    for position in &msg.data {
1125                        let raw_symbol = position.symbol;
1126                        let symbol = make_bybit_symbol(raw_symbol, position.category);
1127
1128                        if let Some(instrument) = instruments.get(&symbol) {
1129                            match parse_ws_position_status_report(
1130                                position, account_id, instrument, ts_init,
1131                            ) {
1132                                Ok(report) => {
1133                                    result.push(NautilusWsMessage::PositionStatusReport(report));
1134                                }
1135                                Err(e) => {
1136                                    tracing::error!("Error parsing position status report: {e}");
1137                                }
1138                            }
1139                        } else {
1140                            tracing::debug!(raw_symbol = %raw_symbol, full_symbol = %symbol, "No instrument found for symbol in AccountPosition message");
1141                        }
1142                    }
1143                }
1144            }
1145            BybitWsMessage::AccountWallet(msg) => {
1146                if let Some(account_id) = account_id {
1147                    for wallet in &msg.data {
1148                        let ts_event = UnixNanos::from(msg.creation_time as u64 * 1_000_000);
1149
1150                        match parse_ws_account_state(wallet, account_id, ts_event, ts_init) {
1151                            Ok(state) => result.push(NautilusWsMessage::AccountState(state)),
1152                            Err(e) => tracing::error!("Error parsing account state: {e}"),
1153                        }
1154                    }
1155                }
1156            }
1157            BybitWsMessage::OrderResponse(resp) => {
1158                if resp.ret_code == 0 {
1159                    tracing::debug!(op = %resp.op, ret_msg = %resp.ret_msg, "Order operation successful");
1160
1161                    if resp.op.contains("batch") {
1162                        self.handle_batch_response(&resp, &mut result);
1163                    } else if let Some(req_id) = &resp.req_id {
1164                        if resp.op.contains("create") {
1165                            self.pending_place_requests.remove(req_id);
1166                        } else if resp.op.contains("cancel") {
1167                            self.pending_cancel_requests.remove(req_id);
1168                        } else if resp.op.contains("amend") {
1169                            self.pending_amend_requests.remove(req_id);
1170                        }
1171                    } else if let Some(order_link_id) =
1172                        resp.data.get("orderLinkId").and_then(|v| v.as_str())
1173                    {
1174                        // Bybit sometimes omits req_id, search by client_order_id instead
1175                        let client_order_id = ClientOrderId::from(order_link_id);
1176                        if resp.op.contains("create") {
1177                            self.find_and_remove_place_request_by_client_order_id(&client_order_id);
1178                        } else if resp.op.contains("cancel") {
1179                            self.find_and_remove_cancel_request_by_client_order_id(
1180                                &client_order_id,
1181                            );
1182                        } else if resp.op.contains("amend") {
1183                            self.find_and_remove_amend_request_by_client_order_id(&client_order_id);
1184                        }
1185                    }
1186                } else if let Some(req_id) = &resp.req_id {
1187                    let clock = get_atomic_clock_realtime();
1188                    let ts_init = clock.get_time_ns();
1189
1190                    if resp.op.contains("batch") {
1191                        self.handle_batch_failure(
1192                            req_id,
1193                            &resp.ret_msg,
1194                            &resp.op,
1195                            ts_init,
1196                            &mut result,
1197                        );
1198                    } else if resp.op.contains("create")
1199                        && let Some((_, (client_order_id, trader_id, strategy_id, instrument_id))) =
1200                            self.pending_place_requests.remove(req_id)
1201                    {
1202                        let Some(account_id) = self.account_id else {
1203                            tracing::error!(
1204                                request_id = %req_id,
1205                                reason = %resp.ret_msg,
1206                                "Cannot create OrderRejected event: account_id is None"
1207                            );
1208                            return result;
1209                        };
1210
1211                        let rejected = OrderRejected::new(
1212                            trader_id,
1213                            strategy_id,
1214                            instrument_id,
1215                            client_order_id,
1216                            account_id,
1217                            Ustr::from(&resp.ret_msg),
1218                            UUID4::new(),
1219                            ts_init,
1220                            ts_init,
1221                            false,
1222                            false,
1223                        );
1224                        result.push(NautilusWsMessage::OrderRejected(rejected));
1225                    } else if resp.op.contains("cancel")
1226                        && let Some((
1227                            _,
1228                            (
1229                                client_order_id,
1230                                trader_id,
1231                                strategy_id,
1232                                instrument_id,
1233                                venue_order_id,
1234                            ),
1235                        )) = self.pending_cancel_requests.remove(req_id)
1236                    {
1237                        let rejected = OrderCancelRejected::new(
1238                            trader_id,
1239                            strategy_id,
1240                            instrument_id,
1241                            client_order_id,
1242                            Ustr::from(&resp.ret_msg),
1243                            UUID4::new(),
1244                            ts_init,
1245                            ts_init,
1246                            false,
1247                            venue_order_id,
1248                            self.account_id,
1249                        );
1250                        result.push(NautilusWsMessage::OrderCancelRejected(rejected));
1251                    } else if resp.op.contains("amend")
1252                        && let Some((
1253                            _,
1254                            (
1255                                client_order_id,
1256                                trader_id,
1257                                strategy_id,
1258                                instrument_id,
1259                                venue_order_id,
1260                            ),
1261                        )) = self.pending_amend_requests.remove(req_id)
1262                    {
1263                        let rejected = OrderModifyRejected::new(
1264                            trader_id,
1265                            strategy_id,
1266                            instrument_id,
1267                            client_order_id,
1268                            Ustr::from(&resp.ret_msg),
1269                            UUID4::new(),
1270                            ts_init,
1271                            ts_init,
1272                            false,
1273                            venue_order_id,
1274                            self.account_id,
1275                        );
1276                        result.push(NautilusWsMessage::OrderModifyRejected(rejected));
1277                    }
1278                } else if let Some(order_link_id) =
1279                    resp.data.get("orderLinkId").and_then(|v| v.as_str())
1280                {
1281                    // Bybit sometimes omits req_id, search by client_order_id instead
1282                    let clock = get_atomic_clock_realtime();
1283                    let ts_init = clock.get_time_ns();
1284                    let client_order_id = ClientOrderId::from(order_link_id);
1285
1286                    if resp.op.contains("create") {
1287                        if let Some((_, (_, trader_id, strategy_id, instrument_id))) =
1288                            self.find_and_remove_place_request_by_client_order_id(&client_order_id)
1289                        {
1290                            let Some(account_id) = self.account_id else {
1291                                tracing::error!(
1292                                    client_order_id = %client_order_id,
1293                                    reason = %resp.ret_msg,
1294                                    "Cannot create OrderRejected event: account_id is None"
1295                                );
1296                                return result;
1297                            };
1298
1299                            let rejected = OrderRejected::new(
1300                                trader_id,
1301                                strategy_id,
1302                                instrument_id,
1303                                client_order_id,
1304                                account_id,
1305                                Ustr::from(&resp.ret_msg),
1306                                UUID4::new(),
1307                                ts_init,
1308                                ts_init,
1309                                false,
1310                                false,
1311                            );
1312                            result.push(NautilusWsMessage::OrderRejected(rejected));
1313                        }
1314                    } else if resp.op.contains("cancel") {
1315                        if let Some((
1316                            _,
1317                            (_, trader_id, strategy_id, instrument_id, venue_order_id),
1318                        )) =
1319                            self.find_and_remove_cancel_request_by_client_order_id(&client_order_id)
1320                        {
1321                            let rejected = OrderCancelRejected::new(
1322                                trader_id,
1323                                strategy_id,
1324                                instrument_id,
1325                                client_order_id,
1326                                Ustr::from(&resp.ret_msg),
1327                                UUID4::new(),
1328                                ts_init,
1329                                ts_init,
1330                                false,
1331                                venue_order_id,
1332                                self.account_id,
1333                            );
1334                            result.push(NautilusWsMessage::OrderCancelRejected(rejected));
1335                        }
1336                    } else if resp.op.contains("amend")
1337                        && let Some((_, (_, trader_id, strategy_id, instrument_id, venue_order_id))) =
1338                            self.find_and_remove_amend_request_by_client_order_id(&client_order_id)
1339                    {
1340                        let rejected = OrderModifyRejected::new(
1341                            trader_id,
1342                            strategy_id,
1343                            instrument_id,
1344                            client_order_id,
1345                            Ustr::from(&resp.ret_msg),
1346                            UUID4::new(),
1347                            ts_init,
1348                            ts_init,
1349                            false,
1350                            venue_order_id,
1351                            self.account_id,
1352                        );
1353                        result.push(NautilusWsMessage::OrderModifyRejected(rejected));
1354                    }
1355                } else {
1356                    tracing::warn!(
1357                        op = %resp.op,
1358                        ret_code = resp.ret_code,
1359                        ret_msg = %resp.ret_msg,
1360                        "Order operation failed but request_id could not be extracted from response"
1361                    );
1362                }
1363            }
1364            BybitWsMessage::Auth(auth_response) => {
1365                let is_success =
1366                    auth_response.success.unwrap_or(false) || (auth_response.ret_code == Some(0));
1367
1368                if is_success {
1369                    self.auth_tracker.succeed();
1370                    tracing::info!("WebSocket authenticated");
1371                    result.push(NautilusWsMessage::Authenticated);
1372                } else {
1373                    let error_msg = auth_response
1374                        .ret_msg
1375                        .as_deref()
1376                        .unwrap_or("Authentication rejected");
1377                    self.auth_tracker.fail(error_msg);
1378                    tracing::error!(error = error_msg, "WebSocket authentication failed");
1379                    result.push(NautilusWsMessage::Error(BybitWebSocketError::from_message(
1380                        error_msg.to_string(),
1381                    )));
1382                }
1383            }
1384            BybitWsMessage::Error(err) => {
1385                result.push(NautilusWsMessage::Error(err));
1386            }
1387            BybitWsMessage::Reconnected => {
1388                self.quote_cache.clear();
1389                result.push(NautilusWsMessage::Reconnected);
1390            }
1391            BybitWsMessage::Subscription(sub_msg) => {
1392                let pending_topics = self.subscriptions.pending_subscribe_topics();
1393                match sub_msg.op {
1394                    BybitWsOperation::Subscribe => {
1395                        if sub_msg.success {
1396                            for topic in pending_topics {
1397                                self.subscriptions.confirm_subscribe(&topic);
1398                                tracing::debug!(topic = topic, "Subscription confirmed");
1399                            }
1400                        } else {
1401                            for topic in pending_topics {
1402                                self.subscriptions.mark_failure(&topic);
1403                                tracing::warn!(
1404                                    topic = topic,
1405                                    error = ?sub_msg.ret_msg,
1406                                    "Subscription failed, will retry on reconnect"
1407                                );
1408                            }
1409                        }
1410                    }
1411                    BybitWsOperation::Unsubscribe => {
1412                        let pending_unsub = self.subscriptions.pending_unsubscribe_topics();
1413                        if sub_msg.success {
1414                            for topic in pending_unsub {
1415                                self.subscriptions.confirm_unsubscribe(&topic);
1416                                tracing::debug!(topic = topic, "Unsubscription confirmed");
1417                            }
1418                        } else {
1419                            for topic in pending_unsub {
1420                                tracing::warn!(
1421                                    topic = topic,
1422                                    error = ?sub_msg.ret_msg,
1423                                    "Unsubscription failed"
1424                                );
1425                            }
1426                        }
1427                    }
1428                    _ => {}
1429                }
1430            }
1431            _ => {}
1432        }
1433
1434        result
1435    }
1436}
1437
1438/// Classifies a parsed JSON value into a typed Bybit WebSocket message.
1439///
1440/// Returns `Raw(value)` if no specific type matches.
1441pub fn classify_bybit_message(value: serde_json::Value) -> BybitWsMessage {
1442    if let Some(op_val) = value.get("op") {
1443        if let Ok(op) = serde_json::from_value::<BybitWsOperation>(op_val.clone())
1444            && op == BybitWsOperation::Auth
1445            && let Ok(auth) = serde_json::from_value::<BybitWsAuthResponse>(value.clone())
1446        {
1447            let is_success = auth.success.unwrap_or(false) || auth.ret_code.unwrap_or(-1) == 0;
1448            if is_success {
1449                return BybitWsMessage::Auth(auth);
1450            }
1451            let resp = BybitWsResponse {
1452                op: Some(auth.op.clone()),
1453                topic: None,
1454                success: auth.success,
1455                conn_id: auth.conn_id.clone(),
1456                req_id: None,
1457                ret_code: auth.ret_code,
1458                ret_msg: auth.ret_msg,
1459            };
1460            let error = BybitWebSocketError::from_response(&resp);
1461            return BybitWsMessage::Error(error);
1462        }
1463
1464        if let Some(op_str) = op_val.as_str()
1465            && op_str.starts_with("order.")
1466        {
1467            return serde_json::from_value::<BybitWsOrderResponse>(value.clone()).map_or_else(
1468                |_| BybitWsMessage::Raw(value),
1469                BybitWsMessage::OrderResponse,
1470            );
1471        }
1472    }
1473
1474    if let Some(success) = value.get("success").and_then(serde_json::Value::as_bool) {
1475        if success {
1476            return serde_json::from_value::<BybitWsSubscriptionMsg>(value.clone())
1477                .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::Subscription);
1478        }
1479        return serde_json::from_value::<BybitWsResponse>(value.clone()).map_or_else(
1480            |_| BybitWsMessage::Raw(value),
1481            |resp| {
1482                let error = BybitWebSocketError::from_response(&resp);
1483                BybitWsMessage::Error(error)
1484            },
1485        );
1486    }
1487
1488    // Most common path for market data
1489    if let Some(topic) = value.get("topic").and_then(serde_json::Value::as_str) {
1490        if topic.starts_with(BYBIT_TOPIC_ORDERBOOK) {
1491            return serde_json::from_value(value.clone())
1492                .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::Orderbook);
1493        }
1494
1495        if topic.contains(BYBIT_TOPIC_PUBLIC_TRADE) || topic.starts_with(BYBIT_TOPIC_TRADE) {
1496            return serde_json::from_value(value.clone())
1497                .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::Trade);
1498        }
1499
1500        if topic.starts_with(BYBIT_TOPIC_KLINE) {
1501            return serde_json::from_value(value.clone())
1502                .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::Kline);
1503        }
1504
1505        if topic.starts_with(BYBIT_TOPIC_TICKERS) {
1506            // Option symbols: BTC-6JAN23-17500-C (date, strike, C/P)
1507            let is_option = value
1508                .get("data")
1509                .and_then(|d| d.get("symbol"))
1510                .and_then(|s| s.as_str())
1511                .is_some_and(|symbol| symbol.contains('-') && symbol.matches('-').count() >= 3);
1512
1513            if is_option {
1514                return serde_json::from_value(value.clone())
1515                    .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::TickerOption);
1516            }
1517            return serde_json::from_value(value.clone())
1518                .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::TickerLinear);
1519        }
1520
1521        if topic.starts_with(BYBIT_TOPIC_ORDER) {
1522            return serde_json::from_value(value.clone())
1523                .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::AccountOrder);
1524        }
1525
1526        if topic.starts_with(BYBIT_TOPIC_EXECUTION) {
1527            return serde_json::from_value(value.clone()).map_or_else(
1528                |_| BybitWsMessage::Raw(value),
1529                BybitWsMessage::AccountExecution,
1530            );
1531        }
1532
1533        if topic.starts_with(BYBIT_TOPIC_WALLET) {
1534            return serde_json::from_value(value.clone()).map_or_else(
1535                |_| BybitWsMessage::Raw(value),
1536                BybitWsMessage::AccountWallet,
1537            );
1538        }
1539
1540        if topic.starts_with(BYBIT_TOPIC_POSITION) {
1541            return serde_json::from_value(value.clone()).map_or_else(
1542                |_| BybitWsMessage::Raw(value),
1543                BybitWsMessage::AccountPosition,
1544            );
1545        }
1546    }
1547
1548    BybitWsMessage::Raw(value)
1549}
1550
1551#[cfg(test)]
1552mod tests {
1553    use rstest::rstest;
1554
1555    use super::*;
1556    use crate::common::consts::BYBIT_WS_TOPIC_DELIMITER;
1557
1558    fn create_test_handler() -> FeedHandler {
1559        let signal = Arc::new(AtomicBool::new(false));
1560        let (_cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
1561        let (_raw_tx, raw_rx) = tokio::sync::mpsc::unbounded_channel();
1562        let (out_tx, _out_rx) = tokio::sync::mpsc::unbounded_channel();
1563        let auth_tracker = AuthTracker::new();
1564        let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER);
1565        let funding_cache = Arc::new(tokio::sync::RwLock::new(AHashMap::new()));
1566
1567        FeedHandler::new(
1568            signal,
1569            cmd_rx,
1570            raw_rx,
1571            out_tx,
1572            None,
1573            None,
1574            true,
1575            Arc::new(AtomicU8::new(0)),
1576            auth_tracker,
1577            subscriptions,
1578            funding_cache,
1579        )
1580    }
1581
1582    #[rstest]
1583    fn test_generate_unique_request_id_returns_different_ids() {
1584        let handler = create_test_handler();
1585
1586        let id1 = handler.generate_unique_request_id();
1587        let id2 = handler.generate_unique_request_id();
1588        let id3 = handler.generate_unique_request_id();
1589
1590        assert_ne!(id1, id2);
1591        assert_ne!(id2, id3);
1592        assert_ne!(id1, id3);
1593    }
1594
1595    #[rstest]
1596    fn test_generate_unique_request_id_produces_valid_uuids() {
1597        let handler = create_test_handler();
1598
1599        let id1 = handler.generate_unique_request_id();
1600        let id2 = handler.generate_unique_request_id();
1601
1602        assert!(UUID4::from(id1.as_str()).to_string() == id1);
1603        assert!(UUID4::from(id2.as_str()).to_string() == id2);
1604    }
1605
1606    #[rstest]
1607    fn test_multiple_place_orders_use_different_request_ids() {
1608        let handler = create_test_handler();
1609
1610        let req_id_1 = handler.generate_unique_request_id();
1611        let req_id_2 = handler.generate_unique_request_id();
1612        let req_id_3 = handler.generate_unique_request_id();
1613
1614        assert_ne!(req_id_1, req_id_2);
1615        assert_ne!(req_id_2, req_id_3);
1616        assert_ne!(req_id_1, req_id_3);
1617    }
1618
1619    #[rstest]
1620    fn test_multiple_amends_use_different_request_ids() {
1621        let handler = create_test_handler();
1622
1623        // Verifies fix for "Duplicate reqId" errors when amending same order multiple times
1624        let req_id_1 = handler.generate_unique_request_id();
1625        let req_id_2 = handler.generate_unique_request_id();
1626        let req_id_3 = handler.generate_unique_request_id();
1627
1628        assert_ne!(
1629            req_id_1, req_id_2,
1630            "Multiple amends should generate different request IDs to avoid 'Duplicate reqId' errors"
1631        );
1632        assert_ne!(
1633            req_id_2, req_id_3,
1634            "Multiple amends should generate different request IDs to avoid 'Duplicate reqId' errors"
1635        );
1636    }
1637
1638    #[rstest]
1639    fn test_multiple_cancels_use_different_request_ids() {
1640        let handler = create_test_handler();
1641
1642        let req_id_1 = handler.generate_unique_request_id();
1643        let req_id_2 = handler.generate_unique_request_id();
1644
1645        assert_ne!(
1646            req_id_1, req_id_2,
1647            "Multiple cancels should generate different request IDs"
1648        );
1649    }
1650
1651    #[rstest]
1652    fn test_concurrent_request_id_generation() {
1653        let handler = create_test_handler();
1654
1655        let mut ids = std::collections::HashSet::new();
1656        for _ in 0..100 {
1657            let id = handler.generate_unique_request_id();
1658            assert!(
1659                ids.insert(id.clone()),
1660                "Generated duplicate request ID: {id}"
1661            );
1662        }
1663        assert_eq!(ids.len(), 100);
1664    }
1665}