nautilus_bybit/websocket/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket message handler for Bybit.
17
18use std::{
19    collections::VecDeque,
20    num::NonZero,
21    sync::{
22        Arc,
23        atomic::{AtomicBool, AtomicU8, Ordering},
24    },
25};
26
27use ahash::AHashMap;
28use dashmap::DashMap;
29use nautilus_common::cache::quote::QuoteCache;
30use nautilus_core::{UUID4, nanos::UnixNanos, time::get_atomic_clock_realtime};
31use nautilus_model::{
32    data::{BarSpecification, BarType, Data},
33    enums::{AggregationSource, BarAggregation, PriceType},
34    events::{OrderCancelRejected, OrderModifyRejected, OrderRejected},
35    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
36    instruments::{Instrument, InstrumentAny},
37};
38use nautilus_network::{
39    retry::{RetryManager, create_websocket_retry_manager},
40    websocket::{AuthTracker, SubscriptionState, WebSocketClient},
41};
42use tokio_tungstenite::tungstenite::Message;
43use ustr::Ustr;
44
45use super::{
46    enums::BybitWsOperation,
47    error::{BybitWsError, create_bybit_timeout_error, should_retry_bybit_error},
48    messages::{
49        BybitWebSocketError, BybitWsAuthResponse, BybitWsHeader, BybitWsMessage, BybitWsRequest,
50        BybitWsResponse, BybitWsSubscriptionMsg, NautilusWsMessage,
51    },
52    parse::{
53        parse_kline_topic, parse_millis_i64, parse_orderbook_deltas, parse_orderbook_quote,
54        parse_ticker_linear_funding, parse_ws_account_state, parse_ws_fill_report,
55        parse_ws_kline_bar, parse_ws_order_status_report, parse_ws_position_status_report,
56        parse_ws_trade_tick,
57    },
58};
59use crate::{
60    common::{
61        consts::{
62            BYBIT_NAUTILUS_BROKER_ID, BYBIT_TOPIC_EXECUTION, BYBIT_TOPIC_KLINE, BYBIT_TOPIC_ORDER,
63            BYBIT_TOPIC_ORDERBOOK, BYBIT_TOPIC_POSITION, BYBIT_TOPIC_PUBLIC_TRADE,
64            BYBIT_TOPIC_TICKERS, BYBIT_TOPIC_TRADE, BYBIT_TOPIC_WALLET,
65        },
66        enums::{BybitProductType, BybitTimeInForce, BybitWsOrderRequestOp},
67        parse::{make_bybit_symbol, parse_price_with_precision, parse_quantity_with_precision},
68    },
69    websocket::messages::{
70        BybitBatchOrderError, BybitWsAmendOrderParams, BybitWsCancelOrderParams,
71        BybitWsOrderResponse, BybitWsPlaceOrderParams,
72    },
73};
74
75/// Commands sent from the outer client to the inner message handler.
76#[derive(Debug)]
77#[allow(
78    clippy::large_enum_variant,
79    reason = "Commands are ephemeral and immediately consumed"
80)]
81pub enum HandlerCommand {
82    SetClient(WebSocketClient),
83    Disconnect,
84    Authenticate {
85        payload: String,
86    },
87    Subscribe {
88        topics: Vec<String>,
89    },
90    Unsubscribe {
91        topics: Vec<String>,
92    },
93    SendText {
94        payload: String,
95    },
96    PlaceOrder {
97        params: BybitWsPlaceOrderParams,
98        client_order_id: ClientOrderId,
99        trader_id: TraderId,
100        strategy_id: StrategyId,
101        instrument_id: InstrumentId,
102    },
103    AmendOrder {
104        params: BybitWsAmendOrderParams,
105        client_order_id: ClientOrderId,
106        trader_id: TraderId,
107        strategy_id: StrategyId,
108        instrument_id: InstrumentId,
109        venue_order_id: Option<VenueOrderId>,
110    },
111    CancelOrder {
112        params: BybitWsCancelOrderParams,
113        client_order_id: ClientOrderId,
114        trader_id: TraderId,
115        strategy_id: StrategyId,
116        instrument_id: InstrumentId,
117        venue_order_id: Option<VenueOrderId>,
118    },
119    RegisterBatchPlace {
120        req_id: String,
121        orders: Vec<BatchOrderData>,
122    },
123    RegisterBatchCancel {
124        req_id: String,
125        cancels: Vec<BatchCancelData>,
126    },
127    InitializeInstruments(Vec<InstrumentAny>),
128    UpdateInstrument(InstrumentAny),
129}
130
131/// Type alias for the funding rate cache.
132type FundingCache = Arc<tokio::sync::RwLock<AHashMap<Ustr, (Option<String>, Option<String>)>>>;
133
134/// Data cached for pending place requests to correlate with responses.
135type PlaceRequestData = (ClientOrderId, TraderId, StrategyId, InstrumentId);
136
137/// Data cached for pending cancel requests to correlate with responses.
138type CancelRequestData = (
139    ClientOrderId,
140    TraderId,
141    StrategyId,
142    InstrumentId,
143    Option<VenueOrderId>,
144);
145
146/// Data cached for pending amend requests to correlate with responses.
147type AmendRequestData = (
148    ClientOrderId,
149    TraderId,
150    StrategyId,
151    InstrumentId,
152    Option<VenueOrderId>,
153);
154
155/// Data for a single order in a batch request.
156type BatchOrderData = (ClientOrderId, PlaceRequestData);
157
158/// Data for a single cancel in a batch request.
159type BatchCancelData = (ClientOrderId, CancelRequestData);
160
161pub(super) struct FeedHandler {
162    signal: Arc<AtomicBool>,
163    client: Option<WebSocketClient>,
164    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
165    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
166    out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
167    auth_tracker: AuthTracker,
168    subscriptions: SubscriptionState,
169    instruments_cache: AHashMap<Ustr, InstrumentAny>,
170    account_id: Option<AccountId>,
171    mm_level: Arc<AtomicU8>,
172    product_type: Option<BybitProductType>,
173    quote_cache: QuoteCache,
174    funding_cache: FundingCache,
175    retry_manager: RetryManager<BybitWsError>,
176    pending_place_requests: DashMap<String, PlaceRequestData>,
177    pending_cancel_requests: DashMap<String, CancelRequestData>,
178    pending_amend_requests: DashMap<String, AmendRequestData>,
179    pending_batch_place_requests: DashMap<String, Vec<BatchOrderData>>,
180    pending_batch_cancel_requests: DashMap<String, Vec<BatchCancelData>>,
181    message_queue: VecDeque<NautilusWsMessage>,
182}
183
184impl FeedHandler {
185    /// Creates a new [`FeedHandler`] instance.
186    #[allow(clippy::too_many_arguments)]
187    pub(super) fn new(
188        signal: Arc<AtomicBool>,
189        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
190        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
191        out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
192        account_id: Option<AccountId>,
193        product_type: Option<BybitProductType>,
194        mm_level: Arc<AtomicU8>,
195        auth_tracker: AuthTracker,
196        subscriptions: SubscriptionState,
197        funding_cache: FundingCache,
198    ) -> Self {
199        Self {
200            signal,
201            client: None,
202            cmd_rx,
203            raw_rx,
204            out_tx,
205            auth_tracker,
206            subscriptions,
207            instruments_cache: AHashMap::new(),
208            account_id,
209            mm_level,
210            product_type,
211            quote_cache: QuoteCache::new(),
212            funding_cache,
213            retry_manager: create_websocket_retry_manager(),
214            pending_place_requests: DashMap::new(),
215            pending_cancel_requests: DashMap::new(),
216            pending_amend_requests: DashMap::new(),
217            pending_batch_place_requests: DashMap::new(),
218            pending_batch_cancel_requests: DashMap::new(),
219            message_queue: VecDeque::new(),
220        }
221    }
222
223    pub(super) fn is_stopped(&self) -> bool {
224        self.signal.load(Ordering::Relaxed)
225    }
226
227    #[allow(dead_code)]
228    pub(super) fn send(&self, msg: NautilusWsMessage) -> Result<(), ()> {
229        self.out_tx.send(msg).map_err(|_| ())
230    }
231
232    fn generate_unique_request_id(&self) -> String {
233        UUID4::new().to_string()
234    }
235
236    fn find_and_remove_place_request_by_client_order_id(
237        &self,
238        client_order_id: &ClientOrderId,
239    ) -> Option<(String, PlaceRequestData)> {
240        self.pending_place_requests
241            .iter()
242            .find(|entry| entry.value().0 == *client_order_id)
243            .and_then(|entry| {
244                let key = entry.key().clone();
245                drop(entry);
246                self.pending_place_requests.remove(&key)
247            })
248    }
249
250    fn find_and_remove_cancel_request_by_client_order_id(
251        &self,
252        client_order_id: &ClientOrderId,
253    ) -> Option<(String, CancelRequestData)> {
254        self.pending_cancel_requests
255            .iter()
256            .find(|entry| entry.value().0 == *client_order_id)
257            .and_then(|entry| {
258                let key = entry.key().clone();
259                drop(entry);
260                self.pending_cancel_requests.remove(&key)
261            })
262    }
263
264    fn find_and_remove_amend_request_by_client_order_id(
265        &self,
266        client_order_id: &ClientOrderId,
267    ) -> Option<(String, AmendRequestData)> {
268        self.pending_amend_requests
269            .iter()
270            .find(|entry| entry.value().0 == *client_order_id)
271            .and_then(|entry| {
272                let key = entry.key().clone();
273                drop(entry);
274                self.pending_amend_requests.remove(&key)
275            })
276    }
277
278    fn include_referer_header(&self, time_in_force: Option<BybitTimeInForce>) -> bool {
279        let is_post_only = matches!(time_in_force, Some(BybitTimeInForce::PostOnly));
280        let mm_level = self.mm_level.load(Ordering::Relaxed);
281        !(is_post_only && mm_level > 0)
282    }
283
284    /// Sends a WebSocket message with retry logic.
285    async fn send_with_retry(&self, payload: String) -> Result<(), BybitWsError> {
286        if let Some(client) = &self.client {
287            self.retry_manager
288                .execute_with_retry(
289                    "websocket_send",
290                    || {
291                        let payload = payload.clone();
292                        async move {
293                            client
294                                .send_text(payload, None)
295                                .await
296                                .map_err(|e| BybitWsError::Transport(format!("Send failed: {e}")))
297                        }
298                    },
299                    should_retry_bybit_error,
300                    create_bybit_timeout_error,
301                )
302                .await
303        } else {
304            Err(BybitWsError::ClientError(
305                "No active WebSocket client".to_string(),
306            ))
307        }
308    }
309
310    /// Handles batch operation request-level failures (ret_code != 0).
311    ///
312    /// When a batch request fails entirely, generate rejection events for all orders
313    /// in the batch and clean up tracking data.
314    fn handle_batch_failure(
315        &self,
316        req_id: &str,
317        ret_msg: &str,
318        op: &str,
319        ts_init: UnixNanos,
320        result: &mut Vec<NautilusWsMessage>,
321    ) {
322        if op.contains("create") {
323            if let Some((_, batch_data)) = self.pending_batch_place_requests.remove(req_id) {
324                tracing::warn!(
325                    req_id = %req_id,
326                    ret_msg = %ret_msg,
327                    num_orders = batch_data.len(),
328                    "Batch place request failed"
329                );
330
331                let Some(account_id) = self.account_id else {
332                    tracing::error!("Cannot create OrderRejected events: account_id is None");
333                    return;
334                };
335
336                let reason = Ustr::from(ret_msg);
337                for (client_order_id, (_, trader_id, strategy_id, instrument_id)) in batch_data {
338                    let rejected = OrderRejected::new(
339                        trader_id,
340                        strategy_id,
341                        instrument_id,
342                        client_order_id,
343                        account_id,
344                        reason,
345                        UUID4::new(),
346                        ts_init,
347                        ts_init,
348                        false,
349                        false,
350                    );
351                    result.push(NautilusWsMessage::OrderRejected(rejected));
352                }
353            }
354        } else if op.contains("cancel")
355            && let Some((_, batch_data)) = self.pending_batch_cancel_requests.remove(req_id)
356        {
357            tracing::warn!(
358                req_id = %req_id,
359                ret_msg = %ret_msg,
360                num_cancels = batch_data.len(),
361                "Batch cancel request failed"
362            );
363
364            let reason = Ustr::from(ret_msg);
365            for (client_order_id, (_, trader_id, strategy_id, instrument_id, venue_order_id)) in
366                batch_data
367            {
368                let rejected = OrderCancelRejected::new(
369                    trader_id,
370                    strategy_id,
371                    instrument_id,
372                    client_order_id,
373                    reason,
374                    UUID4::new(),
375                    ts_init,
376                    ts_init,
377                    false,
378                    venue_order_id,
379                    self.account_id,
380                );
381                result.push(NautilusWsMessage::OrderCancelRejected(rejected));
382            }
383        }
384    }
385
386    /// Handles batch operation responses, checking for individual order failures.
387    fn handle_batch_response(
388        &self,
389        resp: &BybitWsOrderResponse,
390        result: &mut Vec<NautilusWsMessage>,
391    ) {
392        let Some(req_id) = &resp.req_id else {
393            tracing::warn!(
394                op = %resp.op,
395                "Batch response missing req_id - cannot correlate with pending requests"
396            );
397            return;
398        };
399
400        let batch_errors = resp.extract_batch_errors();
401
402        if resp.op.contains("create") {
403            if let Some((_, batch_data)) = self.pending_batch_place_requests.remove(req_id) {
404                self.process_batch_place_errors(batch_data, batch_errors, result);
405            } else {
406                tracing::debug!(
407                    req_id = %req_id,
408                    "Batch place response received but no pending request found"
409                );
410            }
411        } else if resp.op.contains("cancel") {
412            if let Some((_, batch_data)) = self.pending_batch_cancel_requests.remove(req_id) {
413                self.process_batch_cancel_errors(batch_data, batch_errors, result);
414            } else {
415                tracing::debug!(
416                    req_id = %req_id,
417                    "Batch cancel response received but no pending request found"
418                );
419            }
420        }
421    }
422
423    /// Processes individual order errors from a batch place operation.
424    fn process_batch_place_errors(
425        &self,
426        batch_data: Vec<BatchOrderData>,
427        errors: Vec<BybitBatchOrderError>,
428        result: &mut Vec<NautilusWsMessage>,
429    ) {
430        let Some(account_id) = self.account_id else {
431            tracing::error!("Cannot create OrderRejected events: account_id is None");
432            return;
433        };
434
435        let clock = get_atomic_clock_realtime();
436        let ts_init = clock.get_time_ns();
437
438        for (idx, (client_order_id, (_, trader_id, strategy_id, instrument_id))) in
439            batch_data.into_iter().enumerate()
440        {
441            if let Some(error) = errors.get(idx)
442                && error.code != 0
443            {
444                tracing::warn!(
445                    client_order_id = %client_order_id,
446                    error_code = error.code,
447                    error_msg = %error.msg,
448                    "Batch order rejected"
449                );
450
451                let rejected = OrderRejected::new(
452                    trader_id,
453                    strategy_id,
454                    instrument_id,
455                    client_order_id,
456                    account_id,
457                    Ustr::from(&error.msg),
458                    UUID4::new(),
459                    ts_init,
460                    ts_init,
461                    false,
462                    false,
463                );
464                result.push(NautilusWsMessage::OrderRejected(rejected));
465            }
466        }
467    }
468
469    /// Processes individual order errors from a batch cancel operation.
470    fn process_batch_cancel_errors(
471        &self,
472        batch_data: Vec<BatchCancelData>,
473        errors: Vec<BybitBatchOrderError>,
474        result: &mut Vec<NautilusWsMessage>,
475    ) {
476        let clock = get_atomic_clock_realtime();
477        let ts_init = clock.get_time_ns();
478
479        for (idx, (client_order_id, (_, trader_id, strategy_id, instrument_id, venue_order_id))) in
480            batch_data.into_iter().enumerate()
481        {
482            if let Some(error) = errors.get(idx)
483                && error.code != 0
484            {
485                tracing::warn!(
486                    client_order_id = %client_order_id,
487                    error_code = error.code,
488                    error_msg = %error.msg,
489                    "Batch cancel rejected"
490                );
491
492                let rejected = OrderCancelRejected::new(
493                    trader_id,
494                    strategy_id,
495                    instrument_id,
496                    client_order_id,
497                    Ustr::from(&error.msg),
498                    UUID4::new(),
499                    ts_init,
500                    ts_init,
501                    false,
502                    venue_order_id,
503                    self.account_id,
504                );
505                result.push(NautilusWsMessage::OrderCancelRejected(rejected));
506            }
507        }
508    }
509
510    pub(super) async fn next(&mut self) -> Option<NautilusWsMessage> {
511        let clock = get_atomic_clock_realtime();
512
513        loop {
514            if let Some(msg) = self.message_queue.pop_front() {
515                return Some(msg);
516            }
517
518            tokio::select! {
519                Some(cmd) = self.cmd_rx.recv() => {
520                    match cmd {
521                        HandlerCommand::SetClient(client) => {
522                            tracing::debug!("WebSocketClient received by handler");
523                            self.client = Some(client);
524                        }
525                        HandlerCommand::Disconnect => {
526                            tracing::debug!("Disconnect command received");
527
528                            if let Some(client) = self.client.take() {
529                                client.disconnect().await;
530                            }
531                        }
532                        HandlerCommand::Authenticate { payload } => {
533                            tracing::debug!("Authenticate command received");
534                            if let Err(e) = self.send_with_retry(payload).await {
535                                tracing::error!("Failed to send authentication after retries: {e}");
536                            }
537                        }
538                        HandlerCommand::Subscribe { topics } => {
539                            for topic in topics {
540                                tracing::debug!(topic = %topic, "Subscribing to topic");
541                                if let Err(e) = self.send_with_retry(topic.clone()).await {
542                                    tracing::error!(topic = %topic, error = %e, "Failed to send subscription after retries");
543                                }
544                            }
545                        }
546                        HandlerCommand::Unsubscribe { topics } => {
547                            for topic in topics {
548                                tracing::debug!(topic = %topic, "Unsubscribing from topic");
549                                if let Err(e) = self.send_with_retry(topic.clone()).await {
550                                    tracing::error!(topic = %topic, error = %e, "Failed to send unsubscription after retries");
551                                }
552                            }
553                        }
554                        HandlerCommand::SendText { payload } => {
555                            if let Err(e) = self.send_with_retry(payload).await {
556                                tracing::error!("Error sending text with retry: {e}");
557                            }
558                        }
559                        HandlerCommand::InitializeInstruments(instruments) => {
560                            for inst in instruments {
561                                self.instruments_cache.insert(inst.symbol().inner(), inst);
562                            }
563                        }
564                        HandlerCommand::UpdateInstrument(inst) => {
565                            self.instruments_cache.insert(inst.symbol().inner(), inst);
566                        }
567                        HandlerCommand::RegisterBatchPlace { req_id, orders } => {
568                            tracing::debug!(
569                                req_id = %req_id,
570                                num_orders = orders.len(),
571                                "Registering batch place request"
572                            );
573                            self.pending_batch_place_requests.insert(req_id, orders);
574                        }
575                        HandlerCommand::RegisterBatchCancel { req_id, cancels } => {
576                            tracing::debug!(
577                                req_id = %req_id,
578                                num_cancels = cancels.len(),
579                                "Registering batch cancel request"
580                            );
581                            self.pending_batch_cancel_requests.insert(req_id, cancels);
582                        }
583                        HandlerCommand::PlaceOrder {
584                            params,
585                            client_order_id,
586                            trader_id,
587                            strategy_id,
588                            instrument_id,
589                        } => {
590                            let request_id = self.generate_unique_request_id();
591
592                            self.pending_place_requests.insert(
593                                request_id.clone(),
594                                (client_order_id, trader_id, strategy_id, instrument_id),
595                            );
596
597                            let referer = if self.include_referer_header(params.time_in_force) {
598                                Some(BYBIT_NAUTILUS_BROKER_ID.to_string())
599                            } else {
600                                None
601                            };
602
603                            let request = BybitWsRequest {
604                                req_id: Some(request_id.clone()),
605                                op: BybitWsOrderRequestOp::Create,
606                                header: BybitWsHeader::with_referer(referer),
607                                args: vec![params],
608                            };
609
610                            if let Ok(payload) = serde_json::to_string(&request)
611                                && let Err(e) = self.send_with_retry(payload).await
612                            {
613                                tracing::error!("Failed to send place order after retries: {e}");
614                                self.pending_place_requests.remove(&request_id);
615                            }
616                        }
617                        HandlerCommand::AmendOrder {
618                            params,
619                            client_order_id,
620                            trader_id,
621                            strategy_id,
622                            instrument_id,
623                            venue_order_id,
624                        } => {
625                            let request_id = self.generate_unique_request_id();
626
627                            self.pending_amend_requests.insert(
628                                request_id.clone(),
629                                (client_order_id, trader_id, strategy_id, instrument_id, venue_order_id),
630                            );
631
632                            let request = BybitWsRequest {
633                                req_id: Some(request_id.clone()),
634                                op: BybitWsOrderRequestOp::Amend,
635                                header: BybitWsHeader::now(),
636                                args: vec![params],
637                            };
638
639                            if let Ok(payload) = serde_json::to_string(&request)
640                                && let Err(e) = self.send_with_retry(payload).await
641                            {
642                                tracing::error!("Failed to send amend order after retries: {e}");
643                                self.pending_amend_requests.remove(&request_id);
644                            }
645                        }
646                        HandlerCommand::CancelOrder {
647                            params,
648                            client_order_id,
649                            trader_id,
650                            strategy_id,
651                            instrument_id,
652                            venue_order_id,
653                        } => {
654                            let request_id = self.generate_unique_request_id();
655
656                            self.pending_cancel_requests.insert(
657                                request_id.clone(),
658                                (client_order_id, trader_id, strategy_id, instrument_id, venue_order_id),
659                            );
660
661                            let request = BybitWsRequest {
662                                req_id: Some(request_id.clone()),
663                                op: BybitWsOrderRequestOp::Cancel,
664                                header: BybitWsHeader::now(),
665                                args: vec![params],
666                            };
667
668                            if let Ok(payload) = serde_json::to_string(&request)
669                                && let Err(e) = self.send_with_retry(payload).await
670                            {
671                                tracing::error!("Failed to send cancel order after retries: {e}");
672                                self.pending_cancel_requests.remove(&request_id);
673                            }
674                        }
675                    }
676
677                    continue;
678                }
679
680                _ = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
681                    if self.signal.load(Ordering::Relaxed) {
682                        tracing::debug!("Stop signal received during idle period");
683                        return None;
684                    }
685                    continue;
686                }
687
688                msg = self.raw_rx.recv() => {
689                    let msg = match msg {
690                        Some(msg) => msg,
691                        None => {
692                            tracing::debug!("WebSocket stream closed");
693                            return None;
694                        }
695                    };
696
697                    if let Message::Ping(data) = &msg {
698                        tracing::trace!("Received ping frame with {} bytes", data.len());
699
700                        if let Some(client) = &self.client
701                            && let Err(e) = client.send_pong(data.to_vec()).await
702                        {
703                            tracing::warn!(error = %e, "Failed to send pong frame");
704                        }
705                        continue;
706                    }
707
708                    let event = match Self::parse_raw_message(msg) {
709                        Some(event) => event,
710                        None => continue,
711                    };
712
713                    if self.signal.load(Ordering::Relaxed) {
714                        tracing::debug!("Stop signal received");
715                        return None;
716                    }
717
718                    let ts_init = clock.get_time_ns();
719                    let instruments = self.instruments_cache.clone();
720                    let funding_cache = Arc::clone(&self.funding_cache);
721                    let nautilus_messages = self.parse_to_nautilus_messages(
722                        event,
723                        &instruments,
724                        self.account_id,
725                        self.product_type,
726                        &funding_cache,
727                        ts_init,
728                    )
729                    .await;
730
731                    // Enqueue all parsed messages to emit them one by one
732                    self.message_queue.extend(nautilus_messages);
733                }
734            }
735        }
736    }
737
738    fn parse_raw_message(msg: Message) -> Option<BybitWsMessage> {
739        use serde_json::Value;
740
741        match msg {
742            Message::Text(text) => {
743                if text == nautilus_network::RECONNECTED {
744                    tracing::info!("Received WebSocket reconnected signal");
745                    return Some(BybitWsMessage::Reconnected);
746                }
747
748                if text.trim().eq_ignore_ascii_case("pong") {
749                    return None;
750                }
751
752                tracing::trace!("Raw websocket message: {text}");
753
754                let value: Value = match serde_json::from_str(&text) {
755                    Ok(v) => v,
756                    Err(e) => {
757                        tracing::error!("Failed to parse WebSocket message: {e}: {text}");
758                        return None;
759                    }
760                };
761
762                Some(classify_bybit_message(value))
763            }
764            Message::Binary(msg) => {
765                tracing::debug!("Raw binary: {msg:?}");
766                None
767            }
768            Message::Close(_) => {
769                tracing::debug!("Received close message, waiting for reconnection");
770                None
771            }
772            _ => None,
773        }
774    }
775
776    #[allow(clippy::too_many_arguments)]
777    async fn parse_to_nautilus_messages(
778        &mut self,
779        msg: BybitWsMessage,
780        instruments: &AHashMap<Ustr, InstrumentAny>,
781        account_id: Option<AccountId>,
782        product_type: Option<BybitProductType>,
783        funding_cache: &FundingCache,
784        ts_init: UnixNanos,
785    ) -> Vec<NautilusWsMessage> {
786        let mut result = Vec::new();
787
788        match msg {
789            BybitWsMessage::Orderbook(msg) => {
790                let raw_symbol = msg.data.s;
791                let symbol =
792                    product_type.map_or(raw_symbol, |pt| make_bybit_symbol(raw_symbol, pt));
793
794                if let Some(instrument) = instruments.get(&symbol) {
795                    match parse_orderbook_deltas(&msg, instrument, ts_init) {
796                        Ok(deltas) => result.push(NautilusWsMessage::Deltas(deltas)),
797                        Err(e) => tracing::error!("Error parsing orderbook deltas: {e}"),
798                    }
799
800                    // For depth=1 subscriptions, also emit QuoteTick from top-of-book
801                    if let Some(depth_str) = msg.topic.as_str().split('.').nth(1)
802                        && depth_str == "1"
803                    {
804                        let instrument_id = instrument.id();
805                        let last_quote = self.quote_cache.get(&instrument_id);
806
807                        match parse_orderbook_quote(&msg, instrument, last_quote, ts_init) {
808                            Ok(quote) => {
809                                self.quote_cache.insert(instrument_id, quote);
810                                result.push(NautilusWsMessage::Data(vec![Data::Quote(quote)]));
811                            }
812                            Err(e) => tracing::debug!("Skipping orderbook quote: {e}"),
813                        }
814                    }
815                } else {
816                    tracing::debug!(raw_symbol = %raw_symbol, full_symbol = %symbol, "No instrument found for symbol in Orderbook message");
817                }
818            }
819            BybitWsMessage::Trade(msg) => {
820                let mut data_vec = Vec::new();
821                for trade in &msg.data {
822                    let raw_symbol = trade.s;
823                    let symbol =
824                        product_type.map_or(raw_symbol, |pt| make_bybit_symbol(raw_symbol, pt));
825
826                    if let Some(instrument) = instruments.get(&symbol) {
827                        match parse_ws_trade_tick(trade, instrument, ts_init) {
828                            Ok(tick) => data_vec.push(Data::Trade(tick)),
829                            Err(e) => tracing::error!("Error parsing trade tick: {e}"),
830                        }
831                    } else {
832                        tracing::debug!(raw_symbol = %raw_symbol, full_symbol = %symbol, "No instrument found for symbol in Trade message");
833                    }
834                }
835
836                if !data_vec.is_empty() {
837                    result.push(NautilusWsMessage::Data(data_vec));
838                }
839            }
840            BybitWsMessage::Kline(msg) => {
841                let (interval_str, raw_symbol) = match parse_kline_topic(&msg.topic) {
842                    Ok(parts) => parts,
843                    Err(e) => {
844                        tracing::warn!("Failed to parse kline topic: {e}");
845                        return result;
846                    }
847                };
848
849                let symbol = product_type
850                    .map_or_else(|| raw_symbol.into(), |pt| make_bybit_symbol(raw_symbol, pt));
851
852                if let Some(instrument) = instruments.get(&symbol) {
853                    let (step, aggregation) = match interval_str.parse::<usize>() {
854                        Ok(minutes) if minutes > 0 => (minutes, BarAggregation::Minute),
855                        _ => {
856                            tracing::warn!("Unsupported kline interval: {}", interval_str);
857                            return result;
858                        }
859                    };
860
861                    if let Some(non_zero_step) = NonZero::new(step) {
862                        let bar_spec = BarSpecification {
863                            step: non_zero_step,
864                            aggregation,
865                            price_type: PriceType::Last,
866                        };
867                        let bar_type =
868                            BarType::new(instrument.id(), bar_spec, AggregationSource::External);
869
870                        let mut data_vec = Vec::new();
871                        for kline in &msg.data {
872                            // Only process confirmed bars (not partial/building bars)
873                            if !kline.confirm {
874                                continue;
875                            }
876                            match parse_ws_kline_bar(kline, instrument, bar_type, false, ts_init) {
877                                Ok(bar) => data_vec.push(Data::Bar(bar)),
878                                Err(e) => tracing::error!("Error parsing kline to bar: {e}"),
879                            }
880                        }
881                        if !data_vec.is_empty() {
882                            result.push(NautilusWsMessage::Data(data_vec));
883                        }
884                    } else {
885                        tracing::error!("Invalid step value: {}", step);
886                    }
887                } else {
888                    tracing::debug!(raw_symbol = %raw_symbol, full_symbol = %symbol, "No instrument found for symbol in Kline message");
889                }
890            }
891            BybitWsMessage::TickerLinear(msg) => {
892                let raw_symbol = msg.data.symbol;
893                let symbol =
894                    product_type.map_or(raw_symbol, |pt| make_bybit_symbol(raw_symbol, pt));
895
896                if let Some(instrument) = instruments.get(&symbol) {
897                    let instrument_id = instrument.id();
898                    let ts_event = parse_millis_i64(msg.ts, "ticker.ts").unwrap_or(ts_init);
899                    let price_precision = instrument.price_precision();
900                    let size_precision = instrument.size_precision();
901
902                    // Parse Bybit linear ticker fields, propagate errors
903                    let bid_price = msg
904                        .data
905                        .bid1_price
906                        .as_deref()
907                        .map(|s| parse_price_with_precision(s, price_precision, "bid1Price"))
908                        .transpose();
909                    let ask_price = msg
910                        .data
911                        .ask1_price
912                        .as_deref()
913                        .map(|s| parse_price_with_precision(s, price_precision, "ask1Price"))
914                        .transpose();
915                    let bid_size = msg
916                        .data
917                        .bid1_size
918                        .as_deref()
919                        .map(|s| parse_quantity_with_precision(s, size_precision, "bid1Size"))
920                        .transpose();
921                    let ask_size = msg
922                        .data
923                        .ask1_size
924                        .as_deref()
925                        .map(|s| parse_quantity_with_precision(s, size_precision, "ask1Size"))
926                        .transpose();
927
928                    match (bid_price, ask_price, bid_size, ask_size) {
929                        (Ok(bp), Ok(ap), Ok(bs), Ok(as_)) => {
930                            match self.quote_cache.process(
931                                instrument_id,
932                                bp,
933                                ap,
934                                bs,
935                                as_,
936                                ts_event,
937                                ts_init,
938                            ) {
939                                Ok(quote) => {
940                                    result.push(NautilusWsMessage::Data(vec![Data::Quote(quote)]));
941                                }
942                                Err(e) => {
943                                    let raw_data = serde_json::to_string(&msg.data)
944                                        .unwrap_or_else(|_| "<failed to serialize>".to_string());
945                                    tracing::debug!(
946                                        "Skipping partial ticker update: {e}, raw_data: {raw_data}"
947                                    );
948                                }
949                            }
950                        }
951                        _ => {
952                            let raw_data = serde_json::to_string(&msg.data)
953                                .unwrap_or_else(|_| "<failed to serialize>".to_string());
954                            tracing::warn!(
955                                "Failed to parse ticker fields, skipping update, raw_data: {raw_data}"
956                            );
957                        }
958                    }
959
960                    // Extract funding rate if available
961                    if msg.data.funding_rate.is_some() && msg.data.next_funding_time.is_some() {
962                        let should_publish = {
963                            let cache = funding_cache.read().await;
964                            match cache.get(&symbol) {
965                                Some((cached_rate, cached_time)) => {
966                                    cached_rate.as_ref() != msg.data.funding_rate.as_ref()
967                                        || cached_time.as_ref()
968                                            != msg.data.next_funding_time.as_ref()
969                                }
970                                None => true,
971                            }
972                        };
973
974                        if should_publish {
975                            match parse_ticker_linear_funding(
976                                &msg.data,
977                                instrument_id,
978                                ts_event,
979                                ts_init,
980                            ) {
981                                Ok(funding) => {
982                                    let cache_key = (
983                                        msg.data.funding_rate.clone(),
984                                        msg.data.next_funding_time.clone(),
985                                    );
986                                    funding_cache.write().await.insert(symbol, cache_key);
987                                    result.push(NautilusWsMessage::FundingRates(vec![funding]));
988                                }
989                                Err(e) => {
990                                    tracing::debug!("Skipping funding rate update: {e}");
991                                }
992                            }
993                        }
994                    }
995                } else {
996                    tracing::debug!(raw_symbol = %raw_symbol, full_symbol = %symbol, "No instrument found for symbol in TickerLinear message");
997                }
998            }
999            BybitWsMessage::TickerOption(msg) => {
1000                let raw_symbol = &msg.data.symbol;
1001                let symbol = product_type.map_or_else(
1002                    || raw_symbol.as_str().into(),
1003                    |pt| make_bybit_symbol(raw_symbol, pt),
1004                );
1005
1006                if let Some(instrument) = instruments.get(&symbol) {
1007                    let instrument_id = instrument.id();
1008                    let ts_event = parse_millis_i64(msg.ts, "ticker.ts").unwrap_or(ts_init);
1009                    let price_precision = instrument.price_precision();
1010                    let size_precision = instrument.size_precision();
1011
1012                    // Parse Bybit option ticker fields (always complete), propagate errors
1013                    let bid_price = parse_price_with_precision(
1014                        &msg.data.bid_price,
1015                        price_precision,
1016                        "bidPrice",
1017                    );
1018                    let ask_price = parse_price_with_precision(
1019                        &msg.data.ask_price,
1020                        price_precision,
1021                        "askPrice",
1022                    );
1023                    let bid_size = parse_quantity_with_precision(
1024                        &msg.data.bid_size,
1025                        size_precision,
1026                        "bidSize",
1027                    );
1028                    let ask_size = parse_quantity_with_precision(
1029                        &msg.data.ask_size,
1030                        size_precision,
1031                        "askSize",
1032                    );
1033
1034                    match (bid_price, ask_price, bid_size, ask_size) {
1035                        (Ok(bp), Ok(ap), Ok(bs), Ok(as_)) => {
1036                            match self.quote_cache.process(
1037                                instrument_id,
1038                                Some(bp),
1039                                Some(ap),
1040                                Some(bs),
1041                                Some(as_),
1042                                ts_event,
1043                                ts_init,
1044                            ) {
1045                                Ok(quote) => {
1046                                    result.push(NautilusWsMessage::Data(vec![Data::Quote(quote)]));
1047                                }
1048                                Err(e) => {
1049                                    let raw_data = serde_json::to_string(&msg.data)
1050                                        .unwrap_or_else(|_| "<failed to serialize>".to_string());
1051                                    tracing::debug!(
1052                                        "Skipping partial ticker update: {e}, raw_data: {raw_data}"
1053                                    );
1054                                }
1055                            }
1056                        }
1057                        _ => {
1058                            let raw_data = serde_json::to_string(&msg.data)
1059                                .unwrap_or_else(|_| "<failed to serialize>".to_string());
1060                            tracing::warn!(
1061                                "Failed to parse ticker fields, skipping update, raw_data: {raw_data}"
1062                            );
1063                        }
1064                    }
1065                } else {
1066                    tracing::debug!(raw_symbol = %raw_symbol, full_symbol = %symbol, "No instrument found for symbol in TickerOption message");
1067                }
1068            }
1069            BybitWsMessage::AccountOrder(msg) => {
1070                if let Some(account_id) = account_id {
1071                    let mut reports = Vec::new();
1072                    for order in &msg.data {
1073                        let raw_symbol = order.symbol;
1074                        let symbol = make_bybit_symbol(raw_symbol, order.category);
1075
1076                        if let Some(instrument) = instruments.get(&symbol) {
1077                            match parse_ws_order_status_report(
1078                                order, instrument, account_id, ts_init,
1079                            ) {
1080                                Ok(report) => reports.push(report),
1081                                Err(e) => tracing::error!("Error parsing order status report: {e}"),
1082                            }
1083                        } else {
1084                            tracing::debug!(raw_symbol = %raw_symbol, full_symbol = %symbol, "No instrument found for symbol in AccountOrder message");
1085                        }
1086                    }
1087                    if !reports.is_empty() {
1088                        result.push(NautilusWsMessage::OrderStatusReports(reports));
1089                    }
1090                }
1091            }
1092            BybitWsMessage::AccountExecution(msg) => {
1093                if let Some(account_id) = account_id {
1094                    let mut reports = Vec::new();
1095                    for execution in &msg.data {
1096                        let raw_symbol = execution.symbol;
1097                        let symbol = make_bybit_symbol(raw_symbol, execution.category);
1098
1099                        if let Some(instrument) = instruments.get(&symbol) {
1100                            match parse_ws_fill_report(execution, account_id, instrument, ts_init) {
1101                                Ok(report) => reports.push(report),
1102                                Err(e) => tracing::error!("Error parsing fill report: {e}"),
1103                            }
1104                        } else {
1105                            tracing::debug!(raw_symbol = %raw_symbol, full_symbol = %symbol, "No instrument found for symbol in AccountExecution message");
1106                        }
1107                    }
1108                    if !reports.is_empty() {
1109                        result.push(NautilusWsMessage::FillReports(reports));
1110                    }
1111                }
1112            }
1113            BybitWsMessage::AccountPosition(msg) => {
1114                if let Some(account_id) = account_id {
1115                    for position in &msg.data {
1116                        let raw_symbol = position.symbol;
1117                        let symbol = make_bybit_symbol(raw_symbol, position.category);
1118
1119                        if let Some(instrument) = instruments.get(&symbol) {
1120                            match parse_ws_position_status_report(
1121                                position, account_id, instrument, ts_init,
1122                            ) {
1123                                Ok(report) => {
1124                                    result.push(NautilusWsMessage::PositionStatusReport(report));
1125                                }
1126                                Err(e) => {
1127                                    tracing::error!("Error parsing position status report: {e}");
1128                                }
1129                            }
1130                        } else {
1131                            tracing::debug!(raw_symbol = %raw_symbol, full_symbol = %symbol, "No instrument found for symbol in AccountPosition message");
1132                        }
1133                    }
1134                }
1135            }
1136            BybitWsMessage::AccountWallet(msg) => {
1137                if let Some(account_id) = account_id {
1138                    for wallet in &msg.data {
1139                        let ts_event = UnixNanos::from(msg.creation_time as u64 * 1_000_000);
1140
1141                        match parse_ws_account_state(wallet, account_id, ts_event, ts_init) {
1142                            Ok(state) => result.push(NautilusWsMessage::AccountState(state)),
1143                            Err(e) => tracing::error!("Error parsing account state: {e}"),
1144                        }
1145                    }
1146                }
1147            }
1148            BybitWsMessage::OrderResponse(resp) => {
1149                if resp.ret_code == 0 {
1150                    tracing::debug!(op = %resp.op, ret_msg = %resp.ret_msg, "Order operation successful");
1151
1152                    if resp.op.contains("batch") {
1153                        self.handle_batch_response(&resp, &mut result);
1154                    } else if let Some(req_id) = &resp.req_id {
1155                        if resp.op.contains("create") {
1156                            self.pending_place_requests.remove(req_id);
1157                        } else if resp.op.contains("cancel") {
1158                            self.pending_cancel_requests.remove(req_id);
1159                        } else if resp.op.contains("amend") {
1160                            self.pending_amend_requests.remove(req_id);
1161                        }
1162                    } else if let Some(order_link_id) =
1163                        resp.data.get("orderLinkId").and_then(|v| v.as_str())
1164                    {
1165                        // Bybit sometimes omits req_id, search by client_order_id instead
1166                        let client_order_id = ClientOrderId::from(order_link_id);
1167                        if resp.op.contains("create") {
1168                            self.find_and_remove_place_request_by_client_order_id(&client_order_id);
1169                        } else if resp.op.contains("cancel") {
1170                            self.find_and_remove_cancel_request_by_client_order_id(
1171                                &client_order_id,
1172                            );
1173                        } else if resp.op.contains("amend") {
1174                            self.find_and_remove_amend_request_by_client_order_id(&client_order_id);
1175                        }
1176                    }
1177                } else if let Some(req_id) = &resp.req_id {
1178                    let clock = get_atomic_clock_realtime();
1179                    let ts_init = clock.get_time_ns();
1180
1181                    if resp.op.contains("batch") {
1182                        self.handle_batch_failure(
1183                            req_id,
1184                            &resp.ret_msg,
1185                            &resp.op,
1186                            ts_init,
1187                            &mut result,
1188                        );
1189                    } else if resp.op.contains("create")
1190                        && let Some((_, (client_order_id, trader_id, strategy_id, instrument_id))) =
1191                            self.pending_place_requests.remove(req_id)
1192                    {
1193                        let Some(account_id) = self.account_id else {
1194                            tracing::error!(
1195                                request_id = %req_id,
1196                                reason = %resp.ret_msg,
1197                                "Cannot create OrderRejected event: account_id is None"
1198                            );
1199                            return result;
1200                        };
1201
1202                        let rejected = OrderRejected::new(
1203                            trader_id,
1204                            strategy_id,
1205                            instrument_id,
1206                            client_order_id,
1207                            account_id,
1208                            Ustr::from(&resp.ret_msg),
1209                            UUID4::new(),
1210                            ts_init,
1211                            ts_init,
1212                            false,
1213                            false,
1214                        );
1215                        result.push(NautilusWsMessage::OrderRejected(rejected));
1216                    } else if resp.op.contains("cancel")
1217                        && let Some((
1218                            _,
1219                            (
1220                                client_order_id,
1221                                trader_id,
1222                                strategy_id,
1223                                instrument_id,
1224                                venue_order_id,
1225                            ),
1226                        )) = self.pending_cancel_requests.remove(req_id)
1227                    {
1228                        let rejected = OrderCancelRejected::new(
1229                            trader_id,
1230                            strategy_id,
1231                            instrument_id,
1232                            client_order_id,
1233                            Ustr::from(&resp.ret_msg),
1234                            UUID4::new(),
1235                            ts_init,
1236                            ts_init,
1237                            false,
1238                            venue_order_id,
1239                            self.account_id,
1240                        );
1241                        result.push(NautilusWsMessage::OrderCancelRejected(rejected));
1242                    } else if resp.op.contains("amend")
1243                        && let Some((
1244                            _,
1245                            (
1246                                client_order_id,
1247                                trader_id,
1248                                strategy_id,
1249                                instrument_id,
1250                                venue_order_id,
1251                            ),
1252                        )) = self.pending_amend_requests.remove(req_id)
1253                    {
1254                        let rejected = OrderModifyRejected::new(
1255                            trader_id,
1256                            strategy_id,
1257                            instrument_id,
1258                            client_order_id,
1259                            Ustr::from(&resp.ret_msg),
1260                            UUID4::new(),
1261                            ts_init,
1262                            ts_init,
1263                            false,
1264                            venue_order_id,
1265                            self.account_id,
1266                        );
1267                        result.push(NautilusWsMessage::OrderModifyRejected(rejected));
1268                    }
1269                } else if let Some(order_link_id) =
1270                    resp.data.get("orderLinkId").and_then(|v| v.as_str())
1271                {
1272                    // Bybit sometimes omits req_id, search by client_order_id instead
1273                    let clock = get_atomic_clock_realtime();
1274                    let ts_init = clock.get_time_ns();
1275                    let client_order_id = ClientOrderId::from(order_link_id);
1276
1277                    if resp.op.contains("create") {
1278                        if let Some((_, (_, trader_id, strategy_id, instrument_id))) =
1279                            self.find_and_remove_place_request_by_client_order_id(&client_order_id)
1280                        {
1281                            let Some(account_id) = self.account_id else {
1282                                tracing::error!(
1283                                    client_order_id = %client_order_id,
1284                                    reason = %resp.ret_msg,
1285                                    "Cannot create OrderRejected event: account_id is None"
1286                                );
1287                                return result;
1288                            };
1289
1290                            let rejected = OrderRejected::new(
1291                                trader_id,
1292                                strategy_id,
1293                                instrument_id,
1294                                client_order_id,
1295                                account_id,
1296                                Ustr::from(&resp.ret_msg),
1297                                UUID4::new(),
1298                                ts_init,
1299                                ts_init,
1300                                false,
1301                                false,
1302                            );
1303                            result.push(NautilusWsMessage::OrderRejected(rejected));
1304                        }
1305                    } else if resp.op.contains("cancel") {
1306                        if let Some((
1307                            _,
1308                            (_, trader_id, strategy_id, instrument_id, venue_order_id),
1309                        )) =
1310                            self.find_and_remove_cancel_request_by_client_order_id(&client_order_id)
1311                        {
1312                            let rejected = OrderCancelRejected::new(
1313                                trader_id,
1314                                strategy_id,
1315                                instrument_id,
1316                                client_order_id,
1317                                Ustr::from(&resp.ret_msg),
1318                                UUID4::new(),
1319                                ts_init,
1320                                ts_init,
1321                                false,
1322                                venue_order_id,
1323                                self.account_id,
1324                            );
1325                            result.push(NautilusWsMessage::OrderCancelRejected(rejected));
1326                        }
1327                    } else if resp.op.contains("amend")
1328                        && let Some((_, (_, trader_id, strategy_id, instrument_id, venue_order_id))) =
1329                            self.find_and_remove_amend_request_by_client_order_id(&client_order_id)
1330                    {
1331                        let rejected = OrderModifyRejected::new(
1332                            trader_id,
1333                            strategy_id,
1334                            instrument_id,
1335                            client_order_id,
1336                            Ustr::from(&resp.ret_msg),
1337                            UUID4::new(),
1338                            ts_init,
1339                            ts_init,
1340                            false,
1341                            venue_order_id,
1342                            self.account_id,
1343                        );
1344                        result.push(NautilusWsMessage::OrderModifyRejected(rejected));
1345                    }
1346                } else {
1347                    tracing::warn!(
1348                        op = %resp.op,
1349                        ret_code = resp.ret_code,
1350                        ret_msg = %resp.ret_msg,
1351                        "Order operation failed but request_id could not be extracted from response"
1352                    );
1353                }
1354            }
1355            BybitWsMessage::Auth(auth_response) => {
1356                let is_success =
1357                    auth_response.success.unwrap_or(false) || (auth_response.ret_code == Some(0));
1358
1359                if is_success {
1360                    self.auth_tracker.succeed();
1361                    tracing::info!("WebSocket authenticated");
1362                    result.push(NautilusWsMessage::Authenticated);
1363                } else {
1364                    let error_msg = auth_response
1365                        .ret_msg
1366                        .as_deref()
1367                        .unwrap_or("Authentication rejected");
1368                    self.auth_tracker.fail(error_msg);
1369                    tracing::error!(error = error_msg, "WebSocket authentication failed");
1370                    result.push(NautilusWsMessage::Error(BybitWebSocketError::from_message(
1371                        error_msg.to_string(),
1372                    )));
1373                }
1374            }
1375            BybitWsMessage::Error(err) => {
1376                result.push(NautilusWsMessage::Error(err));
1377            }
1378            BybitWsMessage::Reconnected => {
1379                self.quote_cache.clear();
1380                result.push(NautilusWsMessage::Reconnected);
1381            }
1382            BybitWsMessage::Subscription(sub_msg) => {
1383                let pending_topics = self.subscriptions.pending_subscribe_topics();
1384                match sub_msg.op {
1385                    BybitWsOperation::Subscribe => {
1386                        if sub_msg.success {
1387                            for topic in pending_topics {
1388                                self.subscriptions.confirm_subscribe(&topic);
1389                                tracing::debug!(topic = topic, "Subscription confirmed");
1390                            }
1391                        } else {
1392                            for topic in pending_topics {
1393                                self.subscriptions.mark_failure(&topic);
1394                                tracing::warn!(
1395                                    topic = topic,
1396                                    error = ?sub_msg.ret_msg,
1397                                    "Subscription failed, will retry on reconnect"
1398                                );
1399                            }
1400                        }
1401                    }
1402                    BybitWsOperation::Unsubscribe => {
1403                        let pending_unsub = self.subscriptions.pending_unsubscribe_topics();
1404                        if sub_msg.success {
1405                            for topic in pending_unsub {
1406                                self.subscriptions.confirm_unsubscribe(&topic);
1407                                tracing::debug!(topic = topic, "Unsubscription confirmed");
1408                            }
1409                        } else {
1410                            for topic in pending_unsub {
1411                                tracing::warn!(
1412                                    topic = topic,
1413                                    error = ?sub_msg.ret_msg,
1414                                    "Unsubscription failed"
1415                                );
1416                            }
1417                        }
1418                    }
1419                    _ => {}
1420                }
1421            }
1422            _ => {}
1423        }
1424
1425        result
1426    }
1427}
1428
1429/// Classifies a parsed JSON value into a typed Bybit WebSocket message.
1430///
1431/// Returns `Raw(value)` if no specific type matches.
1432pub fn classify_bybit_message(value: serde_json::Value) -> BybitWsMessage {
1433    if let Some(op_val) = value.get("op") {
1434        if let Ok(op) = serde_json::from_value::<BybitWsOperation>(op_val.clone())
1435            && op == BybitWsOperation::Auth
1436            && let Ok(auth) = serde_json::from_value::<BybitWsAuthResponse>(value.clone())
1437        {
1438            let is_success = auth.success.unwrap_or(false) || auth.ret_code.unwrap_or(-1) == 0;
1439            if is_success {
1440                return BybitWsMessage::Auth(auth);
1441            }
1442            let resp = BybitWsResponse {
1443                op: Some(auth.op.clone()),
1444                topic: None,
1445                success: auth.success,
1446                conn_id: auth.conn_id.clone(),
1447                req_id: None,
1448                ret_code: auth.ret_code,
1449                ret_msg: auth.ret_msg,
1450            };
1451            let error = BybitWebSocketError::from_response(&resp);
1452            return BybitWsMessage::Error(error);
1453        }
1454
1455        if let Some(op_str) = op_val.as_str()
1456            && op_str.starts_with("order.")
1457        {
1458            return serde_json::from_value::<BybitWsOrderResponse>(value.clone()).map_or_else(
1459                |_| BybitWsMessage::Raw(value),
1460                BybitWsMessage::OrderResponse,
1461            );
1462        }
1463    }
1464
1465    if let Some(success) = value.get("success").and_then(serde_json::Value::as_bool) {
1466        if success {
1467            return serde_json::from_value::<BybitWsSubscriptionMsg>(value.clone())
1468                .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::Subscription);
1469        }
1470        return serde_json::from_value::<BybitWsResponse>(value.clone()).map_or_else(
1471            |_| BybitWsMessage::Raw(value),
1472            |resp| {
1473                let error = BybitWebSocketError::from_response(&resp);
1474                BybitWsMessage::Error(error)
1475            },
1476        );
1477    }
1478
1479    // Most common path for market data
1480    if let Some(topic) = value.get("topic").and_then(serde_json::Value::as_str) {
1481        if topic.starts_with(BYBIT_TOPIC_ORDERBOOK) {
1482            return serde_json::from_value(value.clone())
1483                .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::Orderbook);
1484        }
1485
1486        if topic.contains(BYBIT_TOPIC_PUBLIC_TRADE) || topic.starts_with(BYBIT_TOPIC_TRADE) {
1487            return serde_json::from_value(value.clone())
1488                .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::Trade);
1489        }
1490
1491        if topic.starts_with(BYBIT_TOPIC_KLINE) {
1492            return serde_json::from_value(value.clone())
1493                .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::Kline);
1494        }
1495
1496        if topic.starts_with(BYBIT_TOPIC_TICKERS) {
1497            // Option symbols: BTC-6JAN23-17500-C (date, strike, C/P)
1498            let is_option = value
1499                .get("data")
1500                .and_then(|d| d.get("symbol"))
1501                .and_then(|s| s.as_str())
1502                .is_some_and(|symbol| symbol.contains('-') && symbol.matches('-').count() >= 3);
1503
1504            if is_option {
1505                return serde_json::from_value(value.clone())
1506                    .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::TickerOption);
1507            }
1508            return serde_json::from_value(value.clone())
1509                .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::TickerLinear);
1510        }
1511
1512        if topic.starts_with(BYBIT_TOPIC_ORDER) {
1513            return serde_json::from_value(value.clone())
1514                .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::AccountOrder);
1515        }
1516
1517        if topic.starts_with(BYBIT_TOPIC_EXECUTION) {
1518            return serde_json::from_value(value.clone()).map_or_else(
1519                |_| BybitWsMessage::Raw(value),
1520                BybitWsMessage::AccountExecution,
1521            );
1522        }
1523
1524        if topic.starts_with(BYBIT_TOPIC_WALLET) {
1525            return serde_json::from_value(value.clone()).map_or_else(
1526                |_| BybitWsMessage::Raw(value),
1527                BybitWsMessage::AccountWallet,
1528            );
1529        }
1530
1531        if topic.starts_with(BYBIT_TOPIC_POSITION) {
1532            return serde_json::from_value(value.clone()).map_or_else(
1533                |_| BybitWsMessage::Raw(value),
1534                BybitWsMessage::AccountPosition,
1535            );
1536        }
1537    }
1538
1539    BybitWsMessage::Raw(value)
1540}
1541
1542#[cfg(test)]
1543mod tests {
1544    use rstest::rstest;
1545
1546    use super::*;
1547    use crate::common::consts::BYBIT_WS_TOPIC_DELIMITER;
1548
1549    fn create_test_handler() -> FeedHandler {
1550        let signal = Arc::new(AtomicBool::new(false));
1551        let (_cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
1552        let (_raw_tx, raw_rx) = tokio::sync::mpsc::unbounded_channel();
1553        let (out_tx, _out_rx) = tokio::sync::mpsc::unbounded_channel();
1554        let auth_tracker = AuthTracker::new();
1555        let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER);
1556        let funding_cache = Arc::new(tokio::sync::RwLock::new(AHashMap::new()));
1557
1558        FeedHandler::new(
1559            signal,
1560            cmd_rx,
1561            raw_rx,
1562            out_tx,
1563            None,
1564            None,
1565            Arc::new(AtomicU8::new(0)),
1566            auth_tracker,
1567            subscriptions,
1568            funding_cache,
1569        )
1570    }
1571
1572    #[rstest]
1573    fn test_generate_unique_request_id_returns_different_ids() {
1574        let handler = create_test_handler();
1575
1576        let id1 = handler.generate_unique_request_id();
1577        let id2 = handler.generate_unique_request_id();
1578        let id3 = handler.generate_unique_request_id();
1579
1580        assert_ne!(id1, id2);
1581        assert_ne!(id2, id3);
1582        assert_ne!(id1, id3);
1583    }
1584
1585    #[rstest]
1586    fn test_generate_unique_request_id_produces_valid_uuids() {
1587        let handler = create_test_handler();
1588
1589        let id1 = handler.generate_unique_request_id();
1590        let id2 = handler.generate_unique_request_id();
1591
1592        assert!(UUID4::from(id1.as_str()).to_string() == id1);
1593        assert!(UUID4::from(id2.as_str()).to_string() == id2);
1594    }
1595
1596    #[rstest]
1597    fn test_multiple_place_orders_use_different_request_ids() {
1598        let handler = create_test_handler();
1599
1600        let req_id_1 = handler.generate_unique_request_id();
1601        let req_id_2 = handler.generate_unique_request_id();
1602        let req_id_3 = handler.generate_unique_request_id();
1603
1604        assert_ne!(req_id_1, req_id_2);
1605        assert_ne!(req_id_2, req_id_3);
1606        assert_ne!(req_id_1, req_id_3);
1607    }
1608
1609    #[rstest]
1610    fn test_multiple_amends_use_different_request_ids() {
1611        let handler = create_test_handler();
1612
1613        // Verifies fix for "Duplicate reqId" errors when amending same order multiple times
1614        let req_id_1 = handler.generate_unique_request_id();
1615        let req_id_2 = handler.generate_unique_request_id();
1616        let req_id_3 = handler.generate_unique_request_id();
1617
1618        assert_ne!(
1619            req_id_1, req_id_2,
1620            "Multiple amends should generate different request IDs to avoid 'Duplicate reqId' errors"
1621        );
1622        assert_ne!(
1623            req_id_2, req_id_3,
1624            "Multiple amends should generate different request IDs to avoid 'Duplicate reqId' errors"
1625        );
1626    }
1627
1628    #[rstest]
1629    fn test_multiple_cancels_use_different_request_ids() {
1630        let handler = create_test_handler();
1631
1632        let req_id_1 = handler.generate_unique_request_id();
1633        let req_id_2 = handler.generate_unique_request_id();
1634
1635        assert_ne!(
1636            req_id_1, req_id_2,
1637            "Multiple cancels should generate different request IDs"
1638        );
1639    }
1640
1641    #[rstest]
1642    fn test_concurrent_request_id_generation() {
1643        let handler = create_test_handler();
1644
1645        let mut ids = std::collections::HashSet::new();
1646        for _ in 0..100 {
1647            let id = handler.generate_unique_request_id();
1648            assert!(
1649                ids.insert(id.clone()),
1650                "Generated duplicate request ID: {id}"
1651            );
1652        }
1653        assert_eq!(ids.len(), 100);
1654    }
1655}