1use std::{
19 collections::VecDeque,
20 sync::{
21 Arc,
22 atomic::{AtomicBool, AtomicU8, Ordering},
23 },
24};
25
26use ahash::AHashMap;
27use dashmap::DashMap;
28use nautilus_common::cache::quote::QuoteCache;
29use nautilus_core::{UUID4, nanos::UnixNanos, time::get_atomic_clock_realtime};
30use nautilus_model::{
31 data::{BarType, Data},
32 events::{OrderCancelRejected, OrderModifyRejected, OrderRejected},
33 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
34 instruments::{Instrument, InstrumentAny},
35};
36use nautilus_network::{
37 retry::{RetryManager, create_websocket_retry_manager},
38 websocket::{AuthTracker, SubscriptionState, WebSocketClient},
39};
40use tokio_tungstenite::tungstenite::Message;
41use ustr::Ustr;
42
43use super::{
44 enums::BybitWsOperation,
45 error::{BybitWsError, create_bybit_timeout_error, should_retry_bybit_error},
46 messages::{
47 BybitWebSocketError, BybitWsAuthResponse, BybitWsHeader, BybitWsMessage, BybitWsRequest,
48 BybitWsResponse, BybitWsSubscriptionMsg, NautilusWsMessage,
49 },
50 parse::{
51 parse_millis_i64, parse_orderbook_deltas, parse_orderbook_quote,
52 parse_ticker_linear_funding, parse_ticker_linear_index_price,
53 parse_ticker_linear_mark_price, parse_ticker_option_index_price,
54 parse_ticker_option_mark_price, parse_ws_account_state, parse_ws_fill_report,
55 parse_ws_kline_bar, parse_ws_order_status_report, parse_ws_position_status_report,
56 parse_ws_trade_tick,
57 },
58};
59use crate::{
60 common::{
61 consts::{
62 BYBIT_NAUTILUS_BROKER_ID, BYBIT_TOPIC_EXECUTION, BYBIT_TOPIC_KLINE, BYBIT_TOPIC_ORDER,
63 BYBIT_TOPIC_ORDERBOOK, BYBIT_TOPIC_POSITION, BYBIT_TOPIC_PUBLIC_TRADE,
64 BYBIT_TOPIC_TICKERS, BYBIT_TOPIC_TRADE, BYBIT_TOPIC_WALLET,
65 },
66 enums::{BybitProductType, BybitTimeInForce, BybitWsOrderRequestOp},
67 parse::{make_bybit_symbol, parse_price_with_precision, parse_quantity_with_precision},
68 },
69 websocket::messages::{
70 BybitBatchOrderError, BybitWsAmendOrderParams, BybitWsCancelOrderParams,
71 BybitWsOrderResponse, BybitWsPlaceOrderParams,
72 },
73};
74
75#[derive(Debug)]
77#[allow(
78 clippy::large_enum_variant,
79 reason = "Commands are ephemeral and immediately consumed"
80)]
81pub enum HandlerCommand {
82 SetClient(WebSocketClient),
83 Disconnect,
84 Authenticate {
85 payload: String,
86 },
87 Subscribe {
88 topics: Vec<String>,
89 },
90 Unsubscribe {
91 topics: Vec<String>,
92 },
93 SendText {
94 payload: String,
95 },
96 PlaceOrder {
97 params: BybitWsPlaceOrderParams,
98 client_order_id: ClientOrderId,
99 trader_id: TraderId,
100 strategy_id: StrategyId,
101 instrument_id: InstrumentId,
102 },
103 AmendOrder {
104 params: BybitWsAmendOrderParams,
105 client_order_id: ClientOrderId,
106 trader_id: TraderId,
107 strategy_id: StrategyId,
108 instrument_id: InstrumentId,
109 venue_order_id: Option<VenueOrderId>,
110 },
111 CancelOrder {
112 params: BybitWsCancelOrderParams,
113 client_order_id: ClientOrderId,
114 trader_id: TraderId,
115 strategy_id: StrategyId,
116 instrument_id: InstrumentId,
117 venue_order_id: Option<VenueOrderId>,
118 },
119 RegisterBatchPlace {
120 req_id: String,
121 orders: Vec<BatchOrderData>,
122 },
123 RegisterBatchCancel {
124 req_id: String,
125 cancels: Vec<BatchCancelData>,
126 },
127 InitializeInstruments(Vec<InstrumentAny>),
128 UpdateInstrument(InstrumentAny),
129}
130
131type FundingCache = Arc<tokio::sync::RwLock<AHashMap<Ustr, (Option<String>, Option<String>)>>>;
133
134type PlaceRequestData = (ClientOrderId, TraderId, StrategyId, InstrumentId);
136
137type CancelRequestData = (
139 ClientOrderId,
140 TraderId,
141 StrategyId,
142 InstrumentId,
143 Option<VenueOrderId>,
144);
145
146type AmendRequestData = (
148 ClientOrderId,
149 TraderId,
150 StrategyId,
151 InstrumentId,
152 Option<VenueOrderId>,
153);
154
155type BatchOrderData = (ClientOrderId, PlaceRequestData);
157
158type BatchCancelData = (ClientOrderId, CancelRequestData);
160
161pub(super) struct FeedHandler {
162 signal: Arc<AtomicBool>,
163 client: Option<WebSocketClient>,
164 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
165 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
166 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
167 auth_tracker: AuthTracker,
168 subscriptions: SubscriptionState,
169 instruments_cache: AHashMap<Ustr, InstrumentAny>,
170 account_id: Option<AccountId>,
171 mm_level: Arc<AtomicU8>,
172 product_type: Option<BybitProductType>,
173 bars_timestamp_on_close: bool,
174 quote_cache: QuoteCache,
175 funding_cache: FundingCache,
176 bar_types_cache: Arc<DashMap<String, BarType>>,
177 retry_manager: RetryManager<BybitWsError>,
178 pending_place_requests: DashMap<String, PlaceRequestData>,
179 pending_cancel_requests: DashMap<String, CancelRequestData>,
180 pending_amend_requests: DashMap<String, AmendRequestData>,
181 pending_batch_place_requests: DashMap<String, Vec<BatchOrderData>>,
182 pending_batch_cancel_requests: DashMap<String, Vec<BatchCancelData>>,
183 message_queue: VecDeque<NautilusWsMessage>,
184}
185
186impl FeedHandler {
187 #[allow(clippy::too_many_arguments)]
189 pub(super) fn new(
190 signal: Arc<AtomicBool>,
191 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
192 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
193 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
194 account_id: Option<AccountId>,
195 product_type: Option<BybitProductType>,
196 bars_timestamp_on_close: bool,
197 mm_level: Arc<AtomicU8>,
198 auth_tracker: AuthTracker,
199 subscriptions: SubscriptionState,
200 funding_cache: FundingCache,
201 bar_types_cache: Arc<DashMap<String, BarType>>,
202 ) -> Self {
203 Self {
204 signal,
205 client: None,
206 cmd_rx,
207 raw_rx,
208 out_tx,
209 auth_tracker,
210 subscriptions,
211 instruments_cache: AHashMap::new(),
212 account_id,
213 mm_level,
214 product_type,
215 bars_timestamp_on_close,
216 quote_cache: QuoteCache::new(),
217 funding_cache,
218 bar_types_cache,
219 retry_manager: create_websocket_retry_manager(),
220 pending_place_requests: DashMap::new(),
221 pending_cancel_requests: DashMap::new(),
222 pending_amend_requests: DashMap::new(),
223 pending_batch_place_requests: DashMap::new(),
224 pending_batch_cancel_requests: DashMap::new(),
225 message_queue: VecDeque::new(),
226 }
227 }
228
229 pub(super) fn is_stopped(&self) -> bool {
230 self.signal.load(Ordering::Relaxed)
231 }
232
233 #[allow(dead_code)]
234 pub(super) fn send(&self, msg: NautilusWsMessage) -> Result<(), ()> {
235 self.out_tx.send(msg).map_err(|_| ())
236 }
237
238 fn generate_unique_request_id(&self) -> String {
239 UUID4::new().to_string()
240 }
241
242 fn find_and_remove_place_request_by_client_order_id(
243 &self,
244 client_order_id: &ClientOrderId,
245 ) -> Option<(String, PlaceRequestData)> {
246 self.pending_place_requests
247 .iter()
248 .find(|entry| entry.value().0 == *client_order_id)
249 .and_then(|entry| {
250 let key = entry.key().clone();
251 drop(entry);
252 self.pending_place_requests.remove(&key)
253 })
254 }
255
256 fn find_and_remove_cancel_request_by_client_order_id(
257 &self,
258 client_order_id: &ClientOrderId,
259 ) -> Option<(String, CancelRequestData)> {
260 self.pending_cancel_requests
261 .iter()
262 .find(|entry| entry.value().0 == *client_order_id)
263 .and_then(|entry| {
264 let key = entry.key().clone();
265 drop(entry);
266 self.pending_cancel_requests.remove(&key)
267 })
268 }
269
270 fn find_and_remove_amend_request_by_client_order_id(
271 &self,
272 client_order_id: &ClientOrderId,
273 ) -> Option<(String, AmendRequestData)> {
274 self.pending_amend_requests
275 .iter()
276 .find(|entry| entry.value().0 == *client_order_id)
277 .and_then(|entry| {
278 let key = entry.key().clone();
279 drop(entry);
280 self.pending_amend_requests.remove(&key)
281 })
282 }
283
284 fn include_referer_header(&self, time_in_force: Option<BybitTimeInForce>) -> bool {
285 let is_post_only = matches!(time_in_force, Some(BybitTimeInForce::PostOnly));
286 let mm_level = self.mm_level.load(Ordering::Relaxed);
287 !(is_post_only && mm_level > 0)
288 }
289
290 async fn send_with_retry(&self, payload: String) -> Result<(), BybitWsError> {
292 if let Some(client) = &self.client {
293 self.retry_manager
294 .execute_with_retry(
295 "websocket_send",
296 || {
297 let payload = payload.clone();
298 async move {
299 client
300 .send_text(payload, None)
301 .await
302 .map_err(|e| BybitWsError::Transport(format!("Send failed: {e}")))
303 }
304 },
305 should_retry_bybit_error,
306 create_bybit_timeout_error,
307 )
308 .await
309 } else {
310 Err(BybitWsError::ClientError(
311 "No active WebSocket client".to_string(),
312 ))
313 }
314 }
315
316 fn handle_batch_failure(
321 &self,
322 req_id: &str,
323 ret_msg: &str,
324 op: &str,
325 ts_init: UnixNanos,
326 result: &mut Vec<NautilusWsMessage>,
327 ) {
328 if op.contains("create") {
329 if let Some((_, batch_data)) = self.pending_batch_place_requests.remove(req_id) {
330 log::warn!(
331 "Batch place request failed: req_id={req_id}, ret_msg={ret_msg}, num_orders={}",
332 batch_data.len()
333 );
334
335 let Some(account_id) = self.account_id else {
336 log::error!("Cannot create OrderRejected events: account_id is None");
337 return;
338 };
339
340 let reason = Ustr::from(ret_msg);
341 for (client_order_id, (_, trader_id, strategy_id, instrument_id)) in batch_data {
342 let rejected = OrderRejected::new(
343 trader_id,
344 strategy_id,
345 instrument_id,
346 client_order_id,
347 account_id,
348 reason,
349 UUID4::new(),
350 ts_init,
351 ts_init,
352 false,
353 false,
354 );
355 result.push(NautilusWsMessage::OrderRejected(rejected));
356 }
357 }
358 } else if op.contains("cancel")
359 && let Some((_, batch_data)) = self.pending_batch_cancel_requests.remove(req_id)
360 {
361 log::warn!(
362 "Batch cancel request failed: req_id={req_id}, ret_msg={ret_msg}, num_cancels={}",
363 batch_data.len()
364 );
365
366 let reason = Ustr::from(ret_msg);
367 for (client_order_id, (_, trader_id, strategy_id, instrument_id, venue_order_id)) in
368 batch_data
369 {
370 let rejected = OrderCancelRejected::new(
371 trader_id,
372 strategy_id,
373 instrument_id,
374 client_order_id,
375 reason,
376 UUID4::new(),
377 ts_init,
378 ts_init,
379 false,
380 venue_order_id,
381 self.account_id,
382 );
383 result.push(NautilusWsMessage::OrderCancelRejected(rejected));
384 }
385 }
386 }
387
388 fn handle_batch_response(
390 &self,
391 resp: &BybitWsOrderResponse,
392 result: &mut Vec<NautilusWsMessage>,
393 ) {
394 let Some(req_id) = &resp.req_id else {
395 log::warn!(
396 "Batch response missing req_id - cannot correlate with pending requests: op={}",
397 resp.op
398 );
399 return;
400 };
401
402 let batch_errors = resp.extract_batch_errors();
403
404 if resp.op.contains("create") {
405 if let Some((_, batch_data)) = self.pending_batch_place_requests.remove(req_id) {
406 self.process_batch_place_errors(batch_data, batch_errors, result);
407 } else {
408 log::debug!(
409 "Batch place response received but no pending request found: req_id={req_id}"
410 );
411 }
412 } else if resp.op.contains("cancel") {
413 if let Some((_, batch_data)) = self.pending_batch_cancel_requests.remove(req_id) {
414 self.process_batch_cancel_errors(batch_data, batch_errors, result);
415 } else {
416 log::debug!(
417 "Batch cancel response received but no pending request found: req_id={req_id}"
418 );
419 }
420 }
421 }
422
423 fn process_batch_place_errors(
425 &self,
426 batch_data: Vec<BatchOrderData>,
427 errors: Vec<BybitBatchOrderError>,
428 result: &mut Vec<NautilusWsMessage>,
429 ) {
430 let Some(account_id) = self.account_id else {
431 log::error!("Cannot create OrderRejected events: account_id is None");
432 return;
433 };
434
435 let clock = get_atomic_clock_realtime();
436 let ts_init = clock.get_time_ns();
437
438 for (idx, (client_order_id, (_, trader_id, strategy_id, instrument_id))) in
439 batch_data.into_iter().enumerate()
440 {
441 if let Some(error) = errors.get(idx)
442 && error.code != 0
443 {
444 log::warn!(
445 "Batch order rejected: client_order_id={client_order_id}, error_code={}, error_msg={}",
446 error.code,
447 error.msg
448 );
449
450 let rejected = OrderRejected::new(
451 trader_id,
452 strategy_id,
453 instrument_id,
454 client_order_id,
455 account_id,
456 Ustr::from(&error.msg),
457 UUID4::new(),
458 ts_init,
459 ts_init,
460 false,
461 false,
462 );
463 result.push(NautilusWsMessage::OrderRejected(rejected));
464 }
465 }
466 }
467
468 fn process_batch_cancel_errors(
470 &self,
471 batch_data: Vec<BatchCancelData>,
472 errors: Vec<BybitBatchOrderError>,
473 result: &mut Vec<NautilusWsMessage>,
474 ) {
475 let clock = get_atomic_clock_realtime();
476 let ts_init = clock.get_time_ns();
477
478 for (idx, (client_order_id, (_, trader_id, strategy_id, instrument_id, venue_order_id))) in
479 batch_data.into_iter().enumerate()
480 {
481 if let Some(error) = errors.get(idx)
482 && error.code != 0
483 {
484 log::warn!(
485 "Batch cancel rejected: client_order_id={client_order_id}, error_code={}, error_msg={}",
486 error.code,
487 error.msg
488 );
489
490 let rejected = OrderCancelRejected::new(
491 trader_id,
492 strategy_id,
493 instrument_id,
494 client_order_id,
495 Ustr::from(&error.msg),
496 UUID4::new(),
497 ts_init,
498 ts_init,
499 false,
500 venue_order_id,
501 self.account_id,
502 );
503 result.push(NautilusWsMessage::OrderCancelRejected(rejected));
504 }
505 }
506 }
507
508 pub(super) async fn next(&mut self) -> Option<NautilusWsMessage> {
509 let clock = get_atomic_clock_realtime();
510
511 loop {
512 if let Some(msg) = self.message_queue.pop_front() {
513 return Some(msg);
514 }
515
516 tokio::select! {
517 Some(cmd) = self.cmd_rx.recv() => {
518 match cmd {
519 HandlerCommand::SetClient(client) => {
520 log::debug!("WebSocketClient received by handler");
521 self.client = Some(client);
522 }
523 HandlerCommand::Disconnect => {
524 log::debug!("Disconnect command received");
525
526 if let Some(client) = self.client.take() {
527 client.disconnect().await;
528 }
529 }
530 HandlerCommand::Authenticate { payload } => {
531 log::debug!("Authenticate command received");
532 if let Err(e) = self.send_with_retry(payload).await {
533 log::error!("Failed to send authentication after retries: {e}");
534 }
535 }
536 HandlerCommand::Subscribe { topics } => {
537 for topic in topics {
538 log::debug!("Subscribing to topic: topic={topic}");
539 if let Err(e) = self.send_with_retry(topic.clone()).await {
540 log::error!("Failed to send subscription after retries: topic={topic}, error={e}");
541 }
542 }
543 }
544 HandlerCommand::Unsubscribe { topics } => {
545 for topic in topics {
546 log::debug!("Unsubscribing from topic: topic={topic}");
547 if let Err(e) = self.send_with_retry(topic.clone()).await {
548 log::error!("Failed to send unsubscription after retries: topic={topic}, error={e}");
549 }
550 }
551 }
552 HandlerCommand::SendText { payload } => {
553 if let Err(e) = self.send_with_retry(payload).await {
554 log::error!("Error sending text with retry: {e}");
555 }
556 }
557 HandlerCommand::InitializeInstruments(instruments) => {
558 for inst in instruments {
559 self.instruments_cache.insert(inst.symbol().inner(), inst);
560 }
561 }
562 HandlerCommand::UpdateInstrument(inst) => {
563 self.instruments_cache.insert(inst.symbol().inner(), inst);
564 }
565 HandlerCommand::RegisterBatchPlace { req_id, orders } => {
566 log::debug!(
567 "Registering batch place request: req_id={req_id}, num_orders={}",
568 orders.len()
569 );
570 self.pending_batch_place_requests.insert(req_id, orders);
571 }
572 HandlerCommand::RegisterBatchCancel { req_id, cancels } => {
573 log::debug!(
574 "Registering batch cancel request: req_id={req_id}, num_cancels={}",
575 cancels.len()
576 );
577 self.pending_batch_cancel_requests.insert(req_id, cancels);
578 }
579 HandlerCommand::PlaceOrder {
580 params,
581 client_order_id,
582 trader_id,
583 strategy_id,
584 instrument_id,
585 } => {
586 let request_id = self.generate_unique_request_id();
587
588 self.pending_place_requests.insert(
589 request_id.clone(),
590 (client_order_id, trader_id, strategy_id, instrument_id),
591 );
592
593 let referer = if self.include_referer_header(params.time_in_force) {
594 Some(BYBIT_NAUTILUS_BROKER_ID.to_string())
595 } else {
596 None
597 };
598
599 let request = BybitWsRequest {
600 req_id: Some(request_id.clone()),
601 op: BybitWsOrderRequestOp::Create,
602 header: BybitWsHeader::with_referer(referer),
603 args: vec![params],
604 };
605
606 if let Ok(payload) = serde_json::to_string(&request)
607 && let Err(e) = self.send_with_retry(payload).await
608 {
609 log::error!("Failed to send place order after retries: {e}");
610 self.pending_place_requests.remove(&request_id);
611 }
612 }
613 HandlerCommand::AmendOrder {
614 params,
615 client_order_id,
616 trader_id,
617 strategy_id,
618 instrument_id,
619 venue_order_id,
620 } => {
621 let request_id = self.generate_unique_request_id();
622
623 self.pending_amend_requests.insert(
624 request_id.clone(),
625 (client_order_id, trader_id, strategy_id, instrument_id, venue_order_id),
626 );
627
628 let request = BybitWsRequest {
629 req_id: Some(request_id.clone()),
630 op: BybitWsOrderRequestOp::Amend,
631 header: BybitWsHeader::now(),
632 args: vec![params],
633 };
634
635 if let Ok(payload) = serde_json::to_string(&request)
636 && let Err(e) = self.send_with_retry(payload).await
637 {
638 log::error!("Failed to send amend order after retries: {e}");
639 self.pending_amend_requests.remove(&request_id);
640 }
641 }
642 HandlerCommand::CancelOrder {
643 params,
644 client_order_id,
645 trader_id,
646 strategy_id,
647 instrument_id,
648 venue_order_id,
649 } => {
650 let request_id = self.generate_unique_request_id();
651
652 self.pending_cancel_requests.insert(
653 request_id.clone(),
654 (client_order_id, trader_id, strategy_id, instrument_id, venue_order_id),
655 );
656
657 let request = BybitWsRequest {
658 req_id: Some(request_id.clone()),
659 op: BybitWsOrderRequestOp::Cancel,
660 header: BybitWsHeader::now(),
661 args: vec![params],
662 };
663
664 if let Ok(payload) = serde_json::to_string(&request)
665 && let Err(e) = self.send_with_retry(payload).await
666 {
667 log::error!("Failed to send cancel order after retries: {e}");
668 self.pending_cancel_requests.remove(&request_id);
669 }
670 }
671 }
672
673 continue;
674 }
675
676 () = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
677 if self.signal.load(Ordering::Relaxed) {
678 log::debug!("Stop signal received during idle period");
679 return None;
680 }
681 continue;
682 }
683
684 msg = self.raw_rx.recv() => {
685 let msg = match msg {
686 Some(msg) => msg,
687 None => {
688 log::debug!("WebSocket stream closed");
689 return None;
690 }
691 };
692
693 if let Message::Ping(data) = &msg {
694 log::trace!("Received ping frame with {} bytes", data.len());
695
696 if let Some(client) = &self.client
697 && let Err(e) = client.send_pong(data.to_vec()).await
698 {
699 log::warn!("Failed to send pong frame: error={e}");
700 }
701 continue;
702 }
703
704 let event = match Self::parse_raw_message(msg) {
705 Some(event) => event,
706 None => continue,
707 };
708
709 if self.signal.load(Ordering::Relaxed) {
710 log::debug!("Stop signal received");
711 return None;
712 }
713
714 let ts_init = clock.get_time_ns();
715 let instruments = self.instruments_cache.clone();
716 let funding_cache = Arc::clone(&self.funding_cache);
717 let nautilus_messages = self.parse_to_nautilus_messages(
718 event,
719 &instruments,
720 self.account_id,
721 self.product_type,
722 &funding_cache,
723 ts_init,
724 )
725 .await;
726
727 self.message_queue.extend(nautilus_messages);
729 }
730 }
731 }
732 }
733
734 fn parse_raw_message(msg: Message) -> Option<BybitWsMessage> {
735 use serde_json::Value;
736
737 match msg {
738 Message::Text(text) => {
739 if text == nautilus_network::RECONNECTED {
740 log::info!("Received WebSocket reconnected signal");
741 return Some(BybitWsMessage::Reconnected);
742 }
743
744 if text.trim().eq_ignore_ascii_case("pong") {
745 return None;
746 }
747
748 log::trace!("Raw websocket message: {text}");
749
750 let value: Value = match serde_json::from_str(&text) {
751 Ok(v) => v,
752 Err(e) => {
753 log::error!("Failed to parse WebSocket message: {e}: {text}");
754 return None;
755 }
756 };
757
758 Some(classify_bybit_message(value))
759 }
760 Message::Binary(msg) => {
761 log::debug!("Raw binary: {msg:?}");
762 None
763 }
764 Message::Close(_) => {
765 log::debug!("Received close message, waiting for reconnection");
766 None
767 }
768 _ => None,
769 }
770 }
771
772 #[allow(clippy::too_many_arguments)]
773 async fn parse_to_nautilus_messages(
774 &mut self,
775 msg: BybitWsMessage,
776 instruments: &AHashMap<Ustr, InstrumentAny>,
777 account_id: Option<AccountId>,
778 product_type: Option<BybitProductType>,
779 funding_cache: &FundingCache,
780 ts_init: UnixNanos,
781 ) -> Vec<NautilusWsMessage> {
782 let mut result = Vec::new();
783
784 match msg {
785 BybitWsMessage::Orderbook(msg) => {
786 let raw_symbol = msg.data.s;
787 let symbol =
788 product_type.map_or(raw_symbol, |pt| make_bybit_symbol(raw_symbol, pt));
789
790 if let Some(instrument) = instruments.get(&symbol) {
791 match parse_orderbook_deltas(&msg, instrument, ts_init) {
792 Ok(deltas) => result.push(NautilusWsMessage::Deltas(deltas)),
793 Err(e) => log::error!("Error parsing orderbook deltas: {e}"),
794 }
795
796 if let Some(depth_str) = msg.topic.as_str().split('.').nth(1)
798 && depth_str == "1"
799 {
800 let instrument_id = instrument.id();
801 let last_quote = self.quote_cache.get(&instrument_id);
802
803 match parse_orderbook_quote(&msg, instrument, last_quote, ts_init) {
804 Ok(quote) => {
805 self.quote_cache.insert(instrument_id, quote);
806 result.push(NautilusWsMessage::Data(vec![Data::Quote(quote)]));
807 }
808 Err(e) => log::debug!("Skipping orderbook quote: {e}"),
809 }
810 }
811 } else {
812 log::debug!(
813 "No instrument found for symbol in Orderbook message: raw_symbol={raw_symbol}, full_symbol={symbol}"
814 );
815 }
816 }
817 BybitWsMessage::Trade(msg) => {
818 let mut data_vec = Vec::new();
819 for trade in &msg.data {
820 let raw_symbol = trade.s;
821 let symbol =
822 product_type.map_or(raw_symbol, |pt| make_bybit_symbol(raw_symbol, pt));
823
824 if let Some(instrument) = instruments.get(&symbol) {
825 match parse_ws_trade_tick(trade, instrument, ts_init) {
826 Ok(tick) => data_vec.push(Data::Trade(tick)),
827 Err(e) => log::error!("Error parsing trade tick: {e}"),
828 }
829 } else {
830 log::debug!(
831 "No instrument found for symbol in Trade message: raw_symbol={raw_symbol}, full_symbol={symbol}"
832 );
833 }
834 }
835
836 if !data_vec.is_empty() {
837 result.push(NautilusWsMessage::Data(data_vec));
838 }
839 }
840 BybitWsMessage::Kline(msg) => {
841 let bar_type = match self.bar_types_cache.get(msg.topic.as_str()) {
842 Some(bt) => *bt,
843 None => {
844 log::debug!("No bar type subscription found for topic: {}", msg.topic);
845 return result;
846 }
847 };
848
849 let symbol = Ustr::from(bar_type.instrument_id().symbol.as_str());
850 let instrument = match instruments.get(&symbol) {
851 Some(inst) => inst,
852 None => {
853 log::debug!(
854 "No instrument found for bar type: {}",
855 bar_type.instrument_id()
856 );
857 return result;
858 }
859 };
860
861 let mut data_vec = Vec::new();
862 for kline in &msg.data {
863 if !kline.confirm {
865 continue;
866 }
867 match parse_ws_kline_bar(
868 kline,
869 instrument,
870 bar_type,
871 self.bars_timestamp_on_close,
872 ts_init,
873 ) {
874 Ok(bar) => data_vec.push(Data::Bar(bar)),
875 Err(e) => log::error!("Error parsing kline to bar: {e}"),
876 }
877 }
878 if !data_vec.is_empty() {
879 result.push(NautilusWsMessage::Data(data_vec));
880 }
881 }
882 BybitWsMessage::TickerLinear(msg) => {
883 let raw_symbol = msg.data.symbol;
884 let symbol =
885 product_type.map_or(raw_symbol, |pt| make_bybit_symbol(raw_symbol, pt));
886
887 if let Some(instrument) = instruments.get(&symbol) {
888 let instrument_id = instrument.id();
889 let ts_event = parse_millis_i64(msg.ts, "ticker.ts").unwrap_or(ts_init);
890 let price_precision = instrument.price_precision();
891 let size_precision = instrument.size_precision();
892
893 let bid_price = msg
895 .data
896 .bid1_price
897 .as_deref()
898 .map(|s| parse_price_with_precision(s, price_precision, "bid1Price"))
899 .transpose();
900 let ask_price = msg
901 .data
902 .ask1_price
903 .as_deref()
904 .map(|s| parse_price_with_precision(s, price_precision, "ask1Price"))
905 .transpose();
906 let bid_size = msg
907 .data
908 .bid1_size
909 .as_deref()
910 .map(|s| parse_quantity_with_precision(s, size_precision, "bid1Size"))
911 .transpose();
912 let ask_size = msg
913 .data
914 .ask1_size
915 .as_deref()
916 .map(|s| parse_quantity_with_precision(s, size_precision, "ask1Size"))
917 .transpose();
918
919 match (bid_price, ask_price, bid_size, ask_size) {
920 (Ok(bp), Ok(ap), Ok(bs), Ok(as_)) => {
921 match self.quote_cache.process(
922 instrument_id,
923 bp,
924 ap,
925 bs,
926 as_,
927 ts_event,
928 ts_init,
929 ) {
930 Ok(quote) => {
931 result.push(NautilusWsMessage::Data(vec![Data::Quote(quote)]));
932 }
933 Err(e) => {
934 let raw_data = serde_json::to_string(&msg.data)
935 .unwrap_or_else(|_| "<failed to serialize>".to_string());
936 log::debug!(
937 "Skipping partial ticker update: {e}, raw_data: {raw_data}"
938 );
939 }
940 }
941 }
942 _ => {
943 let raw_data = serde_json::to_string(&msg.data)
944 .unwrap_or_else(|_| "<failed to serialize>".to_string());
945 log::warn!(
946 "Failed to parse ticker fields, skipping update, raw_data: {raw_data}"
947 );
948 }
949 }
950
951 if msg.data.funding_rate.is_some() && msg.data.next_funding_time.is_some() {
952 let should_publish = {
953 let cache = funding_cache.read().await;
954 match cache.get(&symbol) {
955 Some((cached_rate, cached_time)) => {
956 cached_rate.as_ref() != msg.data.funding_rate.as_ref()
957 || cached_time.as_ref()
958 != msg.data.next_funding_time.as_ref()
959 }
960 None => true,
961 }
962 };
963
964 if should_publish {
965 match parse_ticker_linear_funding(
966 &msg.data,
967 instrument_id,
968 ts_event,
969 ts_init,
970 ) {
971 Ok(funding) => {
972 let cache_key = (
973 msg.data.funding_rate.clone(),
974 msg.data.next_funding_time.clone(),
975 );
976 funding_cache.write().await.insert(symbol, cache_key);
977 result.push(NautilusWsMessage::FundingRates(vec![funding]));
978 }
979 Err(e) => {
980 log::debug!("Skipping funding rate update: {e}");
981 }
982 }
983 }
984 }
985
986 if msg.data.mark_price.is_some() {
987 match parse_ticker_linear_mark_price(
988 &msg.data, instrument, ts_event, ts_init,
989 ) {
990 Ok(mark_price) => {
991 result.push(NautilusWsMessage::MarkPrices(vec![mark_price]));
992 }
993 Err(e) => {
994 log::debug!("Skipping mark price update: {e}");
995 }
996 }
997 }
998
999 if msg.data.index_price.is_some() {
1000 match parse_ticker_linear_index_price(
1001 &msg.data, instrument, ts_event, ts_init,
1002 ) {
1003 Ok(index_price) => {
1004 result.push(NautilusWsMessage::IndexPrices(vec![index_price]));
1005 }
1006 Err(e) => {
1007 log::debug!("Skipping index price update: {e}");
1008 }
1009 }
1010 }
1011 } else {
1012 log::debug!(
1013 "No instrument found for symbol in TickerLinear message: raw_symbol={raw_symbol}, full_symbol={symbol}"
1014 );
1015 }
1016 }
1017 BybitWsMessage::TickerOption(msg) => {
1018 let raw_symbol = &msg.data.symbol;
1019 let symbol = product_type.map_or_else(
1020 || raw_symbol.as_str().into(),
1021 |pt| make_bybit_symbol(raw_symbol, pt),
1022 );
1023
1024 if let Some(instrument) = instruments.get(&symbol) {
1025 let instrument_id = instrument.id();
1026 let ts_event = parse_millis_i64(msg.ts, "ticker.ts").unwrap_or(ts_init);
1027 let price_precision = instrument.price_precision();
1028 let size_precision = instrument.size_precision();
1029
1030 let bid_price = parse_price_with_precision(
1032 &msg.data.bid_price,
1033 price_precision,
1034 "bidPrice",
1035 );
1036 let ask_price = parse_price_with_precision(
1037 &msg.data.ask_price,
1038 price_precision,
1039 "askPrice",
1040 );
1041 let bid_size = parse_quantity_with_precision(
1042 &msg.data.bid_size,
1043 size_precision,
1044 "bidSize",
1045 );
1046 let ask_size = parse_quantity_with_precision(
1047 &msg.data.ask_size,
1048 size_precision,
1049 "askSize",
1050 );
1051
1052 match (bid_price, ask_price, bid_size, ask_size) {
1053 (Ok(bp), Ok(ap), Ok(bs), Ok(as_)) => {
1054 match self.quote_cache.process(
1055 instrument_id,
1056 Some(bp),
1057 Some(ap),
1058 Some(bs),
1059 Some(as_),
1060 ts_event,
1061 ts_init,
1062 ) {
1063 Ok(quote) => {
1064 result.push(NautilusWsMessage::Data(vec![Data::Quote(quote)]));
1065 }
1066 Err(e) => {
1067 let raw_data = serde_json::to_string(&msg.data)
1068 .unwrap_or_else(|_| "<failed to serialize>".to_string());
1069 log::debug!(
1070 "Skipping partial ticker update: {e}, raw_data: {raw_data}"
1071 );
1072 }
1073 }
1074 }
1075 _ => {
1076 let raw_data = serde_json::to_string(&msg.data)
1077 .unwrap_or_else(|_| "<failed to serialize>".to_string());
1078 log::warn!(
1079 "Failed to parse ticker fields, skipping update, raw_data: {raw_data}"
1080 );
1081 }
1082 }
1083
1084 match parse_ticker_option_mark_price(&msg, instrument, ts_init) {
1085 Ok(mark_price) => {
1086 result.push(NautilusWsMessage::MarkPrices(vec![mark_price]));
1087 }
1088 Err(e) => {
1089 log::debug!("Skipping option mark price update: {e}");
1090 }
1091 }
1092
1093 match parse_ticker_option_index_price(&msg, instrument, ts_init) {
1094 Ok(index_price) => {
1095 result.push(NautilusWsMessage::IndexPrices(vec![index_price]));
1096 }
1097 Err(e) => {
1098 log::debug!("Skipping option index price update: {e}");
1099 }
1100 }
1101 } else {
1102 log::debug!(
1103 "No instrument found for symbol in TickerOption message: raw_symbol={raw_symbol}, full_symbol={symbol}"
1104 );
1105 }
1106 }
1107 BybitWsMessage::AccountOrder(msg) => {
1108 if let Some(account_id) = account_id {
1109 let mut reports = Vec::new();
1110 for order in &msg.data {
1111 let raw_symbol = order.symbol;
1112 let symbol = make_bybit_symbol(raw_symbol, order.category);
1113
1114 if let Some(instrument) = instruments.get(&symbol) {
1115 match parse_ws_order_status_report(
1116 order, instrument, account_id, ts_init,
1117 ) {
1118 Ok(report) => reports.push(report),
1119 Err(e) => log::error!("Error parsing order status report: {e}"),
1120 }
1121 } else {
1122 log::debug!(
1123 "No instrument found for symbol in AccountOrder message: raw_symbol={raw_symbol}, full_symbol={symbol}"
1124 );
1125 }
1126 }
1127 if !reports.is_empty() {
1128 result.push(NautilusWsMessage::OrderStatusReports(reports));
1129 }
1130 }
1131 }
1132 BybitWsMessage::AccountExecution(msg) => {
1133 if let Some(account_id) = account_id {
1134 let mut reports = Vec::new();
1135 for execution in &msg.data {
1136 let raw_symbol = execution.symbol;
1137 let symbol = make_bybit_symbol(raw_symbol, execution.category);
1138
1139 if let Some(instrument) = instruments.get(&symbol) {
1140 match parse_ws_fill_report(execution, account_id, instrument, ts_init) {
1141 Ok(report) => reports.push(report),
1142 Err(e) => log::error!("Error parsing fill report: {e}"),
1143 }
1144 } else {
1145 log::debug!(
1146 "No instrument found for symbol in AccountExecution message: raw_symbol={raw_symbol}, full_symbol={symbol}"
1147 );
1148 }
1149 }
1150 if !reports.is_empty() {
1151 result.push(NautilusWsMessage::FillReports(reports));
1152 }
1153 }
1154 }
1155 BybitWsMessage::AccountPosition(msg) => {
1156 if let Some(account_id) = account_id {
1157 for position in &msg.data {
1158 let raw_symbol = position.symbol;
1159 let symbol = make_bybit_symbol(raw_symbol, position.category);
1160
1161 if let Some(instrument) = instruments.get(&symbol) {
1162 match parse_ws_position_status_report(
1163 position, account_id, instrument, ts_init,
1164 ) {
1165 Ok(report) => {
1166 result.push(NautilusWsMessage::PositionStatusReport(report));
1167 }
1168 Err(e) => {
1169 log::error!("Error parsing position status report: {e}");
1170 }
1171 }
1172 } else {
1173 log::debug!(
1174 "No instrument found for symbol in AccountPosition message: raw_symbol={raw_symbol}, full_symbol={symbol}"
1175 );
1176 }
1177 }
1178 }
1179 }
1180 BybitWsMessage::AccountWallet(msg) => {
1181 if let Some(account_id) = account_id {
1182 for wallet in &msg.data {
1183 let ts_event = UnixNanos::from(msg.creation_time as u64 * 1_000_000);
1184
1185 match parse_ws_account_state(wallet, account_id, ts_event, ts_init) {
1186 Ok(state) => result.push(NautilusWsMessage::AccountState(state)),
1187 Err(e) => log::error!("Error parsing account state: {e}"),
1188 }
1189 }
1190 }
1191 }
1192 BybitWsMessage::OrderResponse(resp) => {
1193 if resp.ret_code == 0 {
1194 log::debug!(
1195 "Order operation successful: op={}, ret_msg={}",
1196 resp.op,
1197 resp.ret_msg
1198 );
1199
1200 if resp.op.contains("batch") {
1201 self.handle_batch_response(&resp, &mut result);
1202 } else if let Some(req_id) = &resp.req_id {
1203 if resp.op.contains("create") {
1204 self.pending_place_requests.remove(req_id);
1205 } else if resp.op.contains("cancel") {
1206 self.pending_cancel_requests.remove(req_id);
1207 } else if resp.op.contains("amend") {
1208 self.pending_amend_requests.remove(req_id);
1209 }
1210 } else if let Some(order_link_id) =
1211 resp.data.get("orderLinkId").and_then(|v| v.as_str())
1212 {
1213 let client_order_id = ClientOrderId::from(order_link_id);
1215 if resp.op.contains("create") {
1216 self.find_and_remove_place_request_by_client_order_id(&client_order_id);
1217 } else if resp.op.contains("cancel") {
1218 self.find_and_remove_cancel_request_by_client_order_id(
1219 &client_order_id,
1220 );
1221 } else if resp.op.contains("amend") {
1222 self.find_and_remove_amend_request_by_client_order_id(&client_order_id);
1223 }
1224 }
1225 } else if let Some(req_id) = &resp.req_id {
1226 let clock = get_atomic_clock_realtime();
1227 let ts_init = clock.get_time_ns();
1228
1229 if resp.op.contains("batch") {
1230 self.handle_batch_failure(
1231 req_id,
1232 &resp.ret_msg,
1233 &resp.op,
1234 ts_init,
1235 &mut result,
1236 );
1237 } else if resp.op.contains("create")
1238 && let Some((_, (client_order_id, trader_id, strategy_id, instrument_id))) =
1239 self.pending_place_requests.remove(req_id)
1240 {
1241 let Some(account_id) = self.account_id else {
1242 log::error!(
1243 "Cannot create OrderRejected event: account_id is None: request_id={req_id}, reason={}",
1244 resp.ret_msg
1245 );
1246 return result;
1247 };
1248
1249 let rejected = OrderRejected::new(
1250 trader_id,
1251 strategy_id,
1252 instrument_id,
1253 client_order_id,
1254 account_id,
1255 Ustr::from(&resp.ret_msg),
1256 UUID4::new(),
1257 ts_init,
1258 ts_init,
1259 false,
1260 false,
1261 );
1262 result.push(NautilusWsMessage::OrderRejected(rejected));
1263 } else if resp.op.contains("cancel")
1264 && let Some((
1265 _,
1266 (
1267 client_order_id,
1268 trader_id,
1269 strategy_id,
1270 instrument_id,
1271 venue_order_id,
1272 ),
1273 )) = self.pending_cancel_requests.remove(req_id)
1274 {
1275 let rejected = OrderCancelRejected::new(
1276 trader_id,
1277 strategy_id,
1278 instrument_id,
1279 client_order_id,
1280 Ustr::from(&resp.ret_msg),
1281 UUID4::new(),
1282 ts_init,
1283 ts_init,
1284 false,
1285 venue_order_id,
1286 self.account_id,
1287 );
1288 result.push(NautilusWsMessage::OrderCancelRejected(rejected));
1289 } else if resp.op.contains("amend")
1290 && let Some((
1291 _,
1292 (
1293 client_order_id,
1294 trader_id,
1295 strategy_id,
1296 instrument_id,
1297 venue_order_id,
1298 ),
1299 )) = self.pending_amend_requests.remove(req_id)
1300 {
1301 let rejected = OrderModifyRejected::new(
1302 trader_id,
1303 strategy_id,
1304 instrument_id,
1305 client_order_id,
1306 Ustr::from(&resp.ret_msg),
1307 UUID4::new(),
1308 ts_init,
1309 ts_init,
1310 false,
1311 venue_order_id,
1312 self.account_id,
1313 );
1314 result.push(NautilusWsMessage::OrderModifyRejected(rejected));
1315 }
1316 } else if let Some(order_link_id) =
1317 resp.data.get("orderLinkId").and_then(|v| v.as_str())
1318 {
1319 let clock = get_atomic_clock_realtime();
1321 let ts_init = clock.get_time_ns();
1322 let client_order_id = ClientOrderId::from(order_link_id);
1323
1324 if resp.op.contains("create") {
1325 if let Some((_, (_, trader_id, strategy_id, instrument_id))) =
1326 self.find_and_remove_place_request_by_client_order_id(&client_order_id)
1327 {
1328 let Some(account_id) = self.account_id else {
1329 log::error!(
1330 "Cannot create OrderRejected event: account_id is None: client_order_id={client_order_id}, reason={}",
1331 resp.ret_msg
1332 );
1333 return result;
1334 };
1335
1336 let rejected = OrderRejected::new(
1337 trader_id,
1338 strategy_id,
1339 instrument_id,
1340 client_order_id,
1341 account_id,
1342 Ustr::from(&resp.ret_msg),
1343 UUID4::new(),
1344 ts_init,
1345 ts_init,
1346 false,
1347 false,
1348 );
1349 result.push(NautilusWsMessage::OrderRejected(rejected));
1350 }
1351 } else if resp.op.contains("cancel") {
1352 if let Some((
1353 _,
1354 (_, trader_id, strategy_id, instrument_id, venue_order_id),
1355 )) =
1356 self.find_and_remove_cancel_request_by_client_order_id(&client_order_id)
1357 {
1358 let rejected = OrderCancelRejected::new(
1359 trader_id,
1360 strategy_id,
1361 instrument_id,
1362 client_order_id,
1363 Ustr::from(&resp.ret_msg),
1364 UUID4::new(),
1365 ts_init,
1366 ts_init,
1367 false,
1368 venue_order_id,
1369 self.account_id,
1370 );
1371 result.push(NautilusWsMessage::OrderCancelRejected(rejected));
1372 }
1373 } else if resp.op.contains("amend")
1374 && let Some((_, (_, trader_id, strategy_id, instrument_id, venue_order_id))) =
1375 self.find_and_remove_amend_request_by_client_order_id(&client_order_id)
1376 {
1377 let rejected = OrderModifyRejected::new(
1378 trader_id,
1379 strategy_id,
1380 instrument_id,
1381 client_order_id,
1382 Ustr::from(&resp.ret_msg),
1383 UUID4::new(),
1384 ts_init,
1385 ts_init,
1386 false,
1387 venue_order_id,
1388 self.account_id,
1389 );
1390 result.push(NautilusWsMessage::OrderModifyRejected(rejected));
1391 }
1392 } else {
1393 log::warn!(
1394 "Order operation failed but request_id could not be extracted from response: op={}, ret_code={}, ret_msg={}",
1395 resp.op,
1396 resp.ret_code,
1397 resp.ret_msg
1398 );
1399 }
1400 }
1401 BybitWsMessage::Auth(auth_response) => {
1402 let is_success =
1403 auth_response.success.unwrap_or(false) || (auth_response.ret_code == Some(0));
1404
1405 if is_success {
1406 self.auth_tracker.succeed();
1407 log::info!("WebSocket authenticated");
1408 result.push(NautilusWsMessage::Authenticated);
1409 } else {
1410 let error_msg = auth_response
1411 .ret_msg
1412 .as_deref()
1413 .unwrap_or("Authentication rejected");
1414 self.auth_tracker.fail(error_msg);
1415 log::error!("WebSocket authentication failed: error={error_msg}");
1416 result.push(NautilusWsMessage::Error(BybitWebSocketError::from_message(
1417 error_msg.to_string(),
1418 )));
1419 }
1420 }
1421 BybitWsMessage::Error(err) => {
1422 result.push(NautilusWsMessage::Error(err));
1423 }
1424 BybitWsMessage::Reconnected => {
1425 self.quote_cache.clear();
1426 result.push(NautilusWsMessage::Reconnected);
1427 }
1428 BybitWsMessage::Subscription(sub_msg) => {
1429 let pending_topics = self.subscriptions.pending_subscribe_topics();
1430 match sub_msg.op {
1431 BybitWsOperation::Subscribe => {
1432 if sub_msg.success {
1433 for topic in pending_topics {
1434 self.subscriptions.confirm_subscribe(&topic);
1435 log::debug!("Subscription confirmed: topic={topic}");
1436 }
1437 } else {
1438 for topic in pending_topics {
1439 self.subscriptions.mark_failure(&topic);
1440 log::warn!(
1441 "Subscription failed, will retry on reconnect: topic={topic}, error={:?}",
1442 sub_msg.ret_msg
1443 );
1444 }
1445 }
1446 }
1447 BybitWsOperation::Unsubscribe => {
1448 let pending_unsub = self.subscriptions.pending_unsubscribe_topics();
1449 if sub_msg.success {
1450 for topic in pending_unsub {
1451 self.subscriptions.confirm_unsubscribe(&topic);
1452 log::debug!("Unsubscription confirmed: topic={topic}");
1453 }
1454 } else {
1455 for topic in pending_unsub {
1456 log::warn!(
1457 "Unsubscription failed: topic={topic}, error={:?}",
1458 sub_msg.ret_msg
1459 );
1460 }
1461 }
1462 }
1463 _ => {}
1464 }
1465 }
1466 _ => {}
1467 }
1468
1469 result
1470 }
1471}
1472
1473pub fn classify_bybit_message(value: serde_json::Value) -> BybitWsMessage {
1477 if let Some(op_val) = value.get("op") {
1478 if let Ok(op) = serde_json::from_value::<BybitWsOperation>(op_val.clone())
1479 && op == BybitWsOperation::Auth
1480 && let Ok(auth) = serde_json::from_value::<BybitWsAuthResponse>(value.clone())
1481 {
1482 let is_success = auth.success.unwrap_or(false) || auth.ret_code.unwrap_or(-1) == 0;
1483 if is_success {
1484 return BybitWsMessage::Auth(auth);
1485 }
1486 let resp = BybitWsResponse {
1487 op: Some(auth.op.clone()),
1488 topic: None,
1489 success: auth.success,
1490 conn_id: auth.conn_id.clone(),
1491 req_id: None,
1492 ret_code: auth.ret_code,
1493 ret_msg: auth.ret_msg,
1494 };
1495 let error = BybitWebSocketError::from_response(&resp);
1496 return BybitWsMessage::Error(error);
1497 }
1498
1499 if let Some(op_str) = op_val.as_str()
1500 && op_str.starts_with("order.")
1501 {
1502 return serde_json::from_value::<BybitWsOrderResponse>(value.clone()).map_or_else(
1503 |_| BybitWsMessage::Raw(value),
1504 BybitWsMessage::OrderResponse,
1505 );
1506 }
1507 }
1508
1509 if let Some(success) = value.get("success").and_then(serde_json::Value::as_bool) {
1510 if success {
1511 return serde_json::from_value::<BybitWsSubscriptionMsg>(value.clone())
1512 .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::Subscription);
1513 }
1514 return serde_json::from_value::<BybitWsResponse>(value.clone()).map_or_else(
1515 |_| BybitWsMessage::Raw(value),
1516 |resp| {
1517 let error = BybitWebSocketError::from_response(&resp);
1518 BybitWsMessage::Error(error)
1519 },
1520 );
1521 }
1522
1523 if let Some(topic) = value.get("topic").and_then(serde_json::Value::as_str) {
1525 if topic.starts_with(BYBIT_TOPIC_ORDERBOOK) {
1526 return serde_json::from_value(value.clone())
1527 .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::Orderbook);
1528 }
1529
1530 if topic.contains(BYBIT_TOPIC_PUBLIC_TRADE) || topic.starts_with(BYBIT_TOPIC_TRADE) {
1531 return serde_json::from_value(value.clone())
1532 .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::Trade);
1533 }
1534
1535 if topic.starts_with(BYBIT_TOPIC_KLINE) {
1536 return serde_json::from_value(value.clone())
1537 .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::Kline);
1538 }
1539
1540 if topic.starts_with(BYBIT_TOPIC_TICKERS) {
1541 let is_option = value
1543 .get("data")
1544 .and_then(|d| d.get("symbol"))
1545 .and_then(|s| s.as_str())
1546 .is_some_and(|symbol| symbol.contains('-') && symbol.matches('-').count() >= 3);
1547
1548 if is_option {
1549 return serde_json::from_value(value.clone())
1550 .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::TickerOption);
1551 }
1552 return serde_json::from_value(value.clone())
1553 .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::TickerLinear);
1554 }
1555
1556 if topic.starts_with(BYBIT_TOPIC_ORDER) {
1557 return serde_json::from_value(value.clone())
1558 .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::AccountOrder);
1559 }
1560
1561 if topic.starts_with(BYBIT_TOPIC_EXECUTION) {
1562 return serde_json::from_value(value.clone()).map_or_else(
1563 |_| BybitWsMessage::Raw(value),
1564 BybitWsMessage::AccountExecution,
1565 );
1566 }
1567
1568 if topic.starts_with(BYBIT_TOPIC_WALLET) {
1569 return serde_json::from_value(value.clone()).map_or_else(
1570 |_| BybitWsMessage::Raw(value),
1571 BybitWsMessage::AccountWallet,
1572 );
1573 }
1574
1575 if topic.starts_with(BYBIT_TOPIC_POSITION) {
1576 return serde_json::from_value(value.clone()).map_or_else(
1577 |_| BybitWsMessage::Raw(value),
1578 BybitWsMessage::AccountPosition,
1579 );
1580 }
1581 }
1582
1583 BybitWsMessage::Raw(value)
1584}
1585
1586#[cfg(test)]
1587mod tests {
1588 use rstest::rstest;
1589
1590 use super::*;
1591 use crate::common::consts::BYBIT_WS_TOPIC_DELIMITER;
1592
1593 fn create_test_handler() -> FeedHandler {
1594 let signal = Arc::new(AtomicBool::new(false));
1595 let (_cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
1596 let (_raw_tx, raw_rx) = tokio::sync::mpsc::unbounded_channel();
1597 let (out_tx, _out_rx) = tokio::sync::mpsc::unbounded_channel();
1598 let auth_tracker = AuthTracker::new();
1599 let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER);
1600 let funding_cache = Arc::new(tokio::sync::RwLock::new(AHashMap::new()));
1601 let bar_types_cache = Arc::new(DashMap::new());
1602
1603 FeedHandler::new(
1604 signal,
1605 cmd_rx,
1606 raw_rx,
1607 out_tx,
1608 None,
1609 None,
1610 true,
1611 Arc::new(AtomicU8::new(0)),
1612 auth_tracker,
1613 subscriptions,
1614 funding_cache,
1615 bar_types_cache,
1616 )
1617 }
1618
1619 #[rstest]
1620 fn test_generate_unique_request_id_returns_different_ids() {
1621 let handler = create_test_handler();
1622
1623 let id1 = handler.generate_unique_request_id();
1624 let id2 = handler.generate_unique_request_id();
1625 let id3 = handler.generate_unique_request_id();
1626
1627 assert_ne!(id1, id2);
1628 assert_ne!(id2, id3);
1629 assert_ne!(id1, id3);
1630 }
1631
1632 #[rstest]
1633 fn test_generate_unique_request_id_produces_valid_uuids() {
1634 let handler = create_test_handler();
1635
1636 let id1 = handler.generate_unique_request_id();
1637 let id2 = handler.generate_unique_request_id();
1638
1639 assert!(UUID4::from(id1.as_str()).to_string() == id1);
1640 assert!(UUID4::from(id2.as_str()).to_string() == id2);
1641 }
1642
1643 #[rstest]
1644 fn test_multiple_place_orders_use_different_request_ids() {
1645 let handler = create_test_handler();
1646
1647 let req_id_1 = handler.generate_unique_request_id();
1648 let req_id_2 = handler.generate_unique_request_id();
1649 let req_id_3 = handler.generate_unique_request_id();
1650
1651 assert_ne!(req_id_1, req_id_2);
1652 assert_ne!(req_id_2, req_id_3);
1653 assert_ne!(req_id_1, req_id_3);
1654 }
1655
1656 #[rstest]
1657 fn test_multiple_amends_use_different_request_ids() {
1658 let handler = create_test_handler();
1659
1660 let req_id_1 = handler.generate_unique_request_id();
1662 let req_id_2 = handler.generate_unique_request_id();
1663 let req_id_3 = handler.generate_unique_request_id();
1664
1665 assert_ne!(
1666 req_id_1, req_id_2,
1667 "Multiple amends should generate different request IDs to avoid 'Duplicate reqId' errors"
1668 );
1669 assert_ne!(
1670 req_id_2, req_id_3,
1671 "Multiple amends should generate different request IDs to avoid 'Duplicate reqId' errors"
1672 );
1673 }
1674
1675 #[rstest]
1676 fn test_multiple_cancels_use_different_request_ids() {
1677 let handler = create_test_handler();
1678
1679 let req_id_1 = handler.generate_unique_request_id();
1680 let req_id_2 = handler.generate_unique_request_id();
1681
1682 assert_ne!(
1683 req_id_1, req_id_2,
1684 "Multiple cancels should generate different request IDs"
1685 );
1686 }
1687
1688 #[rstest]
1689 fn test_concurrent_request_id_generation() {
1690 let handler = create_test_handler();
1691
1692 let mut ids = std::collections::HashSet::new();
1693 for _ in 0..100 {
1694 let id = handler.generate_unique_request_id();
1695 assert!(
1696 ids.insert(id.clone()),
1697 "Generated duplicate request ID: {id}"
1698 );
1699 }
1700 assert_eq!(ids.len(), 100);
1701 }
1702}