Skip to main content

nautilus_bybit/websocket/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket message handler for Bybit.
17
18use std::{
19    collections::VecDeque,
20    sync::{
21        Arc,
22        atomic::{AtomicBool, AtomicU8, Ordering},
23    },
24};
25
26use ahash::AHashMap;
27use dashmap::DashMap;
28use nautilus_common::cache::quote::QuoteCache;
29use nautilus_core::{UUID4, nanos::UnixNanos, time::get_atomic_clock_realtime};
30use nautilus_model::{
31    data::{BarType, Data},
32    events::{OrderCancelRejected, OrderModifyRejected, OrderRejected},
33    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
34    instruments::{Instrument, InstrumentAny},
35};
36use nautilus_network::{
37    retry::{RetryManager, create_websocket_retry_manager},
38    websocket::{AuthTracker, SubscriptionState, WebSocketClient},
39};
40use tokio_tungstenite::tungstenite::Message;
41use ustr::Ustr;
42
43use super::{
44    enums::BybitWsOperation,
45    error::{BybitWsError, create_bybit_timeout_error, should_retry_bybit_error},
46    messages::{
47        BybitWebSocketError, BybitWsAuthResponse, BybitWsHeader, BybitWsMessage, BybitWsRequest,
48        BybitWsResponse, BybitWsSubscriptionMsg, NautilusWsMessage,
49    },
50    parse::{
51        parse_millis_i64, parse_orderbook_deltas, parse_orderbook_quote,
52        parse_ticker_linear_funding, parse_ticker_linear_index_price,
53        parse_ticker_linear_mark_price, parse_ticker_option_index_price,
54        parse_ticker_option_mark_price, parse_ws_account_state, parse_ws_fill_report,
55        parse_ws_kline_bar, parse_ws_order_status_report, parse_ws_position_status_report,
56        parse_ws_trade_tick,
57    },
58};
59use crate::{
60    common::{
61        consts::{
62            BYBIT_NAUTILUS_BROKER_ID, BYBIT_TOPIC_EXECUTION, BYBIT_TOPIC_KLINE, BYBIT_TOPIC_ORDER,
63            BYBIT_TOPIC_ORDERBOOK, BYBIT_TOPIC_POSITION, BYBIT_TOPIC_PUBLIC_TRADE,
64            BYBIT_TOPIC_TICKERS, BYBIT_TOPIC_TRADE, BYBIT_TOPIC_WALLET,
65        },
66        enums::{BybitProductType, BybitTimeInForce, BybitWsOrderRequestOp},
67        parse::{make_bybit_symbol, parse_price_with_precision, parse_quantity_with_precision},
68    },
69    websocket::messages::{
70        BybitBatchOrderError, BybitWsAmendOrderParams, BybitWsCancelOrderParams,
71        BybitWsOrderResponse, BybitWsPlaceOrderParams,
72    },
73};
74
75/// Commands sent from the outer client to the inner message handler.
76#[derive(Debug)]
77#[allow(
78    clippy::large_enum_variant,
79    reason = "Commands are ephemeral and immediately consumed"
80)]
81pub enum HandlerCommand {
82    SetClient(WebSocketClient),
83    Disconnect,
84    Authenticate {
85        payload: String,
86    },
87    Subscribe {
88        topics: Vec<String>,
89    },
90    Unsubscribe {
91        topics: Vec<String>,
92    },
93    SendText {
94        payload: String,
95    },
96    PlaceOrder {
97        params: BybitWsPlaceOrderParams,
98        client_order_id: ClientOrderId,
99        trader_id: TraderId,
100        strategy_id: StrategyId,
101        instrument_id: InstrumentId,
102    },
103    AmendOrder {
104        params: BybitWsAmendOrderParams,
105        client_order_id: ClientOrderId,
106        trader_id: TraderId,
107        strategy_id: StrategyId,
108        instrument_id: InstrumentId,
109        venue_order_id: Option<VenueOrderId>,
110    },
111    CancelOrder {
112        params: BybitWsCancelOrderParams,
113        client_order_id: ClientOrderId,
114        trader_id: TraderId,
115        strategy_id: StrategyId,
116        instrument_id: InstrumentId,
117        venue_order_id: Option<VenueOrderId>,
118    },
119    RegisterBatchPlace {
120        req_id: String,
121        orders: Vec<BatchOrderData>,
122    },
123    RegisterBatchCancel {
124        req_id: String,
125        cancels: Vec<BatchCancelData>,
126    },
127    InitializeInstruments(Vec<InstrumentAny>),
128    UpdateInstrument(InstrumentAny),
129}
130
131/// Type alias for the funding rate cache.
132type FundingCache = Arc<tokio::sync::RwLock<AHashMap<Ustr, (Option<String>, Option<String>)>>>;
133
134/// Data cached for pending place requests to correlate with responses.
135type PlaceRequestData = (ClientOrderId, TraderId, StrategyId, InstrumentId);
136
137/// Data cached for pending cancel requests to correlate with responses.
138type CancelRequestData = (
139    ClientOrderId,
140    TraderId,
141    StrategyId,
142    InstrumentId,
143    Option<VenueOrderId>,
144);
145
146/// Data cached for pending amend requests to correlate with responses.
147type AmendRequestData = (
148    ClientOrderId,
149    TraderId,
150    StrategyId,
151    InstrumentId,
152    Option<VenueOrderId>,
153);
154
155/// Data for a single order in a batch request.
156type BatchOrderData = (ClientOrderId, PlaceRequestData);
157
158/// Data for a single cancel in a batch request.
159type BatchCancelData = (ClientOrderId, CancelRequestData);
160
161pub(super) struct FeedHandler {
162    signal: Arc<AtomicBool>,
163    client: Option<WebSocketClient>,
164    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
165    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
166    out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
167    auth_tracker: AuthTracker,
168    subscriptions: SubscriptionState,
169    instruments_cache: AHashMap<Ustr, InstrumentAny>,
170    account_id: Option<AccountId>,
171    mm_level: Arc<AtomicU8>,
172    product_type: Option<BybitProductType>,
173    bars_timestamp_on_close: bool,
174    quote_cache: QuoteCache,
175    funding_cache: FundingCache,
176    bar_types_cache: Arc<DashMap<String, BarType>>,
177    retry_manager: RetryManager<BybitWsError>,
178    pending_place_requests: DashMap<String, PlaceRequestData>,
179    pending_cancel_requests: DashMap<String, CancelRequestData>,
180    pending_amend_requests: DashMap<String, AmendRequestData>,
181    pending_batch_place_requests: DashMap<String, Vec<BatchOrderData>>,
182    pending_batch_cancel_requests: DashMap<String, Vec<BatchCancelData>>,
183    message_queue: VecDeque<NautilusWsMessage>,
184}
185
186impl FeedHandler {
187    /// Creates a new [`FeedHandler`] instance.
188    #[allow(clippy::too_many_arguments)]
189    pub(super) fn new(
190        signal: Arc<AtomicBool>,
191        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
192        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
193        out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
194        account_id: Option<AccountId>,
195        product_type: Option<BybitProductType>,
196        bars_timestamp_on_close: bool,
197        mm_level: Arc<AtomicU8>,
198        auth_tracker: AuthTracker,
199        subscriptions: SubscriptionState,
200        funding_cache: FundingCache,
201        bar_types_cache: Arc<DashMap<String, BarType>>,
202    ) -> Self {
203        Self {
204            signal,
205            client: None,
206            cmd_rx,
207            raw_rx,
208            out_tx,
209            auth_tracker,
210            subscriptions,
211            instruments_cache: AHashMap::new(),
212            account_id,
213            mm_level,
214            product_type,
215            bars_timestamp_on_close,
216            quote_cache: QuoteCache::new(),
217            funding_cache,
218            bar_types_cache,
219            retry_manager: create_websocket_retry_manager(),
220            pending_place_requests: DashMap::new(),
221            pending_cancel_requests: DashMap::new(),
222            pending_amend_requests: DashMap::new(),
223            pending_batch_place_requests: DashMap::new(),
224            pending_batch_cancel_requests: DashMap::new(),
225            message_queue: VecDeque::new(),
226        }
227    }
228
229    pub(super) fn is_stopped(&self) -> bool {
230        self.signal.load(Ordering::Relaxed)
231    }
232
233    #[allow(dead_code)]
234    pub(super) fn send(&self, msg: NautilusWsMessage) -> Result<(), ()> {
235        self.out_tx.send(msg).map_err(|_| ())
236    }
237
238    fn generate_unique_request_id(&self) -> String {
239        UUID4::new().to_string()
240    }
241
242    fn find_and_remove_place_request_by_client_order_id(
243        &self,
244        client_order_id: &ClientOrderId,
245    ) -> Option<(String, PlaceRequestData)> {
246        self.pending_place_requests
247            .iter()
248            .find(|entry| entry.value().0 == *client_order_id)
249            .and_then(|entry| {
250                let key = entry.key().clone();
251                drop(entry);
252                self.pending_place_requests.remove(&key)
253            })
254    }
255
256    fn find_and_remove_cancel_request_by_client_order_id(
257        &self,
258        client_order_id: &ClientOrderId,
259    ) -> Option<(String, CancelRequestData)> {
260        self.pending_cancel_requests
261            .iter()
262            .find(|entry| entry.value().0 == *client_order_id)
263            .and_then(|entry| {
264                let key = entry.key().clone();
265                drop(entry);
266                self.pending_cancel_requests.remove(&key)
267            })
268    }
269
270    fn find_and_remove_amend_request_by_client_order_id(
271        &self,
272        client_order_id: &ClientOrderId,
273    ) -> Option<(String, AmendRequestData)> {
274        self.pending_amend_requests
275            .iter()
276            .find(|entry| entry.value().0 == *client_order_id)
277            .and_then(|entry| {
278                let key = entry.key().clone();
279                drop(entry);
280                self.pending_amend_requests.remove(&key)
281            })
282    }
283
284    fn include_referer_header(&self, time_in_force: Option<BybitTimeInForce>) -> bool {
285        let is_post_only = matches!(time_in_force, Some(BybitTimeInForce::PostOnly));
286        let mm_level = self.mm_level.load(Ordering::Relaxed);
287        !(is_post_only && mm_level > 0)
288    }
289
290    /// Sends a WebSocket message with retry logic.
291    async fn send_with_retry(&self, payload: String) -> Result<(), BybitWsError> {
292        if let Some(client) = &self.client {
293            self.retry_manager
294                .execute_with_retry(
295                    "websocket_send",
296                    || {
297                        let payload = payload.clone();
298                        async move {
299                            client
300                                .send_text(payload, None)
301                                .await
302                                .map_err(|e| BybitWsError::Transport(format!("Send failed: {e}")))
303                        }
304                    },
305                    should_retry_bybit_error,
306                    create_bybit_timeout_error,
307                )
308                .await
309        } else {
310            Err(BybitWsError::ClientError(
311                "No active WebSocket client".to_string(),
312            ))
313        }
314    }
315
316    /// Handles batch operation request-level failures (ret_code != 0).
317    ///
318    /// When a batch request fails entirely, generate rejection events for all orders
319    /// in the batch and clean up tracking data.
320    fn handle_batch_failure(
321        &self,
322        req_id: &str,
323        ret_msg: &str,
324        op: &str,
325        ts_init: UnixNanos,
326        result: &mut Vec<NautilusWsMessage>,
327    ) {
328        if op.contains("create") {
329            if let Some((_, batch_data)) = self.pending_batch_place_requests.remove(req_id) {
330                log::warn!(
331                    "Batch place request failed: req_id={req_id}, ret_msg={ret_msg}, num_orders={}",
332                    batch_data.len()
333                );
334
335                let Some(account_id) = self.account_id else {
336                    log::error!("Cannot create OrderRejected events: account_id is None");
337                    return;
338                };
339
340                let reason = Ustr::from(ret_msg);
341                for (client_order_id, (_, trader_id, strategy_id, instrument_id)) in batch_data {
342                    let rejected = OrderRejected::new(
343                        trader_id,
344                        strategy_id,
345                        instrument_id,
346                        client_order_id,
347                        account_id,
348                        reason,
349                        UUID4::new(),
350                        ts_init,
351                        ts_init,
352                        false,
353                        false,
354                    );
355                    result.push(NautilusWsMessage::OrderRejected(rejected));
356                }
357            }
358        } else if op.contains("cancel")
359            && let Some((_, batch_data)) = self.pending_batch_cancel_requests.remove(req_id)
360        {
361            log::warn!(
362                "Batch cancel request failed: req_id={req_id}, ret_msg={ret_msg}, num_cancels={}",
363                batch_data.len()
364            );
365
366            let reason = Ustr::from(ret_msg);
367            for (client_order_id, (_, trader_id, strategy_id, instrument_id, venue_order_id)) in
368                batch_data
369            {
370                let rejected = OrderCancelRejected::new(
371                    trader_id,
372                    strategy_id,
373                    instrument_id,
374                    client_order_id,
375                    reason,
376                    UUID4::new(),
377                    ts_init,
378                    ts_init,
379                    false,
380                    venue_order_id,
381                    self.account_id,
382                );
383                result.push(NautilusWsMessage::OrderCancelRejected(rejected));
384            }
385        }
386    }
387
388    /// Handles batch operation responses, checking for individual order failures.
389    fn handle_batch_response(
390        &self,
391        resp: &BybitWsOrderResponse,
392        result: &mut Vec<NautilusWsMessage>,
393    ) {
394        let Some(req_id) = &resp.req_id else {
395            log::warn!(
396                "Batch response missing req_id - cannot correlate with pending requests: op={}",
397                resp.op
398            );
399            return;
400        };
401
402        let batch_errors = resp.extract_batch_errors();
403
404        if resp.op.contains("create") {
405            if let Some((_, batch_data)) = self.pending_batch_place_requests.remove(req_id) {
406                self.process_batch_place_errors(batch_data, batch_errors, result);
407            } else {
408                log::debug!(
409                    "Batch place response received but no pending request found: req_id={req_id}"
410                );
411            }
412        } else if resp.op.contains("cancel") {
413            if let Some((_, batch_data)) = self.pending_batch_cancel_requests.remove(req_id) {
414                self.process_batch_cancel_errors(batch_data, batch_errors, result);
415            } else {
416                log::debug!(
417                    "Batch cancel response received but no pending request found: req_id={req_id}"
418                );
419            }
420        }
421    }
422
423    /// Processes individual order errors from a batch place operation.
424    fn process_batch_place_errors(
425        &self,
426        batch_data: Vec<BatchOrderData>,
427        errors: Vec<BybitBatchOrderError>,
428        result: &mut Vec<NautilusWsMessage>,
429    ) {
430        let Some(account_id) = self.account_id else {
431            log::error!("Cannot create OrderRejected events: account_id is None");
432            return;
433        };
434
435        let clock = get_atomic_clock_realtime();
436        let ts_init = clock.get_time_ns();
437
438        for (idx, (client_order_id, (_, trader_id, strategy_id, instrument_id))) in
439            batch_data.into_iter().enumerate()
440        {
441            if let Some(error) = errors.get(idx)
442                && error.code != 0
443            {
444                log::warn!(
445                    "Batch order rejected: client_order_id={client_order_id}, error_code={}, error_msg={}",
446                    error.code,
447                    error.msg
448                );
449
450                let rejected = OrderRejected::new(
451                    trader_id,
452                    strategy_id,
453                    instrument_id,
454                    client_order_id,
455                    account_id,
456                    Ustr::from(&error.msg),
457                    UUID4::new(),
458                    ts_init,
459                    ts_init,
460                    false,
461                    false,
462                );
463                result.push(NautilusWsMessage::OrderRejected(rejected));
464            }
465        }
466    }
467
468    /// Processes individual order errors from a batch cancel operation.
469    fn process_batch_cancel_errors(
470        &self,
471        batch_data: Vec<BatchCancelData>,
472        errors: Vec<BybitBatchOrderError>,
473        result: &mut Vec<NautilusWsMessage>,
474    ) {
475        let clock = get_atomic_clock_realtime();
476        let ts_init = clock.get_time_ns();
477
478        for (idx, (client_order_id, (_, trader_id, strategy_id, instrument_id, venue_order_id))) in
479            batch_data.into_iter().enumerate()
480        {
481            if let Some(error) = errors.get(idx)
482                && error.code != 0
483            {
484                log::warn!(
485                    "Batch cancel rejected: client_order_id={client_order_id}, error_code={}, error_msg={}",
486                    error.code,
487                    error.msg
488                );
489
490                let rejected = OrderCancelRejected::new(
491                    trader_id,
492                    strategy_id,
493                    instrument_id,
494                    client_order_id,
495                    Ustr::from(&error.msg),
496                    UUID4::new(),
497                    ts_init,
498                    ts_init,
499                    false,
500                    venue_order_id,
501                    self.account_id,
502                );
503                result.push(NautilusWsMessage::OrderCancelRejected(rejected));
504            }
505        }
506    }
507
508    pub(super) async fn next(&mut self) -> Option<NautilusWsMessage> {
509        let clock = get_atomic_clock_realtime();
510
511        loop {
512            if let Some(msg) = self.message_queue.pop_front() {
513                return Some(msg);
514            }
515
516            tokio::select! {
517                Some(cmd) = self.cmd_rx.recv() => {
518                    match cmd {
519                        HandlerCommand::SetClient(client) => {
520                            log::debug!("WebSocketClient received by handler");
521                            self.client = Some(client);
522                        }
523                        HandlerCommand::Disconnect => {
524                            log::debug!("Disconnect command received");
525
526                            if let Some(client) = self.client.take() {
527                                client.disconnect().await;
528                            }
529                        }
530                        HandlerCommand::Authenticate { payload } => {
531                            log::debug!("Authenticate command received");
532                            if let Err(e) = self.send_with_retry(payload).await {
533                                log::error!("Failed to send authentication after retries: {e}");
534                            }
535                        }
536                        HandlerCommand::Subscribe { topics } => {
537                            for topic in topics {
538                                log::debug!("Subscribing to topic: topic={topic}");
539                                if let Err(e) = self.send_with_retry(topic.clone()).await {
540                                    log::error!("Failed to send subscription after retries: topic={topic}, error={e}");
541                                }
542                            }
543                        }
544                        HandlerCommand::Unsubscribe { topics } => {
545                            for topic in topics {
546                                log::debug!("Unsubscribing from topic: topic={topic}");
547                                if let Err(e) = self.send_with_retry(topic.clone()).await {
548                                    log::error!("Failed to send unsubscription after retries: topic={topic}, error={e}");
549                                }
550                            }
551                        }
552                        HandlerCommand::SendText { payload } => {
553                            if let Err(e) = self.send_with_retry(payload).await {
554                                log::error!("Error sending text with retry: {e}");
555                            }
556                        }
557                        HandlerCommand::InitializeInstruments(instruments) => {
558                            for inst in instruments {
559                                self.instruments_cache.insert(inst.symbol().inner(), inst);
560                            }
561                        }
562                        HandlerCommand::UpdateInstrument(inst) => {
563                            self.instruments_cache.insert(inst.symbol().inner(), inst);
564                        }
565                        HandlerCommand::RegisterBatchPlace { req_id, orders } => {
566                            log::debug!(
567                                "Registering batch place request: req_id={req_id}, num_orders={}",
568                                orders.len()
569                            );
570                            self.pending_batch_place_requests.insert(req_id, orders);
571                        }
572                        HandlerCommand::RegisterBatchCancel { req_id, cancels } => {
573                            log::debug!(
574                                "Registering batch cancel request: req_id={req_id}, num_cancels={}",
575                                cancels.len()
576                            );
577                            self.pending_batch_cancel_requests.insert(req_id, cancels);
578                        }
579                        HandlerCommand::PlaceOrder {
580                            params,
581                            client_order_id,
582                            trader_id,
583                            strategy_id,
584                            instrument_id,
585                        } => {
586                            let request_id = self.generate_unique_request_id();
587
588                            self.pending_place_requests.insert(
589                                request_id.clone(),
590                                (client_order_id, trader_id, strategy_id, instrument_id),
591                            );
592
593                            let referer = if self.include_referer_header(params.time_in_force) {
594                                Some(BYBIT_NAUTILUS_BROKER_ID.to_string())
595                            } else {
596                                None
597                            };
598
599                            let request = BybitWsRequest {
600                                req_id: Some(request_id.clone()),
601                                op: BybitWsOrderRequestOp::Create,
602                                header: BybitWsHeader::with_referer(referer),
603                                args: vec![params],
604                            };
605
606                            if let Ok(payload) = serde_json::to_string(&request)
607                                && let Err(e) = self.send_with_retry(payload).await
608                            {
609                                log::error!("Failed to send place order after retries: {e}");
610                                self.pending_place_requests.remove(&request_id);
611                            }
612                        }
613                        HandlerCommand::AmendOrder {
614                            params,
615                            client_order_id,
616                            trader_id,
617                            strategy_id,
618                            instrument_id,
619                            venue_order_id,
620                        } => {
621                            let request_id = self.generate_unique_request_id();
622
623                            self.pending_amend_requests.insert(
624                                request_id.clone(),
625                                (client_order_id, trader_id, strategy_id, instrument_id, venue_order_id),
626                            );
627
628                            let request = BybitWsRequest {
629                                req_id: Some(request_id.clone()),
630                                op: BybitWsOrderRequestOp::Amend,
631                                header: BybitWsHeader::now(),
632                                args: vec![params],
633                            };
634
635                            if let Ok(payload) = serde_json::to_string(&request)
636                                && let Err(e) = self.send_with_retry(payload).await
637                            {
638                                log::error!("Failed to send amend order after retries: {e}");
639                                self.pending_amend_requests.remove(&request_id);
640                            }
641                        }
642                        HandlerCommand::CancelOrder {
643                            params,
644                            client_order_id,
645                            trader_id,
646                            strategy_id,
647                            instrument_id,
648                            venue_order_id,
649                        } => {
650                            let request_id = self.generate_unique_request_id();
651
652                            self.pending_cancel_requests.insert(
653                                request_id.clone(),
654                                (client_order_id, trader_id, strategy_id, instrument_id, venue_order_id),
655                            );
656
657                            let request = BybitWsRequest {
658                                req_id: Some(request_id.clone()),
659                                op: BybitWsOrderRequestOp::Cancel,
660                                header: BybitWsHeader::now(),
661                                args: vec![params],
662                            };
663
664                            if let Ok(payload) = serde_json::to_string(&request)
665                                && let Err(e) = self.send_with_retry(payload).await
666                            {
667                                log::error!("Failed to send cancel order after retries: {e}");
668                                self.pending_cancel_requests.remove(&request_id);
669                            }
670                        }
671                    }
672
673                    continue;
674                }
675
676                () = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
677                    if self.signal.load(Ordering::Relaxed) {
678                        log::debug!("Stop signal received during idle period");
679                        return None;
680                    }
681                    continue;
682                }
683
684                msg = self.raw_rx.recv() => {
685                    let msg = match msg {
686                        Some(msg) => msg,
687                        None => {
688                            log::debug!("WebSocket stream closed");
689                            return None;
690                        }
691                    };
692
693                    if let Message::Ping(data) = &msg {
694                        log::trace!("Received ping frame with {} bytes", data.len());
695
696                        if let Some(client) = &self.client
697                            && let Err(e) = client.send_pong(data.to_vec()).await
698                        {
699                            log::warn!("Failed to send pong frame: error={e}");
700                        }
701                        continue;
702                    }
703
704                    let event = match Self::parse_raw_message(msg) {
705                        Some(event) => event,
706                        None => continue,
707                    };
708
709                    if self.signal.load(Ordering::Relaxed) {
710                        log::debug!("Stop signal received");
711                        return None;
712                    }
713
714                    let ts_init = clock.get_time_ns();
715                    let instruments = self.instruments_cache.clone();
716                    let funding_cache = Arc::clone(&self.funding_cache);
717                    let nautilus_messages = self.parse_to_nautilus_messages(
718                        event,
719                        &instruments,
720                        self.account_id,
721                        self.product_type,
722                        &funding_cache,
723                        ts_init,
724                    )
725                    .await;
726
727                    // Enqueue all parsed messages to emit them one by one
728                    self.message_queue.extend(nautilus_messages);
729                }
730            }
731        }
732    }
733
734    fn parse_raw_message(msg: Message) -> Option<BybitWsMessage> {
735        use serde_json::Value;
736
737        match msg {
738            Message::Text(text) => {
739                if text == nautilus_network::RECONNECTED {
740                    log::info!("Received WebSocket reconnected signal");
741                    return Some(BybitWsMessage::Reconnected);
742                }
743
744                if text.trim().eq_ignore_ascii_case("pong") {
745                    return None;
746                }
747
748                log::trace!("Raw websocket message: {text}");
749
750                let value: Value = match serde_json::from_str(&text) {
751                    Ok(v) => v,
752                    Err(e) => {
753                        log::error!("Failed to parse WebSocket message: {e}: {text}");
754                        return None;
755                    }
756                };
757
758                Some(classify_bybit_message(value))
759            }
760            Message::Binary(msg) => {
761                log::debug!("Raw binary: {msg:?}");
762                None
763            }
764            Message::Close(_) => {
765                log::debug!("Received close message, waiting for reconnection");
766                None
767            }
768            _ => None,
769        }
770    }
771
772    #[allow(clippy::too_many_arguments)]
773    async fn parse_to_nautilus_messages(
774        &mut self,
775        msg: BybitWsMessage,
776        instruments: &AHashMap<Ustr, InstrumentAny>,
777        account_id: Option<AccountId>,
778        product_type: Option<BybitProductType>,
779        funding_cache: &FundingCache,
780        ts_init: UnixNanos,
781    ) -> Vec<NautilusWsMessage> {
782        let mut result = Vec::new();
783
784        match msg {
785            BybitWsMessage::Orderbook(msg) => {
786                let raw_symbol = msg.data.s;
787                let symbol =
788                    product_type.map_or(raw_symbol, |pt| make_bybit_symbol(raw_symbol, pt));
789
790                if let Some(instrument) = instruments.get(&symbol) {
791                    match parse_orderbook_deltas(&msg, instrument, ts_init) {
792                        Ok(deltas) => result.push(NautilusWsMessage::Deltas(deltas)),
793                        Err(e) => log::error!("Error parsing orderbook deltas: {e}"),
794                    }
795
796                    // For depth=1 subscriptions, also emit QuoteTick from top-of-book
797                    if let Some(depth_str) = msg.topic.as_str().split('.').nth(1)
798                        && depth_str == "1"
799                    {
800                        let instrument_id = instrument.id();
801                        let last_quote = self.quote_cache.get(&instrument_id);
802
803                        match parse_orderbook_quote(&msg, instrument, last_quote, ts_init) {
804                            Ok(quote) => {
805                                self.quote_cache.insert(instrument_id, quote);
806                                result.push(NautilusWsMessage::Data(vec![Data::Quote(quote)]));
807                            }
808                            Err(e) => log::debug!("Skipping orderbook quote: {e}"),
809                        }
810                    }
811                } else {
812                    log::debug!(
813                        "No instrument found for symbol in Orderbook message: raw_symbol={raw_symbol}, full_symbol={symbol}"
814                    );
815                }
816            }
817            BybitWsMessage::Trade(msg) => {
818                let mut data_vec = Vec::new();
819                for trade in &msg.data {
820                    let raw_symbol = trade.s;
821                    let symbol =
822                        product_type.map_or(raw_symbol, |pt| make_bybit_symbol(raw_symbol, pt));
823
824                    if let Some(instrument) = instruments.get(&symbol) {
825                        match parse_ws_trade_tick(trade, instrument, ts_init) {
826                            Ok(tick) => data_vec.push(Data::Trade(tick)),
827                            Err(e) => log::error!("Error parsing trade tick: {e}"),
828                        }
829                    } else {
830                        log::debug!(
831                            "No instrument found for symbol in Trade message: raw_symbol={raw_symbol}, full_symbol={symbol}"
832                        );
833                    }
834                }
835
836                if !data_vec.is_empty() {
837                    result.push(NautilusWsMessage::Data(data_vec));
838                }
839            }
840            BybitWsMessage::Kline(msg) => {
841                let bar_type = match self.bar_types_cache.get(msg.topic.as_str()) {
842                    Some(bt) => *bt,
843                    None => {
844                        log::debug!("No bar type subscription found for topic: {}", msg.topic);
845                        return result;
846                    }
847                };
848
849                let symbol = Ustr::from(bar_type.instrument_id().symbol.as_str());
850                let instrument = match instruments.get(&symbol) {
851                    Some(inst) => inst,
852                    None => {
853                        log::debug!(
854                            "No instrument found for bar type: {}",
855                            bar_type.instrument_id()
856                        );
857                        return result;
858                    }
859                };
860
861                let mut data_vec = Vec::new();
862                for kline in &msg.data {
863                    // Only process confirmed bars (not partial/building bars)
864                    if !kline.confirm {
865                        continue;
866                    }
867                    match parse_ws_kline_bar(
868                        kline,
869                        instrument,
870                        bar_type,
871                        self.bars_timestamp_on_close,
872                        ts_init,
873                    ) {
874                        Ok(bar) => data_vec.push(Data::Bar(bar)),
875                        Err(e) => log::error!("Error parsing kline to bar: {e}"),
876                    }
877                }
878                if !data_vec.is_empty() {
879                    result.push(NautilusWsMessage::Data(data_vec));
880                }
881            }
882            BybitWsMessage::TickerLinear(msg) => {
883                let raw_symbol = msg.data.symbol;
884                let symbol =
885                    product_type.map_or(raw_symbol, |pt| make_bybit_symbol(raw_symbol, pt));
886
887                if let Some(instrument) = instruments.get(&symbol) {
888                    let instrument_id = instrument.id();
889                    let ts_event = parse_millis_i64(msg.ts, "ticker.ts").unwrap_or(ts_init);
890                    let price_precision = instrument.price_precision();
891                    let size_precision = instrument.size_precision();
892
893                    // Parse Bybit linear ticker fields, propagate errors
894                    let bid_price = msg
895                        .data
896                        .bid1_price
897                        .as_deref()
898                        .map(|s| parse_price_with_precision(s, price_precision, "bid1Price"))
899                        .transpose();
900                    let ask_price = msg
901                        .data
902                        .ask1_price
903                        .as_deref()
904                        .map(|s| parse_price_with_precision(s, price_precision, "ask1Price"))
905                        .transpose();
906                    let bid_size = msg
907                        .data
908                        .bid1_size
909                        .as_deref()
910                        .map(|s| parse_quantity_with_precision(s, size_precision, "bid1Size"))
911                        .transpose();
912                    let ask_size = msg
913                        .data
914                        .ask1_size
915                        .as_deref()
916                        .map(|s| parse_quantity_with_precision(s, size_precision, "ask1Size"))
917                        .transpose();
918
919                    match (bid_price, ask_price, bid_size, ask_size) {
920                        (Ok(bp), Ok(ap), Ok(bs), Ok(as_)) => {
921                            match self.quote_cache.process(
922                                instrument_id,
923                                bp,
924                                ap,
925                                bs,
926                                as_,
927                                ts_event,
928                                ts_init,
929                            ) {
930                                Ok(quote) => {
931                                    result.push(NautilusWsMessage::Data(vec![Data::Quote(quote)]));
932                                }
933                                Err(e) => {
934                                    let raw_data = serde_json::to_string(&msg.data)
935                                        .unwrap_or_else(|_| "<failed to serialize>".to_string());
936                                    log::debug!(
937                                        "Skipping partial ticker update: {e}, raw_data: {raw_data}"
938                                    );
939                                }
940                            }
941                        }
942                        _ => {
943                            let raw_data = serde_json::to_string(&msg.data)
944                                .unwrap_or_else(|_| "<failed to serialize>".to_string());
945                            log::warn!(
946                                "Failed to parse ticker fields, skipping update, raw_data: {raw_data}"
947                            );
948                        }
949                    }
950
951                    if msg.data.funding_rate.is_some() && msg.data.next_funding_time.is_some() {
952                        let should_publish = {
953                            let cache = funding_cache.read().await;
954                            match cache.get(&symbol) {
955                                Some((cached_rate, cached_time)) => {
956                                    cached_rate.as_ref() != msg.data.funding_rate.as_ref()
957                                        || cached_time.as_ref()
958                                            != msg.data.next_funding_time.as_ref()
959                                }
960                                None => true,
961                            }
962                        };
963
964                        if should_publish {
965                            match parse_ticker_linear_funding(
966                                &msg.data,
967                                instrument_id,
968                                ts_event,
969                                ts_init,
970                            ) {
971                                Ok(funding) => {
972                                    let cache_key = (
973                                        msg.data.funding_rate.clone(),
974                                        msg.data.next_funding_time.clone(),
975                                    );
976                                    funding_cache.write().await.insert(symbol, cache_key);
977                                    result.push(NautilusWsMessage::FundingRates(vec![funding]));
978                                }
979                                Err(e) => {
980                                    log::debug!("Skipping funding rate update: {e}");
981                                }
982                            }
983                        }
984                    }
985
986                    if msg.data.mark_price.is_some() {
987                        match parse_ticker_linear_mark_price(
988                            &msg.data, instrument, ts_event, ts_init,
989                        ) {
990                            Ok(mark_price) => {
991                                result.push(NautilusWsMessage::MarkPrices(vec![mark_price]));
992                            }
993                            Err(e) => {
994                                log::debug!("Skipping mark price update: {e}");
995                            }
996                        }
997                    }
998
999                    if msg.data.index_price.is_some() {
1000                        match parse_ticker_linear_index_price(
1001                            &msg.data, instrument, ts_event, ts_init,
1002                        ) {
1003                            Ok(index_price) => {
1004                                result.push(NautilusWsMessage::IndexPrices(vec![index_price]));
1005                            }
1006                            Err(e) => {
1007                                log::debug!("Skipping index price update: {e}");
1008                            }
1009                        }
1010                    }
1011                } else {
1012                    log::debug!(
1013                        "No instrument found for symbol in TickerLinear message: raw_symbol={raw_symbol}, full_symbol={symbol}"
1014                    );
1015                }
1016            }
1017            BybitWsMessage::TickerOption(msg) => {
1018                let raw_symbol = &msg.data.symbol;
1019                let symbol = product_type.map_or_else(
1020                    || raw_symbol.as_str().into(),
1021                    |pt| make_bybit_symbol(raw_symbol, pt),
1022                );
1023
1024                if let Some(instrument) = instruments.get(&symbol) {
1025                    let instrument_id = instrument.id();
1026                    let ts_event = parse_millis_i64(msg.ts, "ticker.ts").unwrap_or(ts_init);
1027                    let price_precision = instrument.price_precision();
1028                    let size_precision = instrument.size_precision();
1029
1030                    // Parse Bybit option ticker fields (always complete), propagate errors
1031                    let bid_price = parse_price_with_precision(
1032                        &msg.data.bid_price,
1033                        price_precision,
1034                        "bidPrice",
1035                    );
1036                    let ask_price = parse_price_with_precision(
1037                        &msg.data.ask_price,
1038                        price_precision,
1039                        "askPrice",
1040                    );
1041                    let bid_size = parse_quantity_with_precision(
1042                        &msg.data.bid_size,
1043                        size_precision,
1044                        "bidSize",
1045                    );
1046                    let ask_size = parse_quantity_with_precision(
1047                        &msg.data.ask_size,
1048                        size_precision,
1049                        "askSize",
1050                    );
1051
1052                    match (bid_price, ask_price, bid_size, ask_size) {
1053                        (Ok(bp), Ok(ap), Ok(bs), Ok(as_)) => {
1054                            match self.quote_cache.process(
1055                                instrument_id,
1056                                Some(bp),
1057                                Some(ap),
1058                                Some(bs),
1059                                Some(as_),
1060                                ts_event,
1061                                ts_init,
1062                            ) {
1063                                Ok(quote) => {
1064                                    result.push(NautilusWsMessage::Data(vec![Data::Quote(quote)]));
1065                                }
1066                                Err(e) => {
1067                                    let raw_data = serde_json::to_string(&msg.data)
1068                                        .unwrap_or_else(|_| "<failed to serialize>".to_string());
1069                                    log::debug!(
1070                                        "Skipping partial ticker update: {e}, raw_data: {raw_data}"
1071                                    );
1072                                }
1073                            }
1074                        }
1075                        _ => {
1076                            let raw_data = serde_json::to_string(&msg.data)
1077                                .unwrap_or_else(|_| "<failed to serialize>".to_string());
1078                            log::warn!(
1079                                "Failed to parse ticker fields, skipping update, raw_data: {raw_data}"
1080                            );
1081                        }
1082                    }
1083
1084                    match parse_ticker_option_mark_price(&msg, instrument, ts_init) {
1085                        Ok(mark_price) => {
1086                            result.push(NautilusWsMessage::MarkPrices(vec![mark_price]));
1087                        }
1088                        Err(e) => {
1089                            log::debug!("Skipping option mark price update: {e}");
1090                        }
1091                    }
1092
1093                    match parse_ticker_option_index_price(&msg, instrument, ts_init) {
1094                        Ok(index_price) => {
1095                            result.push(NautilusWsMessage::IndexPrices(vec![index_price]));
1096                        }
1097                        Err(e) => {
1098                            log::debug!("Skipping option index price update: {e}");
1099                        }
1100                    }
1101                } else {
1102                    log::debug!(
1103                        "No instrument found for symbol in TickerOption message: raw_symbol={raw_symbol}, full_symbol={symbol}"
1104                    );
1105                }
1106            }
1107            BybitWsMessage::AccountOrder(msg) => {
1108                if let Some(account_id) = account_id {
1109                    let mut reports = Vec::new();
1110                    for order in &msg.data {
1111                        let raw_symbol = order.symbol;
1112                        let symbol = make_bybit_symbol(raw_symbol, order.category);
1113
1114                        if let Some(instrument) = instruments.get(&symbol) {
1115                            match parse_ws_order_status_report(
1116                                order, instrument, account_id, ts_init,
1117                            ) {
1118                                Ok(report) => reports.push(report),
1119                                Err(e) => log::error!("Error parsing order status report: {e}"),
1120                            }
1121                        } else {
1122                            log::debug!(
1123                                "No instrument found for symbol in AccountOrder message: raw_symbol={raw_symbol}, full_symbol={symbol}"
1124                            );
1125                        }
1126                    }
1127                    if !reports.is_empty() {
1128                        result.push(NautilusWsMessage::OrderStatusReports(reports));
1129                    }
1130                }
1131            }
1132            BybitWsMessage::AccountExecution(msg) => {
1133                if let Some(account_id) = account_id {
1134                    let mut reports = Vec::new();
1135                    for execution in &msg.data {
1136                        let raw_symbol = execution.symbol;
1137                        let symbol = make_bybit_symbol(raw_symbol, execution.category);
1138
1139                        if let Some(instrument) = instruments.get(&symbol) {
1140                            match parse_ws_fill_report(execution, account_id, instrument, ts_init) {
1141                                Ok(report) => reports.push(report),
1142                                Err(e) => log::error!("Error parsing fill report: {e}"),
1143                            }
1144                        } else {
1145                            log::debug!(
1146                                "No instrument found for symbol in AccountExecution message: raw_symbol={raw_symbol}, full_symbol={symbol}"
1147                            );
1148                        }
1149                    }
1150                    if !reports.is_empty() {
1151                        result.push(NautilusWsMessage::FillReports(reports));
1152                    }
1153                }
1154            }
1155            BybitWsMessage::AccountPosition(msg) => {
1156                if let Some(account_id) = account_id {
1157                    for position in &msg.data {
1158                        let raw_symbol = position.symbol;
1159                        let symbol = make_bybit_symbol(raw_symbol, position.category);
1160
1161                        if let Some(instrument) = instruments.get(&symbol) {
1162                            match parse_ws_position_status_report(
1163                                position, account_id, instrument, ts_init,
1164                            ) {
1165                                Ok(report) => {
1166                                    result.push(NautilusWsMessage::PositionStatusReport(report));
1167                                }
1168                                Err(e) => {
1169                                    log::error!("Error parsing position status report: {e}");
1170                                }
1171                            }
1172                        } else {
1173                            log::debug!(
1174                                "No instrument found for symbol in AccountPosition message: raw_symbol={raw_symbol}, full_symbol={symbol}"
1175                            );
1176                        }
1177                    }
1178                }
1179            }
1180            BybitWsMessage::AccountWallet(msg) => {
1181                if let Some(account_id) = account_id {
1182                    for wallet in &msg.data {
1183                        let ts_event = UnixNanos::from(msg.creation_time as u64 * 1_000_000);
1184
1185                        match parse_ws_account_state(wallet, account_id, ts_event, ts_init) {
1186                            Ok(state) => result.push(NautilusWsMessage::AccountState(state)),
1187                            Err(e) => log::error!("Error parsing account state: {e}"),
1188                        }
1189                    }
1190                }
1191            }
1192            BybitWsMessage::OrderResponse(resp) => {
1193                if resp.ret_code == 0 {
1194                    log::debug!(
1195                        "Order operation successful: op={}, ret_msg={}",
1196                        resp.op,
1197                        resp.ret_msg
1198                    );
1199
1200                    if resp.op.contains("batch") {
1201                        self.handle_batch_response(&resp, &mut result);
1202                    } else if let Some(req_id) = &resp.req_id {
1203                        if resp.op.contains("create") {
1204                            self.pending_place_requests.remove(req_id);
1205                        } else if resp.op.contains("cancel") {
1206                            self.pending_cancel_requests.remove(req_id);
1207                        } else if resp.op.contains("amend") {
1208                            self.pending_amend_requests.remove(req_id);
1209                        }
1210                    } else if let Some(order_link_id) =
1211                        resp.data.get("orderLinkId").and_then(|v| v.as_str())
1212                    {
1213                        // Bybit sometimes omits req_id, search by client_order_id instead
1214                        let client_order_id = ClientOrderId::from(order_link_id);
1215                        if resp.op.contains("create") {
1216                            self.find_and_remove_place_request_by_client_order_id(&client_order_id);
1217                        } else if resp.op.contains("cancel") {
1218                            self.find_and_remove_cancel_request_by_client_order_id(
1219                                &client_order_id,
1220                            );
1221                        } else if resp.op.contains("amend") {
1222                            self.find_and_remove_amend_request_by_client_order_id(&client_order_id);
1223                        }
1224                    }
1225                } else if let Some(req_id) = &resp.req_id {
1226                    let clock = get_atomic_clock_realtime();
1227                    let ts_init = clock.get_time_ns();
1228
1229                    if resp.op.contains("batch") {
1230                        self.handle_batch_failure(
1231                            req_id,
1232                            &resp.ret_msg,
1233                            &resp.op,
1234                            ts_init,
1235                            &mut result,
1236                        );
1237                    } else if resp.op.contains("create")
1238                        && let Some((_, (client_order_id, trader_id, strategy_id, instrument_id))) =
1239                            self.pending_place_requests.remove(req_id)
1240                    {
1241                        let Some(account_id) = self.account_id else {
1242                            log::error!(
1243                                "Cannot create OrderRejected event: account_id is None: request_id={req_id}, reason={}",
1244                                resp.ret_msg
1245                            );
1246                            return result;
1247                        };
1248
1249                        let rejected = OrderRejected::new(
1250                            trader_id,
1251                            strategy_id,
1252                            instrument_id,
1253                            client_order_id,
1254                            account_id,
1255                            Ustr::from(&resp.ret_msg),
1256                            UUID4::new(),
1257                            ts_init,
1258                            ts_init,
1259                            false,
1260                            false,
1261                        );
1262                        result.push(NautilusWsMessage::OrderRejected(rejected));
1263                    } else if resp.op.contains("cancel")
1264                        && let Some((
1265                            _,
1266                            (
1267                                client_order_id,
1268                                trader_id,
1269                                strategy_id,
1270                                instrument_id,
1271                                venue_order_id,
1272                            ),
1273                        )) = self.pending_cancel_requests.remove(req_id)
1274                    {
1275                        let rejected = OrderCancelRejected::new(
1276                            trader_id,
1277                            strategy_id,
1278                            instrument_id,
1279                            client_order_id,
1280                            Ustr::from(&resp.ret_msg),
1281                            UUID4::new(),
1282                            ts_init,
1283                            ts_init,
1284                            false,
1285                            venue_order_id,
1286                            self.account_id,
1287                        );
1288                        result.push(NautilusWsMessage::OrderCancelRejected(rejected));
1289                    } else if resp.op.contains("amend")
1290                        && let Some((
1291                            _,
1292                            (
1293                                client_order_id,
1294                                trader_id,
1295                                strategy_id,
1296                                instrument_id,
1297                                venue_order_id,
1298                            ),
1299                        )) = self.pending_amend_requests.remove(req_id)
1300                    {
1301                        let rejected = OrderModifyRejected::new(
1302                            trader_id,
1303                            strategy_id,
1304                            instrument_id,
1305                            client_order_id,
1306                            Ustr::from(&resp.ret_msg),
1307                            UUID4::new(),
1308                            ts_init,
1309                            ts_init,
1310                            false,
1311                            venue_order_id,
1312                            self.account_id,
1313                        );
1314                        result.push(NautilusWsMessage::OrderModifyRejected(rejected));
1315                    }
1316                } else if let Some(order_link_id) =
1317                    resp.data.get("orderLinkId").and_then(|v| v.as_str())
1318                {
1319                    // Bybit sometimes omits req_id, search by client_order_id instead
1320                    let clock = get_atomic_clock_realtime();
1321                    let ts_init = clock.get_time_ns();
1322                    let client_order_id = ClientOrderId::from(order_link_id);
1323
1324                    if resp.op.contains("create") {
1325                        if let Some((_, (_, trader_id, strategy_id, instrument_id))) =
1326                            self.find_and_remove_place_request_by_client_order_id(&client_order_id)
1327                        {
1328                            let Some(account_id) = self.account_id else {
1329                                log::error!(
1330                                    "Cannot create OrderRejected event: account_id is None: client_order_id={client_order_id}, reason={}",
1331                                    resp.ret_msg
1332                                );
1333                                return result;
1334                            };
1335
1336                            let rejected = OrderRejected::new(
1337                                trader_id,
1338                                strategy_id,
1339                                instrument_id,
1340                                client_order_id,
1341                                account_id,
1342                                Ustr::from(&resp.ret_msg),
1343                                UUID4::new(),
1344                                ts_init,
1345                                ts_init,
1346                                false,
1347                                false,
1348                            );
1349                            result.push(NautilusWsMessage::OrderRejected(rejected));
1350                        }
1351                    } else if resp.op.contains("cancel") {
1352                        if let Some((
1353                            _,
1354                            (_, trader_id, strategy_id, instrument_id, venue_order_id),
1355                        )) =
1356                            self.find_and_remove_cancel_request_by_client_order_id(&client_order_id)
1357                        {
1358                            let rejected = OrderCancelRejected::new(
1359                                trader_id,
1360                                strategy_id,
1361                                instrument_id,
1362                                client_order_id,
1363                                Ustr::from(&resp.ret_msg),
1364                                UUID4::new(),
1365                                ts_init,
1366                                ts_init,
1367                                false,
1368                                venue_order_id,
1369                                self.account_id,
1370                            );
1371                            result.push(NautilusWsMessage::OrderCancelRejected(rejected));
1372                        }
1373                    } else if resp.op.contains("amend")
1374                        && let Some((_, (_, trader_id, strategy_id, instrument_id, venue_order_id))) =
1375                            self.find_and_remove_amend_request_by_client_order_id(&client_order_id)
1376                    {
1377                        let rejected = OrderModifyRejected::new(
1378                            trader_id,
1379                            strategy_id,
1380                            instrument_id,
1381                            client_order_id,
1382                            Ustr::from(&resp.ret_msg),
1383                            UUID4::new(),
1384                            ts_init,
1385                            ts_init,
1386                            false,
1387                            venue_order_id,
1388                            self.account_id,
1389                        );
1390                        result.push(NautilusWsMessage::OrderModifyRejected(rejected));
1391                    }
1392                } else {
1393                    log::warn!(
1394                        "Order operation failed but request_id could not be extracted from response: op={}, ret_code={}, ret_msg={}",
1395                        resp.op,
1396                        resp.ret_code,
1397                        resp.ret_msg
1398                    );
1399                }
1400            }
1401            BybitWsMessage::Auth(auth_response) => {
1402                let is_success =
1403                    auth_response.success.unwrap_or(false) || (auth_response.ret_code == Some(0));
1404
1405                if is_success {
1406                    self.auth_tracker.succeed();
1407                    log::info!("WebSocket authenticated");
1408                    result.push(NautilusWsMessage::Authenticated);
1409                } else {
1410                    let error_msg = auth_response
1411                        .ret_msg
1412                        .as_deref()
1413                        .unwrap_or("Authentication rejected");
1414                    self.auth_tracker.fail(error_msg);
1415                    log::error!("WebSocket authentication failed: error={error_msg}");
1416                    result.push(NautilusWsMessage::Error(BybitWebSocketError::from_message(
1417                        error_msg.to_string(),
1418                    )));
1419                }
1420            }
1421            BybitWsMessage::Error(err) => {
1422                result.push(NautilusWsMessage::Error(err));
1423            }
1424            BybitWsMessage::Reconnected => {
1425                self.quote_cache.clear();
1426                result.push(NautilusWsMessage::Reconnected);
1427            }
1428            BybitWsMessage::Subscription(sub_msg) => {
1429                let pending_topics = self.subscriptions.pending_subscribe_topics();
1430                match sub_msg.op {
1431                    BybitWsOperation::Subscribe => {
1432                        if sub_msg.success {
1433                            for topic in pending_topics {
1434                                self.subscriptions.confirm_subscribe(&topic);
1435                                log::debug!("Subscription confirmed: topic={topic}");
1436                            }
1437                        } else {
1438                            for topic in pending_topics {
1439                                self.subscriptions.mark_failure(&topic);
1440                                log::warn!(
1441                                    "Subscription failed, will retry on reconnect: topic={topic}, error={:?}",
1442                                    sub_msg.ret_msg
1443                                );
1444                            }
1445                        }
1446                    }
1447                    BybitWsOperation::Unsubscribe => {
1448                        let pending_unsub = self.subscriptions.pending_unsubscribe_topics();
1449                        if sub_msg.success {
1450                            for topic in pending_unsub {
1451                                self.subscriptions.confirm_unsubscribe(&topic);
1452                                log::debug!("Unsubscription confirmed: topic={topic}");
1453                            }
1454                        } else {
1455                            for topic in pending_unsub {
1456                                log::warn!(
1457                                    "Unsubscription failed: topic={topic}, error={:?}",
1458                                    sub_msg.ret_msg
1459                                );
1460                            }
1461                        }
1462                    }
1463                    _ => {}
1464                }
1465            }
1466            _ => {}
1467        }
1468
1469        result
1470    }
1471}
1472
1473/// Classifies a parsed JSON value into a typed Bybit WebSocket message.
1474///
1475/// Returns `Raw(value)` if no specific type matches.
1476pub fn classify_bybit_message(value: serde_json::Value) -> BybitWsMessage {
1477    if let Some(op_val) = value.get("op") {
1478        if let Ok(op) = serde_json::from_value::<BybitWsOperation>(op_val.clone())
1479            && op == BybitWsOperation::Auth
1480            && let Ok(auth) = serde_json::from_value::<BybitWsAuthResponse>(value.clone())
1481        {
1482            let is_success = auth.success.unwrap_or(false) || auth.ret_code.unwrap_or(-1) == 0;
1483            if is_success {
1484                return BybitWsMessage::Auth(auth);
1485            }
1486            let resp = BybitWsResponse {
1487                op: Some(auth.op.clone()),
1488                topic: None,
1489                success: auth.success,
1490                conn_id: auth.conn_id.clone(),
1491                req_id: None,
1492                ret_code: auth.ret_code,
1493                ret_msg: auth.ret_msg,
1494            };
1495            let error = BybitWebSocketError::from_response(&resp);
1496            return BybitWsMessage::Error(error);
1497        }
1498
1499        if let Some(op_str) = op_val.as_str()
1500            && op_str.starts_with("order.")
1501        {
1502            return serde_json::from_value::<BybitWsOrderResponse>(value.clone()).map_or_else(
1503                |_| BybitWsMessage::Raw(value),
1504                BybitWsMessage::OrderResponse,
1505            );
1506        }
1507    }
1508
1509    if let Some(success) = value.get("success").and_then(serde_json::Value::as_bool) {
1510        if success {
1511            return serde_json::from_value::<BybitWsSubscriptionMsg>(value.clone())
1512                .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::Subscription);
1513        }
1514        return serde_json::from_value::<BybitWsResponse>(value.clone()).map_or_else(
1515            |_| BybitWsMessage::Raw(value),
1516            |resp| {
1517                let error = BybitWebSocketError::from_response(&resp);
1518                BybitWsMessage::Error(error)
1519            },
1520        );
1521    }
1522
1523    // Most common path for market data
1524    if let Some(topic) = value.get("topic").and_then(serde_json::Value::as_str) {
1525        if topic.starts_with(BYBIT_TOPIC_ORDERBOOK) {
1526            return serde_json::from_value(value.clone())
1527                .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::Orderbook);
1528        }
1529
1530        if topic.contains(BYBIT_TOPIC_PUBLIC_TRADE) || topic.starts_with(BYBIT_TOPIC_TRADE) {
1531            return serde_json::from_value(value.clone())
1532                .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::Trade);
1533        }
1534
1535        if topic.starts_with(BYBIT_TOPIC_KLINE) {
1536            return serde_json::from_value(value.clone())
1537                .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::Kline);
1538        }
1539
1540        if topic.starts_with(BYBIT_TOPIC_TICKERS) {
1541            // Option symbols: BTC-6JAN23-17500-C (date, strike, C/P)
1542            let is_option = value
1543                .get("data")
1544                .and_then(|d| d.get("symbol"))
1545                .and_then(|s| s.as_str())
1546                .is_some_and(|symbol| symbol.contains('-') && symbol.matches('-').count() >= 3);
1547
1548            if is_option {
1549                return serde_json::from_value(value.clone())
1550                    .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::TickerOption);
1551            }
1552            return serde_json::from_value(value.clone())
1553                .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::TickerLinear);
1554        }
1555
1556        if topic.starts_with(BYBIT_TOPIC_ORDER) {
1557            return serde_json::from_value(value.clone())
1558                .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::AccountOrder);
1559        }
1560
1561        if topic.starts_with(BYBIT_TOPIC_EXECUTION) {
1562            return serde_json::from_value(value.clone()).map_or_else(
1563                |_| BybitWsMessage::Raw(value),
1564                BybitWsMessage::AccountExecution,
1565            );
1566        }
1567
1568        if topic.starts_with(BYBIT_TOPIC_WALLET) {
1569            return serde_json::from_value(value.clone()).map_or_else(
1570                |_| BybitWsMessage::Raw(value),
1571                BybitWsMessage::AccountWallet,
1572            );
1573        }
1574
1575        if topic.starts_with(BYBIT_TOPIC_POSITION) {
1576            return serde_json::from_value(value.clone()).map_or_else(
1577                |_| BybitWsMessage::Raw(value),
1578                BybitWsMessage::AccountPosition,
1579            );
1580        }
1581    }
1582
1583    BybitWsMessage::Raw(value)
1584}
1585
1586#[cfg(test)]
1587mod tests {
1588    use rstest::rstest;
1589
1590    use super::*;
1591    use crate::common::consts::BYBIT_WS_TOPIC_DELIMITER;
1592
1593    fn create_test_handler() -> FeedHandler {
1594        let signal = Arc::new(AtomicBool::new(false));
1595        let (_cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
1596        let (_raw_tx, raw_rx) = tokio::sync::mpsc::unbounded_channel();
1597        let (out_tx, _out_rx) = tokio::sync::mpsc::unbounded_channel();
1598        let auth_tracker = AuthTracker::new();
1599        let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER);
1600        let funding_cache = Arc::new(tokio::sync::RwLock::new(AHashMap::new()));
1601        let bar_types_cache = Arc::new(DashMap::new());
1602
1603        FeedHandler::new(
1604            signal,
1605            cmd_rx,
1606            raw_rx,
1607            out_tx,
1608            None,
1609            None,
1610            true,
1611            Arc::new(AtomicU8::new(0)),
1612            auth_tracker,
1613            subscriptions,
1614            funding_cache,
1615            bar_types_cache,
1616        )
1617    }
1618
1619    #[rstest]
1620    fn test_generate_unique_request_id_returns_different_ids() {
1621        let handler = create_test_handler();
1622
1623        let id1 = handler.generate_unique_request_id();
1624        let id2 = handler.generate_unique_request_id();
1625        let id3 = handler.generate_unique_request_id();
1626
1627        assert_ne!(id1, id2);
1628        assert_ne!(id2, id3);
1629        assert_ne!(id1, id3);
1630    }
1631
1632    #[rstest]
1633    fn test_generate_unique_request_id_produces_valid_uuids() {
1634        let handler = create_test_handler();
1635
1636        let id1 = handler.generate_unique_request_id();
1637        let id2 = handler.generate_unique_request_id();
1638
1639        assert!(UUID4::from(id1.as_str()).to_string() == id1);
1640        assert!(UUID4::from(id2.as_str()).to_string() == id2);
1641    }
1642
1643    #[rstest]
1644    fn test_multiple_place_orders_use_different_request_ids() {
1645        let handler = create_test_handler();
1646
1647        let req_id_1 = handler.generate_unique_request_id();
1648        let req_id_2 = handler.generate_unique_request_id();
1649        let req_id_3 = handler.generate_unique_request_id();
1650
1651        assert_ne!(req_id_1, req_id_2);
1652        assert_ne!(req_id_2, req_id_3);
1653        assert_ne!(req_id_1, req_id_3);
1654    }
1655
1656    #[rstest]
1657    fn test_multiple_amends_use_different_request_ids() {
1658        let handler = create_test_handler();
1659
1660        // Verifies fix for "Duplicate reqId" errors when amending same order multiple times
1661        let req_id_1 = handler.generate_unique_request_id();
1662        let req_id_2 = handler.generate_unique_request_id();
1663        let req_id_3 = handler.generate_unique_request_id();
1664
1665        assert_ne!(
1666            req_id_1, req_id_2,
1667            "Multiple amends should generate different request IDs to avoid 'Duplicate reqId' errors"
1668        );
1669        assert_ne!(
1670            req_id_2, req_id_3,
1671            "Multiple amends should generate different request IDs to avoid 'Duplicate reqId' errors"
1672        );
1673    }
1674
1675    #[rstest]
1676    fn test_multiple_cancels_use_different_request_ids() {
1677        let handler = create_test_handler();
1678
1679        let req_id_1 = handler.generate_unique_request_id();
1680        let req_id_2 = handler.generate_unique_request_id();
1681
1682        assert_ne!(
1683            req_id_1, req_id_2,
1684            "Multiple cancels should generate different request IDs"
1685        );
1686    }
1687
1688    #[rstest]
1689    fn test_concurrent_request_id_generation() {
1690        let handler = create_test_handler();
1691
1692        let mut ids = std::collections::HashSet::new();
1693        for _ in 0..100 {
1694            let id = handler.generate_unique_request_id();
1695            assert!(
1696                ids.insert(id.clone()),
1697                "Generated duplicate request ID: {id}"
1698            );
1699        }
1700        assert_eq!(ids.len(), 100);
1701    }
1702}