1use std::{
19 collections::VecDeque,
20 num::NonZero,
21 sync::{
22 Arc,
23 atomic::{AtomicBool, AtomicU8, Ordering},
24 },
25};
26
27use ahash::AHashMap;
28use dashmap::DashMap;
29use nautilus_common::cache::quote::QuoteCache;
30use nautilus_core::{UUID4, nanos::UnixNanos, time::get_atomic_clock_realtime};
31use nautilus_model::{
32 data::{BarSpecification, BarType, Data},
33 enums::{AggregationSource, BarAggregation, PriceType},
34 events::{OrderCancelRejected, OrderModifyRejected, OrderRejected},
35 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
36 instruments::{Instrument, InstrumentAny},
37};
38use nautilus_network::{
39 retry::{RetryManager, create_websocket_retry_manager},
40 websocket::{AuthTracker, SubscriptionState, WebSocketClient},
41};
42use tokio_tungstenite::tungstenite::Message;
43use ustr::Ustr;
44
45use super::{
46 enums::BybitWsOperation,
47 error::{BybitWsError, create_bybit_timeout_error, should_retry_bybit_error},
48 messages::{
49 BybitWebSocketError, BybitWsAuthResponse, BybitWsHeader, BybitWsMessage, BybitWsRequest,
50 BybitWsResponse, BybitWsSubscriptionMsg, NautilusWsMessage,
51 },
52 parse::{
53 parse_kline_topic, parse_millis_i64, parse_orderbook_deltas, parse_orderbook_quote,
54 parse_ticker_linear_funding, parse_ws_account_state, parse_ws_fill_report,
55 parse_ws_kline_bar, parse_ws_order_status_report, parse_ws_position_status_report,
56 parse_ws_trade_tick,
57 },
58};
59use crate::{
60 common::{
61 consts::{
62 BYBIT_NAUTILUS_BROKER_ID, BYBIT_TOPIC_EXECUTION, BYBIT_TOPIC_KLINE, BYBIT_TOPIC_ORDER,
63 BYBIT_TOPIC_ORDERBOOK, BYBIT_TOPIC_POSITION, BYBIT_TOPIC_PUBLIC_TRADE,
64 BYBIT_TOPIC_TICKERS, BYBIT_TOPIC_TRADE, BYBIT_TOPIC_WALLET,
65 },
66 enums::{BybitProductType, BybitTimeInForce, BybitWsOrderRequestOp},
67 parse::{make_bybit_symbol, parse_price_with_precision, parse_quantity_with_precision},
68 },
69 websocket::messages::{
70 BybitBatchOrderError, BybitWsAmendOrderParams, BybitWsCancelOrderParams,
71 BybitWsOrderResponse, BybitWsPlaceOrderParams,
72 },
73};
74
75#[derive(Debug)]
77#[allow(
78 clippy::large_enum_variant,
79 reason = "Commands are ephemeral and immediately consumed"
80)]
81pub enum HandlerCommand {
82 SetClient(WebSocketClient),
83 Disconnect,
84 Authenticate {
85 payload: String,
86 },
87 Subscribe {
88 topics: Vec<String>,
89 },
90 Unsubscribe {
91 topics: Vec<String>,
92 },
93 SendText {
94 payload: String,
95 },
96 PlaceOrder {
97 params: BybitWsPlaceOrderParams,
98 client_order_id: ClientOrderId,
99 trader_id: TraderId,
100 strategy_id: StrategyId,
101 instrument_id: InstrumentId,
102 },
103 AmendOrder {
104 params: BybitWsAmendOrderParams,
105 client_order_id: ClientOrderId,
106 trader_id: TraderId,
107 strategy_id: StrategyId,
108 instrument_id: InstrumentId,
109 venue_order_id: Option<VenueOrderId>,
110 },
111 CancelOrder {
112 params: BybitWsCancelOrderParams,
113 client_order_id: ClientOrderId,
114 trader_id: TraderId,
115 strategy_id: StrategyId,
116 instrument_id: InstrumentId,
117 venue_order_id: Option<VenueOrderId>,
118 },
119 RegisterBatchPlace {
120 req_id: String,
121 orders: Vec<BatchOrderData>,
122 },
123 RegisterBatchCancel {
124 req_id: String,
125 cancels: Vec<BatchCancelData>,
126 },
127 InitializeInstruments(Vec<InstrumentAny>),
128 UpdateInstrument(InstrumentAny),
129}
130
131type FundingCache = Arc<tokio::sync::RwLock<AHashMap<Ustr, (Option<String>, Option<String>)>>>;
133
134type PlaceRequestData = (ClientOrderId, TraderId, StrategyId, InstrumentId);
136
137type CancelRequestData = (
139 ClientOrderId,
140 TraderId,
141 StrategyId,
142 InstrumentId,
143 Option<VenueOrderId>,
144);
145
146type AmendRequestData = (
148 ClientOrderId,
149 TraderId,
150 StrategyId,
151 InstrumentId,
152 Option<VenueOrderId>,
153);
154
155type BatchOrderData = (ClientOrderId, PlaceRequestData);
157
158type BatchCancelData = (ClientOrderId, CancelRequestData);
160
161pub(super) struct FeedHandler {
162 signal: Arc<AtomicBool>,
163 client: Option<WebSocketClient>,
164 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
165 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
166 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
167 auth_tracker: AuthTracker,
168 subscriptions: SubscriptionState,
169 instruments_cache: AHashMap<Ustr, InstrumentAny>,
170 account_id: Option<AccountId>,
171 mm_level: Arc<AtomicU8>,
172 product_type: Option<BybitProductType>,
173 bars_timestamp_on_close: bool,
174 quote_cache: QuoteCache,
175 funding_cache: FundingCache,
176 retry_manager: RetryManager<BybitWsError>,
177 pending_place_requests: DashMap<String, PlaceRequestData>,
178 pending_cancel_requests: DashMap<String, CancelRequestData>,
179 pending_amend_requests: DashMap<String, AmendRequestData>,
180 pending_batch_place_requests: DashMap<String, Vec<BatchOrderData>>,
181 pending_batch_cancel_requests: DashMap<String, Vec<BatchCancelData>>,
182 message_queue: VecDeque<NautilusWsMessage>,
183}
184
185impl FeedHandler {
186 #[allow(clippy::too_many_arguments)]
188 pub(super) fn new(
189 signal: Arc<AtomicBool>,
190 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
191 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
192 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
193 account_id: Option<AccountId>,
194 product_type: Option<BybitProductType>,
195 bars_timestamp_on_close: bool,
196 mm_level: Arc<AtomicU8>,
197 auth_tracker: AuthTracker,
198 subscriptions: SubscriptionState,
199 funding_cache: FundingCache,
200 ) -> Self {
201 Self {
202 signal,
203 client: None,
204 cmd_rx,
205 raw_rx,
206 out_tx,
207 auth_tracker,
208 subscriptions,
209 instruments_cache: AHashMap::new(),
210 account_id,
211 mm_level,
212 product_type,
213 bars_timestamp_on_close,
214 quote_cache: QuoteCache::new(),
215 funding_cache,
216 retry_manager: create_websocket_retry_manager(),
217 pending_place_requests: DashMap::new(),
218 pending_cancel_requests: DashMap::new(),
219 pending_amend_requests: DashMap::new(),
220 pending_batch_place_requests: DashMap::new(),
221 pending_batch_cancel_requests: DashMap::new(),
222 message_queue: VecDeque::new(),
223 }
224 }
225
226 pub(super) fn is_stopped(&self) -> bool {
227 self.signal.load(Ordering::Relaxed)
228 }
229
230 #[allow(dead_code)]
231 pub(super) fn send(&self, msg: NautilusWsMessage) -> Result<(), ()> {
232 self.out_tx.send(msg).map_err(|_| ())
233 }
234
235 fn generate_unique_request_id(&self) -> String {
236 UUID4::new().to_string()
237 }
238
239 fn find_and_remove_place_request_by_client_order_id(
240 &self,
241 client_order_id: &ClientOrderId,
242 ) -> Option<(String, PlaceRequestData)> {
243 self.pending_place_requests
244 .iter()
245 .find(|entry| entry.value().0 == *client_order_id)
246 .and_then(|entry| {
247 let key = entry.key().clone();
248 drop(entry);
249 self.pending_place_requests.remove(&key)
250 })
251 }
252
253 fn find_and_remove_cancel_request_by_client_order_id(
254 &self,
255 client_order_id: &ClientOrderId,
256 ) -> Option<(String, CancelRequestData)> {
257 self.pending_cancel_requests
258 .iter()
259 .find(|entry| entry.value().0 == *client_order_id)
260 .and_then(|entry| {
261 let key = entry.key().clone();
262 drop(entry);
263 self.pending_cancel_requests.remove(&key)
264 })
265 }
266
267 fn find_and_remove_amend_request_by_client_order_id(
268 &self,
269 client_order_id: &ClientOrderId,
270 ) -> Option<(String, AmendRequestData)> {
271 self.pending_amend_requests
272 .iter()
273 .find(|entry| entry.value().0 == *client_order_id)
274 .and_then(|entry| {
275 let key = entry.key().clone();
276 drop(entry);
277 self.pending_amend_requests.remove(&key)
278 })
279 }
280
281 fn include_referer_header(&self, time_in_force: Option<BybitTimeInForce>) -> bool {
282 let is_post_only = matches!(time_in_force, Some(BybitTimeInForce::PostOnly));
283 let mm_level = self.mm_level.load(Ordering::Relaxed);
284 !(is_post_only && mm_level > 0)
285 }
286
287 async fn send_with_retry(&self, payload: String) -> Result<(), BybitWsError> {
289 if let Some(client) = &self.client {
290 self.retry_manager
291 .execute_with_retry(
292 "websocket_send",
293 || {
294 let payload = payload.clone();
295 async move {
296 client
297 .send_text(payload, None)
298 .await
299 .map_err(|e| BybitWsError::Transport(format!("Send failed: {e}")))
300 }
301 },
302 should_retry_bybit_error,
303 create_bybit_timeout_error,
304 )
305 .await
306 } else {
307 Err(BybitWsError::ClientError(
308 "No active WebSocket client".to_string(),
309 ))
310 }
311 }
312
313 fn handle_batch_failure(
318 &self,
319 req_id: &str,
320 ret_msg: &str,
321 op: &str,
322 ts_init: UnixNanos,
323 result: &mut Vec<NautilusWsMessage>,
324 ) {
325 if op.contains("create") {
326 if let Some((_, batch_data)) = self.pending_batch_place_requests.remove(req_id) {
327 tracing::warn!(
328 req_id = %req_id,
329 ret_msg = %ret_msg,
330 num_orders = batch_data.len(),
331 "Batch place request failed"
332 );
333
334 let Some(account_id) = self.account_id else {
335 tracing::error!("Cannot create OrderRejected events: account_id is None");
336 return;
337 };
338
339 let reason = Ustr::from(ret_msg);
340 for (client_order_id, (_, trader_id, strategy_id, instrument_id)) in batch_data {
341 let rejected = OrderRejected::new(
342 trader_id,
343 strategy_id,
344 instrument_id,
345 client_order_id,
346 account_id,
347 reason,
348 UUID4::new(),
349 ts_init,
350 ts_init,
351 false,
352 false,
353 );
354 result.push(NautilusWsMessage::OrderRejected(rejected));
355 }
356 }
357 } else if op.contains("cancel")
358 && let Some((_, batch_data)) = self.pending_batch_cancel_requests.remove(req_id)
359 {
360 tracing::warn!(
361 req_id = %req_id,
362 ret_msg = %ret_msg,
363 num_cancels = batch_data.len(),
364 "Batch cancel request failed"
365 );
366
367 let reason = Ustr::from(ret_msg);
368 for (client_order_id, (_, trader_id, strategy_id, instrument_id, venue_order_id)) in
369 batch_data
370 {
371 let rejected = OrderCancelRejected::new(
372 trader_id,
373 strategy_id,
374 instrument_id,
375 client_order_id,
376 reason,
377 UUID4::new(),
378 ts_init,
379 ts_init,
380 false,
381 venue_order_id,
382 self.account_id,
383 );
384 result.push(NautilusWsMessage::OrderCancelRejected(rejected));
385 }
386 }
387 }
388
389 fn handle_batch_response(
391 &self,
392 resp: &BybitWsOrderResponse,
393 result: &mut Vec<NautilusWsMessage>,
394 ) {
395 let Some(req_id) = &resp.req_id else {
396 tracing::warn!(
397 op = %resp.op,
398 "Batch response missing req_id - cannot correlate with pending requests"
399 );
400 return;
401 };
402
403 let batch_errors = resp.extract_batch_errors();
404
405 if resp.op.contains("create") {
406 if let Some((_, batch_data)) = self.pending_batch_place_requests.remove(req_id) {
407 self.process_batch_place_errors(batch_data, batch_errors, result);
408 } else {
409 tracing::debug!(
410 req_id = %req_id,
411 "Batch place response received but no pending request found"
412 );
413 }
414 } else if resp.op.contains("cancel") {
415 if let Some((_, batch_data)) = self.pending_batch_cancel_requests.remove(req_id) {
416 self.process_batch_cancel_errors(batch_data, batch_errors, result);
417 } else {
418 tracing::debug!(
419 req_id = %req_id,
420 "Batch cancel response received but no pending request found"
421 );
422 }
423 }
424 }
425
426 fn process_batch_place_errors(
428 &self,
429 batch_data: Vec<BatchOrderData>,
430 errors: Vec<BybitBatchOrderError>,
431 result: &mut Vec<NautilusWsMessage>,
432 ) {
433 let Some(account_id) = self.account_id else {
434 tracing::error!("Cannot create OrderRejected events: account_id is None");
435 return;
436 };
437
438 let clock = get_atomic_clock_realtime();
439 let ts_init = clock.get_time_ns();
440
441 for (idx, (client_order_id, (_, trader_id, strategy_id, instrument_id))) in
442 batch_data.into_iter().enumerate()
443 {
444 if let Some(error) = errors.get(idx)
445 && error.code != 0
446 {
447 tracing::warn!(
448 client_order_id = %client_order_id,
449 error_code = error.code,
450 error_msg = %error.msg,
451 "Batch order rejected"
452 );
453
454 let rejected = OrderRejected::new(
455 trader_id,
456 strategy_id,
457 instrument_id,
458 client_order_id,
459 account_id,
460 Ustr::from(&error.msg),
461 UUID4::new(),
462 ts_init,
463 ts_init,
464 false,
465 false,
466 );
467 result.push(NautilusWsMessage::OrderRejected(rejected));
468 }
469 }
470 }
471
472 fn process_batch_cancel_errors(
474 &self,
475 batch_data: Vec<BatchCancelData>,
476 errors: Vec<BybitBatchOrderError>,
477 result: &mut Vec<NautilusWsMessage>,
478 ) {
479 let clock = get_atomic_clock_realtime();
480 let ts_init = clock.get_time_ns();
481
482 for (idx, (client_order_id, (_, trader_id, strategy_id, instrument_id, venue_order_id))) in
483 batch_data.into_iter().enumerate()
484 {
485 if let Some(error) = errors.get(idx)
486 && error.code != 0
487 {
488 tracing::warn!(
489 client_order_id = %client_order_id,
490 error_code = error.code,
491 error_msg = %error.msg,
492 "Batch cancel rejected"
493 );
494
495 let rejected = OrderCancelRejected::new(
496 trader_id,
497 strategy_id,
498 instrument_id,
499 client_order_id,
500 Ustr::from(&error.msg),
501 UUID4::new(),
502 ts_init,
503 ts_init,
504 false,
505 venue_order_id,
506 self.account_id,
507 );
508 result.push(NautilusWsMessage::OrderCancelRejected(rejected));
509 }
510 }
511 }
512
513 pub(super) async fn next(&mut self) -> Option<NautilusWsMessage> {
514 let clock = get_atomic_clock_realtime();
515
516 loop {
517 if let Some(msg) = self.message_queue.pop_front() {
518 return Some(msg);
519 }
520
521 tokio::select! {
522 Some(cmd) = self.cmd_rx.recv() => {
523 match cmd {
524 HandlerCommand::SetClient(client) => {
525 tracing::debug!("WebSocketClient received by handler");
526 self.client = Some(client);
527 }
528 HandlerCommand::Disconnect => {
529 tracing::debug!("Disconnect command received");
530
531 if let Some(client) = self.client.take() {
532 client.disconnect().await;
533 }
534 }
535 HandlerCommand::Authenticate { payload } => {
536 tracing::debug!("Authenticate command received");
537 if let Err(e) = self.send_with_retry(payload).await {
538 tracing::error!("Failed to send authentication after retries: {e}");
539 }
540 }
541 HandlerCommand::Subscribe { topics } => {
542 for topic in topics {
543 tracing::debug!(topic = %topic, "Subscribing to topic");
544 if let Err(e) = self.send_with_retry(topic.clone()).await {
545 tracing::error!(topic = %topic, error = %e, "Failed to send subscription after retries");
546 }
547 }
548 }
549 HandlerCommand::Unsubscribe { topics } => {
550 for topic in topics {
551 tracing::debug!(topic = %topic, "Unsubscribing from topic");
552 if let Err(e) = self.send_with_retry(topic.clone()).await {
553 tracing::error!(topic = %topic, error = %e, "Failed to send unsubscription after retries");
554 }
555 }
556 }
557 HandlerCommand::SendText { payload } => {
558 if let Err(e) = self.send_with_retry(payload).await {
559 tracing::error!("Error sending text with retry: {e}");
560 }
561 }
562 HandlerCommand::InitializeInstruments(instruments) => {
563 for inst in instruments {
564 self.instruments_cache.insert(inst.symbol().inner(), inst);
565 }
566 }
567 HandlerCommand::UpdateInstrument(inst) => {
568 self.instruments_cache.insert(inst.symbol().inner(), inst);
569 }
570 HandlerCommand::RegisterBatchPlace { req_id, orders } => {
571 tracing::debug!(
572 req_id = %req_id,
573 num_orders = orders.len(),
574 "Registering batch place request"
575 );
576 self.pending_batch_place_requests.insert(req_id, orders);
577 }
578 HandlerCommand::RegisterBatchCancel { req_id, cancels } => {
579 tracing::debug!(
580 req_id = %req_id,
581 num_cancels = cancels.len(),
582 "Registering batch cancel request"
583 );
584 self.pending_batch_cancel_requests.insert(req_id, cancels);
585 }
586 HandlerCommand::PlaceOrder {
587 params,
588 client_order_id,
589 trader_id,
590 strategy_id,
591 instrument_id,
592 } => {
593 let request_id = self.generate_unique_request_id();
594
595 self.pending_place_requests.insert(
596 request_id.clone(),
597 (client_order_id, trader_id, strategy_id, instrument_id),
598 );
599
600 let referer = if self.include_referer_header(params.time_in_force) {
601 Some(BYBIT_NAUTILUS_BROKER_ID.to_string())
602 } else {
603 None
604 };
605
606 let request = BybitWsRequest {
607 req_id: Some(request_id.clone()),
608 op: BybitWsOrderRequestOp::Create,
609 header: BybitWsHeader::with_referer(referer),
610 args: vec![params],
611 };
612
613 if let Ok(payload) = serde_json::to_string(&request)
614 && let Err(e) = self.send_with_retry(payload).await
615 {
616 tracing::error!("Failed to send place order after retries: {e}");
617 self.pending_place_requests.remove(&request_id);
618 }
619 }
620 HandlerCommand::AmendOrder {
621 params,
622 client_order_id,
623 trader_id,
624 strategy_id,
625 instrument_id,
626 venue_order_id,
627 } => {
628 let request_id = self.generate_unique_request_id();
629
630 self.pending_amend_requests.insert(
631 request_id.clone(),
632 (client_order_id, trader_id, strategy_id, instrument_id, venue_order_id),
633 );
634
635 let request = BybitWsRequest {
636 req_id: Some(request_id.clone()),
637 op: BybitWsOrderRequestOp::Amend,
638 header: BybitWsHeader::now(),
639 args: vec![params],
640 };
641
642 if let Ok(payload) = serde_json::to_string(&request)
643 && let Err(e) = self.send_with_retry(payload).await
644 {
645 tracing::error!("Failed to send amend order after retries: {e}");
646 self.pending_amend_requests.remove(&request_id);
647 }
648 }
649 HandlerCommand::CancelOrder {
650 params,
651 client_order_id,
652 trader_id,
653 strategy_id,
654 instrument_id,
655 venue_order_id,
656 } => {
657 let request_id = self.generate_unique_request_id();
658
659 self.pending_cancel_requests.insert(
660 request_id.clone(),
661 (client_order_id, trader_id, strategy_id, instrument_id, venue_order_id),
662 );
663
664 let request = BybitWsRequest {
665 req_id: Some(request_id.clone()),
666 op: BybitWsOrderRequestOp::Cancel,
667 header: BybitWsHeader::now(),
668 args: vec![params],
669 };
670
671 if let Ok(payload) = serde_json::to_string(&request)
672 && let Err(e) = self.send_with_retry(payload).await
673 {
674 tracing::error!("Failed to send cancel order after retries: {e}");
675 self.pending_cancel_requests.remove(&request_id);
676 }
677 }
678 }
679
680 continue;
681 }
682
683 _ = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
684 if self.signal.load(Ordering::Relaxed) {
685 tracing::debug!("Stop signal received during idle period");
686 return None;
687 }
688 continue;
689 }
690
691 msg = self.raw_rx.recv() => {
692 let msg = match msg {
693 Some(msg) => msg,
694 None => {
695 tracing::debug!("WebSocket stream closed");
696 return None;
697 }
698 };
699
700 if let Message::Ping(data) = &msg {
701 tracing::trace!("Received ping frame with {} bytes", data.len());
702
703 if let Some(client) = &self.client
704 && let Err(e) = client.send_pong(data.to_vec()).await
705 {
706 tracing::warn!(error = %e, "Failed to send pong frame");
707 }
708 continue;
709 }
710
711 let event = match Self::parse_raw_message(msg) {
712 Some(event) => event,
713 None => continue,
714 };
715
716 if self.signal.load(Ordering::Relaxed) {
717 tracing::debug!("Stop signal received");
718 return None;
719 }
720
721 let ts_init = clock.get_time_ns();
722 let instruments = self.instruments_cache.clone();
723 let funding_cache = Arc::clone(&self.funding_cache);
724 let nautilus_messages = self.parse_to_nautilus_messages(
725 event,
726 &instruments,
727 self.account_id,
728 self.product_type,
729 &funding_cache,
730 ts_init,
731 )
732 .await;
733
734 self.message_queue.extend(nautilus_messages);
736 }
737 }
738 }
739 }
740
741 fn parse_raw_message(msg: Message) -> Option<BybitWsMessage> {
742 use serde_json::Value;
743
744 match msg {
745 Message::Text(text) => {
746 if text == nautilus_network::RECONNECTED {
747 tracing::info!("Received WebSocket reconnected signal");
748 return Some(BybitWsMessage::Reconnected);
749 }
750
751 if text.trim().eq_ignore_ascii_case("pong") {
752 return None;
753 }
754
755 tracing::trace!("Raw websocket message: {text}");
756
757 let value: Value = match serde_json::from_str(&text) {
758 Ok(v) => v,
759 Err(e) => {
760 tracing::error!("Failed to parse WebSocket message: {e}: {text}");
761 return None;
762 }
763 };
764
765 Some(classify_bybit_message(value))
766 }
767 Message::Binary(msg) => {
768 tracing::debug!("Raw binary: {msg:?}");
769 None
770 }
771 Message::Close(_) => {
772 tracing::debug!("Received close message, waiting for reconnection");
773 None
774 }
775 _ => None,
776 }
777 }
778
779 #[allow(clippy::too_many_arguments)]
780 async fn parse_to_nautilus_messages(
781 &mut self,
782 msg: BybitWsMessage,
783 instruments: &AHashMap<Ustr, InstrumentAny>,
784 account_id: Option<AccountId>,
785 product_type: Option<BybitProductType>,
786 funding_cache: &FundingCache,
787 ts_init: UnixNanos,
788 ) -> Vec<NautilusWsMessage> {
789 let mut result = Vec::new();
790
791 match msg {
792 BybitWsMessage::Orderbook(msg) => {
793 let raw_symbol = msg.data.s;
794 let symbol =
795 product_type.map_or(raw_symbol, |pt| make_bybit_symbol(raw_symbol, pt));
796
797 if let Some(instrument) = instruments.get(&symbol) {
798 match parse_orderbook_deltas(&msg, instrument, ts_init) {
799 Ok(deltas) => result.push(NautilusWsMessage::Deltas(deltas)),
800 Err(e) => tracing::error!("Error parsing orderbook deltas: {e}"),
801 }
802
803 if let Some(depth_str) = msg.topic.as_str().split('.').nth(1)
805 && depth_str == "1"
806 {
807 let instrument_id = instrument.id();
808 let last_quote = self.quote_cache.get(&instrument_id);
809
810 match parse_orderbook_quote(&msg, instrument, last_quote, ts_init) {
811 Ok(quote) => {
812 self.quote_cache.insert(instrument_id, quote);
813 result.push(NautilusWsMessage::Data(vec![Data::Quote(quote)]));
814 }
815 Err(e) => tracing::debug!("Skipping orderbook quote: {e}"),
816 }
817 }
818 } else {
819 tracing::debug!(raw_symbol = %raw_symbol, full_symbol = %symbol, "No instrument found for symbol in Orderbook message");
820 }
821 }
822 BybitWsMessage::Trade(msg) => {
823 let mut data_vec = Vec::new();
824 for trade in &msg.data {
825 let raw_symbol = trade.s;
826 let symbol =
827 product_type.map_or(raw_symbol, |pt| make_bybit_symbol(raw_symbol, pt));
828
829 if let Some(instrument) = instruments.get(&symbol) {
830 match parse_ws_trade_tick(trade, instrument, ts_init) {
831 Ok(tick) => data_vec.push(Data::Trade(tick)),
832 Err(e) => tracing::error!("Error parsing trade tick: {e}"),
833 }
834 } else {
835 tracing::debug!(raw_symbol = %raw_symbol, full_symbol = %symbol, "No instrument found for symbol in Trade message");
836 }
837 }
838
839 if !data_vec.is_empty() {
840 result.push(NautilusWsMessage::Data(data_vec));
841 }
842 }
843 BybitWsMessage::Kline(msg) => {
844 let (interval_str, raw_symbol) = match parse_kline_topic(&msg.topic) {
845 Ok(parts) => parts,
846 Err(e) => {
847 tracing::warn!("Failed to parse kline topic: {e}");
848 return result;
849 }
850 };
851
852 let symbol = product_type
853 .map_or_else(|| raw_symbol.into(), |pt| make_bybit_symbol(raw_symbol, pt));
854
855 if let Some(instrument) = instruments.get(&symbol) {
856 let (step, aggregation) = match interval_str.parse::<usize>() {
857 Ok(minutes) if minutes > 0 => (minutes, BarAggregation::Minute),
858 _ => {
859 tracing::warn!("Unsupported kline interval: {}", interval_str);
860 return result;
861 }
862 };
863
864 if let Some(non_zero_step) = NonZero::new(step) {
865 let bar_spec = BarSpecification {
866 step: non_zero_step,
867 aggregation,
868 price_type: PriceType::Last,
869 };
870 let bar_type =
871 BarType::new(instrument.id(), bar_spec, AggregationSource::External);
872
873 let mut data_vec = Vec::new();
874 for kline in &msg.data {
875 if !kline.confirm {
877 continue;
878 }
879 match parse_ws_kline_bar(
880 kline,
881 instrument,
882 bar_type,
883 self.bars_timestamp_on_close,
884 ts_init,
885 ) {
886 Ok(bar) => data_vec.push(Data::Bar(bar)),
887 Err(e) => tracing::error!("Error parsing kline to bar: {e}"),
888 }
889 }
890 if !data_vec.is_empty() {
891 result.push(NautilusWsMessage::Data(data_vec));
892 }
893 } else {
894 tracing::error!("Invalid step value: {}", step);
895 }
896 } else {
897 tracing::debug!(raw_symbol = %raw_symbol, full_symbol = %symbol, "No instrument found for symbol in Kline message");
898 }
899 }
900 BybitWsMessage::TickerLinear(msg) => {
901 let raw_symbol = msg.data.symbol;
902 let symbol =
903 product_type.map_or(raw_symbol, |pt| make_bybit_symbol(raw_symbol, pt));
904
905 if let Some(instrument) = instruments.get(&symbol) {
906 let instrument_id = instrument.id();
907 let ts_event = parse_millis_i64(msg.ts, "ticker.ts").unwrap_or(ts_init);
908 let price_precision = instrument.price_precision();
909 let size_precision = instrument.size_precision();
910
911 let bid_price = msg
913 .data
914 .bid1_price
915 .as_deref()
916 .map(|s| parse_price_with_precision(s, price_precision, "bid1Price"))
917 .transpose();
918 let ask_price = msg
919 .data
920 .ask1_price
921 .as_deref()
922 .map(|s| parse_price_with_precision(s, price_precision, "ask1Price"))
923 .transpose();
924 let bid_size = msg
925 .data
926 .bid1_size
927 .as_deref()
928 .map(|s| parse_quantity_with_precision(s, size_precision, "bid1Size"))
929 .transpose();
930 let ask_size = msg
931 .data
932 .ask1_size
933 .as_deref()
934 .map(|s| parse_quantity_with_precision(s, size_precision, "ask1Size"))
935 .transpose();
936
937 match (bid_price, ask_price, bid_size, ask_size) {
938 (Ok(bp), Ok(ap), Ok(bs), Ok(as_)) => {
939 match self.quote_cache.process(
940 instrument_id,
941 bp,
942 ap,
943 bs,
944 as_,
945 ts_event,
946 ts_init,
947 ) {
948 Ok(quote) => {
949 result.push(NautilusWsMessage::Data(vec![Data::Quote(quote)]));
950 }
951 Err(e) => {
952 let raw_data = serde_json::to_string(&msg.data)
953 .unwrap_or_else(|_| "<failed to serialize>".to_string());
954 tracing::debug!(
955 "Skipping partial ticker update: {e}, raw_data: {raw_data}"
956 );
957 }
958 }
959 }
960 _ => {
961 let raw_data = serde_json::to_string(&msg.data)
962 .unwrap_or_else(|_| "<failed to serialize>".to_string());
963 tracing::warn!(
964 "Failed to parse ticker fields, skipping update, raw_data: {raw_data}"
965 );
966 }
967 }
968
969 if msg.data.funding_rate.is_some() && msg.data.next_funding_time.is_some() {
971 let should_publish = {
972 let cache = funding_cache.read().await;
973 match cache.get(&symbol) {
974 Some((cached_rate, cached_time)) => {
975 cached_rate.as_ref() != msg.data.funding_rate.as_ref()
976 || cached_time.as_ref()
977 != msg.data.next_funding_time.as_ref()
978 }
979 None => true,
980 }
981 };
982
983 if should_publish {
984 match parse_ticker_linear_funding(
985 &msg.data,
986 instrument_id,
987 ts_event,
988 ts_init,
989 ) {
990 Ok(funding) => {
991 let cache_key = (
992 msg.data.funding_rate.clone(),
993 msg.data.next_funding_time.clone(),
994 );
995 funding_cache.write().await.insert(symbol, cache_key);
996 result.push(NautilusWsMessage::FundingRates(vec![funding]));
997 }
998 Err(e) => {
999 tracing::debug!("Skipping funding rate update: {e}");
1000 }
1001 }
1002 }
1003 }
1004 } else {
1005 tracing::debug!(raw_symbol = %raw_symbol, full_symbol = %symbol, "No instrument found for symbol in TickerLinear message");
1006 }
1007 }
1008 BybitWsMessage::TickerOption(msg) => {
1009 let raw_symbol = &msg.data.symbol;
1010 let symbol = product_type.map_or_else(
1011 || raw_symbol.as_str().into(),
1012 |pt| make_bybit_symbol(raw_symbol, pt),
1013 );
1014
1015 if let Some(instrument) = instruments.get(&symbol) {
1016 let instrument_id = instrument.id();
1017 let ts_event = parse_millis_i64(msg.ts, "ticker.ts").unwrap_or(ts_init);
1018 let price_precision = instrument.price_precision();
1019 let size_precision = instrument.size_precision();
1020
1021 let bid_price = parse_price_with_precision(
1023 &msg.data.bid_price,
1024 price_precision,
1025 "bidPrice",
1026 );
1027 let ask_price = parse_price_with_precision(
1028 &msg.data.ask_price,
1029 price_precision,
1030 "askPrice",
1031 );
1032 let bid_size = parse_quantity_with_precision(
1033 &msg.data.bid_size,
1034 size_precision,
1035 "bidSize",
1036 );
1037 let ask_size = parse_quantity_with_precision(
1038 &msg.data.ask_size,
1039 size_precision,
1040 "askSize",
1041 );
1042
1043 match (bid_price, ask_price, bid_size, ask_size) {
1044 (Ok(bp), Ok(ap), Ok(bs), Ok(as_)) => {
1045 match self.quote_cache.process(
1046 instrument_id,
1047 Some(bp),
1048 Some(ap),
1049 Some(bs),
1050 Some(as_),
1051 ts_event,
1052 ts_init,
1053 ) {
1054 Ok(quote) => {
1055 result.push(NautilusWsMessage::Data(vec![Data::Quote(quote)]));
1056 }
1057 Err(e) => {
1058 let raw_data = serde_json::to_string(&msg.data)
1059 .unwrap_or_else(|_| "<failed to serialize>".to_string());
1060 tracing::debug!(
1061 "Skipping partial ticker update: {e}, raw_data: {raw_data}"
1062 );
1063 }
1064 }
1065 }
1066 _ => {
1067 let raw_data = serde_json::to_string(&msg.data)
1068 .unwrap_or_else(|_| "<failed to serialize>".to_string());
1069 tracing::warn!(
1070 "Failed to parse ticker fields, skipping update, raw_data: {raw_data}"
1071 );
1072 }
1073 }
1074 } else {
1075 tracing::debug!(raw_symbol = %raw_symbol, full_symbol = %symbol, "No instrument found for symbol in TickerOption message");
1076 }
1077 }
1078 BybitWsMessage::AccountOrder(msg) => {
1079 if let Some(account_id) = account_id {
1080 let mut reports = Vec::new();
1081 for order in &msg.data {
1082 let raw_symbol = order.symbol;
1083 let symbol = make_bybit_symbol(raw_symbol, order.category);
1084
1085 if let Some(instrument) = instruments.get(&symbol) {
1086 match parse_ws_order_status_report(
1087 order, instrument, account_id, ts_init,
1088 ) {
1089 Ok(report) => reports.push(report),
1090 Err(e) => tracing::error!("Error parsing order status report: {e}"),
1091 }
1092 } else {
1093 tracing::debug!(raw_symbol = %raw_symbol, full_symbol = %symbol, "No instrument found for symbol in AccountOrder message");
1094 }
1095 }
1096 if !reports.is_empty() {
1097 result.push(NautilusWsMessage::OrderStatusReports(reports));
1098 }
1099 }
1100 }
1101 BybitWsMessage::AccountExecution(msg) => {
1102 if let Some(account_id) = account_id {
1103 let mut reports = Vec::new();
1104 for execution in &msg.data {
1105 let raw_symbol = execution.symbol;
1106 let symbol = make_bybit_symbol(raw_symbol, execution.category);
1107
1108 if let Some(instrument) = instruments.get(&symbol) {
1109 match parse_ws_fill_report(execution, account_id, instrument, ts_init) {
1110 Ok(report) => reports.push(report),
1111 Err(e) => tracing::error!("Error parsing fill report: {e}"),
1112 }
1113 } else {
1114 tracing::debug!(raw_symbol = %raw_symbol, full_symbol = %symbol, "No instrument found for symbol in AccountExecution message");
1115 }
1116 }
1117 if !reports.is_empty() {
1118 result.push(NautilusWsMessage::FillReports(reports));
1119 }
1120 }
1121 }
1122 BybitWsMessage::AccountPosition(msg) => {
1123 if let Some(account_id) = account_id {
1124 for position in &msg.data {
1125 let raw_symbol = position.symbol;
1126 let symbol = make_bybit_symbol(raw_symbol, position.category);
1127
1128 if let Some(instrument) = instruments.get(&symbol) {
1129 match parse_ws_position_status_report(
1130 position, account_id, instrument, ts_init,
1131 ) {
1132 Ok(report) => {
1133 result.push(NautilusWsMessage::PositionStatusReport(report));
1134 }
1135 Err(e) => {
1136 tracing::error!("Error parsing position status report: {e}");
1137 }
1138 }
1139 } else {
1140 tracing::debug!(raw_symbol = %raw_symbol, full_symbol = %symbol, "No instrument found for symbol in AccountPosition message");
1141 }
1142 }
1143 }
1144 }
1145 BybitWsMessage::AccountWallet(msg) => {
1146 if let Some(account_id) = account_id {
1147 for wallet in &msg.data {
1148 let ts_event = UnixNanos::from(msg.creation_time as u64 * 1_000_000);
1149
1150 match parse_ws_account_state(wallet, account_id, ts_event, ts_init) {
1151 Ok(state) => result.push(NautilusWsMessage::AccountState(state)),
1152 Err(e) => tracing::error!("Error parsing account state: {e}"),
1153 }
1154 }
1155 }
1156 }
1157 BybitWsMessage::OrderResponse(resp) => {
1158 if resp.ret_code == 0 {
1159 tracing::debug!(op = %resp.op, ret_msg = %resp.ret_msg, "Order operation successful");
1160
1161 if resp.op.contains("batch") {
1162 self.handle_batch_response(&resp, &mut result);
1163 } else if let Some(req_id) = &resp.req_id {
1164 if resp.op.contains("create") {
1165 self.pending_place_requests.remove(req_id);
1166 } else if resp.op.contains("cancel") {
1167 self.pending_cancel_requests.remove(req_id);
1168 } else if resp.op.contains("amend") {
1169 self.pending_amend_requests.remove(req_id);
1170 }
1171 } else if let Some(order_link_id) =
1172 resp.data.get("orderLinkId").and_then(|v| v.as_str())
1173 {
1174 let client_order_id = ClientOrderId::from(order_link_id);
1176 if resp.op.contains("create") {
1177 self.find_and_remove_place_request_by_client_order_id(&client_order_id);
1178 } else if resp.op.contains("cancel") {
1179 self.find_and_remove_cancel_request_by_client_order_id(
1180 &client_order_id,
1181 );
1182 } else if resp.op.contains("amend") {
1183 self.find_and_remove_amend_request_by_client_order_id(&client_order_id);
1184 }
1185 }
1186 } else if let Some(req_id) = &resp.req_id {
1187 let clock = get_atomic_clock_realtime();
1188 let ts_init = clock.get_time_ns();
1189
1190 if resp.op.contains("batch") {
1191 self.handle_batch_failure(
1192 req_id,
1193 &resp.ret_msg,
1194 &resp.op,
1195 ts_init,
1196 &mut result,
1197 );
1198 } else if resp.op.contains("create")
1199 && let Some((_, (client_order_id, trader_id, strategy_id, instrument_id))) =
1200 self.pending_place_requests.remove(req_id)
1201 {
1202 let Some(account_id) = self.account_id else {
1203 tracing::error!(
1204 request_id = %req_id,
1205 reason = %resp.ret_msg,
1206 "Cannot create OrderRejected event: account_id is None"
1207 );
1208 return result;
1209 };
1210
1211 let rejected = OrderRejected::new(
1212 trader_id,
1213 strategy_id,
1214 instrument_id,
1215 client_order_id,
1216 account_id,
1217 Ustr::from(&resp.ret_msg),
1218 UUID4::new(),
1219 ts_init,
1220 ts_init,
1221 false,
1222 false,
1223 );
1224 result.push(NautilusWsMessage::OrderRejected(rejected));
1225 } else if resp.op.contains("cancel")
1226 && let Some((
1227 _,
1228 (
1229 client_order_id,
1230 trader_id,
1231 strategy_id,
1232 instrument_id,
1233 venue_order_id,
1234 ),
1235 )) = self.pending_cancel_requests.remove(req_id)
1236 {
1237 let rejected = OrderCancelRejected::new(
1238 trader_id,
1239 strategy_id,
1240 instrument_id,
1241 client_order_id,
1242 Ustr::from(&resp.ret_msg),
1243 UUID4::new(),
1244 ts_init,
1245 ts_init,
1246 false,
1247 venue_order_id,
1248 self.account_id,
1249 );
1250 result.push(NautilusWsMessage::OrderCancelRejected(rejected));
1251 } else if resp.op.contains("amend")
1252 && let Some((
1253 _,
1254 (
1255 client_order_id,
1256 trader_id,
1257 strategy_id,
1258 instrument_id,
1259 venue_order_id,
1260 ),
1261 )) = self.pending_amend_requests.remove(req_id)
1262 {
1263 let rejected = OrderModifyRejected::new(
1264 trader_id,
1265 strategy_id,
1266 instrument_id,
1267 client_order_id,
1268 Ustr::from(&resp.ret_msg),
1269 UUID4::new(),
1270 ts_init,
1271 ts_init,
1272 false,
1273 venue_order_id,
1274 self.account_id,
1275 );
1276 result.push(NautilusWsMessage::OrderModifyRejected(rejected));
1277 }
1278 } else if let Some(order_link_id) =
1279 resp.data.get("orderLinkId").and_then(|v| v.as_str())
1280 {
1281 let clock = get_atomic_clock_realtime();
1283 let ts_init = clock.get_time_ns();
1284 let client_order_id = ClientOrderId::from(order_link_id);
1285
1286 if resp.op.contains("create") {
1287 if let Some((_, (_, trader_id, strategy_id, instrument_id))) =
1288 self.find_and_remove_place_request_by_client_order_id(&client_order_id)
1289 {
1290 let Some(account_id) = self.account_id else {
1291 tracing::error!(
1292 client_order_id = %client_order_id,
1293 reason = %resp.ret_msg,
1294 "Cannot create OrderRejected event: account_id is None"
1295 );
1296 return result;
1297 };
1298
1299 let rejected = OrderRejected::new(
1300 trader_id,
1301 strategy_id,
1302 instrument_id,
1303 client_order_id,
1304 account_id,
1305 Ustr::from(&resp.ret_msg),
1306 UUID4::new(),
1307 ts_init,
1308 ts_init,
1309 false,
1310 false,
1311 );
1312 result.push(NautilusWsMessage::OrderRejected(rejected));
1313 }
1314 } else if resp.op.contains("cancel") {
1315 if let Some((
1316 _,
1317 (_, trader_id, strategy_id, instrument_id, venue_order_id),
1318 )) =
1319 self.find_and_remove_cancel_request_by_client_order_id(&client_order_id)
1320 {
1321 let rejected = OrderCancelRejected::new(
1322 trader_id,
1323 strategy_id,
1324 instrument_id,
1325 client_order_id,
1326 Ustr::from(&resp.ret_msg),
1327 UUID4::new(),
1328 ts_init,
1329 ts_init,
1330 false,
1331 venue_order_id,
1332 self.account_id,
1333 );
1334 result.push(NautilusWsMessage::OrderCancelRejected(rejected));
1335 }
1336 } else if resp.op.contains("amend")
1337 && let Some((_, (_, trader_id, strategy_id, instrument_id, venue_order_id))) =
1338 self.find_and_remove_amend_request_by_client_order_id(&client_order_id)
1339 {
1340 let rejected = OrderModifyRejected::new(
1341 trader_id,
1342 strategy_id,
1343 instrument_id,
1344 client_order_id,
1345 Ustr::from(&resp.ret_msg),
1346 UUID4::new(),
1347 ts_init,
1348 ts_init,
1349 false,
1350 venue_order_id,
1351 self.account_id,
1352 );
1353 result.push(NautilusWsMessage::OrderModifyRejected(rejected));
1354 }
1355 } else {
1356 tracing::warn!(
1357 op = %resp.op,
1358 ret_code = resp.ret_code,
1359 ret_msg = %resp.ret_msg,
1360 "Order operation failed but request_id could not be extracted from response"
1361 );
1362 }
1363 }
1364 BybitWsMessage::Auth(auth_response) => {
1365 let is_success =
1366 auth_response.success.unwrap_or(false) || (auth_response.ret_code == Some(0));
1367
1368 if is_success {
1369 self.auth_tracker.succeed();
1370 tracing::info!("WebSocket authenticated");
1371 result.push(NautilusWsMessage::Authenticated);
1372 } else {
1373 let error_msg = auth_response
1374 .ret_msg
1375 .as_deref()
1376 .unwrap_or("Authentication rejected");
1377 self.auth_tracker.fail(error_msg);
1378 tracing::error!(error = error_msg, "WebSocket authentication failed");
1379 result.push(NautilusWsMessage::Error(BybitWebSocketError::from_message(
1380 error_msg.to_string(),
1381 )));
1382 }
1383 }
1384 BybitWsMessage::Error(err) => {
1385 result.push(NautilusWsMessage::Error(err));
1386 }
1387 BybitWsMessage::Reconnected => {
1388 self.quote_cache.clear();
1389 result.push(NautilusWsMessage::Reconnected);
1390 }
1391 BybitWsMessage::Subscription(sub_msg) => {
1392 let pending_topics = self.subscriptions.pending_subscribe_topics();
1393 match sub_msg.op {
1394 BybitWsOperation::Subscribe => {
1395 if sub_msg.success {
1396 for topic in pending_topics {
1397 self.subscriptions.confirm_subscribe(&topic);
1398 tracing::debug!(topic = topic, "Subscription confirmed");
1399 }
1400 } else {
1401 for topic in pending_topics {
1402 self.subscriptions.mark_failure(&topic);
1403 tracing::warn!(
1404 topic = topic,
1405 error = ?sub_msg.ret_msg,
1406 "Subscription failed, will retry on reconnect"
1407 );
1408 }
1409 }
1410 }
1411 BybitWsOperation::Unsubscribe => {
1412 let pending_unsub = self.subscriptions.pending_unsubscribe_topics();
1413 if sub_msg.success {
1414 for topic in pending_unsub {
1415 self.subscriptions.confirm_unsubscribe(&topic);
1416 tracing::debug!(topic = topic, "Unsubscription confirmed");
1417 }
1418 } else {
1419 for topic in pending_unsub {
1420 tracing::warn!(
1421 topic = topic,
1422 error = ?sub_msg.ret_msg,
1423 "Unsubscription failed"
1424 );
1425 }
1426 }
1427 }
1428 _ => {}
1429 }
1430 }
1431 _ => {}
1432 }
1433
1434 result
1435 }
1436}
1437
1438pub fn classify_bybit_message(value: serde_json::Value) -> BybitWsMessage {
1442 if let Some(op_val) = value.get("op") {
1443 if let Ok(op) = serde_json::from_value::<BybitWsOperation>(op_val.clone())
1444 && op == BybitWsOperation::Auth
1445 && let Ok(auth) = serde_json::from_value::<BybitWsAuthResponse>(value.clone())
1446 {
1447 let is_success = auth.success.unwrap_or(false) || auth.ret_code.unwrap_or(-1) == 0;
1448 if is_success {
1449 return BybitWsMessage::Auth(auth);
1450 }
1451 let resp = BybitWsResponse {
1452 op: Some(auth.op.clone()),
1453 topic: None,
1454 success: auth.success,
1455 conn_id: auth.conn_id.clone(),
1456 req_id: None,
1457 ret_code: auth.ret_code,
1458 ret_msg: auth.ret_msg,
1459 };
1460 let error = BybitWebSocketError::from_response(&resp);
1461 return BybitWsMessage::Error(error);
1462 }
1463
1464 if let Some(op_str) = op_val.as_str()
1465 && op_str.starts_with("order.")
1466 {
1467 return serde_json::from_value::<BybitWsOrderResponse>(value.clone()).map_or_else(
1468 |_| BybitWsMessage::Raw(value),
1469 BybitWsMessage::OrderResponse,
1470 );
1471 }
1472 }
1473
1474 if let Some(success) = value.get("success").and_then(serde_json::Value::as_bool) {
1475 if success {
1476 return serde_json::from_value::<BybitWsSubscriptionMsg>(value.clone())
1477 .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::Subscription);
1478 }
1479 return serde_json::from_value::<BybitWsResponse>(value.clone()).map_or_else(
1480 |_| BybitWsMessage::Raw(value),
1481 |resp| {
1482 let error = BybitWebSocketError::from_response(&resp);
1483 BybitWsMessage::Error(error)
1484 },
1485 );
1486 }
1487
1488 if let Some(topic) = value.get("topic").and_then(serde_json::Value::as_str) {
1490 if topic.starts_with(BYBIT_TOPIC_ORDERBOOK) {
1491 return serde_json::from_value(value.clone())
1492 .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::Orderbook);
1493 }
1494
1495 if topic.contains(BYBIT_TOPIC_PUBLIC_TRADE) || topic.starts_with(BYBIT_TOPIC_TRADE) {
1496 return serde_json::from_value(value.clone())
1497 .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::Trade);
1498 }
1499
1500 if topic.starts_with(BYBIT_TOPIC_KLINE) {
1501 return serde_json::from_value(value.clone())
1502 .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::Kline);
1503 }
1504
1505 if topic.starts_with(BYBIT_TOPIC_TICKERS) {
1506 let is_option = value
1508 .get("data")
1509 .and_then(|d| d.get("symbol"))
1510 .and_then(|s| s.as_str())
1511 .is_some_and(|symbol| symbol.contains('-') && symbol.matches('-').count() >= 3);
1512
1513 if is_option {
1514 return serde_json::from_value(value.clone())
1515 .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::TickerOption);
1516 }
1517 return serde_json::from_value(value.clone())
1518 .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::TickerLinear);
1519 }
1520
1521 if topic.starts_with(BYBIT_TOPIC_ORDER) {
1522 return serde_json::from_value(value.clone())
1523 .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::AccountOrder);
1524 }
1525
1526 if topic.starts_with(BYBIT_TOPIC_EXECUTION) {
1527 return serde_json::from_value(value.clone()).map_or_else(
1528 |_| BybitWsMessage::Raw(value),
1529 BybitWsMessage::AccountExecution,
1530 );
1531 }
1532
1533 if topic.starts_with(BYBIT_TOPIC_WALLET) {
1534 return serde_json::from_value(value.clone()).map_or_else(
1535 |_| BybitWsMessage::Raw(value),
1536 BybitWsMessage::AccountWallet,
1537 );
1538 }
1539
1540 if topic.starts_with(BYBIT_TOPIC_POSITION) {
1541 return serde_json::from_value(value.clone()).map_or_else(
1542 |_| BybitWsMessage::Raw(value),
1543 BybitWsMessage::AccountPosition,
1544 );
1545 }
1546 }
1547
1548 BybitWsMessage::Raw(value)
1549}
1550
1551#[cfg(test)]
1552mod tests {
1553 use rstest::rstest;
1554
1555 use super::*;
1556 use crate::common::consts::BYBIT_WS_TOPIC_DELIMITER;
1557
1558 fn create_test_handler() -> FeedHandler {
1559 let signal = Arc::new(AtomicBool::new(false));
1560 let (_cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
1561 let (_raw_tx, raw_rx) = tokio::sync::mpsc::unbounded_channel();
1562 let (out_tx, _out_rx) = tokio::sync::mpsc::unbounded_channel();
1563 let auth_tracker = AuthTracker::new();
1564 let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER);
1565 let funding_cache = Arc::new(tokio::sync::RwLock::new(AHashMap::new()));
1566
1567 FeedHandler::new(
1568 signal,
1569 cmd_rx,
1570 raw_rx,
1571 out_tx,
1572 None,
1573 None,
1574 true,
1575 Arc::new(AtomicU8::new(0)),
1576 auth_tracker,
1577 subscriptions,
1578 funding_cache,
1579 )
1580 }
1581
1582 #[rstest]
1583 fn test_generate_unique_request_id_returns_different_ids() {
1584 let handler = create_test_handler();
1585
1586 let id1 = handler.generate_unique_request_id();
1587 let id2 = handler.generate_unique_request_id();
1588 let id3 = handler.generate_unique_request_id();
1589
1590 assert_ne!(id1, id2);
1591 assert_ne!(id2, id3);
1592 assert_ne!(id1, id3);
1593 }
1594
1595 #[rstest]
1596 fn test_generate_unique_request_id_produces_valid_uuids() {
1597 let handler = create_test_handler();
1598
1599 let id1 = handler.generate_unique_request_id();
1600 let id2 = handler.generate_unique_request_id();
1601
1602 assert!(UUID4::from(id1.as_str()).to_string() == id1);
1603 assert!(UUID4::from(id2.as_str()).to_string() == id2);
1604 }
1605
1606 #[rstest]
1607 fn test_multiple_place_orders_use_different_request_ids() {
1608 let handler = create_test_handler();
1609
1610 let req_id_1 = handler.generate_unique_request_id();
1611 let req_id_2 = handler.generate_unique_request_id();
1612 let req_id_3 = handler.generate_unique_request_id();
1613
1614 assert_ne!(req_id_1, req_id_2);
1615 assert_ne!(req_id_2, req_id_3);
1616 assert_ne!(req_id_1, req_id_3);
1617 }
1618
1619 #[rstest]
1620 fn test_multiple_amends_use_different_request_ids() {
1621 let handler = create_test_handler();
1622
1623 let req_id_1 = handler.generate_unique_request_id();
1625 let req_id_2 = handler.generate_unique_request_id();
1626 let req_id_3 = handler.generate_unique_request_id();
1627
1628 assert_ne!(
1629 req_id_1, req_id_2,
1630 "Multiple amends should generate different request IDs to avoid 'Duplicate reqId' errors"
1631 );
1632 assert_ne!(
1633 req_id_2, req_id_3,
1634 "Multiple amends should generate different request IDs to avoid 'Duplicate reqId' errors"
1635 );
1636 }
1637
1638 #[rstest]
1639 fn test_multiple_cancels_use_different_request_ids() {
1640 let handler = create_test_handler();
1641
1642 let req_id_1 = handler.generate_unique_request_id();
1643 let req_id_2 = handler.generate_unique_request_id();
1644
1645 assert_ne!(
1646 req_id_1, req_id_2,
1647 "Multiple cancels should generate different request IDs"
1648 );
1649 }
1650
1651 #[rstest]
1652 fn test_concurrent_request_id_generation() {
1653 let handler = create_test_handler();
1654
1655 let mut ids = std::collections::HashSet::new();
1656 for _ in 0..100 {
1657 let id = handler.generate_unique_request_id();
1658 assert!(
1659 ids.insert(id.clone()),
1660 "Generated duplicate request ID: {id}"
1661 );
1662 }
1663 assert_eq!(ids.len(), 100);
1664 }
1665}