nautilus_bybit/websocket/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! WebSocket message handler for Bybit.
17
18use std::{
19    collections::VecDeque,
20    num::NonZero,
21    sync::{
22        Arc,
23        atomic::{AtomicBool, AtomicU8, Ordering},
24    },
25};
26
27use ahash::AHashMap;
28use dashmap::DashMap;
29use nautilus_common::cache::quote::QuoteCache;
30use nautilus_core::{UUID4, nanos::UnixNanos, time::get_atomic_clock_realtime};
31use nautilus_model::{
32    data::{BarSpecification, BarType, Data},
33    enums::{AggregationSource, BarAggregation, PriceType},
34    events::{OrderCancelRejected, OrderModifyRejected, OrderRejected},
35    identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
36    instruments::{Instrument, InstrumentAny},
37};
38use nautilus_network::{
39    retry::{RetryManager, create_websocket_retry_manager},
40    websocket::{AuthTracker, SubscriptionState, WebSocketClient},
41};
42use tokio::sync::RwLock;
43use tokio_tungstenite::tungstenite::Message;
44use ustr::Ustr;
45
46use super::{
47    enums::BybitWsOperation,
48    error::{BybitWsError, create_bybit_timeout_error, should_retry_bybit_error},
49    messages::{
50        BybitWebSocketError, BybitWsHeader, BybitWsMessage, BybitWsRequest, NautilusWsMessage,
51    },
52    parse::{
53        parse_kline_topic, parse_millis_i64, parse_orderbook_deltas, parse_orderbook_quote,
54        parse_ticker_linear_funding, parse_ws_account_state, parse_ws_fill_report,
55        parse_ws_kline_bar, parse_ws_order_status_report, parse_ws_position_status_report,
56        parse_ws_trade_tick,
57    },
58};
59use crate::{
60    common::{
61        consts::BYBIT_NAUTILUS_BROKER_ID,
62        enums::{BybitProductType, BybitTimeInForce, BybitWsOrderRequestOp},
63        parse::{make_bybit_symbol, parse_price_with_precision, parse_quantity_with_precision},
64    },
65    websocket::messages::{
66        BybitBatchOrderError, BybitWsAmendOrderParams, BybitWsCancelOrderParams,
67        BybitWsOrderResponse, BybitWsPlaceOrderParams,
68    },
69};
70
71/// Commands sent from the outer client to the inner message handler.
72#[derive(Debug)]
73#[allow(
74    clippy::large_enum_variant,
75    reason = "Commands are ephemeral and immediately consumed"
76)]
77pub enum HandlerCommand {
78    SetClient(WebSocketClient),
79    Disconnect,
80    Authenticate {
81        payload: String,
82    },
83    Subscribe {
84        topics: Vec<String>,
85    },
86    Unsubscribe {
87        topics: Vec<String>,
88    },
89    SendText {
90        payload: String,
91    },
92    PlaceOrder {
93        params: BybitWsPlaceOrderParams,
94        client_order_id: ClientOrderId,
95        trader_id: TraderId,
96        strategy_id: StrategyId,
97        instrument_id: InstrumentId,
98    },
99    AmendOrder {
100        params: BybitWsAmendOrderParams,
101        client_order_id: ClientOrderId,
102        trader_id: TraderId,
103        strategy_id: StrategyId,
104        instrument_id: InstrumentId,
105        venue_order_id: Option<VenueOrderId>,
106    },
107    CancelOrder {
108        params: BybitWsCancelOrderParams,
109        client_order_id: ClientOrderId,
110        trader_id: TraderId,
111        strategy_id: StrategyId,
112        instrument_id: InstrumentId,
113        venue_order_id: Option<VenueOrderId>,
114    },
115    RegisterBatchPlace {
116        req_id: String,
117        orders: Vec<BatchOrderData>,
118    },
119    RegisterBatchCancel {
120        req_id: String,
121        cancels: Vec<BatchCancelData>,
122    },
123    InitializeInstruments(Vec<InstrumentAny>),
124    UpdateInstrument(InstrumentAny),
125}
126
127/// Type alias for the funding rate cache.
128type FundingCache = Arc<RwLock<AHashMap<Ustr, (Option<String>, Option<String>)>>>;
129
130/// Data cached for pending place requests to correlate with responses.
131type PlaceRequestData = (ClientOrderId, TraderId, StrategyId, InstrumentId);
132
133/// Data cached for pending cancel requests to correlate with responses.
134type CancelRequestData = (
135    ClientOrderId,
136    TraderId,
137    StrategyId,
138    InstrumentId,
139    Option<VenueOrderId>,
140);
141
142/// Data cached for pending amend requests to correlate with responses.
143type AmendRequestData = (
144    ClientOrderId,
145    TraderId,
146    StrategyId,
147    InstrumentId,
148    Option<VenueOrderId>,
149);
150
151/// Data for a single order in a batch request.
152type BatchOrderData = (ClientOrderId, PlaceRequestData);
153
154/// Data for a single cancel in a batch request.
155type BatchCancelData = (ClientOrderId, CancelRequestData);
156
157pub(super) struct FeedHandler {
158    signal: Arc<AtomicBool>,
159    client: Option<WebSocketClient>,
160    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
161    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
162    out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
163    auth_tracker: AuthTracker,
164    subscriptions: SubscriptionState,
165    instruments_cache: AHashMap<Ustr, InstrumentAny>,
166    account_id: Option<AccountId>,
167    mm_level: Arc<AtomicU8>,
168    product_type: Option<BybitProductType>,
169    quote_cache: QuoteCache,
170    funding_cache: FundingCache,
171    retry_manager: RetryManager<BybitWsError>,
172    pending_place_requests: DashMap<String, PlaceRequestData>,
173    pending_cancel_requests: DashMap<String, CancelRequestData>,
174    pending_amend_requests: DashMap<String, AmendRequestData>,
175    pending_batch_place_requests: DashMap<String, Vec<BatchOrderData>>,
176    pending_batch_cancel_requests: DashMap<String, Vec<BatchCancelData>>,
177    message_queue: VecDeque<NautilusWsMessage>,
178}
179
180impl FeedHandler {
181    /// Creates a new [`FeedHandler`] instance.
182    #[allow(clippy::too_many_arguments)]
183    pub(super) fn new(
184        signal: Arc<AtomicBool>,
185        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
186        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
187        out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
188        account_id: Option<AccountId>,
189        product_type: Option<BybitProductType>,
190        mm_level: Arc<AtomicU8>,
191        auth_tracker: AuthTracker,
192        subscriptions: SubscriptionState,
193        funding_cache: FundingCache,
194    ) -> Self {
195        Self {
196            signal,
197            client: None,
198            cmd_rx,
199            raw_rx,
200            out_tx,
201            auth_tracker,
202            subscriptions,
203            instruments_cache: AHashMap::new(),
204            account_id,
205            mm_level,
206            product_type,
207            quote_cache: QuoteCache::new(),
208            funding_cache,
209            retry_manager: create_websocket_retry_manager(),
210            pending_place_requests: DashMap::new(),
211            pending_cancel_requests: DashMap::new(),
212            pending_amend_requests: DashMap::new(),
213            pending_batch_place_requests: DashMap::new(),
214            pending_batch_cancel_requests: DashMap::new(),
215            message_queue: VecDeque::new(),
216        }
217    }
218
219    pub(super) fn is_stopped(&self) -> bool {
220        self.signal.load(Ordering::Relaxed)
221    }
222
223    #[allow(dead_code)]
224    pub(super) fn send(&self, msg: NautilusWsMessage) -> Result<(), ()> {
225        self.out_tx.send(msg).map_err(|_| ())
226    }
227
228    fn generate_unique_request_id(&self) -> String {
229        UUID4::new().to_string()
230    }
231
232    fn find_and_remove_place_request_by_client_order_id(
233        &self,
234        client_order_id: &ClientOrderId,
235    ) -> Option<(String, PlaceRequestData)> {
236        self.pending_place_requests
237            .iter()
238            .find(|entry| entry.value().0 == *client_order_id)
239            .and_then(|entry| {
240                let key = entry.key().clone();
241                drop(entry);
242                self.pending_place_requests.remove(&key)
243            })
244    }
245
246    fn find_and_remove_cancel_request_by_client_order_id(
247        &self,
248        client_order_id: &ClientOrderId,
249    ) -> Option<(String, CancelRequestData)> {
250        self.pending_cancel_requests
251            .iter()
252            .find(|entry| entry.value().0 == *client_order_id)
253            .and_then(|entry| {
254                let key = entry.key().clone();
255                drop(entry);
256                self.pending_cancel_requests.remove(&key)
257            })
258    }
259
260    fn find_and_remove_amend_request_by_client_order_id(
261        &self,
262        client_order_id: &ClientOrderId,
263    ) -> Option<(String, AmendRequestData)> {
264        self.pending_amend_requests
265            .iter()
266            .find(|entry| entry.value().0 == *client_order_id)
267            .and_then(|entry| {
268                let key = entry.key().clone();
269                drop(entry);
270                self.pending_amend_requests.remove(&key)
271            })
272    }
273
274    fn include_referer_header(&self, time_in_force: Option<BybitTimeInForce>) -> bool {
275        let is_post_only = matches!(time_in_force, Some(BybitTimeInForce::PostOnly));
276        let mm_level = self.mm_level.load(Ordering::Relaxed);
277        !(is_post_only && mm_level > 0)
278    }
279
280    /// Sends a WebSocket message with retry logic.
281    async fn send_with_retry(&self, payload: String) -> Result<(), BybitWsError> {
282        if let Some(client) = &self.client {
283            self.retry_manager
284                .execute_with_retry(
285                    "websocket_send",
286                    || {
287                        let payload = payload.clone();
288                        async move {
289                            client
290                                .send_text(payload, None)
291                                .await
292                                .map_err(|e| BybitWsError::Transport(format!("Send failed: {e}")))
293                        }
294                    },
295                    should_retry_bybit_error,
296                    create_bybit_timeout_error,
297                )
298                .await
299        } else {
300            Err(BybitWsError::ClientError(
301                "No active WebSocket client".to_string(),
302            ))
303        }
304    }
305
306    /// Handles batch operation request-level failures (ret_code != 0).
307    ///
308    /// When a batch request fails entirely, generate rejection events for all orders
309    /// in the batch and clean up tracking data.
310    fn handle_batch_failure(
311        &self,
312        req_id: &str,
313        ret_msg: &str,
314        op: &str,
315        ts_init: UnixNanos,
316        result: &mut Vec<NautilusWsMessage>,
317    ) {
318        if op.contains("create") {
319            if let Some((_, batch_data)) = self.pending_batch_place_requests.remove(req_id) {
320                tracing::warn!(
321                    req_id = %req_id,
322                    ret_msg = %ret_msg,
323                    num_orders = batch_data.len(),
324                    "Batch place request failed"
325                );
326
327                let Some(account_id) = self.account_id else {
328                    tracing::error!("Cannot create OrderRejected events: account_id is None");
329                    return;
330                };
331
332                let reason = Ustr::from(ret_msg);
333                for (client_order_id, (_, trader_id, strategy_id, instrument_id)) in batch_data {
334                    let rejected = OrderRejected::new(
335                        trader_id,
336                        strategy_id,
337                        instrument_id,
338                        client_order_id,
339                        account_id,
340                        reason,
341                        UUID4::new(),
342                        ts_init,
343                        ts_init,
344                        false,
345                        false,
346                    );
347                    result.push(NautilusWsMessage::OrderRejected(rejected));
348                }
349            }
350        } else if op.contains("cancel")
351            && let Some((_, batch_data)) = self.pending_batch_cancel_requests.remove(req_id)
352        {
353            tracing::warn!(
354                req_id = %req_id,
355                ret_msg = %ret_msg,
356                num_cancels = batch_data.len(),
357                "Batch cancel request failed"
358            );
359
360            let reason = Ustr::from(ret_msg);
361            for (client_order_id, (_, trader_id, strategy_id, instrument_id, venue_order_id)) in
362                batch_data
363            {
364                let rejected = OrderCancelRejected::new(
365                    trader_id,
366                    strategy_id,
367                    instrument_id,
368                    client_order_id,
369                    reason,
370                    UUID4::new(),
371                    ts_init,
372                    ts_init,
373                    false,
374                    venue_order_id,
375                    self.account_id,
376                );
377                result.push(NautilusWsMessage::OrderCancelRejected(rejected));
378            }
379        }
380    }
381
382    /// Handles batch operation responses, checking for individual order failures.
383    fn handle_batch_response(
384        &self,
385        resp: &BybitWsOrderResponse,
386        result: &mut Vec<NautilusWsMessage>,
387    ) {
388        let Some(req_id) = &resp.req_id else {
389            tracing::warn!(
390                op = %resp.op,
391                "Batch response missing req_id - cannot correlate with pending requests"
392            );
393            return;
394        };
395
396        let batch_errors = resp.extract_batch_errors();
397
398        if resp.op.contains("create") {
399            if let Some((_, batch_data)) = self.pending_batch_place_requests.remove(req_id) {
400                self.process_batch_place_errors(batch_data, batch_errors, result);
401            } else {
402                tracing::debug!(
403                    req_id = %req_id,
404                    "Batch place response received but no pending request found"
405                );
406            }
407        } else if resp.op.contains("cancel") {
408            if let Some((_, batch_data)) = self.pending_batch_cancel_requests.remove(req_id) {
409                self.process_batch_cancel_errors(batch_data, batch_errors, result);
410            } else {
411                tracing::debug!(
412                    req_id = %req_id,
413                    "Batch cancel response received but no pending request found"
414                );
415            }
416        }
417    }
418
419    /// Processes individual order errors from a batch place operation.
420    fn process_batch_place_errors(
421        &self,
422        batch_data: Vec<BatchOrderData>,
423        errors: Vec<BybitBatchOrderError>,
424        result: &mut Vec<NautilusWsMessage>,
425    ) {
426        let Some(account_id) = self.account_id else {
427            tracing::error!("Cannot create OrderRejected events: account_id is None");
428            return;
429        };
430
431        let clock = get_atomic_clock_realtime();
432        let ts_init = clock.get_time_ns();
433
434        for (idx, (client_order_id, (_, trader_id, strategy_id, instrument_id))) in
435            batch_data.into_iter().enumerate()
436        {
437            if let Some(error) = errors.get(idx)
438                && error.code != 0
439            {
440                tracing::warn!(
441                    client_order_id = %client_order_id,
442                    error_code = error.code,
443                    error_msg = %error.msg,
444                    "Batch order rejected"
445                );
446
447                let rejected = OrderRejected::new(
448                    trader_id,
449                    strategy_id,
450                    instrument_id,
451                    client_order_id,
452                    account_id,
453                    Ustr::from(&error.msg),
454                    UUID4::new(),
455                    ts_init,
456                    ts_init,
457                    false,
458                    false,
459                );
460                result.push(NautilusWsMessage::OrderRejected(rejected));
461            }
462        }
463    }
464
465    /// Processes individual order errors from a batch cancel operation.
466    fn process_batch_cancel_errors(
467        &self,
468        batch_data: Vec<BatchCancelData>,
469        errors: Vec<BybitBatchOrderError>,
470        result: &mut Vec<NautilusWsMessage>,
471    ) {
472        let clock = get_atomic_clock_realtime();
473        let ts_init = clock.get_time_ns();
474
475        for (idx, (client_order_id, (_, trader_id, strategy_id, instrument_id, venue_order_id))) in
476            batch_data.into_iter().enumerate()
477        {
478            if let Some(error) = errors.get(idx)
479                && error.code != 0
480            {
481                tracing::warn!(
482                    client_order_id = %client_order_id,
483                    error_code = error.code,
484                    error_msg = %error.msg,
485                    "Batch cancel rejected"
486                );
487
488                let rejected = OrderCancelRejected::new(
489                    trader_id,
490                    strategy_id,
491                    instrument_id,
492                    client_order_id,
493                    Ustr::from(&error.msg),
494                    UUID4::new(),
495                    ts_init,
496                    ts_init,
497                    false,
498                    venue_order_id,
499                    self.account_id,
500                );
501                result.push(NautilusWsMessage::OrderCancelRejected(rejected));
502            }
503        }
504    }
505
506    pub(super) async fn next(&mut self) -> Option<NautilusWsMessage> {
507        let clock = get_atomic_clock_realtime();
508
509        loop {
510            if let Some(msg) = self.message_queue.pop_front() {
511                return Some(msg);
512            }
513
514            tokio::select! {
515                Some(cmd) = self.cmd_rx.recv() => {
516                    match cmd {
517                        HandlerCommand::SetClient(client) => {
518                            tracing::debug!("WebSocketClient received by handler");
519                            self.client = Some(client);
520                        }
521                        HandlerCommand::Disconnect => {
522                            tracing::debug!("Disconnect command received");
523
524                            if let Some(client) = self.client.take() {
525                                client.disconnect().await;
526                            }
527                        }
528                        HandlerCommand::Authenticate { payload } => {
529                            tracing::debug!("Authenticate command received");
530                            if let Err(e) = self.send_with_retry(payload).await {
531                                tracing::error!("Failed to send authentication after retries: {e}");
532                            }
533                        }
534                        HandlerCommand::Subscribe { topics } => {
535                            for topic in topics {
536                                tracing::debug!(topic = %topic, "Subscribing to topic");
537                                if let Err(e) = self.send_with_retry(topic.clone()).await {
538                                    tracing::error!(topic = %topic, error = %e, "Failed to send subscription after retries");
539                                }
540                            }
541                        }
542                        HandlerCommand::Unsubscribe { topics } => {
543                            for topic in topics {
544                                tracing::debug!(topic = %topic, "Unsubscribing from topic");
545                                if let Err(e) = self.send_with_retry(topic.clone()).await {
546                                    tracing::error!(topic = %topic, error = %e, "Failed to send unsubscription after retries");
547                                }
548                            }
549                        }
550                        HandlerCommand::SendText { payload } => {
551                            if let Err(e) = self.send_with_retry(payload).await {
552                                tracing::error!("Error sending text with retry: {e}");
553                            }
554                        }
555                        HandlerCommand::InitializeInstruments(instruments) => {
556                            for inst in instruments {
557                                self.instruments_cache.insert(inst.symbol().inner(), inst);
558                            }
559                        }
560                        HandlerCommand::UpdateInstrument(inst) => {
561                            self.instruments_cache.insert(inst.symbol().inner(), inst);
562                        }
563                        HandlerCommand::RegisterBatchPlace { req_id, orders } => {
564                            tracing::debug!(
565                                req_id = %req_id,
566                                num_orders = orders.len(),
567                                "Registering batch place request"
568                            );
569                            self.pending_batch_place_requests.insert(req_id, orders);
570                        }
571                        HandlerCommand::RegisterBatchCancel { req_id, cancels } => {
572                            tracing::debug!(
573                                req_id = %req_id,
574                                num_cancels = cancels.len(),
575                                "Registering batch cancel request"
576                            );
577                            self.pending_batch_cancel_requests.insert(req_id, cancels);
578                        }
579                        HandlerCommand::PlaceOrder {
580                            params,
581                            client_order_id,
582                            trader_id,
583                            strategy_id,
584                            instrument_id,
585                        } => {
586                            let request_id = self.generate_unique_request_id();
587
588                            self.pending_place_requests.insert(
589                                request_id.clone(),
590                                (client_order_id, trader_id, strategy_id, instrument_id),
591                            );
592
593                            let referer = if self.include_referer_header(params.time_in_force) {
594                                Some(BYBIT_NAUTILUS_BROKER_ID.to_string())
595                            } else {
596                                None
597                            };
598
599                            let request = BybitWsRequest {
600                                req_id: Some(request_id.clone()),
601                                op: BybitWsOrderRequestOp::Create,
602                                header: BybitWsHeader::with_referer(referer),
603                                args: vec![params],
604                            };
605
606                            if let Ok(payload) = serde_json::to_string(&request)
607                                && let Err(e) = self.send_with_retry(payload).await
608                            {
609                                tracing::error!("Failed to send place order after retries: {e}");
610                                self.pending_place_requests.remove(&request_id);
611                            }
612                        }
613                        HandlerCommand::AmendOrder {
614                            params,
615                            client_order_id,
616                            trader_id,
617                            strategy_id,
618                            instrument_id,
619                            venue_order_id,
620                        } => {
621                            let request_id = self.generate_unique_request_id();
622
623                            self.pending_amend_requests.insert(
624                                request_id.clone(),
625                                (client_order_id, trader_id, strategy_id, instrument_id, venue_order_id),
626                            );
627
628                            let request = BybitWsRequest {
629                                req_id: Some(request_id.clone()),
630                                op: BybitWsOrderRequestOp::Amend,
631                                header: BybitWsHeader::now(),
632                                args: vec![params],
633                            };
634
635                            if let Ok(payload) = serde_json::to_string(&request)
636                                && let Err(e) = self.send_with_retry(payload).await
637                            {
638                                tracing::error!("Failed to send amend order after retries: {e}");
639                                self.pending_amend_requests.remove(&request_id);
640                            }
641                        }
642                        HandlerCommand::CancelOrder {
643                            params,
644                            client_order_id,
645                            trader_id,
646                            strategy_id,
647                            instrument_id,
648                            venue_order_id,
649                        } => {
650                            let request_id = self.generate_unique_request_id();
651
652                            self.pending_cancel_requests.insert(
653                                request_id.clone(),
654                                (client_order_id, trader_id, strategy_id, instrument_id, venue_order_id),
655                            );
656
657                            let request = BybitWsRequest {
658                                req_id: Some(request_id.clone()),
659                                op: BybitWsOrderRequestOp::Cancel,
660                                header: BybitWsHeader::now(),
661                                args: vec![params],
662                            };
663
664                            if let Ok(payload) = serde_json::to_string(&request)
665                                && let Err(e) = self.send_with_retry(payload).await
666                            {
667                                tracing::error!("Failed to send cancel order after retries: {e}");
668                                self.pending_cancel_requests.remove(&request_id);
669                            }
670                        }
671                    }
672
673                    continue;
674                }
675
676                _ = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
677                    if self.signal.load(Ordering::Relaxed) {
678                        tracing::debug!("Stop signal received during idle period");
679                        return None;
680                    }
681                    continue;
682                }
683
684                msg = self.raw_rx.recv() => {
685                    let msg = match msg {
686                        Some(msg) => msg,
687                        None => {
688                            tracing::debug!("WebSocket stream closed");
689                            return None;
690                        }
691                    };
692
693                    if let Message::Ping(data) = &msg {
694                        tracing::trace!("Received ping frame with {} bytes", data.len());
695
696                        if let Some(client) = &self.client
697                            && let Err(e) = client.send_pong(data.to_vec()).await
698                        {
699                            tracing::warn!(error = %e, "Failed to send pong frame");
700                        }
701                        continue;
702                    }
703
704                    let event = match Self::parse_raw_message(msg) {
705                        Some(event) => event,
706                        None => continue,
707                    };
708
709                    if self.signal.load(Ordering::Relaxed) {
710                        tracing::debug!("Stop signal received");
711                        return None;
712                    }
713
714                    let ts_init = clock.get_time_ns();
715                    let instruments = self.instruments_cache.clone();
716                    let funding_cache = Arc::clone(&self.funding_cache);
717                    let nautilus_messages = self.parse_to_nautilus_messages(
718                        event,
719                        &instruments,
720                        self.account_id,
721                        self.product_type,
722                        &funding_cache,
723                        ts_init,
724                    )
725                    .await;
726
727                    // Enqueue all parsed messages to emit them one by one
728                    self.message_queue.extend(nautilus_messages);
729                }
730            }
731        }
732    }
733
734    fn parse_raw_message(msg: Message) -> Option<BybitWsMessage> {
735        use serde_json::Value;
736
737        match msg {
738            Message::Text(text) => {
739                if text == nautilus_network::RECONNECTED {
740                    tracing::info!("Received WebSocket reconnected signal");
741                    return Some(BybitWsMessage::Reconnected);
742                }
743
744                if text.trim().eq_ignore_ascii_case("pong") {
745                    return None;
746                }
747
748                tracing::trace!("Raw websocket message: {text}");
749
750                let value: Value = match serde_json::from_str(&text) {
751                    Ok(v) => v,
752                    Err(e) => {
753                        tracing::error!("Failed to parse WebSocket message: {e}: {text}");
754                        return None;
755                    }
756                };
757
758                Self::classify_bybit_message(&value).or(Some(BybitWsMessage::Raw(value)))
759            }
760            Message::Binary(msg) => {
761                tracing::debug!("Raw binary: {msg:?}");
762                None
763            }
764            Message::Close(_) => {
765                tracing::debug!("Received close message, waiting for reconnection");
766                None
767            }
768            _ => None,
769        }
770    }
771
772    pub(crate) fn classify_bybit_message(value: &serde_json::Value) -> Option<BybitWsMessage> {
773        use super::{
774            enums::BybitWsOperation,
775            messages::{
776                BybitWsAuthResponse, BybitWsOrderResponse, BybitWsResponse, BybitWsSubscriptionMsg,
777            },
778        };
779
780        if let Ok(op) = serde_json::from_value::<BybitWsOperation>(
781            value.get("op").cloned().unwrap_or(serde_json::Value::Null),
782        ) && op == BybitWsOperation::Auth
783            && let Ok(auth) = serde_json::from_value::<BybitWsAuthResponse>(value.clone())
784        {
785            let is_success = auth.success.unwrap_or(false) || auth.ret_code.unwrap_or(-1) == 0;
786            if is_success {
787                return Some(BybitWsMessage::Auth(auth));
788            }
789            let resp = BybitWsResponse {
790                op: Some(auth.op.clone()),
791                topic: None,
792                success: auth.success,
793                conn_id: auth.conn_id.clone(),
794                req_id: None,
795                ret_code: auth.ret_code,
796                ret_msg: auth.ret_msg,
797            };
798            let error = BybitWebSocketError::from_response(&resp);
799            return Some(BybitWsMessage::Error(error));
800        }
801
802        if let Some(success) = value.get("success").and_then(serde_json::Value::as_bool) {
803            if success {
804                if let Ok(msg) = serde_json::from_value::<BybitWsSubscriptionMsg>(value.clone()) {
805                    return Some(BybitWsMessage::Subscription(msg));
806                }
807            } else if let Ok(resp) = serde_json::from_value::<BybitWsResponse>(value.clone()) {
808                let error = BybitWebSocketError::from_response(&resp);
809                return Some(BybitWsMessage::Error(error));
810            }
811        }
812
813        if let Some(op) = value.get("op").and_then(serde_json::Value::as_str)
814            && op.starts_with("order.")
815            && let Ok(order_resp) = serde_json::from_value::<BybitWsOrderResponse>(value.clone())
816        {
817            return Some(BybitWsMessage::OrderResponse(order_resp));
818        }
819
820        if let Some(topic) = value.get("topic").and_then(serde_json::Value::as_str) {
821            if topic.starts_with("orderbook")
822                && let Ok(msg) = serde_json::from_value(value.clone())
823            {
824                return Some(BybitWsMessage::Orderbook(msg));
825            } else if (topic.contains("publicTrade") || topic.starts_with("trade"))
826                && let Ok(msg) = serde_json::from_value(value.clone())
827            {
828                return Some(BybitWsMessage::Trade(msg));
829            } else if topic.starts_with("kline")
830                && let Ok(msg) = serde_json::from_value(value.clone())
831            {
832                return Some(BybitWsMessage::Kline(msg));
833            } else if topic.starts_with("tickers") {
834                // Option symbols have format: BTC-6JAN23-17500-C (with hyphens, date, strike, and C/P)
835                if let Some(symbol) = value
836                    .get("data")
837                    .and_then(|d| d.get("symbol"))
838                    .and_then(|s| s.as_str())
839                    && symbol.contains('-')
840                    && symbol.matches('-').count() >= 3
841                    && let Ok(msg) = serde_json::from_value(value.clone())
842                {
843                    return Some(BybitWsMessage::TickerOption(msg));
844                }
845                if let Ok(msg) = serde_json::from_value(value.clone()) {
846                    return Some(BybitWsMessage::TickerLinear(msg));
847                }
848            } else if topic.starts_with("order")
849                && let Ok(msg) = serde_json::from_value(value.clone())
850            {
851                return Some(BybitWsMessage::AccountOrder(msg));
852            } else if topic.starts_with("execution")
853                && let Ok(msg) = serde_json::from_value(value.clone())
854            {
855                return Some(BybitWsMessage::AccountExecution(msg));
856            } else if topic.starts_with("wallet")
857                && let Ok(msg) = serde_json::from_value(value.clone())
858            {
859                return Some(BybitWsMessage::AccountWallet(msg));
860            } else if topic.starts_with("position")
861                && let Ok(msg) = serde_json::from_value(value.clone())
862            {
863                return Some(BybitWsMessage::AccountPosition(msg));
864            }
865        }
866
867        None
868    }
869
870    #[allow(clippy::too_many_arguments)]
871    async fn parse_to_nautilus_messages(
872        &mut self,
873        msg: BybitWsMessage,
874        instruments: &AHashMap<Ustr, InstrumentAny>,
875        account_id: Option<AccountId>,
876        product_type: Option<BybitProductType>,
877        funding_cache: &FundingCache,
878        ts_init: UnixNanos,
879    ) -> Vec<NautilusWsMessage> {
880        let mut result = Vec::new();
881
882        match msg {
883            BybitWsMessage::Orderbook(msg) => {
884                let raw_symbol = msg.data.s;
885                let symbol =
886                    product_type.map_or(raw_symbol, |pt| make_bybit_symbol(raw_symbol, pt));
887
888                if let Some(instrument) = instruments.get(&symbol) {
889                    match parse_orderbook_deltas(&msg, instrument, ts_init) {
890                        Ok(deltas) => result.push(NautilusWsMessage::Deltas(deltas)),
891                        Err(e) => tracing::error!("Error parsing orderbook deltas: {e}"),
892                    }
893
894                    // For depth=1 subscriptions, also emit QuoteTick from top-of-book
895                    if let Some(depth_str) = msg.topic.as_str().split('.').nth(1)
896                        && depth_str == "1"
897                    {
898                        let instrument_id = instrument.id();
899                        let last_quote = self.quote_cache.get(&instrument_id);
900
901                        match parse_orderbook_quote(&msg, instrument, last_quote, ts_init) {
902                            Ok(quote) => {
903                                self.quote_cache.insert(instrument_id, quote);
904                                result.push(NautilusWsMessage::Data(vec![Data::Quote(quote)]));
905                            }
906                            Err(e) => tracing::debug!("Skipping orderbook quote: {e}"),
907                        }
908                    }
909                } else {
910                    tracing::debug!(raw_symbol = %raw_symbol, full_symbol = %symbol, "No instrument found for symbol in Orderbook message");
911                }
912            }
913            BybitWsMessage::Trade(msg) => {
914                let mut data_vec = Vec::new();
915                for trade in &msg.data {
916                    let raw_symbol = trade.s;
917                    let symbol =
918                        product_type.map_or(raw_symbol, |pt| make_bybit_symbol(raw_symbol, pt));
919
920                    if let Some(instrument) = instruments.get(&symbol) {
921                        match parse_ws_trade_tick(trade, instrument, ts_init) {
922                            Ok(tick) => data_vec.push(Data::Trade(tick)),
923                            Err(e) => tracing::error!("Error parsing trade tick: {e}"),
924                        }
925                    } else {
926                        tracing::debug!(raw_symbol = %raw_symbol, full_symbol = %symbol, "No instrument found for symbol in Trade message");
927                    }
928                }
929
930                if !data_vec.is_empty() {
931                    result.push(NautilusWsMessage::Data(data_vec));
932                }
933            }
934            BybitWsMessage::Kline(msg) => {
935                let (interval_str, raw_symbol) = match parse_kline_topic(&msg.topic) {
936                    Ok(parts) => parts,
937                    Err(e) => {
938                        tracing::warn!("Failed to parse kline topic: {e}");
939                        return result;
940                    }
941                };
942
943                let symbol = product_type
944                    .map_or_else(|| raw_symbol.into(), |pt| make_bybit_symbol(raw_symbol, pt));
945
946                if let Some(instrument) = instruments.get(&symbol) {
947                    let (step, aggregation) = match interval_str.parse::<usize>() {
948                        Ok(minutes) if minutes > 0 => (minutes, BarAggregation::Minute),
949                        _ => {
950                            tracing::warn!("Unsupported kline interval: {}", interval_str);
951                            return result;
952                        }
953                    };
954
955                    if let Some(non_zero_step) = NonZero::new(step) {
956                        let bar_spec = BarSpecification {
957                            step: non_zero_step,
958                            aggregation,
959                            price_type: PriceType::Last,
960                        };
961                        let bar_type =
962                            BarType::new(instrument.id(), bar_spec, AggregationSource::External);
963
964                        let mut data_vec = Vec::new();
965                        for kline in &msg.data {
966                            // Only process confirmed bars (not partial/building bars)
967                            if !kline.confirm {
968                                continue;
969                            }
970                            match parse_ws_kline_bar(kline, instrument, bar_type, false, ts_init) {
971                                Ok(bar) => data_vec.push(Data::Bar(bar)),
972                                Err(e) => tracing::error!("Error parsing kline to bar: {e}"),
973                            }
974                        }
975                        if !data_vec.is_empty() {
976                            result.push(NautilusWsMessage::Data(data_vec));
977                        }
978                    } else {
979                        tracing::error!("Invalid step value: {}", step);
980                    }
981                } else {
982                    tracing::debug!(raw_symbol = %raw_symbol, full_symbol = %symbol, "No instrument found for symbol in Kline message");
983                }
984            }
985            BybitWsMessage::TickerLinear(msg) => {
986                let raw_symbol = msg.data.symbol;
987                let symbol =
988                    product_type.map_or(raw_symbol, |pt| make_bybit_symbol(raw_symbol, pt));
989
990                if let Some(instrument) = instruments.get(&symbol) {
991                    let instrument_id = instrument.id();
992                    let ts_event = parse_millis_i64(msg.ts, "ticker.ts").unwrap_or(ts_init);
993                    let price_precision = instrument.price_precision();
994                    let size_precision = instrument.size_precision();
995
996                    // Parse Bybit linear ticker fields, propagate errors
997                    let bid_price = msg
998                        .data
999                        .bid1_price
1000                        .as_deref()
1001                        .map(|s| parse_price_with_precision(s, price_precision, "bid1Price"))
1002                        .transpose();
1003                    let ask_price = msg
1004                        .data
1005                        .ask1_price
1006                        .as_deref()
1007                        .map(|s| parse_price_with_precision(s, price_precision, "ask1Price"))
1008                        .transpose();
1009                    let bid_size = msg
1010                        .data
1011                        .bid1_size
1012                        .as_deref()
1013                        .map(|s| parse_quantity_with_precision(s, size_precision, "bid1Size"))
1014                        .transpose();
1015                    let ask_size = msg
1016                        .data
1017                        .ask1_size
1018                        .as_deref()
1019                        .map(|s| parse_quantity_with_precision(s, size_precision, "ask1Size"))
1020                        .transpose();
1021
1022                    match (bid_price, ask_price, bid_size, ask_size) {
1023                        (Ok(bp), Ok(ap), Ok(bs), Ok(as_)) => {
1024                            match self.quote_cache.process(
1025                                instrument_id,
1026                                bp,
1027                                ap,
1028                                bs,
1029                                as_,
1030                                ts_event,
1031                                ts_init,
1032                            ) {
1033                                Ok(quote) => {
1034                                    result.push(NautilusWsMessage::Data(vec![Data::Quote(quote)]));
1035                                }
1036                                Err(e) => {
1037                                    let raw_data = serde_json::to_string(&msg.data)
1038                                        .unwrap_or_else(|_| "<failed to serialize>".to_string());
1039                                    tracing::debug!(
1040                                        "Skipping partial ticker update: {e}, raw_data: {raw_data}"
1041                                    );
1042                                }
1043                            }
1044                        }
1045                        _ => {
1046                            let raw_data = serde_json::to_string(&msg.data)
1047                                .unwrap_or_else(|_| "<failed to serialize>".to_string());
1048                            tracing::warn!(
1049                                "Failed to parse ticker fields, skipping update, raw_data: {raw_data}"
1050                            );
1051                        }
1052                    }
1053
1054                    // Extract funding rate if available
1055                    if msg.data.funding_rate.is_some() && msg.data.next_funding_time.is_some() {
1056                        let cache_key = (
1057                            msg.data.funding_rate.clone(),
1058                            msg.data.next_funding_time.clone(),
1059                        );
1060
1061                        let should_publish = {
1062                            let cache = funding_cache.read().await;
1063                            cache.get(&symbol) != Some(&cache_key)
1064                        };
1065
1066                        if should_publish {
1067                            match parse_ticker_linear_funding(
1068                                &msg.data,
1069                                instrument_id,
1070                                ts_event,
1071                                ts_init,
1072                            ) {
1073                                Ok(funding) => {
1074                                    funding_cache.write().await.insert(symbol, cache_key);
1075                                    result.push(NautilusWsMessage::FundingRates(vec![funding]));
1076                                }
1077                                Err(e) => {
1078                                    tracing::debug!("Skipping funding rate update: {e}");
1079                                }
1080                            }
1081                        }
1082                    }
1083                } else {
1084                    tracing::debug!(raw_symbol = %raw_symbol, full_symbol = %symbol, "No instrument found for symbol in TickerLinear message");
1085                }
1086            }
1087            BybitWsMessage::TickerOption(msg) => {
1088                let raw_symbol = &msg.data.symbol;
1089                let symbol = product_type.map_or_else(
1090                    || raw_symbol.as_str().into(),
1091                    |pt| make_bybit_symbol(raw_symbol, pt),
1092                );
1093
1094                if let Some(instrument) = instruments.get(&symbol) {
1095                    let instrument_id = instrument.id();
1096                    let ts_event = parse_millis_i64(msg.ts, "ticker.ts").unwrap_or(ts_init);
1097                    let price_precision = instrument.price_precision();
1098                    let size_precision = instrument.size_precision();
1099
1100                    // Parse Bybit option ticker fields (always complete), propagate errors
1101                    let bid_price = parse_price_with_precision(
1102                        &msg.data.bid_price,
1103                        price_precision,
1104                        "bidPrice",
1105                    );
1106                    let ask_price = parse_price_with_precision(
1107                        &msg.data.ask_price,
1108                        price_precision,
1109                        "askPrice",
1110                    );
1111                    let bid_size = parse_quantity_with_precision(
1112                        &msg.data.bid_size,
1113                        size_precision,
1114                        "bidSize",
1115                    );
1116                    let ask_size = parse_quantity_with_precision(
1117                        &msg.data.ask_size,
1118                        size_precision,
1119                        "askSize",
1120                    );
1121
1122                    match (bid_price, ask_price, bid_size, ask_size) {
1123                        (Ok(bp), Ok(ap), Ok(bs), Ok(as_)) => {
1124                            match self.quote_cache.process(
1125                                instrument_id,
1126                                Some(bp),
1127                                Some(ap),
1128                                Some(bs),
1129                                Some(as_),
1130                                ts_event,
1131                                ts_init,
1132                            ) {
1133                                Ok(quote) => {
1134                                    result.push(NautilusWsMessage::Data(vec![Data::Quote(quote)]));
1135                                }
1136                                Err(e) => {
1137                                    let raw_data = serde_json::to_string(&msg.data)
1138                                        .unwrap_or_else(|_| "<failed to serialize>".to_string());
1139                                    tracing::debug!(
1140                                        "Skipping partial ticker update: {e}, raw_data: {raw_data}"
1141                                    );
1142                                }
1143                            }
1144                        }
1145                        _ => {
1146                            let raw_data = serde_json::to_string(&msg.data)
1147                                .unwrap_or_else(|_| "<failed to serialize>".to_string());
1148                            tracing::warn!(
1149                                "Failed to parse ticker fields, skipping update, raw_data: {raw_data}"
1150                            );
1151                        }
1152                    }
1153                } else {
1154                    tracing::debug!(raw_symbol = %raw_symbol, full_symbol = %symbol, "No instrument found for symbol in TickerOption message");
1155                }
1156            }
1157            BybitWsMessage::AccountOrder(msg) => {
1158                if let Some(account_id) = account_id {
1159                    let mut reports = Vec::new();
1160                    for order in &msg.data {
1161                        let raw_symbol = order.symbol;
1162                        let symbol = make_bybit_symbol(raw_symbol, order.category);
1163
1164                        if let Some(instrument) = instruments.get(&symbol) {
1165                            match parse_ws_order_status_report(
1166                                order, instrument, account_id, ts_init,
1167                            ) {
1168                                Ok(report) => reports.push(report),
1169                                Err(e) => tracing::error!("Error parsing order status report: {e}"),
1170                            }
1171                        } else {
1172                            tracing::debug!(raw_symbol = %raw_symbol, full_symbol = %symbol, "No instrument found for symbol in AccountOrder message");
1173                        }
1174                    }
1175                    if !reports.is_empty() {
1176                        result.push(NautilusWsMessage::OrderStatusReports(reports));
1177                    }
1178                }
1179            }
1180            BybitWsMessage::AccountExecution(msg) => {
1181                if let Some(account_id) = account_id {
1182                    let mut reports = Vec::new();
1183                    for execution in &msg.data {
1184                        let raw_symbol = execution.symbol;
1185                        let symbol = make_bybit_symbol(raw_symbol, execution.category);
1186
1187                        if let Some(instrument) = instruments.get(&symbol) {
1188                            match parse_ws_fill_report(execution, account_id, instrument, ts_init) {
1189                                Ok(report) => reports.push(report),
1190                                Err(e) => tracing::error!("Error parsing fill report: {e}"),
1191                            }
1192                        } else {
1193                            tracing::debug!(raw_symbol = %raw_symbol, full_symbol = %symbol, "No instrument found for symbol in AccountExecution message");
1194                        }
1195                    }
1196                    if !reports.is_empty() {
1197                        result.push(NautilusWsMessage::FillReports(reports));
1198                    }
1199                }
1200            }
1201            BybitWsMessage::AccountPosition(msg) => {
1202                if let Some(account_id) = account_id {
1203                    for position in &msg.data {
1204                        let raw_symbol = position.symbol;
1205                        let symbol = make_bybit_symbol(raw_symbol, position.category);
1206
1207                        if let Some(instrument) = instruments.get(&symbol) {
1208                            match parse_ws_position_status_report(
1209                                position, account_id, instrument, ts_init,
1210                            ) {
1211                                Ok(report) => {
1212                                    result.push(NautilusWsMessage::PositionStatusReport(report));
1213                                }
1214                                Err(e) => {
1215                                    tracing::error!("Error parsing position status report: {e}");
1216                                }
1217                            }
1218                        } else {
1219                            tracing::debug!(raw_symbol = %raw_symbol, full_symbol = %symbol, "No instrument found for symbol in AccountPosition message");
1220                        }
1221                    }
1222                }
1223            }
1224            BybitWsMessage::AccountWallet(msg) => {
1225                if let Some(account_id) = account_id {
1226                    for wallet in &msg.data {
1227                        let ts_event = UnixNanos::from(msg.creation_time as u64 * 1_000_000);
1228
1229                        match parse_ws_account_state(wallet, account_id, ts_event, ts_init) {
1230                            Ok(state) => result.push(NautilusWsMessage::AccountState(state)),
1231                            Err(e) => tracing::error!("Error parsing account state: {e}"),
1232                        }
1233                    }
1234                }
1235            }
1236            BybitWsMessage::OrderResponse(resp) => {
1237                if resp.ret_code == 0 {
1238                    tracing::debug!(op = %resp.op, ret_msg = %resp.ret_msg, "Order operation successful");
1239
1240                    if resp.op.contains("batch") {
1241                        self.handle_batch_response(&resp, &mut result);
1242                    } else if let Some(req_id) = &resp.req_id {
1243                        if resp.op.contains("create") {
1244                            self.pending_place_requests.remove(req_id);
1245                        } else if resp.op.contains("cancel") {
1246                            self.pending_cancel_requests.remove(req_id);
1247                        } else if resp.op.contains("amend") {
1248                            self.pending_amend_requests.remove(req_id);
1249                        }
1250                    } else if let Some(order_link_id) =
1251                        resp.data.get("orderLinkId").and_then(|v| v.as_str())
1252                    {
1253                        // Bybit sometimes omits req_id, search by client_order_id instead
1254                        let client_order_id = ClientOrderId::from(order_link_id);
1255                        if resp.op.contains("create") {
1256                            self.find_and_remove_place_request_by_client_order_id(&client_order_id);
1257                        } else if resp.op.contains("cancel") {
1258                            self.find_and_remove_cancel_request_by_client_order_id(
1259                                &client_order_id,
1260                            );
1261                        } else if resp.op.contains("amend") {
1262                            self.find_and_remove_amend_request_by_client_order_id(&client_order_id);
1263                        }
1264                    }
1265                } else if let Some(req_id) = &resp.req_id {
1266                    let clock = get_atomic_clock_realtime();
1267                    let ts_init = clock.get_time_ns();
1268
1269                    if resp.op.contains("batch") {
1270                        self.handle_batch_failure(
1271                            req_id,
1272                            &resp.ret_msg,
1273                            &resp.op,
1274                            ts_init,
1275                            &mut result,
1276                        );
1277                    } else if resp.op.contains("create")
1278                        && let Some((_, (client_order_id, trader_id, strategy_id, instrument_id))) =
1279                            self.pending_place_requests.remove(req_id)
1280                    {
1281                        let Some(account_id) = self.account_id else {
1282                            tracing::error!(
1283                                request_id = %req_id,
1284                                reason = %resp.ret_msg,
1285                                "Cannot create OrderRejected event: account_id is None"
1286                            );
1287                            return result;
1288                        };
1289
1290                        let rejected = OrderRejected::new(
1291                            trader_id,
1292                            strategy_id,
1293                            instrument_id,
1294                            client_order_id,
1295                            account_id,
1296                            Ustr::from(&resp.ret_msg),
1297                            UUID4::new(),
1298                            ts_init,
1299                            ts_init,
1300                            false,
1301                            false,
1302                        );
1303                        result.push(NautilusWsMessage::OrderRejected(rejected));
1304                    } else if resp.op.contains("cancel")
1305                        && let Some((
1306                            _,
1307                            (
1308                                client_order_id,
1309                                trader_id,
1310                                strategy_id,
1311                                instrument_id,
1312                                venue_order_id,
1313                            ),
1314                        )) = self.pending_cancel_requests.remove(req_id)
1315                    {
1316                        let rejected = OrderCancelRejected::new(
1317                            trader_id,
1318                            strategy_id,
1319                            instrument_id,
1320                            client_order_id,
1321                            Ustr::from(&resp.ret_msg),
1322                            UUID4::new(),
1323                            ts_init,
1324                            ts_init,
1325                            false,
1326                            venue_order_id,
1327                            self.account_id,
1328                        );
1329                        result.push(NautilusWsMessage::OrderCancelRejected(rejected));
1330                    } else if resp.op.contains("amend")
1331                        && let Some((
1332                            _,
1333                            (
1334                                client_order_id,
1335                                trader_id,
1336                                strategy_id,
1337                                instrument_id,
1338                                venue_order_id,
1339                            ),
1340                        )) = self.pending_amend_requests.remove(req_id)
1341                    {
1342                        let rejected = OrderModifyRejected::new(
1343                            trader_id,
1344                            strategy_id,
1345                            instrument_id,
1346                            client_order_id,
1347                            Ustr::from(&resp.ret_msg),
1348                            UUID4::new(),
1349                            ts_init,
1350                            ts_init,
1351                            false,
1352                            venue_order_id,
1353                            self.account_id,
1354                        );
1355                        result.push(NautilusWsMessage::OrderModifyRejected(rejected));
1356                    }
1357                } else if let Some(order_link_id) =
1358                    resp.data.get("orderLinkId").and_then(|v| v.as_str())
1359                {
1360                    // Bybit sometimes omits req_id, search by client_order_id instead
1361                    let clock = get_atomic_clock_realtime();
1362                    let ts_init = clock.get_time_ns();
1363                    let client_order_id = ClientOrderId::from(order_link_id);
1364
1365                    if resp.op.contains("create") {
1366                        if let Some((_, (_, trader_id, strategy_id, instrument_id))) =
1367                            self.find_and_remove_place_request_by_client_order_id(&client_order_id)
1368                        {
1369                            let Some(account_id) = self.account_id else {
1370                                tracing::error!(
1371                                    client_order_id = %client_order_id,
1372                                    reason = %resp.ret_msg,
1373                                    "Cannot create OrderRejected event: account_id is None"
1374                                );
1375                                return result;
1376                            };
1377
1378                            let rejected = OrderRejected::new(
1379                                trader_id,
1380                                strategy_id,
1381                                instrument_id,
1382                                client_order_id,
1383                                account_id,
1384                                Ustr::from(&resp.ret_msg),
1385                                UUID4::new(),
1386                                ts_init,
1387                                ts_init,
1388                                false,
1389                                false,
1390                            );
1391                            result.push(NautilusWsMessage::OrderRejected(rejected));
1392                        }
1393                    } else if resp.op.contains("cancel") {
1394                        if let Some((
1395                            _,
1396                            (_, trader_id, strategy_id, instrument_id, venue_order_id),
1397                        )) =
1398                            self.find_and_remove_cancel_request_by_client_order_id(&client_order_id)
1399                        {
1400                            let rejected = OrderCancelRejected::new(
1401                                trader_id,
1402                                strategy_id,
1403                                instrument_id,
1404                                client_order_id,
1405                                Ustr::from(&resp.ret_msg),
1406                                UUID4::new(),
1407                                ts_init,
1408                                ts_init,
1409                                false,
1410                                venue_order_id,
1411                                self.account_id,
1412                            );
1413                            result.push(NautilusWsMessage::OrderCancelRejected(rejected));
1414                        }
1415                    } else if resp.op.contains("amend")
1416                        && let Some((_, (_, trader_id, strategy_id, instrument_id, venue_order_id))) =
1417                            self.find_and_remove_amend_request_by_client_order_id(&client_order_id)
1418                    {
1419                        let rejected = OrderModifyRejected::new(
1420                            trader_id,
1421                            strategy_id,
1422                            instrument_id,
1423                            client_order_id,
1424                            Ustr::from(&resp.ret_msg),
1425                            UUID4::new(),
1426                            ts_init,
1427                            ts_init,
1428                            false,
1429                            venue_order_id,
1430                            self.account_id,
1431                        );
1432                        result.push(NautilusWsMessage::OrderModifyRejected(rejected));
1433                    }
1434                } else {
1435                    tracing::warn!(
1436                        op = %resp.op,
1437                        ret_code = resp.ret_code,
1438                        ret_msg = %resp.ret_msg,
1439                        "Order operation failed but request_id could not be extracted from response"
1440                    );
1441                }
1442            }
1443            BybitWsMessage::Auth(auth_response) => {
1444                let is_success =
1445                    auth_response.success.unwrap_or(false) || (auth_response.ret_code == Some(0));
1446
1447                if is_success {
1448                    self.auth_tracker.succeed();
1449                    tracing::info!("WebSocket authenticated");
1450                    result.push(NautilusWsMessage::Authenticated);
1451                } else {
1452                    let error_msg = auth_response
1453                        .ret_msg
1454                        .as_deref()
1455                        .unwrap_or("Authentication rejected");
1456                    self.auth_tracker.fail(error_msg);
1457                    tracing::error!(error = error_msg, "WebSocket authentication failed");
1458                    result.push(NautilusWsMessage::Error(BybitWebSocketError::from_message(
1459                        error_msg.to_string(),
1460                    )));
1461                }
1462            }
1463            BybitWsMessage::Error(err) => {
1464                result.push(NautilusWsMessage::Error(err));
1465            }
1466            BybitWsMessage::Reconnected => {
1467                self.quote_cache.clear();
1468                result.push(NautilusWsMessage::Reconnected);
1469            }
1470            BybitWsMessage::Subscription(sub_msg) => {
1471                let pending_topics = self.subscriptions.pending_subscribe_topics();
1472                match sub_msg.op {
1473                    BybitWsOperation::Subscribe => {
1474                        if sub_msg.success {
1475                            for topic in pending_topics {
1476                                self.subscriptions.confirm_subscribe(&topic);
1477                                tracing::debug!(topic = topic, "Subscription confirmed");
1478                            }
1479                        } else {
1480                            for topic in pending_topics {
1481                                self.subscriptions.mark_failure(&topic);
1482                                tracing::warn!(
1483                                    topic = topic,
1484                                    error = ?sub_msg.ret_msg,
1485                                    "Subscription failed, will retry on reconnect"
1486                                );
1487                            }
1488                        }
1489                    }
1490                    BybitWsOperation::Unsubscribe => {
1491                        let pending_unsub = self.subscriptions.pending_unsubscribe_topics();
1492                        if sub_msg.success {
1493                            for topic in pending_unsub {
1494                                self.subscriptions.confirm_unsubscribe(&topic);
1495                                tracing::debug!(topic = topic, "Unsubscription confirmed");
1496                            }
1497                        } else {
1498                            for topic in pending_unsub {
1499                                tracing::warn!(
1500                                    topic = topic,
1501                                    error = ?sub_msg.ret_msg,
1502                                    "Unsubscription failed"
1503                                );
1504                            }
1505                        }
1506                    }
1507                    _ => {}
1508                }
1509            }
1510            _ => {}
1511        }
1512
1513        result
1514    }
1515}
1516
1517////////////////////////////////////////////////////////////////////////////////
1518// Tests
1519////////////////////////////////////////////////////////////////////////////////
1520
1521#[cfg(test)]
1522mod tests {
1523    use rstest::rstest;
1524
1525    use super::*;
1526    use crate::common::consts::BYBIT_WS_TOPIC_DELIMITER;
1527
1528    fn create_test_handler() -> FeedHandler {
1529        let signal = Arc::new(AtomicBool::new(false));
1530        let (_cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
1531        let (_raw_tx, raw_rx) = tokio::sync::mpsc::unbounded_channel();
1532        let (out_tx, _out_rx) = tokio::sync::mpsc::unbounded_channel();
1533        let auth_tracker = AuthTracker::new();
1534        let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER);
1535        let funding_cache = Arc::new(RwLock::new(AHashMap::new()));
1536
1537        FeedHandler::new(
1538            signal,
1539            cmd_rx,
1540            raw_rx,
1541            out_tx,
1542            None,
1543            None,
1544            Arc::new(AtomicU8::new(0)),
1545            auth_tracker,
1546            subscriptions,
1547            funding_cache,
1548        )
1549    }
1550
1551    #[rstest]
1552    fn test_generate_unique_request_id_returns_different_ids() {
1553        let handler = create_test_handler();
1554
1555        let id1 = handler.generate_unique_request_id();
1556        let id2 = handler.generate_unique_request_id();
1557        let id3 = handler.generate_unique_request_id();
1558
1559        assert_ne!(id1, id2);
1560        assert_ne!(id2, id3);
1561        assert_ne!(id1, id3);
1562    }
1563
1564    #[rstest]
1565    fn test_generate_unique_request_id_produces_valid_uuids() {
1566        let handler = create_test_handler();
1567
1568        let id1 = handler.generate_unique_request_id();
1569        let id2 = handler.generate_unique_request_id();
1570
1571        assert!(UUID4::from(id1.as_str()).to_string() == id1);
1572        assert!(UUID4::from(id2.as_str()).to_string() == id2);
1573    }
1574
1575    #[rstest]
1576    fn test_multiple_place_orders_use_different_request_ids() {
1577        let handler = create_test_handler();
1578
1579        let req_id_1 = handler.generate_unique_request_id();
1580        let req_id_2 = handler.generate_unique_request_id();
1581        let req_id_3 = handler.generate_unique_request_id();
1582
1583        assert_ne!(req_id_1, req_id_2);
1584        assert_ne!(req_id_2, req_id_3);
1585        assert_ne!(req_id_1, req_id_3);
1586    }
1587
1588    #[rstest]
1589    fn test_multiple_amends_use_different_request_ids() {
1590        let handler = create_test_handler();
1591
1592        // Verifies fix for "Duplicate reqId" errors when amending same order multiple times
1593        let req_id_1 = handler.generate_unique_request_id();
1594        let req_id_2 = handler.generate_unique_request_id();
1595        let req_id_3 = handler.generate_unique_request_id();
1596
1597        assert_ne!(
1598            req_id_1, req_id_2,
1599            "Multiple amends should generate different request IDs to avoid 'Duplicate reqId' errors"
1600        );
1601        assert_ne!(
1602            req_id_2, req_id_3,
1603            "Multiple amends should generate different request IDs to avoid 'Duplicate reqId' errors"
1604        );
1605    }
1606
1607    #[rstest]
1608    fn test_multiple_cancels_use_different_request_ids() {
1609        let handler = create_test_handler();
1610
1611        let req_id_1 = handler.generate_unique_request_id();
1612        let req_id_2 = handler.generate_unique_request_id();
1613
1614        assert_ne!(
1615            req_id_1, req_id_2,
1616            "Multiple cancels should generate different request IDs"
1617        );
1618    }
1619
1620    #[rstest]
1621    fn test_concurrent_request_id_generation() {
1622        let handler = create_test_handler();
1623
1624        let mut ids = std::collections::HashSet::new();
1625        for _ in 0..100 {
1626            let id = handler.generate_unique_request_id();
1627            assert!(
1628                ids.insert(id.clone()),
1629                "Generated duplicate request ID: {}",
1630                id
1631            );
1632        }
1633        assert_eq!(ids.len(), 100);
1634    }
1635}