nautilus_bybit/websocket/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket message handler for Bybit.
17
18use std::{
19    collections::VecDeque,
20    num::NonZero,
21    sync::{
22        Arc,
23        atomic::{AtomicBool, AtomicU8, Ordering},
24    },
25};
26
27use ahash::AHashMap;
28use dashmap::DashMap;
29use nautilus_common::cache::quote::QuoteCache;
30use nautilus_core::{UUID4, nanos::UnixNanos, time::get_atomic_clock_realtime};
31use nautilus_model::{
32    data::{BarSpecification, BarType, Data},
33    enums::{AggregationSource, BarAggregation, PriceType},
34    events::{OrderCancelRejected, OrderModifyRejected, OrderRejected},
35    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
36    instruments::{Instrument, InstrumentAny},
37};
38use nautilus_network::{
39    retry::{RetryManager, create_websocket_retry_manager},
40    websocket::{AuthTracker, SubscriptionState, WebSocketClient},
41};
42use tokio_tungstenite::tungstenite::Message;
43use ustr::Ustr;
44
45use super::{
46    enums::BybitWsOperation,
47    error::{BybitWsError, create_bybit_timeout_error, should_retry_bybit_error},
48    messages::{
49        BybitWebSocketError, BybitWsAuthResponse, BybitWsHeader, BybitWsMessage, BybitWsRequest,
50        BybitWsResponse, BybitWsSubscriptionMsg, NautilusWsMessage,
51    },
52    parse::{
53        parse_kline_topic, parse_millis_i64, parse_orderbook_deltas, parse_orderbook_quote,
54        parse_ticker_linear_funding, parse_ws_account_state, parse_ws_fill_report,
55        parse_ws_kline_bar, parse_ws_order_status_report, parse_ws_position_status_report,
56        parse_ws_trade_tick,
57    },
58};
59use crate::{
60    common::{
61        consts::{
62            BYBIT_NAUTILUS_BROKER_ID, BYBIT_TOPIC_EXECUTION, BYBIT_TOPIC_KLINE, BYBIT_TOPIC_ORDER,
63            BYBIT_TOPIC_ORDERBOOK, BYBIT_TOPIC_POSITION, BYBIT_TOPIC_PUBLIC_TRADE,
64            BYBIT_TOPIC_TICKERS, BYBIT_TOPIC_TRADE, BYBIT_TOPIC_WALLET,
65        },
66        enums::{BybitProductType, BybitTimeInForce, BybitWsOrderRequestOp},
67        parse::{make_bybit_symbol, parse_price_with_precision, parse_quantity_with_precision},
68    },
69    websocket::messages::{
70        BybitBatchOrderError, BybitWsAmendOrderParams, BybitWsCancelOrderParams,
71        BybitWsOrderResponse, BybitWsPlaceOrderParams,
72    },
73};
74
75/// Commands sent from the outer client to the inner message handler.
76#[derive(Debug)]
77#[allow(
78    clippy::large_enum_variant,
79    reason = "Commands are ephemeral and immediately consumed"
80)]
81pub enum HandlerCommand {
82    SetClient(WebSocketClient),
83    Disconnect,
84    Authenticate {
85        payload: String,
86    },
87    Subscribe {
88        topics: Vec<String>,
89    },
90    Unsubscribe {
91        topics: Vec<String>,
92    },
93    SendText {
94        payload: String,
95    },
96    PlaceOrder {
97        params: BybitWsPlaceOrderParams,
98        client_order_id: ClientOrderId,
99        trader_id: TraderId,
100        strategy_id: StrategyId,
101        instrument_id: InstrumentId,
102    },
103    AmendOrder {
104        params: BybitWsAmendOrderParams,
105        client_order_id: ClientOrderId,
106        trader_id: TraderId,
107        strategy_id: StrategyId,
108        instrument_id: InstrumentId,
109        venue_order_id: Option<VenueOrderId>,
110    },
111    CancelOrder {
112        params: BybitWsCancelOrderParams,
113        client_order_id: ClientOrderId,
114        trader_id: TraderId,
115        strategy_id: StrategyId,
116        instrument_id: InstrumentId,
117        venue_order_id: Option<VenueOrderId>,
118    },
119    RegisterBatchPlace {
120        req_id: String,
121        orders: Vec<BatchOrderData>,
122    },
123    RegisterBatchCancel {
124        req_id: String,
125        cancels: Vec<BatchCancelData>,
126    },
127    InitializeInstruments(Vec<InstrumentAny>),
128    UpdateInstrument(InstrumentAny),
129}
130
131/// Type alias for the funding rate cache.
132type FundingCache = Arc<tokio::sync::RwLock<AHashMap<Ustr, (Option<String>, Option<String>)>>>;
133
134/// Data cached for pending place requests to correlate with responses.
135type PlaceRequestData = (ClientOrderId, TraderId, StrategyId, InstrumentId);
136
137/// Data cached for pending cancel requests to correlate with responses.
138type CancelRequestData = (
139    ClientOrderId,
140    TraderId,
141    StrategyId,
142    InstrumentId,
143    Option<VenueOrderId>,
144);
145
146/// Data cached for pending amend requests to correlate with responses.
147type AmendRequestData = (
148    ClientOrderId,
149    TraderId,
150    StrategyId,
151    InstrumentId,
152    Option<VenueOrderId>,
153);
154
155/// Data for a single order in a batch request.
156type BatchOrderData = (ClientOrderId, PlaceRequestData);
157
158/// Data for a single cancel in a batch request.
159type BatchCancelData = (ClientOrderId, CancelRequestData);
160
161pub(super) struct FeedHandler {
162    signal: Arc<AtomicBool>,
163    client: Option<WebSocketClient>,
164    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
165    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
166    out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
167    auth_tracker: AuthTracker,
168    subscriptions: SubscriptionState,
169    instruments_cache: AHashMap<Ustr, InstrumentAny>,
170    account_id: Option<AccountId>,
171    mm_level: Arc<AtomicU8>,
172    product_type: Option<BybitProductType>,
173    bars_timestamp_on_close: bool,
174    quote_cache: QuoteCache,
175    funding_cache: FundingCache,
176    retry_manager: RetryManager<BybitWsError>,
177    pending_place_requests: DashMap<String, PlaceRequestData>,
178    pending_cancel_requests: DashMap<String, CancelRequestData>,
179    pending_amend_requests: DashMap<String, AmendRequestData>,
180    pending_batch_place_requests: DashMap<String, Vec<BatchOrderData>>,
181    pending_batch_cancel_requests: DashMap<String, Vec<BatchCancelData>>,
182    message_queue: VecDeque<NautilusWsMessage>,
183}
184
185impl FeedHandler {
186    /// Creates a new [`FeedHandler`] instance.
187    #[allow(clippy::too_many_arguments)]
188    pub(super) fn new(
189        signal: Arc<AtomicBool>,
190        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
191        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
192        out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
193        account_id: Option<AccountId>,
194        product_type: Option<BybitProductType>,
195        bars_timestamp_on_close: bool,
196        mm_level: Arc<AtomicU8>,
197        auth_tracker: AuthTracker,
198        subscriptions: SubscriptionState,
199        funding_cache: FundingCache,
200    ) -> Self {
201        Self {
202            signal,
203            client: None,
204            cmd_rx,
205            raw_rx,
206            out_tx,
207            auth_tracker,
208            subscriptions,
209            instruments_cache: AHashMap::new(),
210            account_id,
211            mm_level,
212            product_type,
213            bars_timestamp_on_close,
214            quote_cache: QuoteCache::new(),
215            funding_cache,
216            retry_manager: create_websocket_retry_manager(),
217            pending_place_requests: DashMap::new(),
218            pending_cancel_requests: DashMap::new(),
219            pending_amend_requests: DashMap::new(),
220            pending_batch_place_requests: DashMap::new(),
221            pending_batch_cancel_requests: DashMap::new(),
222            message_queue: VecDeque::new(),
223        }
224    }
225
226    pub(super) fn is_stopped(&self) -> bool {
227        self.signal.load(Ordering::Relaxed)
228    }
229
230    #[allow(dead_code)]
231    pub(super) fn send(&self, msg: NautilusWsMessage) -> Result<(), ()> {
232        self.out_tx.send(msg).map_err(|_| ())
233    }
234
235    fn generate_unique_request_id(&self) -> String {
236        UUID4::new().to_string()
237    }
238
239    fn find_and_remove_place_request_by_client_order_id(
240        &self,
241        client_order_id: &ClientOrderId,
242    ) -> Option<(String, PlaceRequestData)> {
243        self.pending_place_requests
244            .iter()
245            .find(|entry| entry.value().0 == *client_order_id)
246            .and_then(|entry| {
247                let key = entry.key().clone();
248                drop(entry);
249                self.pending_place_requests.remove(&key)
250            })
251    }
252
253    fn find_and_remove_cancel_request_by_client_order_id(
254        &self,
255        client_order_id: &ClientOrderId,
256    ) -> Option<(String, CancelRequestData)> {
257        self.pending_cancel_requests
258            .iter()
259            .find(|entry| entry.value().0 == *client_order_id)
260            .and_then(|entry| {
261                let key = entry.key().clone();
262                drop(entry);
263                self.pending_cancel_requests.remove(&key)
264            })
265    }
266
267    fn find_and_remove_amend_request_by_client_order_id(
268        &self,
269        client_order_id: &ClientOrderId,
270    ) -> Option<(String, AmendRequestData)> {
271        self.pending_amend_requests
272            .iter()
273            .find(|entry| entry.value().0 == *client_order_id)
274            .and_then(|entry| {
275                let key = entry.key().clone();
276                drop(entry);
277                self.pending_amend_requests.remove(&key)
278            })
279    }
280
281    fn include_referer_header(&self, time_in_force: Option<BybitTimeInForce>) -> bool {
282        let is_post_only = matches!(time_in_force, Some(BybitTimeInForce::PostOnly));
283        let mm_level = self.mm_level.load(Ordering::Relaxed);
284        !(is_post_only && mm_level > 0)
285    }
286
287    /// Sends a WebSocket message with retry logic.
288    async fn send_with_retry(&self, payload: String) -> Result<(), BybitWsError> {
289        if let Some(client) = &self.client {
290            self.retry_manager
291                .execute_with_retry(
292                    "websocket_send",
293                    || {
294                        let payload = payload.clone();
295                        async move {
296                            client
297                                .send_text(payload, None)
298                                .await
299                                .map_err(|e| BybitWsError::Transport(format!("Send failed: {e}")))
300                        }
301                    },
302                    should_retry_bybit_error,
303                    create_bybit_timeout_error,
304                )
305                .await
306        } else {
307            Err(BybitWsError::ClientError(
308                "No active WebSocket client".to_string(),
309            ))
310        }
311    }
312
313    /// Handles batch operation request-level failures (ret_code != 0).
314    ///
315    /// When a batch request fails entirely, generate rejection events for all orders
316    /// in the batch and clean up tracking data.
317    fn handle_batch_failure(
318        &self,
319        req_id: &str,
320        ret_msg: &str,
321        op: &str,
322        ts_init: UnixNanos,
323        result: &mut Vec<NautilusWsMessage>,
324    ) {
325        if op.contains("create") {
326            if let Some((_, batch_data)) = self.pending_batch_place_requests.remove(req_id) {
327                log::warn!(
328                    "Batch place request failed: req_id={req_id}, ret_msg={ret_msg}, num_orders={}",
329                    batch_data.len()
330                );
331
332                let Some(account_id) = self.account_id else {
333                    log::error!("Cannot create OrderRejected events: account_id is None");
334                    return;
335                };
336
337                let reason = Ustr::from(ret_msg);
338                for (client_order_id, (_, trader_id, strategy_id, instrument_id)) in batch_data {
339                    let rejected = OrderRejected::new(
340                        trader_id,
341                        strategy_id,
342                        instrument_id,
343                        client_order_id,
344                        account_id,
345                        reason,
346                        UUID4::new(),
347                        ts_init,
348                        ts_init,
349                        false,
350                        false,
351                    );
352                    result.push(NautilusWsMessage::OrderRejected(rejected));
353                }
354            }
355        } else if op.contains("cancel")
356            && let Some((_, batch_data)) = self.pending_batch_cancel_requests.remove(req_id)
357        {
358            log::warn!(
359                "Batch cancel request failed: req_id={req_id}, ret_msg={ret_msg}, num_cancels={}",
360                batch_data.len()
361            );
362
363            let reason = Ustr::from(ret_msg);
364            for (client_order_id, (_, trader_id, strategy_id, instrument_id, venue_order_id)) in
365                batch_data
366            {
367                let rejected = OrderCancelRejected::new(
368                    trader_id,
369                    strategy_id,
370                    instrument_id,
371                    client_order_id,
372                    reason,
373                    UUID4::new(),
374                    ts_init,
375                    ts_init,
376                    false,
377                    venue_order_id,
378                    self.account_id,
379                );
380                result.push(NautilusWsMessage::OrderCancelRejected(rejected));
381            }
382        }
383    }
384
385    /// Handles batch operation responses, checking for individual order failures.
386    fn handle_batch_response(
387        &self,
388        resp: &BybitWsOrderResponse,
389        result: &mut Vec<NautilusWsMessage>,
390    ) {
391        let Some(req_id) = &resp.req_id else {
392            log::warn!(
393                "Batch response missing req_id - cannot correlate with pending requests: op={}",
394                resp.op
395            );
396            return;
397        };
398
399        let batch_errors = resp.extract_batch_errors();
400
401        if resp.op.contains("create") {
402            if let Some((_, batch_data)) = self.pending_batch_place_requests.remove(req_id) {
403                self.process_batch_place_errors(batch_data, batch_errors, result);
404            } else {
405                log::debug!(
406                    "Batch place response received but no pending request found: req_id={req_id}"
407                );
408            }
409        } else if resp.op.contains("cancel") {
410            if let Some((_, batch_data)) = self.pending_batch_cancel_requests.remove(req_id) {
411                self.process_batch_cancel_errors(batch_data, batch_errors, result);
412            } else {
413                log::debug!(
414                    "Batch cancel response received but no pending request found: req_id={req_id}"
415                );
416            }
417        }
418    }
419
420    /// Processes individual order errors from a batch place operation.
421    fn process_batch_place_errors(
422        &self,
423        batch_data: Vec<BatchOrderData>,
424        errors: Vec<BybitBatchOrderError>,
425        result: &mut Vec<NautilusWsMessage>,
426    ) {
427        let Some(account_id) = self.account_id else {
428            log::error!("Cannot create OrderRejected events: account_id is None");
429            return;
430        };
431
432        let clock = get_atomic_clock_realtime();
433        let ts_init = clock.get_time_ns();
434
435        for (idx, (client_order_id, (_, trader_id, strategy_id, instrument_id))) in
436            batch_data.into_iter().enumerate()
437        {
438            if let Some(error) = errors.get(idx)
439                && error.code != 0
440            {
441                log::warn!(
442                    "Batch order rejected: client_order_id={client_order_id}, error_code={}, error_msg={}",
443                    error.code,
444                    error.msg
445                );
446
447                let rejected = OrderRejected::new(
448                    trader_id,
449                    strategy_id,
450                    instrument_id,
451                    client_order_id,
452                    account_id,
453                    Ustr::from(&error.msg),
454                    UUID4::new(),
455                    ts_init,
456                    ts_init,
457                    false,
458                    false,
459                );
460                result.push(NautilusWsMessage::OrderRejected(rejected));
461            }
462        }
463    }
464
465    /// Processes individual order errors from a batch cancel operation.
466    fn process_batch_cancel_errors(
467        &self,
468        batch_data: Vec<BatchCancelData>,
469        errors: Vec<BybitBatchOrderError>,
470        result: &mut Vec<NautilusWsMessage>,
471    ) {
472        let clock = get_atomic_clock_realtime();
473        let ts_init = clock.get_time_ns();
474
475        for (idx, (client_order_id, (_, trader_id, strategy_id, instrument_id, venue_order_id))) in
476            batch_data.into_iter().enumerate()
477        {
478            if let Some(error) = errors.get(idx)
479                && error.code != 0
480            {
481                log::warn!(
482                    "Batch cancel rejected: client_order_id={client_order_id}, error_code={}, error_msg={}",
483                    error.code,
484                    error.msg
485                );
486
487                let rejected = OrderCancelRejected::new(
488                    trader_id,
489                    strategy_id,
490                    instrument_id,
491                    client_order_id,
492                    Ustr::from(&error.msg),
493                    UUID4::new(),
494                    ts_init,
495                    ts_init,
496                    false,
497                    venue_order_id,
498                    self.account_id,
499                );
500                result.push(NautilusWsMessage::OrderCancelRejected(rejected));
501            }
502        }
503    }
504
505    pub(super) async fn next(&mut self) -> Option<NautilusWsMessage> {
506        let clock = get_atomic_clock_realtime();
507
508        loop {
509            if let Some(msg) = self.message_queue.pop_front() {
510                return Some(msg);
511            }
512
513            tokio::select! {
514                Some(cmd) = self.cmd_rx.recv() => {
515                    match cmd {
516                        HandlerCommand::SetClient(client) => {
517                            log::debug!("WebSocketClient received by handler");
518                            self.client = Some(client);
519                        }
520                        HandlerCommand::Disconnect => {
521                            log::debug!("Disconnect command received");
522
523                            if let Some(client) = self.client.take() {
524                                client.disconnect().await;
525                            }
526                        }
527                        HandlerCommand::Authenticate { payload } => {
528                            log::debug!("Authenticate command received");
529                            if let Err(e) = self.send_with_retry(payload).await {
530                                log::error!("Failed to send authentication after retries: {e}");
531                            }
532                        }
533                        HandlerCommand::Subscribe { topics } => {
534                            for topic in topics {
535                                log::debug!("Subscribing to topic: topic={topic}");
536                                if let Err(e) = self.send_with_retry(topic.clone()).await {
537                                    log::error!("Failed to send subscription after retries: topic={topic}, error={e}");
538                                }
539                            }
540                        }
541                        HandlerCommand::Unsubscribe { topics } => {
542                            for topic in topics {
543                                log::debug!("Unsubscribing from topic: topic={topic}");
544                                if let Err(e) = self.send_with_retry(topic.clone()).await {
545                                    log::error!("Failed to send unsubscription after retries: topic={topic}, error={e}");
546                                }
547                            }
548                        }
549                        HandlerCommand::SendText { payload } => {
550                            if let Err(e) = self.send_with_retry(payload).await {
551                                log::error!("Error sending text with retry: {e}");
552                            }
553                        }
554                        HandlerCommand::InitializeInstruments(instruments) => {
555                            for inst in instruments {
556                                self.instruments_cache.insert(inst.symbol().inner(), inst);
557                            }
558                        }
559                        HandlerCommand::UpdateInstrument(inst) => {
560                            self.instruments_cache.insert(inst.symbol().inner(), inst);
561                        }
562                        HandlerCommand::RegisterBatchPlace { req_id, orders } => {
563                            log::debug!(
564                                "Registering batch place request: req_id={req_id}, num_orders={}",
565                                orders.len()
566                            );
567                            self.pending_batch_place_requests.insert(req_id, orders);
568                        }
569                        HandlerCommand::RegisterBatchCancel { req_id, cancels } => {
570                            log::debug!(
571                                "Registering batch cancel request: req_id={req_id}, num_cancels={}",
572                                cancels.len()
573                            );
574                            self.pending_batch_cancel_requests.insert(req_id, cancels);
575                        }
576                        HandlerCommand::PlaceOrder {
577                            params,
578                            client_order_id,
579                            trader_id,
580                            strategy_id,
581                            instrument_id,
582                        } => {
583                            let request_id = self.generate_unique_request_id();
584
585                            self.pending_place_requests.insert(
586                                request_id.clone(),
587                                (client_order_id, trader_id, strategy_id, instrument_id),
588                            );
589
590                            let referer = if self.include_referer_header(params.time_in_force) {
591                                Some(BYBIT_NAUTILUS_BROKER_ID.to_string())
592                            } else {
593                                None
594                            };
595
596                            let request = BybitWsRequest {
597                                req_id: Some(request_id.clone()),
598                                op: BybitWsOrderRequestOp::Create,
599                                header: BybitWsHeader::with_referer(referer),
600                                args: vec![params],
601                            };
602
603                            if let Ok(payload) = serde_json::to_string(&request)
604                                && let Err(e) = self.send_with_retry(payload).await
605                            {
606                                log::error!("Failed to send place order after retries: {e}");
607                                self.pending_place_requests.remove(&request_id);
608                            }
609                        }
610                        HandlerCommand::AmendOrder {
611                            params,
612                            client_order_id,
613                            trader_id,
614                            strategy_id,
615                            instrument_id,
616                            venue_order_id,
617                        } => {
618                            let request_id = self.generate_unique_request_id();
619
620                            self.pending_amend_requests.insert(
621                                request_id.clone(),
622                                (client_order_id, trader_id, strategy_id, instrument_id, venue_order_id),
623                            );
624
625                            let request = BybitWsRequest {
626                                req_id: Some(request_id.clone()),
627                                op: BybitWsOrderRequestOp::Amend,
628                                header: BybitWsHeader::now(),
629                                args: vec![params],
630                            };
631
632                            if let Ok(payload) = serde_json::to_string(&request)
633                                && let Err(e) = self.send_with_retry(payload).await
634                            {
635                                log::error!("Failed to send amend order after retries: {e}");
636                                self.pending_amend_requests.remove(&request_id);
637                            }
638                        }
639                        HandlerCommand::CancelOrder {
640                            params,
641                            client_order_id,
642                            trader_id,
643                            strategy_id,
644                            instrument_id,
645                            venue_order_id,
646                        } => {
647                            let request_id = self.generate_unique_request_id();
648
649                            self.pending_cancel_requests.insert(
650                                request_id.clone(),
651                                (client_order_id, trader_id, strategy_id, instrument_id, venue_order_id),
652                            );
653
654                            let request = BybitWsRequest {
655                                req_id: Some(request_id.clone()),
656                                op: BybitWsOrderRequestOp::Cancel,
657                                header: BybitWsHeader::now(),
658                                args: vec![params],
659                            };
660
661                            if let Ok(payload) = serde_json::to_string(&request)
662                                && let Err(e) = self.send_with_retry(payload).await
663                            {
664                                log::error!("Failed to send cancel order after retries: {e}");
665                                self.pending_cancel_requests.remove(&request_id);
666                            }
667                        }
668                    }
669
670                    continue;
671                }
672
673                () = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
674                    if self.signal.load(Ordering::Relaxed) {
675                        log::debug!("Stop signal received during idle period");
676                        return None;
677                    }
678                    continue;
679                }
680
681                msg = self.raw_rx.recv() => {
682                    let msg = match msg {
683                        Some(msg) => msg,
684                        None => {
685                            log::debug!("WebSocket stream closed");
686                            return None;
687                        }
688                    };
689
690                    if let Message::Ping(data) = &msg {
691                        log::trace!("Received ping frame with {} bytes", data.len());
692
693                        if let Some(client) = &self.client
694                            && let Err(e) = client.send_pong(data.to_vec()).await
695                        {
696                            log::warn!("Failed to send pong frame: error={e}");
697                        }
698                        continue;
699                    }
700
701                    let event = match Self::parse_raw_message(msg) {
702                        Some(event) => event,
703                        None => continue,
704                    };
705
706                    if self.signal.load(Ordering::Relaxed) {
707                        log::debug!("Stop signal received");
708                        return None;
709                    }
710
711                    let ts_init = clock.get_time_ns();
712                    let instruments = self.instruments_cache.clone();
713                    let funding_cache = Arc::clone(&self.funding_cache);
714                    let nautilus_messages = self.parse_to_nautilus_messages(
715                        event,
716                        &instruments,
717                        self.account_id,
718                        self.product_type,
719                        &funding_cache,
720                        ts_init,
721                    )
722                    .await;
723
724                    // Enqueue all parsed messages to emit them one by one
725                    self.message_queue.extend(nautilus_messages);
726                }
727            }
728        }
729    }
730
731    fn parse_raw_message(msg: Message) -> Option<BybitWsMessage> {
732        use serde_json::Value;
733
734        match msg {
735            Message::Text(text) => {
736                if text == nautilus_network::RECONNECTED {
737                    log::info!("Received WebSocket reconnected signal");
738                    return Some(BybitWsMessage::Reconnected);
739                }
740
741                if text.trim().eq_ignore_ascii_case("pong") {
742                    return None;
743                }
744
745                log::trace!("Raw websocket message: {text}");
746
747                let value: Value = match serde_json::from_str(&text) {
748                    Ok(v) => v,
749                    Err(e) => {
750                        log::error!("Failed to parse WebSocket message: {e}: {text}");
751                        return None;
752                    }
753                };
754
755                Some(classify_bybit_message(value))
756            }
757            Message::Binary(msg) => {
758                log::debug!("Raw binary: {msg:?}");
759                None
760            }
761            Message::Close(_) => {
762                log::debug!("Received close message, waiting for reconnection");
763                None
764            }
765            _ => None,
766        }
767    }
768
769    #[allow(clippy::too_many_arguments)]
770    async fn parse_to_nautilus_messages(
771        &mut self,
772        msg: BybitWsMessage,
773        instruments: &AHashMap<Ustr, InstrumentAny>,
774        account_id: Option<AccountId>,
775        product_type: Option<BybitProductType>,
776        funding_cache: &FundingCache,
777        ts_init: UnixNanos,
778    ) -> Vec<NautilusWsMessage> {
779        let mut result = Vec::new();
780
781        match msg {
782            BybitWsMessage::Orderbook(msg) => {
783                let raw_symbol = msg.data.s;
784                let symbol =
785                    product_type.map_or(raw_symbol, |pt| make_bybit_symbol(raw_symbol, pt));
786
787                if let Some(instrument) = instruments.get(&symbol) {
788                    match parse_orderbook_deltas(&msg, instrument, ts_init) {
789                        Ok(deltas) => result.push(NautilusWsMessage::Deltas(deltas)),
790                        Err(e) => log::error!("Error parsing orderbook deltas: {e}"),
791                    }
792
793                    // For depth=1 subscriptions, also emit QuoteTick from top-of-book
794                    if let Some(depth_str) = msg.topic.as_str().split('.').nth(1)
795                        && depth_str == "1"
796                    {
797                        let instrument_id = instrument.id();
798                        let last_quote = self.quote_cache.get(&instrument_id);
799
800                        match parse_orderbook_quote(&msg, instrument, last_quote, ts_init) {
801                            Ok(quote) => {
802                                self.quote_cache.insert(instrument_id, quote);
803                                result.push(NautilusWsMessage::Data(vec![Data::Quote(quote)]));
804                            }
805                            Err(e) => log::debug!("Skipping orderbook quote: {e}"),
806                        }
807                    }
808                } else {
809                    log::debug!(
810                        "No instrument found for symbol in Orderbook message: raw_symbol={raw_symbol}, full_symbol={symbol}"
811                    );
812                }
813            }
814            BybitWsMessage::Trade(msg) => {
815                let mut data_vec = Vec::new();
816                for trade in &msg.data {
817                    let raw_symbol = trade.s;
818                    let symbol =
819                        product_type.map_or(raw_symbol, |pt| make_bybit_symbol(raw_symbol, pt));
820
821                    if let Some(instrument) = instruments.get(&symbol) {
822                        match parse_ws_trade_tick(trade, instrument, ts_init) {
823                            Ok(tick) => data_vec.push(Data::Trade(tick)),
824                            Err(e) => log::error!("Error parsing trade tick: {e}"),
825                        }
826                    } else {
827                        log::debug!(
828                            "No instrument found for symbol in Trade message: raw_symbol={raw_symbol}, full_symbol={symbol}"
829                        );
830                    }
831                }
832
833                if !data_vec.is_empty() {
834                    result.push(NautilusWsMessage::Data(data_vec));
835                }
836            }
837            BybitWsMessage::Kline(msg) => {
838                let (interval_str, raw_symbol) = match parse_kline_topic(&msg.topic) {
839                    Ok(parts) => parts,
840                    Err(e) => {
841                        log::warn!("Failed to parse kline topic: {e}");
842                        return result;
843                    }
844                };
845
846                let symbol = product_type
847                    .map_or_else(|| raw_symbol.into(), |pt| make_bybit_symbol(raw_symbol, pt));
848
849                if let Some(instrument) = instruments.get(&symbol) {
850                    let (step, aggregation) = match interval_str.parse::<usize>() {
851                        Ok(minutes) if minutes > 0 => (minutes, BarAggregation::Minute),
852                        _ => {
853                            log::warn!("Unsupported kline interval: {interval_str}");
854                            return result;
855                        }
856                    };
857
858                    if let Some(non_zero_step) = NonZero::new(step) {
859                        let bar_spec = BarSpecification {
860                            step: non_zero_step,
861                            aggregation,
862                            price_type: PriceType::Last,
863                        };
864                        let bar_type =
865                            BarType::new(instrument.id(), bar_spec, AggregationSource::External);
866
867                        let mut data_vec = Vec::new();
868                        for kline in &msg.data {
869                            // Only process confirmed bars (not partial/building bars)
870                            if !kline.confirm {
871                                continue;
872                            }
873                            match parse_ws_kline_bar(
874                                kline,
875                                instrument,
876                                bar_type,
877                                self.bars_timestamp_on_close,
878                                ts_init,
879                            ) {
880                                Ok(bar) => data_vec.push(Data::Bar(bar)),
881                                Err(e) => log::error!("Error parsing kline to bar: {e}"),
882                            }
883                        }
884                        if !data_vec.is_empty() {
885                            result.push(NautilusWsMessage::Data(data_vec));
886                        }
887                    } else {
888                        log::error!("Invalid step value: {step}");
889                    }
890                } else {
891                    log::debug!(
892                        "No instrument found for symbol in Kline message: raw_symbol={raw_symbol}, full_symbol={symbol}"
893                    );
894                }
895            }
896            BybitWsMessage::TickerLinear(msg) => {
897                let raw_symbol = msg.data.symbol;
898                let symbol =
899                    product_type.map_or(raw_symbol, |pt| make_bybit_symbol(raw_symbol, pt));
900
901                if let Some(instrument) = instruments.get(&symbol) {
902                    let instrument_id = instrument.id();
903                    let ts_event = parse_millis_i64(msg.ts, "ticker.ts").unwrap_or(ts_init);
904                    let price_precision = instrument.price_precision();
905                    let size_precision = instrument.size_precision();
906
907                    // Parse Bybit linear ticker fields, propagate errors
908                    let bid_price = msg
909                        .data
910                        .bid1_price
911                        .as_deref()
912                        .map(|s| parse_price_with_precision(s, price_precision, "bid1Price"))
913                        .transpose();
914                    let ask_price = msg
915                        .data
916                        .ask1_price
917                        .as_deref()
918                        .map(|s| parse_price_with_precision(s, price_precision, "ask1Price"))
919                        .transpose();
920                    let bid_size = msg
921                        .data
922                        .bid1_size
923                        .as_deref()
924                        .map(|s| parse_quantity_with_precision(s, size_precision, "bid1Size"))
925                        .transpose();
926                    let ask_size = msg
927                        .data
928                        .ask1_size
929                        .as_deref()
930                        .map(|s| parse_quantity_with_precision(s, size_precision, "ask1Size"))
931                        .transpose();
932
933                    match (bid_price, ask_price, bid_size, ask_size) {
934                        (Ok(bp), Ok(ap), Ok(bs), Ok(as_)) => {
935                            match self.quote_cache.process(
936                                instrument_id,
937                                bp,
938                                ap,
939                                bs,
940                                as_,
941                                ts_event,
942                                ts_init,
943                            ) {
944                                Ok(quote) => {
945                                    result.push(NautilusWsMessage::Data(vec![Data::Quote(quote)]));
946                                }
947                                Err(e) => {
948                                    let raw_data = serde_json::to_string(&msg.data)
949                                        .unwrap_or_else(|_| "<failed to serialize>".to_string());
950                                    log::debug!(
951                                        "Skipping partial ticker update: {e}, raw_data: {raw_data}"
952                                    );
953                                }
954                            }
955                        }
956                        _ => {
957                            let raw_data = serde_json::to_string(&msg.data)
958                                .unwrap_or_else(|_| "<failed to serialize>".to_string());
959                            log::warn!(
960                                "Failed to parse ticker fields, skipping update, raw_data: {raw_data}"
961                            );
962                        }
963                    }
964
965                    // Extract funding rate if available
966                    if msg.data.funding_rate.is_some() && msg.data.next_funding_time.is_some() {
967                        let should_publish = {
968                            let cache = funding_cache.read().await;
969                            match cache.get(&symbol) {
970                                Some((cached_rate, cached_time)) => {
971                                    cached_rate.as_ref() != msg.data.funding_rate.as_ref()
972                                        || cached_time.as_ref()
973                                            != msg.data.next_funding_time.as_ref()
974                                }
975                                None => true,
976                            }
977                        };
978
979                        if should_publish {
980                            match parse_ticker_linear_funding(
981                                &msg.data,
982                                instrument_id,
983                                ts_event,
984                                ts_init,
985                            ) {
986                                Ok(funding) => {
987                                    let cache_key = (
988                                        msg.data.funding_rate.clone(),
989                                        msg.data.next_funding_time.clone(),
990                                    );
991                                    funding_cache.write().await.insert(symbol, cache_key);
992                                    result.push(NautilusWsMessage::FundingRates(vec![funding]));
993                                }
994                                Err(e) => {
995                                    log::debug!("Skipping funding rate update: {e}");
996                                }
997                            }
998                        }
999                    }
1000                } else {
1001                    log::debug!(
1002                        "No instrument found for symbol in TickerLinear message: raw_symbol={raw_symbol}, full_symbol={symbol}"
1003                    );
1004                }
1005            }
1006            BybitWsMessage::TickerOption(msg) => {
1007                let raw_symbol = &msg.data.symbol;
1008                let symbol = product_type.map_or_else(
1009                    || raw_symbol.as_str().into(),
1010                    |pt| make_bybit_symbol(raw_symbol, pt),
1011                );
1012
1013                if let Some(instrument) = instruments.get(&symbol) {
1014                    let instrument_id = instrument.id();
1015                    let ts_event = parse_millis_i64(msg.ts, "ticker.ts").unwrap_or(ts_init);
1016                    let price_precision = instrument.price_precision();
1017                    let size_precision = instrument.size_precision();
1018
1019                    // Parse Bybit option ticker fields (always complete), propagate errors
1020                    let bid_price = parse_price_with_precision(
1021                        &msg.data.bid_price,
1022                        price_precision,
1023                        "bidPrice",
1024                    );
1025                    let ask_price = parse_price_with_precision(
1026                        &msg.data.ask_price,
1027                        price_precision,
1028                        "askPrice",
1029                    );
1030                    let bid_size = parse_quantity_with_precision(
1031                        &msg.data.bid_size,
1032                        size_precision,
1033                        "bidSize",
1034                    );
1035                    let ask_size = parse_quantity_with_precision(
1036                        &msg.data.ask_size,
1037                        size_precision,
1038                        "askSize",
1039                    );
1040
1041                    match (bid_price, ask_price, bid_size, ask_size) {
1042                        (Ok(bp), Ok(ap), Ok(bs), Ok(as_)) => {
1043                            match self.quote_cache.process(
1044                                instrument_id,
1045                                Some(bp),
1046                                Some(ap),
1047                                Some(bs),
1048                                Some(as_),
1049                                ts_event,
1050                                ts_init,
1051                            ) {
1052                                Ok(quote) => {
1053                                    result.push(NautilusWsMessage::Data(vec![Data::Quote(quote)]));
1054                                }
1055                                Err(e) => {
1056                                    let raw_data = serde_json::to_string(&msg.data)
1057                                        .unwrap_or_else(|_| "<failed to serialize>".to_string());
1058                                    log::debug!(
1059                                        "Skipping partial ticker update: {e}, raw_data: {raw_data}"
1060                                    );
1061                                }
1062                            }
1063                        }
1064                        _ => {
1065                            let raw_data = serde_json::to_string(&msg.data)
1066                                .unwrap_or_else(|_| "<failed to serialize>".to_string());
1067                            log::warn!(
1068                                "Failed to parse ticker fields, skipping update, raw_data: {raw_data}"
1069                            );
1070                        }
1071                    }
1072                } else {
1073                    log::debug!(
1074                        "No instrument found for symbol in TickerOption message: raw_symbol={raw_symbol}, full_symbol={symbol}"
1075                    );
1076                }
1077            }
1078            BybitWsMessage::AccountOrder(msg) => {
1079                if let Some(account_id) = account_id {
1080                    let mut reports = Vec::new();
1081                    for order in &msg.data {
1082                        let raw_symbol = order.symbol;
1083                        let symbol = make_bybit_symbol(raw_symbol, order.category);
1084
1085                        if let Some(instrument) = instruments.get(&symbol) {
1086                            match parse_ws_order_status_report(
1087                                order, instrument, account_id, ts_init,
1088                            ) {
1089                                Ok(report) => reports.push(report),
1090                                Err(e) => log::error!("Error parsing order status report: {e}"),
1091                            }
1092                        } else {
1093                            log::debug!(
1094                                "No instrument found for symbol in AccountOrder message: raw_symbol={raw_symbol}, full_symbol={symbol}"
1095                            );
1096                        }
1097                    }
1098                    if !reports.is_empty() {
1099                        result.push(NautilusWsMessage::OrderStatusReports(reports));
1100                    }
1101                }
1102            }
1103            BybitWsMessage::AccountExecution(msg) => {
1104                if let Some(account_id) = account_id {
1105                    let mut reports = Vec::new();
1106                    for execution in &msg.data {
1107                        let raw_symbol = execution.symbol;
1108                        let symbol = make_bybit_symbol(raw_symbol, execution.category);
1109
1110                        if let Some(instrument) = instruments.get(&symbol) {
1111                            match parse_ws_fill_report(execution, account_id, instrument, ts_init) {
1112                                Ok(report) => reports.push(report),
1113                                Err(e) => log::error!("Error parsing fill report: {e}"),
1114                            }
1115                        } else {
1116                            log::debug!(
1117                                "No instrument found for symbol in AccountExecution message: raw_symbol={raw_symbol}, full_symbol={symbol}"
1118                            );
1119                        }
1120                    }
1121                    if !reports.is_empty() {
1122                        result.push(NautilusWsMessage::FillReports(reports));
1123                    }
1124                }
1125            }
1126            BybitWsMessage::AccountPosition(msg) => {
1127                if let Some(account_id) = account_id {
1128                    for position in &msg.data {
1129                        let raw_symbol = position.symbol;
1130                        let symbol = make_bybit_symbol(raw_symbol, position.category);
1131
1132                        if let Some(instrument) = instruments.get(&symbol) {
1133                            match parse_ws_position_status_report(
1134                                position, account_id, instrument, ts_init,
1135                            ) {
1136                                Ok(report) => {
1137                                    result.push(NautilusWsMessage::PositionStatusReport(report));
1138                                }
1139                                Err(e) => {
1140                                    log::error!("Error parsing position status report: {e}");
1141                                }
1142                            }
1143                        } else {
1144                            log::debug!(
1145                                "No instrument found for symbol in AccountPosition message: raw_symbol={raw_symbol}, full_symbol={symbol}"
1146                            );
1147                        }
1148                    }
1149                }
1150            }
1151            BybitWsMessage::AccountWallet(msg) => {
1152                if let Some(account_id) = account_id {
1153                    for wallet in &msg.data {
1154                        let ts_event = UnixNanos::from(msg.creation_time as u64 * 1_000_000);
1155
1156                        match parse_ws_account_state(wallet, account_id, ts_event, ts_init) {
1157                            Ok(state) => result.push(NautilusWsMessage::AccountState(state)),
1158                            Err(e) => log::error!("Error parsing account state: {e}"),
1159                        }
1160                    }
1161                }
1162            }
1163            BybitWsMessage::OrderResponse(resp) => {
1164                if resp.ret_code == 0 {
1165                    log::debug!(
1166                        "Order operation successful: op={}, ret_msg={}",
1167                        resp.op,
1168                        resp.ret_msg
1169                    );
1170
1171                    if resp.op.contains("batch") {
1172                        self.handle_batch_response(&resp, &mut result);
1173                    } else if let Some(req_id) = &resp.req_id {
1174                        if resp.op.contains("create") {
1175                            self.pending_place_requests.remove(req_id);
1176                        } else if resp.op.contains("cancel") {
1177                            self.pending_cancel_requests.remove(req_id);
1178                        } else if resp.op.contains("amend") {
1179                            self.pending_amend_requests.remove(req_id);
1180                        }
1181                    } else if let Some(order_link_id) =
1182                        resp.data.get("orderLinkId").and_then(|v| v.as_str())
1183                    {
1184                        // Bybit sometimes omits req_id, search by client_order_id instead
1185                        let client_order_id = ClientOrderId::from(order_link_id);
1186                        if resp.op.contains("create") {
1187                            self.find_and_remove_place_request_by_client_order_id(&client_order_id);
1188                        } else if resp.op.contains("cancel") {
1189                            self.find_and_remove_cancel_request_by_client_order_id(
1190                                &client_order_id,
1191                            );
1192                        } else if resp.op.contains("amend") {
1193                            self.find_and_remove_amend_request_by_client_order_id(&client_order_id);
1194                        }
1195                    }
1196                } else if let Some(req_id) = &resp.req_id {
1197                    let clock = get_atomic_clock_realtime();
1198                    let ts_init = clock.get_time_ns();
1199
1200                    if resp.op.contains("batch") {
1201                        self.handle_batch_failure(
1202                            req_id,
1203                            &resp.ret_msg,
1204                            &resp.op,
1205                            ts_init,
1206                            &mut result,
1207                        );
1208                    } else if resp.op.contains("create")
1209                        && let Some((_, (client_order_id, trader_id, strategy_id, instrument_id))) =
1210                            self.pending_place_requests.remove(req_id)
1211                    {
1212                        let Some(account_id) = self.account_id else {
1213                            log::error!(
1214                                "Cannot create OrderRejected event: account_id is None: request_id={req_id}, reason={}",
1215                                resp.ret_msg
1216                            );
1217                            return result;
1218                        };
1219
1220                        let rejected = OrderRejected::new(
1221                            trader_id,
1222                            strategy_id,
1223                            instrument_id,
1224                            client_order_id,
1225                            account_id,
1226                            Ustr::from(&resp.ret_msg),
1227                            UUID4::new(),
1228                            ts_init,
1229                            ts_init,
1230                            false,
1231                            false,
1232                        );
1233                        result.push(NautilusWsMessage::OrderRejected(rejected));
1234                    } else if resp.op.contains("cancel")
1235                        && let Some((
1236                            _,
1237                            (
1238                                client_order_id,
1239                                trader_id,
1240                                strategy_id,
1241                                instrument_id,
1242                                venue_order_id,
1243                            ),
1244                        )) = self.pending_cancel_requests.remove(req_id)
1245                    {
1246                        let rejected = OrderCancelRejected::new(
1247                            trader_id,
1248                            strategy_id,
1249                            instrument_id,
1250                            client_order_id,
1251                            Ustr::from(&resp.ret_msg),
1252                            UUID4::new(),
1253                            ts_init,
1254                            ts_init,
1255                            false,
1256                            venue_order_id,
1257                            self.account_id,
1258                        );
1259                        result.push(NautilusWsMessage::OrderCancelRejected(rejected));
1260                    } else if resp.op.contains("amend")
1261                        && let Some((
1262                            _,
1263                            (
1264                                client_order_id,
1265                                trader_id,
1266                                strategy_id,
1267                                instrument_id,
1268                                venue_order_id,
1269                            ),
1270                        )) = self.pending_amend_requests.remove(req_id)
1271                    {
1272                        let rejected = OrderModifyRejected::new(
1273                            trader_id,
1274                            strategy_id,
1275                            instrument_id,
1276                            client_order_id,
1277                            Ustr::from(&resp.ret_msg),
1278                            UUID4::new(),
1279                            ts_init,
1280                            ts_init,
1281                            false,
1282                            venue_order_id,
1283                            self.account_id,
1284                        );
1285                        result.push(NautilusWsMessage::OrderModifyRejected(rejected));
1286                    }
1287                } else if let Some(order_link_id) =
1288                    resp.data.get("orderLinkId").and_then(|v| v.as_str())
1289                {
1290                    // Bybit sometimes omits req_id, search by client_order_id instead
1291                    let clock = get_atomic_clock_realtime();
1292                    let ts_init = clock.get_time_ns();
1293                    let client_order_id = ClientOrderId::from(order_link_id);
1294
1295                    if resp.op.contains("create") {
1296                        if let Some((_, (_, trader_id, strategy_id, instrument_id))) =
1297                            self.find_and_remove_place_request_by_client_order_id(&client_order_id)
1298                        {
1299                            let Some(account_id) = self.account_id else {
1300                                log::error!(
1301                                    "Cannot create OrderRejected event: account_id is None: client_order_id={client_order_id}, reason={}",
1302                                    resp.ret_msg
1303                                );
1304                                return result;
1305                            };
1306
1307                            let rejected = OrderRejected::new(
1308                                trader_id,
1309                                strategy_id,
1310                                instrument_id,
1311                                client_order_id,
1312                                account_id,
1313                                Ustr::from(&resp.ret_msg),
1314                                UUID4::new(),
1315                                ts_init,
1316                                ts_init,
1317                                false,
1318                                false,
1319                            );
1320                            result.push(NautilusWsMessage::OrderRejected(rejected));
1321                        }
1322                    } else if resp.op.contains("cancel") {
1323                        if let Some((
1324                            _,
1325                            (_, trader_id, strategy_id, instrument_id, venue_order_id),
1326                        )) =
1327                            self.find_and_remove_cancel_request_by_client_order_id(&client_order_id)
1328                        {
1329                            let rejected = OrderCancelRejected::new(
1330                                trader_id,
1331                                strategy_id,
1332                                instrument_id,
1333                                client_order_id,
1334                                Ustr::from(&resp.ret_msg),
1335                                UUID4::new(),
1336                                ts_init,
1337                                ts_init,
1338                                false,
1339                                venue_order_id,
1340                                self.account_id,
1341                            );
1342                            result.push(NautilusWsMessage::OrderCancelRejected(rejected));
1343                        }
1344                    } else if resp.op.contains("amend")
1345                        && let Some((_, (_, trader_id, strategy_id, instrument_id, venue_order_id))) =
1346                            self.find_and_remove_amend_request_by_client_order_id(&client_order_id)
1347                    {
1348                        let rejected = OrderModifyRejected::new(
1349                            trader_id,
1350                            strategy_id,
1351                            instrument_id,
1352                            client_order_id,
1353                            Ustr::from(&resp.ret_msg),
1354                            UUID4::new(),
1355                            ts_init,
1356                            ts_init,
1357                            false,
1358                            venue_order_id,
1359                            self.account_id,
1360                        );
1361                        result.push(NautilusWsMessage::OrderModifyRejected(rejected));
1362                    }
1363                } else {
1364                    log::warn!(
1365                        "Order operation failed but request_id could not be extracted from response: op={}, ret_code={}, ret_msg={}",
1366                        resp.op,
1367                        resp.ret_code,
1368                        resp.ret_msg
1369                    );
1370                }
1371            }
1372            BybitWsMessage::Auth(auth_response) => {
1373                let is_success =
1374                    auth_response.success.unwrap_or(false) || (auth_response.ret_code == Some(0));
1375
1376                if is_success {
1377                    self.auth_tracker.succeed();
1378                    log::info!("WebSocket authenticated");
1379                    result.push(NautilusWsMessage::Authenticated);
1380                } else {
1381                    let error_msg = auth_response
1382                        .ret_msg
1383                        .as_deref()
1384                        .unwrap_or("Authentication rejected");
1385                    self.auth_tracker.fail(error_msg);
1386                    log::error!("WebSocket authentication failed: error={error_msg}");
1387                    result.push(NautilusWsMessage::Error(BybitWebSocketError::from_message(
1388                        error_msg.to_string(),
1389                    )));
1390                }
1391            }
1392            BybitWsMessage::Error(err) => {
1393                result.push(NautilusWsMessage::Error(err));
1394            }
1395            BybitWsMessage::Reconnected => {
1396                self.quote_cache.clear();
1397                result.push(NautilusWsMessage::Reconnected);
1398            }
1399            BybitWsMessage::Subscription(sub_msg) => {
1400                let pending_topics = self.subscriptions.pending_subscribe_topics();
1401                match sub_msg.op {
1402                    BybitWsOperation::Subscribe => {
1403                        if sub_msg.success {
1404                            for topic in pending_topics {
1405                                self.subscriptions.confirm_subscribe(&topic);
1406                                log::debug!("Subscription confirmed: topic={topic}");
1407                            }
1408                        } else {
1409                            for topic in pending_topics {
1410                                self.subscriptions.mark_failure(&topic);
1411                                log::warn!(
1412                                    "Subscription failed, will retry on reconnect: topic={topic}, error={:?}",
1413                                    sub_msg.ret_msg
1414                                );
1415                            }
1416                        }
1417                    }
1418                    BybitWsOperation::Unsubscribe => {
1419                        let pending_unsub = self.subscriptions.pending_unsubscribe_topics();
1420                        if sub_msg.success {
1421                            for topic in pending_unsub {
1422                                self.subscriptions.confirm_unsubscribe(&topic);
1423                                log::debug!("Unsubscription confirmed: topic={topic}");
1424                            }
1425                        } else {
1426                            for topic in pending_unsub {
1427                                log::warn!(
1428                                    "Unsubscription failed: topic={topic}, error={:?}",
1429                                    sub_msg.ret_msg
1430                                );
1431                            }
1432                        }
1433                    }
1434                    _ => {}
1435                }
1436            }
1437            _ => {}
1438        }
1439
1440        result
1441    }
1442}
1443
1444/// Classifies a parsed JSON value into a typed Bybit WebSocket message.
1445///
1446/// Returns `Raw(value)` if no specific type matches.
1447pub fn classify_bybit_message(value: serde_json::Value) -> BybitWsMessage {
1448    if let Some(op_val) = value.get("op") {
1449        if let Ok(op) = serde_json::from_value::<BybitWsOperation>(op_val.clone())
1450            && op == BybitWsOperation::Auth
1451            && let Ok(auth) = serde_json::from_value::<BybitWsAuthResponse>(value.clone())
1452        {
1453            let is_success = auth.success.unwrap_or(false) || auth.ret_code.unwrap_or(-1) == 0;
1454            if is_success {
1455                return BybitWsMessage::Auth(auth);
1456            }
1457            let resp = BybitWsResponse {
1458                op: Some(auth.op.clone()),
1459                topic: None,
1460                success: auth.success,
1461                conn_id: auth.conn_id.clone(),
1462                req_id: None,
1463                ret_code: auth.ret_code,
1464                ret_msg: auth.ret_msg,
1465            };
1466            let error = BybitWebSocketError::from_response(&resp);
1467            return BybitWsMessage::Error(error);
1468        }
1469
1470        if let Some(op_str) = op_val.as_str()
1471            && op_str.starts_with("order.")
1472        {
1473            return serde_json::from_value::<BybitWsOrderResponse>(value.clone()).map_or_else(
1474                |_| BybitWsMessage::Raw(value),
1475                BybitWsMessage::OrderResponse,
1476            );
1477        }
1478    }
1479
1480    if let Some(success) = value.get("success").and_then(serde_json::Value::as_bool) {
1481        if success {
1482            return serde_json::from_value::<BybitWsSubscriptionMsg>(value.clone())
1483                .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::Subscription);
1484        }
1485        return serde_json::from_value::<BybitWsResponse>(value.clone()).map_or_else(
1486            |_| BybitWsMessage::Raw(value),
1487            |resp| {
1488                let error = BybitWebSocketError::from_response(&resp);
1489                BybitWsMessage::Error(error)
1490            },
1491        );
1492    }
1493
1494    // Most common path for market data
1495    if let Some(topic) = value.get("topic").and_then(serde_json::Value::as_str) {
1496        if topic.starts_with(BYBIT_TOPIC_ORDERBOOK) {
1497            return serde_json::from_value(value.clone())
1498                .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::Orderbook);
1499        }
1500
1501        if topic.contains(BYBIT_TOPIC_PUBLIC_TRADE) || topic.starts_with(BYBIT_TOPIC_TRADE) {
1502            return serde_json::from_value(value.clone())
1503                .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::Trade);
1504        }
1505
1506        if topic.starts_with(BYBIT_TOPIC_KLINE) {
1507            return serde_json::from_value(value.clone())
1508                .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::Kline);
1509        }
1510
1511        if topic.starts_with(BYBIT_TOPIC_TICKERS) {
1512            // Option symbols: BTC-6JAN23-17500-C (date, strike, C/P)
1513            let is_option = value
1514                .get("data")
1515                .and_then(|d| d.get("symbol"))
1516                .and_then(|s| s.as_str())
1517                .is_some_and(|symbol| symbol.contains('-') && symbol.matches('-').count() >= 3);
1518
1519            if is_option {
1520                return serde_json::from_value(value.clone())
1521                    .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::TickerOption);
1522            }
1523            return serde_json::from_value(value.clone())
1524                .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::TickerLinear);
1525        }
1526
1527        if topic.starts_with(BYBIT_TOPIC_ORDER) {
1528            return serde_json::from_value(value.clone())
1529                .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::AccountOrder);
1530        }
1531
1532        if topic.starts_with(BYBIT_TOPIC_EXECUTION) {
1533            return serde_json::from_value(value.clone()).map_or_else(
1534                |_| BybitWsMessage::Raw(value),
1535                BybitWsMessage::AccountExecution,
1536            );
1537        }
1538
1539        if topic.starts_with(BYBIT_TOPIC_WALLET) {
1540            return serde_json::from_value(value.clone()).map_or_else(
1541                |_| BybitWsMessage::Raw(value),
1542                BybitWsMessage::AccountWallet,
1543            );
1544        }
1545
1546        if topic.starts_with(BYBIT_TOPIC_POSITION) {
1547            return serde_json::from_value(value.clone()).map_or_else(
1548                |_| BybitWsMessage::Raw(value),
1549                BybitWsMessage::AccountPosition,
1550            );
1551        }
1552    }
1553
1554    BybitWsMessage::Raw(value)
1555}
1556
1557#[cfg(test)]
1558mod tests {
1559    use rstest::rstest;
1560
1561    use super::*;
1562    use crate::common::consts::BYBIT_WS_TOPIC_DELIMITER;
1563
1564    fn create_test_handler() -> FeedHandler {
1565        let signal = Arc::new(AtomicBool::new(false));
1566        let (_cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
1567        let (_raw_tx, raw_rx) = tokio::sync::mpsc::unbounded_channel();
1568        let (out_tx, _out_rx) = tokio::sync::mpsc::unbounded_channel();
1569        let auth_tracker = AuthTracker::new();
1570        let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER);
1571        let funding_cache = Arc::new(tokio::sync::RwLock::new(AHashMap::new()));
1572
1573        FeedHandler::new(
1574            signal,
1575            cmd_rx,
1576            raw_rx,
1577            out_tx,
1578            None,
1579            None,
1580            true,
1581            Arc::new(AtomicU8::new(0)),
1582            auth_tracker,
1583            subscriptions,
1584            funding_cache,
1585        )
1586    }
1587
1588    #[rstest]
1589    fn test_generate_unique_request_id_returns_different_ids() {
1590        let handler = create_test_handler();
1591
1592        let id1 = handler.generate_unique_request_id();
1593        let id2 = handler.generate_unique_request_id();
1594        let id3 = handler.generate_unique_request_id();
1595
1596        assert_ne!(id1, id2);
1597        assert_ne!(id2, id3);
1598        assert_ne!(id1, id3);
1599    }
1600
1601    #[rstest]
1602    fn test_generate_unique_request_id_produces_valid_uuids() {
1603        let handler = create_test_handler();
1604
1605        let id1 = handler.generate_unique_request_id();
1606        let id2 = handler.generate_unique_request_id();
1607
1608        assert!(UUID4::from(id1.as_str()).to_string() == id1);
1609        assert!(UUID4::from(id2.as_str()).to_string() == id2);
1610    }
1611
1612    #[rstest]
1613    fn test_multiple_place_orders_use_different_request_ids() {
1614        let handler = create_test_handler();
1615
1616        let req_id_1 = handler.generate_unique_request_id();
1617        let req_id_2 = handler.generate_unique_request_id();
1618        let req_id_3 = handler.generate_unique_request_id();
1619
1620        assert_ne!(req_id_1, req_id_2);
1621        assert_ne!(req_id_2, req_id_3);
1622        assert_ne!(req_id_1, req_id_3);
1623    }
1624
1625    #[rstest]
1626    fn test_multiple_amends_use_different_request_ids() {
1627        let handler = create_test_handler();
1628
1629        // Verifies fix for "Duplicate reqId" errors when amending same order multiple times
1630        let req_id_1 = handler.generate_unique_request_id();
1631        let req_id_2 = handler.generate_unique_request_id();
1632        let req_id_3 = handler.generate_unique_request_id();
1633
1634        assert_ne!(
1635            req_id_1, req_id_2,
1636            "Multiple amends should generate different request IDs to avoid 'Duplicate reqId' errors"
1637        );
1638        assert_ne!(
1639            req_id_2, req_id_3,
1640            "Multiple amends should generate different request IDs to avoid 'Duplicate reqId' errors"
1641        );
1642    }
1643
1644    #[rstest]
1645    fn test_multiple_cancels_use_different_request_ids() {
1646        let handler = create_test_handler();
1647
1648        let req_id_1 = handler.generate_unique_request_id();
1649        let req_id_2 = handler.generate_unique_request_id();
1650
1651        assert_ne!(
1652            req_id_1, req_id_2,
1653            "Multiple cancels should generate different request IDs"
1654        );
1655    }
1656
1657    #[rstest]
1658    fn test_concurrent_request_id_generation() {
1659        let handler = create_test_handler();
1660
1661        let mut ids = std::collections::HashSet::new();
1662        for _ in 0..100 {
1663            let id = handler.generate_unique_request_id();
1664            assert!(
1665                ids.insert(id.clone()),
1666                "Generated duplicate request ID: {id}"
1667            );
1668        }
1669        assert_eq!(ids.len(), 100);
1670    }
1671}