1use std::{
19 collections::VecDeque,
20 num::NonZero,
21 sync::{
22 Arc,
23 atomic::{AtomicBool, AtomicU8, Ordering},
24 },
25};
26
27use ahash::AHashMap;
28use dashmap::DashMap;
29use nautilus_common::cache::quote::QuoteCache;
30use nautilus_core::{UUID4, nanos::UnixNanos, time::get_atomic_clock_realtime};
31use nautilus_model::{
32 data::{BarSpecification, BarType, Data},
33 enums::{AggregationSource, BarAggregation, PriceType},
34 events::{OrderCancelRejected, OrderModifyRejected, OrderRejected},
35 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
36 instruments::{Instrument, InstrumentAny},
37};
38use nautilus_network::{
39 retry::{RetryManager, create_websocket_retry_manager},
40 websocket::{AuthTracker, SubscriptionState, WebSocketClient},
41};
42use tokio::sync::RwLock;
43use tokio_tungstenite::tungstenite::Message;
44use ustr::Ustr;
45
46use super::{
47 enums::BybitWsOperation,
48 error::{BybitWsError, create_bybit_timeout_error, should_retry_bybit_error},
49 messages::{
50 BybitWebSocketError, BybitWsHeader, BybitWsMessage, BybitWsRequest, NautilusWsMessage,
51 },
52 parse::{
53 parse_kline_topic, parse_millis_i64, parse_orderbook_deltas, parse_orderbook_quote,
54 parse_ticker_linear_funding, parse_ws_account_state, parse_ws_fill_report,
55 parse_ws_kline_bar, parse_ws_order_status_report, parse_ws_position_status_report,
56 parse_ws_trade_tick,
57 },
58};
59use crate::{
60 common::{
61 consts::BYBIT_NAUTILUS_BROKER_ID,
62 enums::{BybitProductType, BybitTimeInForce, BybitWsOrderRequestOp},
63 parse::{make_bybit_symbol, parse_price_with_precision, parse_quantity_with_precision},
64 },
65 websocket::messages::{
66 BybitBatchOrderError, BybitWsAmendOrderParams, BybitWsCancelOrderParams,
67 BybitWsOrderResponse, BybitWsPlaceOrderParams,
68 },
69};
70
71#[derive(Debug)]
73#[allow(
74 clippy::large_enum_variant,
75 reason = "Commands are ephemeral and immediately consumed"
76)]
77pub enum HandlerCommand {
78 SetClient(WebSocketClient),
79 Disconnect,
80 Authenticate {
81 payload: String,
82 },
83 Subscribe {
84 topics: Vec<String>,
85 },
86 Unsubscribe {
87 topics: Vec<String>,
88 },
89 SendText {
90 payload: String,
91 },
92 PlaceOrder {
93 params: BybitWsPlaceOrderParams,
94 client_order_id: ClientOrderId,
95 trader_id: TraderId,
96 strategy_id: StrategyId,
97 instrument_id: InstrumentId,
98 },
99 AmendOrder {
100 params: BybitWsAmendOrderParams,
101 client_order_id: ClientOrderId,
102 trader_id: TraderId,
103 strategy_id: StrategyId,
104 instrument_id: InstrumentId,
105 venue_order_id: Option<VenueOrderId>,
106 },
107 CancelOrder {
108 params: BybitWsCancelOrderParams,
109 client_order_id: ClientOrderId,
110 trader_id: TraderId,
111 strategy_id: StrategyId,
112 instrument_id: InstrumentId,
113 venue_order_id: Option<VenueOrderId>,
114 },
115 RegisterBatchPlace {
116 req_id: String,
117 orders: Vec<BatchOrderData>,
118 },
119 RegisterBatchCancel {
120 req_id: String,
121 cancels: Vec<BatchCancelData>,
122 },
123 InitializeInstruments(Vec<InstrumentAny>),
124 UpdateInstrument(InstrumentAny),
125}
126
127type FundingCache = Arc<RwLock<AHashMap<Ustr, (Option<String>, Option<String>)>>>;
129
130type PlaceRequestData = (ClientOrderId, TraderId, StrategyId, InstrumentId);
132
133type CancelRequestData = (
135 ClientOrderId,
136 TraderId,
137 StrategyId,
138 InstrumentId,
139 Option<VenueOrderId>,
140);
141
142type AmendRequestData = (
144 ClientOrderId,
145 TraderId,
146 StrategyId,
147 InstrumentId,
148 Option<VenueOrderId>,
149);
150
151type BatchOrderData = (ClientOrderId, PlaceRequestData);
153
154type BatchCancelData = (ClientOrderId, CancelRequestData);
156
157pub(super) struct FeedHandler {
158 signal: Arc<AtomicBool>,
159 client: Option<WebSocketClient>,
160 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
161 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
162 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
163 auth_tracker: AuthTracker,
164 subscriptions: SubscriptionState,
165 instruments_cache: AHashMap<Ustr, InstrumentAny>,
166 account_id: Option<AccountId>,
167 mm_level: Arc<AtomicU8>,
168 product_type: Option<BybitProductType>,
169 quote_cache: QuoteCache,
170 funding_cache: FundingCache,
171 retry_manager: RetryManager<BybitWsError>,
172 pending_place_requests: DashMap<String, PlaceRequestData>,
173 pending_cancel_requests: DashMap<String, CancelRequestData>,
174 pending_amend_requests: DashMap<String, AmendRequestData>,
175 pending_batch_place_requests: DashMap<String, Vec<BatchOrderData>>,
176 pending_batch_cancel_requests: DashMap<String, Vec<BatchCancelData>>,
177 message_queue: VecDeque<NautilusWsMessage>,
178}
179
180impl FeedHandler {
181 #[allow(clippy::too_many_arguments)]
183 pub(super) fn new(
184 signal: Arc<AtomicBool>,
185 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
186 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
187 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
188 account_id: Option<AccountId>,
189 product_type: Option<BybitProductType>,
190 mm_level: Arc<AtomicU8>,
191 auth_tracker: AuthTracker,
192 subscriptions: SubscriptionState,
193 funding_cache: FundingCache,
194 ) -> Self {
195 Self {
196 signal,
197 client: None,
198 cmd_rx,
199 raw_rx,
200 out_tx,
201 auth_tracker,
202 subscriptions,
203 instruments_cache: AHashMap::new(),
204 account_id,
205 mm_level,
206 product_type,
207 quote_cache: QuoteCache::new(),
208 funding_cache,
209 retry_manager: create_websocket_retry_manager(),
210 pending_place_requests: DashMap::new(),
211 pending_cancel_requests: DashMap::new(),
212 pending_amend_requests: DashMap::new(),
213 pending_batch_place_requests: DashMap::new(),
214 pending_batch_cancel_requests: DashMap::new(),
215 message_queue: VecDeque::new(),
216 }
217 }
218
219 pub(super) fn is_stopped(&self) -> bool {
220 self.signal.load(Ordering::Relaxed)
221 }
222
223 #[allow(dead_code)]
224 pub(super) fn send(&self, msg: NautilusWsMessage) -> Result<(), ()> {
225 self.out_tx.send(msg).map_err(|_| ())
226 }
227
228 fn generate_unique_request_id(&self) -> String {
229 UUID4::new().to_string()
230 }
231
232 fn find_and_remove_place_request_by_client_order_id(
233 &self,
234 client_order_id: &ClientOrderId,
235 ) -> Option<(String, PlaceRequestData)> {
236 self.pending_place_requests
237 .iter()
238 .find(|entry| entry.value().0 == *client_order_id)
239 .and_then(|entry| {
240 let key = entry.key().clone();
241 drop(entry);
242 self.pending_place_requests.remove(&key)
243 })
244 }
245
246 fn find_and_remove_cancel_request_by_client_order_id(
247 &self,
248 client_order_id: &ClientOrderId,
249 ) -> Option<(String, CancelRequestData)> {
250 self.pending_cancel_requests
251 .iter()
252 .find(|entry| entry.value().0 == *client_order_id)
253 .and_then(|entry| {
254 let key = entry.key().clone();
255 drop(entry);
256 self.pending_cancel_requests.remove(&key)
257 })
258 }
259
260 fn find_and_remove_amend_request_by_client_order_id(
261 &self,
262 client_order_id: &ClientOrderId,
263 ) -> Option<(String, AmendRequestData)> {
264 self.pending_amend_requests
265 .iter()
266 .find(|entry| entry.value().0 == *client_order_id)
267 .and_then(|entry| {
268 let key = entry.key().clone();
269 drop(entry);
270 self.pending_amend_requests.remove(&key)
271 })
272 }
273
274 fn include_referer_header(&self, time_in_force: Option<BybitTimeInForce>) -> bool {
275 let is_post_only = matches!(time_in_force, Some(BybitTimeInForce::PostOnly));
276 let mm_level = self.mm_level.load(Ordering::Relaxed);
277 !(is_post_only && mm_level > 0)
278 }
279
280 async fn send_with_retry(&self, payload: String) -> Result<(), BybitWsError> {
282 if let Some(client) = &self.client {
283 self.retry_manager
284 .execute_with_retry(
285 "websocket_send",
286 || {
287 let payload = payload.clone();
288 async move {
289 client
290 .send_text(payload, None)
291 .await
292 .map_err(|e| BybitWsError::Transport(format!("Send failed: {e}")))
293 }
294 },
295 should_retry_bybit_error,
296 create_bybit_timeout_error,
297 )
298 .await
299 } else {
300 Err(BybitWsError::ClientError(
301 "No active WebSocket client".to_string(),
302 ))
303 }
304 }
305
306 fn handle_batch_failure(
311 &self,
312 req_id: &str,
313 ret_msg: &str,
314 op: &str,
315 ts_init: UnixNanos,
316 result: &mut Vec<NautilusWsMessage>,
317 ) {
318 if op.contains("create") {
319 if let Some((_, batch_data)) = self.pending_batch_place_requests.remove(req_id) {
320 tracing::warn!(
321 req_id = %req_id,
322 ret_msg = %ret_msg,
323 num_orders = batch_data.len(),
324 "Batch place request failed"
325 );
326
327 let Some(account_id) = self.account_id else {
328 tracing::error!("Cannot create OrderRejected events: account_id is None");
329 return;
330 };
331
332 let reason = Ustr::from(ret_msg);
333 for (client_order_id, (_, trader_id, strategy_id, instrument_id)) in batch_data {
334 let rejected = OrderRejected::new(
335 trader_id,
336 strategy_id,
337 instrument_id,
338 client_order_id,
339 account_id,
340 reason,
341 UUID4::new(),
342 ts_init,
343 ts_init,
344 false,
345 false,
346 );
347 result.push(NautilusWsMessage::OrderRejected(rejected));
348 }
349 }
350 } else if op.contains("cancel")
351 && let Some((_, batch_data)) = self.pending_batch_cancel_requests.remove(req_id)
352 {
353 tracing::warn!(
354 req_id = %req_id,
355 ret_msg = %ret_msg,
356 num_cancels = batch_data.len(),
357 "Batch cancel request failed"
358 );
359
360 let reason = Ustr::from(ret_msg);
361 for (client_order_id, (_, trader_id, strategy_id, instrument_id, venue_order_id)) in
362 batch_data
363 {
364 let rejected = OrderCancelRejected::new(
365 trader_id,
366 strategy_id,
367 instrument_id,
368 client_order_id,
369 reason,
370 UUID4::new(),
371 ts_init,
372 ts_init,
373 false,
374 venue_order_id,
375 self.account_id,
376 );
377 result.push(NautilusWsMessage::OrderCancelRejected(rejected));
378 }
379 }
380 }
381
382 fn handle_batch_response(
384 &self,
385 resp: &BybitWsOrderResponse,
386 result: &mut Vec<NautilusWsMessage>,
387 ) {
388 let Some(req_id) = &resp.req_id else {
389 tracing::warn!(
390 op = %resp.op,
391 "Batch response missing req_id - cannot correlate with pending requests"
392 );
393 return;
394 };
395
396 let batch_errors = resp.extract_batch_errors();
397
398 if resp.op.contains("create") {
399 if let Some((_, batch_data)) = self.pending_batch_place_requests.remove(req_id) {
400 self.process_batch_place_errors(batch_data, batch_errors, result);
401 } else {
402 tracing::debug!(
403 req_id = %req_id,
404 "Batch place response received but no pending request found"
405 );
406 }
407 } else if resp.op.contains("cancel") {
408 if let Some((_, batch_data)) = self.pending_batch_cancel_requests.remove(req_id) {
409 self.process_batch_cancel_errors(batch_data, batch_errors, result);
410 } else {
411 tracing::debug!(
412 req_id = %req_id,
413 "Batch cancel response received but no pending request found"
414 );
415 }
416 }
417 }
418
419 fn process_batch_place_errors(
421 &self,
422 batch_data: Vec<BatchOrderData>,
423 errors: Vec<BybitBatchOrderError>,
424 result: &mut Vec<NautilusWsMessage>,
425 ) {
426 let Some(account_id) = self.account_id else {
427 tracing::error!("Cannot create OrderRejected events: account_id is None");
428 return;
429 };
430
431 let clock = get_atomic_clock_realtime();
432 let ts_init = clock.get_time_ns();
433
434 for (idx, (client_order_id, (_, trader_id, strategy_id, instrument_id))) in
435 batch_data.into_iter().enumerate()
436 {
437 if let Some(error) = errors.get(idx)
438 && error.code != 0
439 {
440 tracing::warn!(
441 client_order_id = %client_order_id,
442 error_code = error.code,
443 error_msg = %error.msg,
444 "Batch order rejected"
445 );
446
447 let rejected = OrderRejected::new(
448 trader_id,
449 strategy_id,
450 instrument_id,
451 client_order_id,
452 account_id,
453 Ustr::from(&error.msg),
454 UUID4::new(),
455 ts_init,
456 ts_init,
457 false,
458 false,
459 );
460 result.push(NautilusWsMessage::OrderRejected(rejected));
461 }
462 }
463 }
464
465 fn process_batch_cancel_errors(
467 &self,
468 batch_data: Vec<BatchCancelData>,
469 errors: Vec<BybitBatchOrderError>,
470 result: &mut Vec<NautilusWsMessage>,
471 ) {
472 let clock = get_atomic_clock_realtime();
473 let ts_init = clock.get_time_ns();
474
475 for (idx, (client_order_id, (_, trader_id, strategy_id, instrument_id, venue_order_id))) in
476 batch_data.into_iter().enumerate()
477 {
478 if let Some(error) = errors.get(idx)
479 && error.code != 0
480 {
481 tracing::warn!(
482 client_order_id = %client_order_id,
483 error_code = error.code,
484 error_msg = %error.msg,
485 "Batch cancel rejected"
486 );
487
488 let rejected = OrderCancelRejected::new(
489 trader_id,
490 strategy_id,
491 instrument_id,
492 client_order_id,
493 Ustr::from(&error.msg),
494 UUID4::new(),
495 ts_init,
496 ts_init,
497 false,
498 venue_order_id,
499 self.account_id,
500 );
501 result.push(NautilusWsMessage::OrderCancelRejected(rejected));
502 }
503 }
504 }
505
506 pub(super) async fn next(&mut self) -> Option<NautilusWsMessage> {
507 let clock = get_atomic_clock_realtime();
508
509 loop {
510 if let Some(msg) = self.message_queue.pop_front() {
511 return Some(msg);
512 }
513
514 tokio::select! {
515 Some(cmd) = self.cmd_rx.recv() => {
516 match cmd {
517 HandlerCommand::SetClient(client) => {
518 tracing::debug!("WebSocketClient received by handler");
519 self.client = Some(client);
520 }
521 HandlerCommand::Disconnect => {
522 tracing::debug!("Disconnect command received");
523
524 if let Some(client) = self.client.take() {
525 client.disconnect().await;
526 }
527 }
528 HandlerCommand::Authenticate { payload } => {
529 tracing::debug!("Authenticate command received");
530 if let Err(e) = self.send_with_retry(payload).await {
531 tracing::error!("Failed to send authentication after retries: {e}");
532 }
533 }
534 HandlerCommand::Subscribe { topics } => {
535 for topic in topics {
536 tracing::debug!(topic = %topic, "Subscribing to topic");
537 if let Err(e) = self.send_with_retry(topic.clone()).await {
538 tracing::error!(topic = %topic, error = %e, "Failed to send subscription after retries");
539 }
540 }
541 }
542 HandlerCommand::Unsubscribe { topics } => {
543 for topic in topics {
544 tracing::debug!(topic = %topic, "Unsubscribing from topic");
545 if let Err(e) = self.send_with_retry(topic.clone()).await {
546 tracing::error!(topic = %topic, error = %e, "Failed to send unsubscription after retries");
547 }
548 }
549 }
550 HandlerCommand::SendText { payload } => {
551 if let Err(e) = self.send_with_retry(payload).await {
552 tracing::error!("Error sending text with retry: {e}");
553 }
554 }
555 HandlerCommand::InitializeInstruments(instruments) => {
556 for inst in instruments {
557 self.instruments_cache.insert(inst.symbol().inner(), inst);
558 }
559 }
560 HandlerCommand::UpdateInstrument(inst) => {
561 self.instruments_cache.insert(inst.symbol().inner(), inst);
562 }
563 HandlerCommand::RegisterBatchPlace { req_id, orders } => {
564 tracing::debug!(
565 req_id = %req_id,
566 num_orders = orders.len(),
567 "Registering batch place request"
568 );
569 self.pending_batch_place_requests.insert(req_id, orders);
570 }
571 HandlerCommand::RegisterBatchCancel { req_id, cancels } => {
572 tracing::debug!(
573 req_id = %req_id,
574 num_cancels = cancels.len(),
575 "Registering batch cancel request"
576 );
577 self.pending_batch_cancel_requests.insert(req_id, cancels);
578 }
579 HandlerCommand::PlaceOrder {
580 params,
581 client_order_id,
582 trader_id,
583 strategy_id,
584 instrument_id,
585 } => {
586 let request_id = self.generate_unique_request_id();
587
588 self.pending_place_requests.insert(
589 request_id.clone(),
590 (client_order_id, trader_id, strategy_id, instrument_id),
591 );
592
593 let referer = if self.include_referer_header(params.time_in_force) {
594 Some(BYBIT_NAUTILUS_BROKER_ID.to_string())
595 } else {
596 None
597 };
598
599 let request = BybitWsRequest {
600 req_id: Some(request_id.clone()),
601 op: BybitWsOrderRequestOp::Create,
602 header: BybitWsHeader::with_referer(referer),
603 args: vec![params],
604 };
605
606 if let Ok(payload) = serde_json::to_string(&request)
607 && let Err(e) = self.send_with_retry(payload).await
608 {
609 tracing::error!("Failed to send place order after retries: {e}");
610 self.pending_place_requests.remove(&request_id);
611 }
612 }
613 HandlerCommand::AmendOrder {
614 params,
615 client_order_id,
616 trader_id,
617 strategy_id,
618 instrument_id,
619 venue_order_id,
620 } => {
621 let request_id = self.generate_unique_request_id();
622
623 self.pending_amend_requests.insert(
624 request_id.clone(),
625 (client_order_id, trader_id, strategy_id, instrument_id, venue_order_id),
626 );
627
628 let request = BybitWsRequest {
629 req_id: Some(request_id.clone()),
630 op: BybitWsOrderRequestOp::Amend,
631 header: BybitWsHeader::now(),
632 args: vec![params],
633 };
634
635 if let Ok(payload) = serde_json::to_string(&request)
636 && let Err(e) = self.send_with_retry(payload).await
637 {
638 tracing::error!("Failed to send amend order after retries: {e}");
639 self.pending_amend_requests.remove(&request_id);
640 }
641 }
642 HandlerCommand::CancelOrder {
643 params,
644 client_order_id,
645 trader_id,
646 strategy_id,
647 instrument_id,
648 venue_order_id,
649 } => {
650 let request_id = self.generate_unique_request_id();
651
652 self.pending_cancel_requests.insert(
653 request_id.clone(),
654 (client_order_id, trader_id, strategy_id, instrument_id, venue_order_id),
655 );
656
657 let request = BybitWsRequest {
658 req_id: Some(request_id.clone()),
659 op: BybitWsOrderRequestOp::Cancel,
660 header: BybitWsHeader::now(),
661 args: vec![params],
662 };
663
664 if let Ok(payload) = serde_json::to_string(&request)
665 && let Err(e) = self.send_with_retry(payload).await
666 {
667 tracing::error!("Failed to send cancel order after retries: {e}");
668 self.pending_cancel_requests.remove(&request_id);
669 }
670 }
671 }
672
673 continue;
674 }
675
676 _ = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
677 if self.signal.load(Ordering::Relaxed) {
678 tracing::debug!("Stop signal received during idle period");
679 return None;
680 }
681 continue;
682 }
683
684 msg = self.raw_rx.recv() => {
685 let msg = match msg {
686 Some(msg) => msg,
687 None => {
688 tracing::debug!("WebSocket stream closed");
689 return None;
690 }
691 };
692
693 if let Message::Ping(data) = &msg {
694 tracing::trace!("Received ping frame with {} bytes", data.len());
695
696 if let Some(client) = &self.client
697 && let Err(e) = client.send_pong(data.to_vec()).await
698 {
699 tracing::warn!(error = %e, "Failed to send pong frame");
700 }
701 continue;
702 }
703
704 let event = match Self::parse_raw_message(msg) {
705 Some(event) => event,
706 None => continue,
707 };
708
709 if self.signal.load(Ordering::Relaxed) {
710 tracing::debug!("Stop signal received");
711 return None;
712 }
713
714 let ts_init = clock.get_time_ns();
715 let instruments = self.instruments_cache.clone();
716 let funding_cache = Arc::clone(&self.funding_cache);
717 let nautilus_messages = self.parse_to_nautilus_messages(
718 event,
719 &instruments,
720 self.account_id,
721 self.product_type,
722 &funding_cache,
723 ts_init,
724 )
725 .await;
726
727 self.message_queue.extend(nautilus_messages);
729 }
730 }
731 }
732 }
733
734 fn parse_raw_message(msg: Message) -> Option<BybitWsMessage> {
735 use serde_json::Value;
736
737 match msg {
738 Message::Text(text) => {
739 if text == nautilus_network::RECONNECTED {
740 tracing::info!("Received WebSocket reconnected signal");
741 return Some(BybitWsMessage::Reconnected);
742 }
743
744 if text.trim().eq_ignore_ascii_case("pong") {
745 return None;
746 }
747
748 tracing::trace!("Raw websocket message: {text}");
749
750 let value: Value = match serde_json::from_str(&text) {
751 Ok(v) => v,
752 Err(e) => {
753 tracing::error!("Failed to parse WebSocket message: {e}: {text}");
754 return None;
755 }
756 };
757
758 Self::classify_bybit_message(&value).or(Some(BybitWsMessage::Raw(value)))
759 }
760 Message::Binary(msg) => {
761 tracing::debug!("Raw binary: {msg:?}");
762 None
763 }
764 Message::Close(_) => {
765 tracing::debug!("Received close message, waiting for reconnection");
766 None
767 }
768 _ => None,
769 }
770 }
771
772 pub(crate) fn classify_bybit_message(value: &serde_json::Value) -> Option<BybitWsMessage> {
773 use super::{
774 enums::BybitWsOperation,
775 messages::{
776 BybitWsAuthResponse, BybitWsOrderResponse, BybitWsResponse, BybitWsSubscriptionMsg,
777 },
778 };
779
780 if let Ok(op) = serde_json::from_value::<BybitWsOperation>(
781 value.get("op").cloned().unwrap_or(serde_json::Value::Null),
782 ) && op == BybitWsOperation::Auth
783 && let Ok(auth) = serde_json::from_value::<BybitWsAuthResponse>(value.clone())
784 {
785 let is_success = auth.success.unwrap_or(false) || auth.ret_code.unwrap_or(-1) == 0;
786 if is_success {
787 return Some(BybitWsMessage::Auth(auth));
788 }
789 let resp = BybitWsResponse {
790 op: Some(auth.op.clone()),
791 topic: None,
792 success: auth.success,
793 conn_id: auth.conn_id.clone(),
794 req_id: None,
795 ret_code: auth.ret_code,
796 ret_msg: auth.ret_msg,
797 };
798 let error = BybitWebSocketError::from_response(&resp);
799 return Some(BybitWsMessage::Error(error));
800 }
801
802 if let Some(success) = value.get("success").and_then(serde_json::Value::as_bool) {
803 if success {
804 if let Ok(msg) = serde_json::from_value::<BybitWsSubscriptionMsg>(value.clone()) {
805 return Some(BybitWsMessage::Subscription(msg));
806 }
807 } else if let Ok(resp) = serde_json::from_value::<BybitWsResponse>(value.clone()) {
808 let error = BybitWebSocketError::from_response(&resp);
809 return Some(BybitWsMessage::Error(error));
810 }
811 }
812
813 if let Some(op) = value.get("op").and_then(serde_json::Value::as_str)
814 && op.starts_with("order.")
815 && let Ok(order_resp) = serde_json::from_value::<BybitWsOrderResponse>(value.clone())
816 {
817 return Some(BybitWsMessage::OrderResponse(order_resp));
818 }
819
820 if let Some(topic) = value.get("topic").and_then(serde_json::Value::as_str) {
821 if topic.starts_with("orderbook")
822 && let Ok(msg) = serde_json::from_value(value.clone())
823 {
824 return Some(BybitWsMessage::Orderbook(msg));
825 } else if (topic.contains("publicTrade") || topic.starts_with("trade"))
826 && let Ok(msg) = serde_json::from_value(value.clone())
827 {
828 return Some(BybitWsMessage::Trade(msg));
829 } else if topic.starts_with("kline")
830 && let Ok(msg) = serde_json::from_value(value.clone())
831 {
832 return Some(BybitWsMessage::Kline(msg));
833 } else if topic.starts_with("tickers") {
834 if let Some(symbol) = value
836 .get("data")
837 .and_then(|d| d.get("symbol"))
838 .and_then(|s| s.as_str())
839 && symbol.contains('-')
840 && symbol.matches('-').count() >= 3
841 && let Ok(msg) = serde_json::from_value(value.clone())
842 {
843 return Some(BybitWsMessage::TickerOption(msg));
844 }
845 if let Ok(msg) = serde_json::from_value(value.clone()) {
846 return Some(BybitWsMessage::TickerLinear(msg));
847 }
848 } else if topic.starts_with("order")
849 && let Ok(msg) = serde_json::from_value(value.clone())
850 {
851 return Some(BybitWsMessage::AccountOrder(msg));
852 } else if topic.starts_with("execution")
853 && let Ok(msg) = serde_json::from_value(value.clone())
854 {
855 return Some(BybitWsMessage::AccountExecution(msg));
856 } else if topic.starts_with("wallet")
857 && let Ok(msg) = serde_json::from_value(value.clone())
858 {
859 return Some(BybitWsMessage::AccountWallet(msg));
860 } else if topic.starts_with("position")
861 && let Ok(msg) = serde_json::from_value(value.clone())
862 {
863 return Some(BybitWsMessage::AccountPosition(msg));
864 }
865 }
866
867 None
868 }
869
870 #[allow(clippy::too_many_arguments)]
871 async fn parse_to_nautilus_messages(
872 &mut self,
873 msg: BybitWsMessage,
874 instruments: &AHashMap<Ustr, InstrumentAny>,
875 account_id: Option<AccountId>,
876 product_type: Option<BybitProductType>,
877 funding_cache: &FundingCache,
878 ts_init: UnixNanos,
879 ) -> Vec<NautilusWsMessage> {
880 let mut result = Vec::new();
881
882 match msg {
883 BybitWsMessage::Orderbook(msg) => {
884 let raw_symbol = msg.data.s;
885 let symbol =
886 product_type.map_or(raw_symbol, |pt| make_bybit_symbol(raw_symbol, pt));
887
888 if let Some(instrument) = instruments.get(&symbol) {
889 match parse_orderbook_deltas(&msg, instrument, ts_init) {
890 Ok(deltas) => result.push(NautilusWsMessage::Deltas(deltas)),
891 Err(e) => tracing::error!("Error parsing orderbook deltas: {e}"),
892 }
893
894 if let Some(depth_str) = msg.topic.as_str().split('.').nth(1)
896 && depth_str == "1"
897 {
898 let instrument_id = instrument.id();
899 let last_quote = self.quote_cache.get(&instrument_id);
900
901 match parse_orderbook_quote(&msg, instrument, last_quote, ts_init) {
902 Ok(quote) => {
903 self.quote_cache.insert(instrument_id, quote);
904 result.push(NautilusWsMessage::Data(vec![Data::Quote(quote)]));
905 }
906 Err(e) => tracing::debug!("Skipping orderbook quote: {e}"),
907 }
908 }
909 } else {
910 tracing::debug!(raw_symbol = %raw_symbol, full_symbol = %symbol, "No instrument found for symbol in Orderbook message");
911 }
912 }
913 BybitWsMessage::Trade(msg) => {
914 let mut data_vec = Vec::new();
915 for trade in &msg.data {
916 let raw_symbol = trade.s;
917 let symbol =
918 product_type.map_or(raw_symbol, |pt| make_bybit_symbol(raw_symbol, pt));
919
920 if let Some(instrument) = instruments.get(&symbol) {
921 match parse_ws_trade_tick(trade, instrument, ts_init) {
922 Ok(tick) => data_vec.push(Data::Trade(tick)),
923 Err(e) => tracing::error!("Error parsing trade tick: {e}"),
924 }
925 } else {
926 tracing::debug!(raw_symbol = %raw_symbol, full_symbol = %symbol, "No instrument found for symbol in Trade message");
927 }
928 }
929
930 if !data_vec.is_empty() {
931 result.push(NautilusWsMessage::Data(data_vec));
932 }
933 }
934 BybitWsMessage::Kline(msg) => {
935 let (interval_str, raw_symbol) = match parse_kline_topic(&msg.topic) {
936 Ok(parts) => parts,
937 Err(e) => {
938 tracing::warn!("Failed to parse kline topic: {e}");
939 return result;
940 }
941 };
942
943 let symbol = product_type
944 .map_or_else(|| raw_symbol.into(), |pt| make_bybit_symbol(raw_symbol, pt));
945
946 if let Some(instrument) = instruments.get(&symbol) {
947 let (step, aggregation) = match interval_str.parse::<usize>() {
948 Ok(minutes) if minutes > 0 => (minutes, BarAggregation::Minute),
949 _ => {
950 tracing::warn!("Unsupported kline interval: {}", interval_str);
951 return result;
952 }
953 };
954
955 if let Some(non_zero_step) = NonZero::new(step) {
956 let bar_spec = BarSpecification {
957 step: non_zero_step,
958 aggregation,
959 price_type: PriceType::Last,
960 };
961 let bar_type =
962 BarType::new(instrument.id(), bar_spec, AggregationSource::External);
963
964 let mut data_vec = Vec::new();
965 for kline in &msg.data {
966 if !kline.confirm {
968 continue;
969 }
970 match parse_ws_kline_bar(kline, instrument, bar_type, false, ts_init) {
971 Ok(bar) => data_vec.push(Data::Bar(bar)),
972 Err(e) => tracing::error!("Error parsing kline to bar: {e}"),
973 }
974 }
975 if !data_vec.is_empty() {
976 result.push(NautilusWsMessage::Data(data_vec));
977 }
978 } else {
979 tracing::error!("Invalid step value: {}", step);
980 }
981 } else {
982 tracing::debug!(raw_symbol = %raw_symbol, full_symbol = %symbol, "No instrument found for symbol in Kline message");
983 }
984 }
985 BybitWsMessage::TickerLinear(msg) => {
986 let raw_symbol = msg.data.symbol;
987 let symbol =
988 product_type.map_or(raw_symbol, |pt| make_bybit_symbol(raw_symbol, pt));
989
990 if let Some(instrument) = instruments.get(&symbol) {
991 let instrument_id = instrument.id();
992 let ts_event = parse_millis_i64(msg.ts, "ticker.ts").unwrap_or(ts_init);
993 let price_precision = instrument.price_precision();
994 let size_precision = instrument.size_precision();
995
996 let bid_price = msg
998 .data
999 .bid1_price
1000 .as_deref()
1001 .map(|s| parse_price_with_precision(s, price_precision, "bid1Price"))
1002 .transpose();
1003 let ask_price = msg
1004 .data
1005 .ask1_price
1006 .as_deref()
1007 .map(|s| parse_price_with_precision(s, price_precision, "ask1Price"))
1008 .transpose();
1009 let bid_size = msg
1010 .data
1011 .bid1_size
1012 .as_deref()
1013 .map(|s| parse_quantity_with_precision(s, size_precision, "bid1Size"))
1014 .transpose();
1015 let ask_size = msg
1016 .data
1017 .ask1_size
1018 .as_deref()
1019 .map(|s| parse_quantity_with_precision(s, size_precision, "ask1Size"))
1020 .transpose();
1021
1022 match (bid_price, ask_price, bid_size, ask_size) {
1023 (Ok(bp), Ok(ap), Ok(bs), Ok(as_)) => {
1024 match self.quote_cache.process(
1025 instrument_id,
1026 bp,
1027 ap,
1028 bs,
1029 as_,
1030 ts_event,
1031 ts_init,
1032 ) {
1033 Ok(quote) => {
1034 result.push(NautilusWsMessage::Data(vec![Data::Quote(quote)]));
1035 }
1036 Err(e) => {
1037 let raw_data = serde_json::to_string(&msg.data)
1038 .unwrap_or_else(|_| "<failed to serialize>".to_string());
1039 tracing::debug!(
1040 "Skipping partial ticker update: {e}, raw_data: {raw_data}"
1041 );
1042 }
1043 }
1044 }
1045 _ => {
1046 let raw_data = serde_json::to_string(&msg.data)
1047 .unwrap_or_else(|_| "<failed to serialize>".to_string());
1048 tracing::warn!(
1049 "Failed to parse ticker fields, skipping update, raw_data: {raw_data}"
1050 );
1051 }
1052 }
1053
1054 if msg.data.funding_rate.is_some() && msg.data.next_funding_time.is_some() {
1056 let cache_key = (
1057 msg.data.funding_rate.clone(),
1058 msg.data.next_funding_time.clone(),
1059 );
1060
1061 let should_publish = {
1062 let cache = funding_cache.read().await;
1063 cache.get(&symbol) != Some(&cache_key)
1064 };
1065
1066 if should_publish {
1067 match parse_ticker_linear_funding(
1068 &msg.data,
1069 instrument_id,
1070 ts_event,
1071 ts_init,
1072 ) {
1073 Ok(funding) => {
1074 funding_cache.write().await.insert(symbol, cache_key);
1075 result.push(NautilusWsMessage::FundingRates(vec![funding]));
1076 }
1077 Err(e) => {
1078 tracing::debug!("Skipping funding rate update: {e}");
1079 }
1080 }
1081 }
1082 }
1083 } else {
1084 tracing::debug!(raw_symbol = %raw_symbol, full_symbol = %symbol, "No instrument found for symbol in TickerLinear message");
1085 }
1086 }
1087 BybitWsMessage::TickerOption(msg) => {
1088 let raw_symbol = &msg.data.symbol;
1089 let symbol = product_type.map_or_else(
1090 || raw_symbol.as_str().into(),
1091 |pt| make_bybit_symbol(raw_symbol, pt),
1092 );
1093
1094 if let Some(instrument) = instruments.get(&symbol) {
1095 let instrument_id = instrument.id();
1096 let ts_event = parse_millis_i64(msg.ts, "ticker.ts").unwrap_or(ts_init);
1097 let price_precision = instrument.price_precision();
1098 let size_precision = instrument.size_precision();
1099
1100 let bid_price = parse_price_with_precision(
1102 &msg.data.bid_price,
1103 price_precision,
1104 "bidPrice",
1105 );
1106 let ask_price = parse_price_with_precision(
1107 &msg.data.ask_price,
1108 price_precision,
1109 "askPrice",
1110 );
1111 let bid_size = parse_quantity_with_precision(
1112 &msg.data.bid_size,
1113 size_precision,
1114 "bidSize",
1115 );
1116 let ask_size = parse_quantity_with_precision(
1117 &msg.data.ask_size,
1118 size_precision,
1119 "askSize",
1120 );
1121
1122 match (bid_price, ask_price, bid_size, ask_size) {
1123 (Ok(bp), Ok(ap), Ok(bs), Ok(as_)) => {
1124 match self.quote_cache.process(
1125 instrument_id,
1126 Some(bp),
1127 Some(ap),
1128 Some(bs),
1129 Some(as_),
1130 ts_event,
1131 ts_init,
1132 ) {
1133 Ok(quote) => {
1134 result.push(NautilusWsMessage::Data(vec![Data::Quote(quote)]));
1135 }
1136 Err(e) => {
1137 let raw_data = serde_json::to_string(&msg.data)
1138 .unwrap_or_else(|_| "<failed to serialize>".to_string());
1139 tracing::debug!(
1140 "Skipping partial ticker update: {e}, raw_data: {raw_data}"
1141 );
1142 }
1143 }
1144 }
1145 _ => {
1146 let raw_data = serde_json::to_string(&msg.data)
1147 .unwrap_or_else(|_| "<failed to serialize>".to_string());
1148 tracing::warn!(
1149 "Failed to parse ticker fields, skipping update, raw_data: {raw_data}"
1150 );
1151 }
1152 }
1153 } else {
1154 tracing::debug!(raw_symbol = %raw_symbol, full_symbol = %symbol, "No instrument found for symbol in TickerOption message");
1155 }
1156 }
1157 BybitWsMessage::AccountOrder(msg) => {
1158 if let Some(account_id) = account_id {
1159 let mut reports = Vec::new();
1160 for order in &msg.data {
1161 let raw_symbol = order.symbol;
1162 let symbol = make_bybit_symbol(raw_symbol, order.category);
1163
1164 if let Some(instrument) = instruments.get(&symbol) {
1165 match parse_ws_order_status_report(
1166 order, instrument, account_id, ts_init,
1167 ) {
1168 Ok(report) => reports.push(report),
1169 Err(e) => tracing::error!("Error parsing order status report: {e}"),
1170 }
1171 } else {
1172 tracing::debug!(raw_symbol = %raw_symbol, full_symbol = %symbol, "No instrument found for symbol in AccountOrder message");
1173 }
1174 }
1175 if !reports.is_empty() {
1176 result.push(NautilusWsMessage::OrderStatusReports(reports));
1177 }
1178 }
1179 }
1180 BybitWsMessage::AccountExecution(msg) => {
1181 if let Some(account_id) = account_id {
1182 let mut reports = Vec::new();
1183 for execution in &msg.data {
1184 let raw_symbol = execution.symbol;
1185 let symbol = make_bybit_symbol(raw_symbol, execution.category);
1186
1187 if let Some(instrument) = instruments.get(&symbol) {
1188 match parse_ws_fill_report(execution, account_id, instrument, ts_init) {
1189 Ok(report) => reports.push(report),
1190 Err(e) => tracing::error!("Error parsing fill report: {e}"),
1191 }
1192 } else {
1193 tracing::debug!(raw_symbol = %raw_symbol, full_symbol = %symbol, "No instrument found for symbol in AccountExecution message");
1194 }
1195 }
1196 if !reports.is_empty() {
1197 result.push(NautilusWsMessage::FillReports(reports));
1198 }
1199 }
1200 }
1201 BybitWsMessage::AccountPosition(msg) => {
1202 if let Some(account_id) = account_id {
1203 for position in &msg.data {
1204 let raw_symbol = position.symbol;
1205 let symbol = make_bybit_symbol(raw_symbol, position.category);
1206
1207 if let Some(instrument) = instruments.get(&symbol) {
1208 match parse_ws_position_status_report(
1209 position, account_id, instrument, ts_init,
1210 ) {
1211 Ok(report) => {
1212 result.push(NautilusWsMessage::PositionStatusReport(report));
1213 }
1214 Err(e) => {
1215 tracing::error!("Error parsing position status report: {e}");
1216 }
1217 }
1218 } else {
1219 tracing::debug!(raw_symbol = %raw_symbol, full_symbol = %symbol, "No instrument found for symbol in AccountPosition message");
1220 }
1221 }
1222 }
1223 }
1224 BybitWsMessage::AccountWallet(msg) => {
1225 if let Some(account_id) = account_id {
1226 for wallet in &msg.data {
1227 let ts_event = UnixNanos::from(msg.creation_time as u64 * 1_000_000);
1228
1229 match parse_ws_account_state(wallet, account_id, ts_event, ts_init) {
1230 Ok(state) => result.push(NautilusWsMessage::AccountState(state)),
1231 Err(e) => tracing::error!("Error parsing account state: {e}"),
1232 }
1233 }
1234 }
1235 }
1236 BybitWsMessage::OrderResponse(resp) => {
1237 if resp.ret_code == 0 {
1238 tracing::debug!(op = %resp.op, ret_msg = %resp.ret_msg, "Order operation successful");
1239
1240 if resp.op.contains("batch") {
1241 self.handle_batch_response(&resp, &mut result);
1242 } else if let Some(req_id) = &resp.req_id {
1243 if resp.op.contains("create") {
1244 self.pending_place_requests.remove(req_id);
1245 } else if resp.op.contains("cancel") {
1246 self.pending_cancel_requests.remove(req_id);
1247 } else if resp.op.contains("amend") {
1248 self.pending_amend_requests.remove(req_id);
1249 }
1250 } else if let Some(order_link_id) =
1251 resp.data.get("orderLinkId").and_then(|v| v.as_str())
1252 {
1253 let client_order_id = ClientOrderId::from(order_link_id);
1255 if resp.op.contains("create") {
1256 self.find_and_remove_place_request_by_client_order_id(&client_order_id);
1257 } else if resp.op.contains("cancel") {
1258 self.find_and_remove_cancel_request_by_client_order_id(
1259 &client_order_id,
1260 );
1261 } else if resp.op.contains("amend") {
1262 self.find_and_remove_amend_request_by_client_order_id(&client_order_id);
1263 }
1264 }
1265 } else if let Some(req_id) = &resp.req_id {
1266 let clock = get_atomic_clock_realtime();
1267 let ts_init = clock.get_time_ns();
1268
1269 if resp.op.contains("batch") {
1270 self.handle_batch_failure(
1271 req_id,
1272 &resp.ret_msg,
1273 &resp.op,
1274 ts_init,
1275 &mut result,
1276 );
1277 } else if resp.op.contains("create")
1278 && let Some((_, (client_order_id, trader_id, strategy_id, instrument_id))) =
1279 self.pending_place_requests.remove(req_id)
1280 {
1281 let Some(account_id) = self.account_id else {
1282 tracing::error!(
1283 request_id = %req_id,
1284 reason = %resp.ret_msg,
1285 "Cannot create OrderRejected event: account_id is None"
1286 );
1287 return result;
1288 };
1289
1290 let rejected = OrderRejected::new(
1291 trader_id,
1292 strategy_id,
1293 instrument_id,
1294 client_order_id,
1295 account_id,
1296 Ustr::from(&resp.ret_msg),
1297 UUID4::new(),
1298 ts_init,
1299 ts_init,
1300 false,
1301 false,
1302 );
1303 result.push(NautilusWsMessage::OrderRejected(rejected));
1304 } else if resp.op.contains("cancel")
1305 && let Some((
1306 _,
1307 (
1308 client_order_id,
1309 trader_id,
1310 strategy_id,
1311 instrument_id,
1312 venue_order_id,
1313 ),
1314 )) = self.pending_cancel_requests.remove(req_id)
1315 {
1316 let rejected = OrderCancelRejected::new(
1317 trader_id,
1318 strategy_id,
1319 instrument_id,
1320 client_order_id,
1321 Ustr::from(&resp.ret_msg),
1322 UUID4::new(),
1323 ts_init,
1324 ts_init,
1325 false,
1326 venue_order_id,
1327 self.account_id,
1328 );
1329 result.push(NautilusWsMessage::OrderCancelRejected(rejected));
1330 } else if resp.op.contains("amend")
1331 && let Some((
1332 _,
1333 (
1334 client_order_id,
1335 trader_id,
1336 strategy_id,
1337 instrument_id,
1338 venue_order_id,
1339 ),
1340 )) = self.pending_amend_requests.remove(req_id)
1341 {
1342 let rejected = OrderModifyRejected::new(
1343 trader_id,
1344 strategy_id,
1345 instrument_id,
1346 client_order_id,
1347 Ustr::from(&resp.ret_msg),
1348 UUID4::new(),
1349 ts_init,
1350 ts_init,
1351 false,
1352 venue_order_id,
1353 self.account_id,
1354 );
1355 result.push(NautilusWsMessage::OrderModifyRejected(rejected));
1356 }
1357 } else if let Some(order_link_id) =
1358 resp.data.get("orderLinkId").and_then(|v| v.as_str())
1359 {
1360 let clock = get_atomic_clock_realtime();
1362 let ts_init = clock.get_time_ns();
1363 let client_order_id = ClientOrderId::from(order_link_id);
1364
1365 if resp.op.contains("create") {
1366 if let Some((_, (_, trader_id, strategy_id, instrument_id))) =
1367 self.find_and_remove_place_request_by_client_order_id(&client_order_id)
1368 {
1369 let Some(account_id) = self.account_id else {
1370 tracing::error!(
1371 client_order_id = %client_order_id,
1372 reason = %resp.ret_msg,
1373 "Cannot create OrderRejected event: account_id is None"
1374 );
1375 return result;
1376 };
1377
1378 let rejected = OrderRejected::new(
1379 trader_id,
1380 strategy_id,
1381 instrument_id,
1382 client_order_id,
1383 account_id,
1384 Ustr::from(&resp.ret_msg),
1385 UUID4::new(),
1386 ts_init,
1387 ts_init,
1388 false,
1389 false,
1390 );
1391 result.push(NautilusWsMessage::OrderRejected(rejected));
1392 }
1393 } else if resp.op.contains("cancel") {
1394 if let Some((
1395 _,
1396 (_, trader_id, strategy_id, instrument_id, venue_order_id),
1397 )) =
1398 self.find_and_remove_cancel_request_by_client_order_id(&client_order_id)
1399 {
1400 let rejected = OrderCancelRejected::new(
1401 trader_id,
1402 strategy_id,
1403 instrument_id,
1404 client_order_id,
1405 Ustr::from(&resp.ret_msg),
1406 UUID4::new(),
1407 ts_init,
1408 ts_init,
1409 false,
1410 venue_order_id,
1411 self.account_id,
1412 );
1413 result.push(NautilusWsMessage::OrderCancelRejected(rejected));
1414 }
1415 } else if resp.op.contains("amend")
1416 && let Some((_, (_, trader_id, strategy_id, instrument_id, venue_order_id))) =
1417 self.find_and_remove_amend_request_by_client_order_id(&client_order_id)
1418 {
1419 let rejected = OrderModifyRejected::new(
1420 trader_id,
1421 strategy_id,
1422 instrument_id,
1423 client_order_id,
1424 Ustr::from(&resp.ret_msg),
1425 UUID4::new(),
1426 ts_init,
1427 ts_init,
1428 false,
1429 venue_order_id,
1430 self.account_id,
1431 );
1432 result.push(NautilusWsMessage::OrderModifyRejected(rejected));
1433 }
1434 } else {
1435 tracing::warn!(
1436 op = %resp.op,
1437 ret_code = resp.ret_code,
1438 ret_msg = %resp.ret_msg,
1439 "Order operation failed but request_id could not be extracted from response"
1440 );
1441 }
1442 }
1443 BybitWsMessage::Auth(auth_response) => {
1444 let is_success =
1445 auth_response.success.unwrap_or(false) || (auth_response.ret_code == Some(0));
1446
1447 if is_success {
1448 self.auth_tracker.succeed();
1449 tracing::info!("WebSocket authenticated");
1450 result.push(NautilusWsMessage::Authenticated);
1451 } else {
1452 let error_msg = auth_response
1453 .ret_msg
1454 .as_deref()
1455 .unwrap_or("Authentication rejected");
1456 self.auth_tracker.fail(error_msg);
1457 tracing::error!(error = error_msg, "WebSocket authentication failed");
1458 result.push(NautilusWsMessage::Error(BybitWebSocketError::from_message(
1459 error_msg.to_string(),
1460 )));
1461 }
1462 }
1463 BybitWsMessage::Error(err) => {
1464 result.push(NautilusWsMessage::Error(err));
1465 }
1466 BybitWsMessage::Reconnected => {
1467 self.quote_cache.clear();
1468 result.push(NautilusWsMessage::Reconnected);
1469 }
1470 BybitWsMessage::Subscription(sub_msg) => {
1471 let pending_topics = self.subscriptions.pending_subscribe_topics();
1472 match sub_msg.op {
1473 BybitWsOperation::Subscribe => {
1474 if sub_msg.success {
1475 for topic in pending_topics {
1476 self.subscriptions.confirm_subscribe(&topic);
1477 tracing::debug!(topic = topic, "Subscription confirmed");
1478 }
1479 } else {
1480 for topic in pending_topics {
1481 self.subscriptions.mark_failure(&topic);
1482 tracing::warn!(
1483 topic = topic,
1484 error = ?sub_msg.ret_msg,
1485 "Subscription failed, will retry on reconnect"
1486 );
1487 }
1488 }
1489 }
1490 BybitWsOperation::Unsubscribe => {
1491 let pending_unsub = self.subscriptions.pending_unsubscribe_topics();
1492 if sub_msg.success {
1493 for topic in pending_unsub {
1494 self.subscriptions.confirm_unsubscribe(&topic);
1495 tracing::debug!(topic = topic, "Unsubscription confirmed");
1496 }
1497 } else {
1498 for topic in pending_unsub {
1499 tracing::warn!(
1500 topic = topic,
1501 error = ?sub_msg.ret_msg,
1502 "Unsubscription failed"
1503 );
1504 }
1505 }
1506 }
1507 _ => {}
1508 }
1509 }
1510 _ => {}
1511 }
1512
1513 result
1514 }
1515}
1516
1517#[cfg(test)]
1522mod tests {
1523 use rstest::rstest;
1524
1525 use super::*;
1526 use crate::common::consts::BYBIT_WS_TOPIC_DELIMITER;
1527
1528 fn create_test_handler() -> FeedHandler {
1529 let signal = Arc::new(AtomicBool::new(false));
1530 let (_cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
1531 let (_raw_tx, raw_rx) = tokio::sync::mpsc::unbounded_channel();
1532 let (out_tx, _out_rx) = tokio::sync::mpsc::unbounded_channel();
1533 let auth_tracker = AuthTracker::new();
1534 let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER);
1535 let funding_cache = Arc::new(RwLock::new(AHashMap::new()));
1536
1537 FeedHandler::new(
1538 signal,
1539 cmd_rx,
1540 raw_rx,
1541 out_tx,
1542 None,
1543 None,
1544 Arc::new(AtomicU8::new(0)),
1545 auth_tracker,
1546 subscriptions,
1547 funding_cache,
1548 )
1549 }
1550
1551 #[rstest]
1552 fn test_generate_unique_request_id_returns_different_ids() {
1553 let handler = create_test_handler();
1554
1555 let id1 = handler.generate_unique_request_id();
1556 let id2 = handler.generate_unique_request_id();
1557 let id3 = handler.generate_unique_request_id();
1558
1559 assert_ne!(id1, id2);
1560 assert_ne!(id2, id3);
1561 assert_ne!(id1, id3);
1562 }
1563
1564 #[rstest]
1565 fn test_generate_unique_request_id_produces_valid_uuids() {
1566 let handler = create_test_handler();
1567
1568 let id1 = handler.generate_unique_request_id();
1569 let id2 = handler.generate_unique_request_id();
1570
1571 assert!(UUID4::from(id1.as_str()).to_string() == id1);
1572 assert!(UUID4::from(id2.as_str()).to_string() == id2);
1573 }
1574
1575 #[rstest]
1576 fn test_multiple_place_orders_use_different_request_ids() {
1577 let handler = create_test_handler();
1578
1579 let req_id_1 = handler.generate_unique_request_id();
1580 let req_id_2 = handler.generate_unique_request_id();
1581 let req_id_3 = handler.generate_unique_request_id();
1582
1583 assert_ne!(req_id_1, req_id_2);
1584 assert_ne!(req_id_2, req_id_3);
1585 assert_ne!(req_id_1, req_id_3);
1586 }
1587
1588 #[rstest]
1589 fn test_multiple_amends_use_different_request_ids() {
1590 let handler = create_test_handler();
1591
1592 let req_id_1 = handler.generate_unique_request_id();
1594 let req_id_2 = handler.generate_unique_request_id();
1595 let req_id_3 = handler.generate_unique_request_id();
1596
1597 assert_ne!(
1598 req_id_1, req_id_2,
1599 "Multiple amends should generate different request IDs to avoid 'Duplicate reqId' errors"
1600 );
1601 assert_ne!(
1602 req_id_2, req_id_3,
1603 "Multiple amends should generate different request IDs to avoid 'Duplicate reqId' errors"
1604 );
1605 }
1606
1607 #[rstest]
1608 fn test_multiple_cancels_use_different_request_ids() {
1609 let handler = create_test_handler();
1610
1611 let req_id_1 = handler.generate_unique_request_id();
1612 let req_id_2 = handler.generate_unique_request_id();
1613
1614 assert_ne!(
1615 req_id_1, req_id_2,
1616 "Multiple cancels should generate different request IDs"
1617 );
1618 }
1619
1620 #[rstest]
1621 fn test_concurrent_request_id_generation() {
1622 let handler = create_test_handler();
1623
1624 let mut ids = std::collections::HashSet::new();
1625 for _ in 0..100 {
1626 let id = handler.generate_unique_request_id();
1627 assert!(
1628 ids.insert(id.clone()),
1629 "Generated duplicate request ID: {}",
1630 id
1631 );
1632 }
1633 assert_eq!(ids.len(), 100);
1634 }
1635}