1use std::{
19 collections::VecDeque,
20 num::NonZero,
21 sync::{
22 Arc,
23 atomic::{AtomicBool, AtomicU8, Ordering},
24 },
25};
26
27use ahash::AHashMap;
28use dashmap::DashMap;
29use nautilus_common::cache::quote::QuoteCache;
30use nautilus_core::{UUID4, nanos::UnixNanos, time::get_atomic_clock_realtime};
31use nautilus_model::{
32 data::{BarSpecification, BarType, Data},
33 enums::{AggregationSource, BarAggregation, PriceType},
34 events::{OrderCancelRejected, OrderModifyRejected, OrderRejected},
35 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
36 instruments::{Instrument, InstrumentAny},
37};
38use nautilus_network::{
39 retry::{RetryManager, create_websocket_retry_manager},
40 websocket::{AuthTracker, SubscriptionState, WebSocketClient},
41};
42use tokio_tungstenite::tungstenite::Message;
43use ustr::Ustr;
44
45use super::{
46 enums::BybitWsOperation,
47 error::{BybitWsError, create_bybit_timeout_error, should_retry_bybit_error},
48 messages::{
49 BybitWebSocketError, BybitWsAuthResponse, BybitWsHeader, BybitWsMessage, BybitWsRequest,
50 BybitWsResponse, BybitWsSubscriptionMsg, NautilusWsMessage,
51 },
52 parse::{
53 parse_kline_topic, parse_millis_i64, parse_orderbook_deltas, parse_orderbook_quote,
54 parse_ticker_linear_funding, parse_ws_account_state, parse_ws_fill_report,
55 parse_ws_kline_bar, parse_ws_order_status_report, parse_ws_position_status_report,
56 parse_ws_trade_tick,
57 },
58};
59use crate::{
60 common::{
61 consts::{
62 BYBIT_NAUTILUS_BROKER_ID, BYBIT_TOPIC_EXECUTION, BYBIT_TOPIC_KLINE, BYBIT_TOPIC_ORDER,
63 BYBIT_TOPIC_ORDERBOOK, BYBIT_TOPIC_POSITION, BYBIT_TOPIC_PUBLIC_TRADE,
64 BYBIT_TOPIC_TICKERS, BYBIT_TOPIC_TRADE, BYBIT_TOPIC_WALLET,
65 },
66 enums::{BybitProductType, BybitTimeInForce, BybitWsOrderRequestOp},
67 parse::{make_bybit_symbol, parse_price_with_precision, parse_quantity_with_precision},
68 },
69 websocket::messages::{
70 BybitBatchOrderError, BybitWsAmendOrderParams, BybitWsCancelOrderParams,
71 BybitWsOrderResponse, BybitWsPlaceOrderParams,
72 },
73};
74
75#[derive(Debug)]
77#[allow(
78 clippy::large_enum_variant,
79 reason = "Commands are ephemeral and immediately consumed"
80)]
81pub enum HandlerCommand {
82 SetClient(WebSocketClient),
83 Disconnect,
84 Authenticate {
85 payload: String,
86 },
87 Subscribe {
88 topics: Vec<String>,
89 },
90 Unsubscribe {
91 topics: Vec<String>,
92 },
93 SendText {
94 payload: String,
95 },
96 PlaceOrder {
97 params: BybitWsPlaceOrderParams,
98 client_order_id: ClientOrderId,
99 trader_id: TraderId,
100 strategy_id: StrategyId,
101 instrument_id: InstrumentId,
102 },
103 AmendOrder {
104 params: BybitWsAmendOrderParams,
105 client_order_id: ClientOrderId,
106 trader_id: TraderId,
107 strategy_id: StrategyId,
108 instrument_id: InstrumentId,
109 venue_order_id: Option<VenueOrderId>,
110 },
111 CancelOrder {
112 params: BybitWsCancelOrderParams,
113 client_order_id: ClientOrderId,
114 trader_id: TraderId,
115 strategy_id: StrategyId,
116 instrument_id: InstrumentId,
117 venue_order_id: Option<VenueOrderId>,
118 },
119 RegisterBatchPlace {
120 req_id: String,
121 orders: Vec<BatchOrderData>,
122 },
123 RegisterBatchCancel {
124 req_id: String,
125 cancels: Vec<BatchCancelData>,
126 },
127 InitializeInstruments(Vec<InstrumentAny>),
128 UpdateInstrument(InstrumentAny),
129}
130
131type FundingCache = Arc<tokio::sync::RwLock<AHashMap<Ustr, (Option<String>, Option<String>)>>>;
133
134type PlaceRequestData = (ClientOrderId, TraderId, StrategyId, InstrumentId);
136
137type CancelRequestData = (
139 ClientOrderId,
140 TraderId,
141 StrategyId,
142 InstrumentId,
143 Option<VenueOrderId>,
144);
145
146type AmendRequestData = (
148 ClientOrderId,
149 TraderId,
150 StrategyId,
151 InstrumentId,
152 Option<VenueOrderId>,
153);
154
155type BatchOrderData = (ClientOrderId, PlaceRequestData);
157
158type BatchCancelData = (ClientOrderId, CancelRequestData);
160
161pub(super) struct FeedHandler {
162 signal: Arc<AtomicBool>,
163 client: Option<WebSocketClient>,
164 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
165 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
166 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
167 auth_tracker: AuthTracker,
168 subscriptions: SubscriptionState,
169 instruments_cache: AHashMap<Ustr, InstrumentAny>,
170 account_id: Option<AccountId>,
171 mm_level: Arc<AtomicU8>,
172 product_type: Option<BybitProductType>,
173 quote_cache: QuoteCache,
174 funding_cache: FundingCache,
175 retry_manager: RetryManager<BybitWsError>,
176 pending_place_requests: DashMap<String, PlaceRequestData>,
177 pending_cancel_requests: DashMap<String, CancelRequestData>,
178 pending_amend_requests: DashMap<String, AmendRequestData>,
179 pending_batch_place_requests: DashMap<String, Vec<BatchOrderData>>,
180 pending_batch_cancel_requests: DashMap<String, Vec<BatchCancelData>>,
181 message_queue: VecDeque<NautilusWsMessage>,
182}
183
184impl FeedHandler {
185 #[allow(clippy::too_many_arguments)]
187 pub(super) fn new(
188 signal: Arc<AtomicBool>,
189 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
190 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
191 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
192 account_id: Option<AccountId>,
193 product_type: Option<BybitProductType>,
194 mm_level: Arc<AtomicU8>,
195 auth_tracker: AuthTracker,
196 subscriptions: SubscriptionState,
197 funding_cache: FundingCache,
198 ) -> Self {
199 Self {
200 signal,
201 client: None,
202 cmd_rx,
203 raw_rx,
204 out_tx,
205 auth_tracker,
206 subscriptions,
207 instruments_cache: AHashMap::new(),
208 account_id,
209 mm_level,
210 product_type,
211 quote_cache: QuoteCache::new(),
212 funding_cache,
213 retry_manager: create_websocket_retry_manager(),
214 pending_place_requests: DashMap::new(),
215 pending_cancel_requests: DashMap::new(),
216 pending_amend_requests: DashMap::new(),
217 pending_batch_place_requests: DashMap::new(),
218 pending_batch_cancel_requests: DashMap::new(),
219 message_queue: VecDeque::new(),
220 }
221 }
222
223 pub(super) fn is_stopped(&self) -> bool {
224 self.signal.load(Ordering::Relaxed)
225 }
226
227 #[allow(dead_code)]
228 pub(super) fn send(&self, msg: NautilusWsMessage) -> Result<(), ()> {
229 self.out_tx.send(msg).map_err(|_| ())
230 }
231
232 fn generate_unique_request_id(&self) -> String {
233 UUID4::new().to_string()
234 }
235
236 fn find_and_remove_place_request_by_client_order_id(
237 &self,
238 client_order_id: &ClientOrderId,
239 ) -> Option<(String, PlaceRequestData)> {
240 self.pending_place_requests
241 .iter()
242 .find(|entry| entry.value().0 == *client_order_id)
243 .and_then(|entry| {
244 let key = entry.key().clone();
245 drop(entry);
246 self.pending_place_requests.remove(&key)
247 })
248 }
249
250 fn find_and_remove_cancel_request_by_client_order_id(
251 &self,
252 client_order_id: &ClientOrderId,
253 ) -> Option<(String, CancelRequestData)> {
254 self.pending_cancel_requests
255 .iter()
256 .find(|entry| entry.value().0 == *client_order_id)
257 .and_then(|entry| {
258 let key = entry.key().clone();
259 drop(entry);
260 self.pending_cancel_requests.remove(&key)
261 })
262 }
263
264 fn find_and_remove_amend_request_by_client_order_id(
265 &self,
266 client_order_id: &ClientOrderId,
267 ) -> Option<(String, AmendRequestData)> {
268 self.pending_amend_requests
269 .iter()
270 .find(|entry| entry.value().0 == *client_order_id)
271 .and_then(|entry| {
272 let key = entry.key().clone();
273 drop(entry);
274 self.pending_amend_requests.remove(&key)
275 })
276 }
277
278 fn include_referer_header(&self, time_in_force: Option<BybitTimeInForce>) -> bool {
279 let is_post_only = matches!(time_in_force, Some(BybitTimeInForce::PostOnly));
280 let mm_level = self.mm_level.load(Ordering::Relaxed);
281 !(is_post_only && mm_level > 0)
282 }
283
284 async fn send_with_retry(&self, payload: String) -> Result<(), BybitWsError> {
286 if let Some(client) = &self.client {
287 self.retry_manager
288 .execute_with_retry(
289 "websocket_send",
290 || {
291 let payload = payload.clone();
292 async move {
293 client
294 .send_text(payload, None)
295 .await
296 .map_err(|e| BybitWsError::Transport(format!("Send failed: {e}")))
297 }
298 },
299 should_retry_bybit_error,
300 create_bybit_timeout_error,
301 )
302 .await
303 } else {
304 Err(BybitWsError::ClientError(
305 "No active WebSocket client".to_string(),
306 ))
307 }
308 }
309
310 fn handle_batch_failure(
315 &self,
316 req_id: &str,
317 ret_msg: &str,
318 op: &str,
319 ts_init: UnixNanos,
320 result: &mut Vec<NautilusWsMessage>,
321 ) {
322 if op.contains("create") {
323 if let Some((_, batch_data)) = self.pending_batch_place_requests.remove(req_id) {
324 tracing::warn!(
325 req_id = %req_id,
326 ret_msg = %ret_msg,
327 num_orders = batch_data.len(),
328 "Batch place request failed"
329 );
330
331 let Some(account_id) = self.account_id else {
332 tracing::error!("Cannot create OrderRejected events: account_id is None");
333 return;
334 };
335
336 let reason = Ustr::from(ret_msg);
337 for (client_order_id, (_, trader_id, strategy_id, instrument_id)) in batch_data {
338 let rejected = OrderRejected::new(
339 trader_id,
340 strategy_id,
341 instrument_id,
342 client_order_id,
343 account_id,
344 reason,
345 UUID4::new(),
346 ts_init,
347 ts_init,
348 false,
349 false,
350 );
351 result.push(NautilusWsMessage::OrderRejected(rejected));
352 }
353 }
354 } else if op.contains("cancel")
355 && let Some((_, batch_data)) = self.pending_batch_cancel_requests.remove(req_id)
356 {
357 tracing::warn!(
358 req_id = %req_id,
359 ret_msg = %ret_msg,
360 num_cancels = batch_data.len(),
361 "Batch cancel request failed"
362 );
363
364 let reason = Ustr::from(ret_msg);
365 for (client_order_id, (_, trader_id, strategy_id, instrument_id, venue_order_id)) in
366 batch_data
367 {
368 let rejected = OrderCancelRejected::new(
369 trader_id,
370 strategy_id,
371 instrument_id,
372 client_order_id,
373 reason,
374 UUID4::new(),
375 ts_init,
376 ts_init,
377 false,
378 venue_order_id,
379 self.account_id,
380 );
381 result.push(NautilusWsMessage::OrderCancelRejected(rejected));
382 }
383 }
384 }
385
386 fn handle_batch_response(
388 &self,
389 resp: &BybitWsOrderResponse,
390 result: &mut Vec<NautilusWsMessage>,
391 ) {
392 let Some(req_id) = &resp.req_id else {
393 tracing::warn!(
394 op = %resp.op,
395 "Batch response missing req_id - cannot correlate with pending requests"
396 );
397 return;
398 };
399
400 let batch_errors = resp.extract_batch_errors();
401
402 if resp.op.contains("create") {
403 if let Some((_, batch_data)) = self.pending_batch_place_requests.remove(req_id) {
404 self.process_batch_place_errors(batch_data, batch_errors, result);
405 } else {
406 tracing::debug!(
407 req_id = %req_id,
408 "Batch place response received but no pending request found"
409 );
410 }
411 } else if resp.op.contains("cancel") {
412 if let Some((_, batch_data)) = self.pending_batch_cancel_requests.remove(req_id) {
413 self.process_batch_cancel_errors(batch_data, batch_errors, result);
414 } else {
415 tracing::debug!(
416 req_id = %req_id,
417 "Batch cancel response received but no pending request found"
418 );
419 }
420 }
421 }
422
423 fn process_batch_place_errors(
425 &self,
426 batch_data: Vec<BatchOrderData>,
427 errors: Vec<BybitBatchOrderError>,
428 result: &mut Vec<NautilusWsMessage>,
429 ) {
430 let Some(account_id) = self.account_id else {
431 tracing::error!("Cannot create OrderRejected events: account_id is None");
432 return;
433 };
434
435 let clock = get_atomic_clock_realtime();
436 let ts_init = clock.get_time_ns();
437
438 for (idx, (client_order_id, (_, trader_id, strategy_id, instrument_id))) in
439 batch_data.into_iter().enumerate()
440 {
441 if let Some(error) = errors.get(idx)
442 && error.code != 0
443 {
444 tracing::warn!(
445 client_order_id = %client_order_id,
446 error_code = error.code,
447 error_msg = %error.msg,
448 "Batch order rejected"
449 );
450
451 let rejected = OrderRejected::new(
452 trader_id,
453 strategy_id,
454 instrument_id,
455 client_order_id,
456 account_id,
457 Ustr::from(&error.msg),
458 UUID4::new(),
459 ts_init,
460 ts_init,
461 false,
462 false,
463 );
464 result.push(NautilusWsMessage::OrderRejected(rejected));
465 }
466 }
467 }
468
469 fn process_batch_cancel_errors(
471 &self,
472 batch_data: Vec<BatchCancelData>,
473 errors: Vec<BybitBatchOrderError>,
474 result: &mut Vec<NautilusWsMessage>,
475 ) {
476 let clock = get_atomic_clock_realtime();
477 let ts_init = clock.get_time_ns();
478
479 for (idx, (client_order_id, (_, trader_id, strategy_id, instrument_id, venue_order_id))) in
480 batch_data.into_iter().enumerate()
481 {
482 if let Some(error) = errors.get(idx)
483 && error.code != 0
484 {
485 tracing::warn!(
486 client_order_id = %client_order_id,
487 error_code = error.code,
488 error_msg = %error.msg,
489 "Batch cancel rejected"
490 );
491
492 let rejected = OrderCancelRejected::new(
493 trader_id,
494 strategy_id,
495 instrument_id,
496 client_order_id,
497 Ustr::from(&error.msg),
498 UUID4::new(),
499 ts_init,
500 ts_init,
501 false,
502 venue_order_id,
503 self.account_id,
504 );
505 result.push(NautilusWsMessage::OrderCancelRejected(rejected));
506 }
507 }
508 }
509
510 pub(super) async fn next(&mut self) -> Option<NautilusWsMessage> {
511 let clock = get_atomic_clock_realtime();
512
513 loop {
514 if let Some(msg) = self.message_queue.pop_front() {
515 return Some(msg);
516 }
517
518 tokio::select! {
519 Some(cmd) = self.cmd_rx.recv() => {
520 match cmd {
521 HandlerCommand::SetClient(client) => {
522 tracing::debug!("WebSocketClient received by handler");
523 self.client = Some(client);
524 }
525 HandlerCommand::Disconnect => {
526 tracing::debug!("Disconnect command received");
527
528 if let Some(client) = self.client.take() {
529 client.disconnect().await;
530 }
531 }
532 HandlerCommand::Authenticate { payload } => {
533 tracing::debug!("Authenticate command received");
534 if let Err(e) = self.send_with_retry(payload).await {
535 tracing::error!("Failed to send authentication after retries: {e}");
536 }
537 }
538 HandlerCommand::Subscribe { topics } => {
539 for topic in topics {
540 tracing::debug!(topic = %topic, "Subscribing to topic");
541 if let Err(e) = self.send_with_retry(topic.clone()).await {
542 tracing::error!(topic = %topic, error = %e, "Failed to send subscription after retries");
543 }
544 }
545 }
546 HandlerCommand::Unsubscribe { topics } => {
547 for topic in topics {
548 tracing::debug!(topic = %topic, "Unsubscribing from topic");
549 if let Err(e) = self.send_with_retry(topic.clone()).await {
550 tracing::error!(topic = %topic, error = %e, "Failed to send unsubscription after retries");
551 }
552 }
553 }
554 HandlerCommand::SendText { payload } => {
555 if let Err(e) = self.send_with_retry(payload).await {
556 tracing::error!("Error sending text with retry: {e}");
557 }
558 }
559 HandlerCommand::InitializeInstruments(instruments) => {
560 for inst in instruments {
561 self.instruments_cache.insert(inst.symbol().inner(), inst);
562 }
563 }
564 HandlerCommand::UpdateInstrument(inst) => {
565 self.instruments_cache.insert(inst.symbol().inner(), inst);
566 }
567 HandlerCommand::RegisterBatchPlace { req_id, orders } => {
568 tracing::debug!(
569 req_id = %req_id,
570 num_orders = orders.len(),
571 "Registering batch place request"
572 );
573 self.pending_batch_place_requests.insert(req_id, orders);
574 }
575 HandlerCommand::RegisterBatchCancel { req_id, cancels } => {
576 tracing::debug!(
577 req_id = %req_id,
578 num_cancels = cancels.len(),
579 "Registering batch cancel request"
580 );
581 self.pending_batch_cancel_requests.insert(req_id, cancels);
582 }
583 HandlerCommand::PlaceOrder {
584 params,
585 client_order_id,
586 trader_id,
587 strategy_id,
588 instrument_id,
589 } => {
590 let request_id = self.generate_unique_request_id();
591
592 self.pending_place_requests.insert(
593 request_id.clone(),
594 (client_order_id, trader_id, strategy_id, instrument_id),
595 );
596
597 let referer = if self.include_referer_header(params.time_in_force) {
598 Some(BYBIT_NAUTILUS_BROKER_ID.to_string())
599 } else {
600 None
601 };
602
603 let request = BybitWsRequest {
604 req_id: Some(request_id.clone()),
605 op: BybitWsOrderRequestOp::Create,
606 header: BybitWsHeader::with_referer(referer),
607 args: vec![params],
608 };
609
610 if let Ok(payload) = serde_json::to_string(&request)
611 && let Err(e) = self.send_with_retry(payload).await
612 {
613 tracing::error!("Failed to send place order after retries: {e}");
614 self.pending_place_requests.remove(&request_id);
615 }
616 }
617 HandlerCommand::AmendOrder {
618 params,
619 client_order_id,
620 trader_id,
621 strategy_id,
622 instrument_id,
623 venue_order_id,
624 } => {
625 let request_id = self.generate_unique_request_id();
626
627 self.pending_amend_requests.insert(
628 request_id.clone(),
629 (client_order_id, trader_id, strategy_id, instrument_id, venue_order_id),
630 );
631
632 let request = BybitWsRequest {
633 req_id: Some(request_id.clone()),
634 op: BybitWsOrderRequestOp::Amend,
635 header: BybitWsHeader::now(),
636 args: vec![params],
637 };
638
639 if let Ok(payload) = serde_json::to_string(&request)
640 && let Err(e) = self.send_with_retry(payload).await
641 {
642 tracing::error!("Failed to send amend order after retries: {e}");
643 self.pending_amend_requests.remove(&request_id);
644 }
645 }
646 HandlerCommand::CancelOrder {
647 params,
648 client_order_id,
649 trader_id,
650 strategy_id,
651 instrument_id,
652 venue_order_id,
653 } => {
654 let request_id = self.generate_unique_request_id();
655
656 self.pending_cancel_requests.insert(
657 request_id.clone(),
658 (client_order_id, trader_id, strategy_id, instrument_id, venue_order_id),
659 );
660
661 let request = BybitWsRequest {
662 req_id: Some(request_id.clone()),
663 op: BybitWsOrderRequestOp::Cancel,
664 header: BybitWsHeader::now(),
665 args: vec![params],
666 };
667
668 if let Ok(payload) = serde_json::to_string(&request)
669 && let Err(e) = self.send_with_retry(payload).await
670 {
671 tracing::error!("Failed to send cancel order after retries: {e}");
672 self.pending_cancel_requests.remove(&request_id);
673 }
674 }
675 }
676
677 continue;
678 }
679
680 _ = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
681 if self.signal.load(Ordering::Relaxed) {
682 tracing::debug!("Stop signal received during idle period");
683 return None;
684 }
685 continue;
686 }
687
688 msg = self.raw_rx.recv() => {
689 let msg = match msg {
690 Some(msg) => msg,
691 None => {
692 tracing::debug!("WebSocket stream closed");
693 return None;
694 }
695 };
696
697 if let Message::Ping(data) = &msg {
698 tracing::trace!("Received ping frame with {} bytes", data.len());
699
700 if let Some(client) = &self.client
701 && let Err(e) = client.send_pong(data.to_vec()).await
702 {
703 tracing::warn!(error = %e, "Failed to send pong frame");
704 }
705 continue;
706 }
707
708 let event = match Self::parse_raw_message(msg) {
709 Some(event) => event,
710 None => continue,
711 };
712
713 if self.signal.load(Ordering::Relaxed) {
714 tracing::debug!("Stop signal received");
715 return None;
716 }
717
718 let ts_init = clock.get_time_ns();
719 let instruments = self.instruments_cache.clone();
720 let funding_cache = Arc::clone(&self.funding_cache);
721 let nautilus_messages = self.parse_to_nautilus_messages(
722 event,
723 &instruments,
724 self.account_id,
725 self.product_type,
726 &funding_cache,
727 ts_init,
728 )
729 .await;
730
731 self.message_queue.extend(nautilus_messages);
733 }
734 }
735 }
736 }
737
738 fn parse_raw_message(msg: Message) -> Option<BybitWsMessage> {
739 use serde_json::Value;
740
741 match msg {
742 Message::Text(text) => {
743 if text == nautilus_network::RECONNECTED {
744 tracing::info!("Received WebSocket reconnected signal");
745 return Some(BybitWsMessage::Reconnected);
746 }
747
748 if text.trim().eq_ignore_ascii_case("pong") {
749 return None;
750 }
751
752 tracing::trace!("Raw websocket message: {text}");
753
754 let value: Value = match serde_json::from_str(&text) {
755 Ok(v) => v,
756 Err(e) => {
757 tracing::error!("Failed to parse WebSocket message: {e}: {text}");
758 return None;
759 }
760 };
761
762 Some(classify_bybit_message(value))
763 }
764 Message::Binary(msg) => {
765 tracing::debug!("Raw binary: {msg:?}");
766 None
767 }
768 Message::Close(_) => {
769 tracing::debug!("Received close message, waiting for reconnection");
770 None
771 }
772 _ => None,
773 }
774 }
775
776 #[allow(clippy::too_many_arguments)]
777 async fn parse_to_nautilus_messages(
778 &mut self,
779 msg: BybitWsMessage,
780 instruments: &AHashMap<Ustr, InstrumentAny>,
781 account_id: Option<AccountId>,
782 product_type: Option<BybitProductType>,
783 funding_cache: &FundingCache,
784 ts_init: UnixNanos,
785 ) -> Vec<NautilusWsMessage> {
786 let mut result = Vec::new();
787
788 match msg {
789 BybitWsMessage::Orderbook(msg) => {
790 let raw_symbol = msg.data.s;
791 let symbol =
792 product_type.map_or(raw_symbol, |pt| make_bybit_symbol(raw_symbol, pt));
793
794 if let Some(instrument) = instruments.get(&symbol) {
795 match parse_orderbook_deltas(&msg, instrument, ts_init) {
796 Ok(deltas) => result.push(NautilusWsMessage::Deltas(deltas)),
797 Err(e) => tracing::error!("Error parsing orderbook deltas: {e}"),
798 }
799
800 if let Some(depth_str) = msg.topic.as_str().split('.').nth(1)
802 && depth_str == "1"
803 {
804 let instrument_id = instrument.id();
805 let last_quote = self.quote_cache.get(&instrument_id);
806
807 match parse_orderbook_quote(&msg, instrument, last_quote, ts_init) {
808 Ok(quote) => {
809 self.quote_cache.insert(instrument_id, quote);
810 result.push(NautilusWsMessage::Data(vec![Data::Quote(quote)]));
811 }
812 Err(e) => tracing::debug!("Skipping orderbook quote: {e}"),
813 }
814 }
815 } else {
816 tracing::debug!(raw_symbol = %raw_symbol, full_symbol = %symbol, "No instrument found for symbol in Orderbook message");
817 }
818 }
819 BybitWsMessage::Trade(msg) => {
820 let mut data_vec = Vec::new();
821 for trade in &msg.data {
822 let raw_symbol = trade.s;
823 let symbol =
824 product_type.map_or(raw_symbol, |pt| make_bybit_symbol(raw_symbol, pt));
825
826 if let Some(instrument) = instruments.get(&symbol) {
827 match parse_ws_trade_tick(trade, instrument, ts_init) {
828 Ok(tick) => data_vec.push(Data::Trade(tick)),
829 Err(e) => tracing::error!("Error parsing trade tick: {e}"),
830 }
831 } else {
832 tracing::debug!(raw_symbol = %raw_symbol, full_symbol = %symbol, "No instrument found for symbol in Trade message");
833 }
834 }
835
836 if !data_vec.is_empty() {
837 result.push(NautilusWsMessage::Data(data_vec));
838 }
839 }
840 BybitWsMessage::Kline(msg) => {
841 let (interval_str, raw_symbol) = match parse_kline_topic(&msg.topic) {
842 Ok(parts) => parts,
843 Err(e) => {
844 tracing::warn!("Failed to parse kline topic: {e}");
845 return result;
846 }
847 };
848
849 let symbol = product_type
850 .map_or_else(|| raw_symbol.into(), |pt| make_bybit_symbol(raw_symbol, pt));
851
852 if let Some(instrument) = instruments.get(&symbol) {
853 let (step, aggregation) = match interval_str.parse::<usize>() {
854 Ok(minutes) if minutes > 0 => (minutes, BarAggregation::Minute),
855 _ => {
856 tracing::warn!("Unsupported kline interval: {}", interval_str);
857 return result;
858 }
859 };
860
861 if let Some(non_zero_step) = NonZero::new(step) {
862 let bar_spec = BarSpecification {
863 step: non_zero_step,
864 aggregation,
865 price_type: PriceType::Last,
866 };
867 let bar_type =
868 BarType::new(instrument.id(), bar_spec, AggregationSource::External);
869
870 let mut data_vec = Vec::new();
871 for kline in &msg.data {
872 if !kline.confirm {
874 continue;
875 }
876 match parse_ws_kline_bar(kline, instrument, bar_type, false, ts_init) {
877 Ok(bar) => data_vec.push(Data::Bar(bar)),
878 Err(e) => tracing::error!("Error parsing kline to bar: {e}"),
879 }
880 }
881 if !data_vec.is_empty() {
882 result.push(NautilusWsMessage::Data(data_vec));
883 }
884 } else {
885 tracing::error!("Invalid step value: {}", step);
886 }
887 } else {
888 tracing::debug!(raw_symbol = %raw_symbol, full_symbol = %symbol, "No instrument found for symbol in Kline message");
889 }
890 }
891 BybitWsMessage::TickerLinear(msg) => {
892 let raw_symbol = msg.data.symbol;
893 let symbol =
894 product_type.map_or(raw_symbol, |pt| make_bybit_symbol(raw_symbol, pt));
895
896 if let Some(instrument) = instruments.get(&symbol) {
897 let instrument_id = instrument.id();
898 let ts_event = parse_millis_i64(msg.ts, "ticker.ts").unwrap_or(ts_init);
899 let price_precision = instrument.price_precision();
900 let size_precision = instrument.size_precision();
901
902 let bid_price = msg
904 .data
905 .bid1_price
906 .as_deref()
907 .map(|s| parse_price_with_precision(s, price_precision, "bid1Price"))
908 .transpose();
909 let ask_price = msg
910 .data
911 .ask1_price
912 .as_deref()
913 .map(|s| parse_price_with_precision(s, price_precision, "ask1Price"))
914 .transpose();
915 let bid_size = msg
916 .data
917 .bid1_size
918 .as_deref()
919 .map(|s| parse_quantity_with_precision(s, size_precision, "bid1Size"))
920 .transpose();
921 let ask_size = msg
922 .data
923 .ask1_size
924 .as_deref()
925 .map(|s| parse_quantity_with_precision(s, size_precision, "ask1Size"))
926 .transpose();
927
928 match (bid_price, ask_price, bid_size, ask_size) {
929 (Ok(bp), Ok(ap), Ok(bs), Ok(as_)) => {
930 match self.quote_cache.process(
931 instrument_id,
932 bp,
933 ap,
934 bs,
935 as_,
936 ts_event,
937 ts_init,
938 ) {
939 Ok(quote) => {
940 result.push(NautilusWsMessage::Data(vec![Data::Quote(quote)]));
941 }
942 Err(e) => {
943 let raw_data = serde_json::to_string(&msg.data)
944 .unwrap_or_else(|_| "<failed to serialize>".to_string());
945 tracing::debug!(
946 "Skipping partial ticker update: {e}, raw_data: {raw_data}"
947 );
948 }
949 }
950 }
951 _ => {
952 let raw_data = serde_json::to_string(&msg.data)
953 .unwrap_or_else(|_| "<failed to serialize>".to_string());
954 tracing::warn!(
955 "Failed to parse ticker fields, skipping update, raw_data: {raw_data}"
956 );
957 }
958 }
959
960 if msg.data.funding_rate.is_some() && msg.data.next_funding_time.is_some() {
962 let should_publish = {
963 let cache = funding_cache.read().await;
964 match cache.get(&symbol) {
965 Some((cached_rate, cached_time)) => {
966 cached_rate.as_ref() != msg.data.funding_rate.as_ref()
967 || cached_time.as_ref()
968 != msg.data.next_funding_time.as_ref()
969 }
970 None => true,
971 }
972 };
973
974 if should_publish {
975 match parse_ticker_linear_funding(
976 &msg.data,
977 instrument_id,
978 ts_event,
979 ts_init,
980 ) {
981 Ok(funding) => {
982 let cache_key = (
983 msg.data.funding_rate.clone(),
984 msg.data.next_funding_time.clone(),
985 );
986 funding_cache.write().await.insert(symbol, cache_key);
987 result.push(NautilusWsMessage::FundingRates(vec![funding]));
988 }
989 Err(e) => {
990 tracing::debug!("Skipping funding rate update: {e}");
991 }
992 }
993 }
994 }
995 } else {
996 tracing::debug!(raw_symbol = %raw_symbol, full_symbol = %symbol, "No instrument found for symbol in TickerLinear message");
997 }
998 }
999 BybitWsMessage::TickerOption(msg) => {
1000 let raw_symbol = &msg.data.symbol;
1001 let symbol = product_type.map_or_else(
1002 || raw_symbol.as_str().into(),
1003 |pt| make_bybit_symbol(raw_symbol, pt),
1004 );
1005
1006 if let Some(instrument) = instruments.get(&symbol) {
1007 let instrument_id = instrument.id();
1008 let ts_event = parse_millis_i64(msg.ts, "ticker.ts").unwrap_or(ts_init);
1009 let price_precision = instrument.price_precision();
1010 let size_precision = instrument.size_precision();
1011
1012 let bid_price = parse_price_with_precision(
1014 &msg.data.bid_price,
1015 price_precision,
1016 "bidPrice",
1017 );
1018 let ask_price = parse_price_with_precision(
1019 &msg.data.ask_price,
1020 price_precision,
1021 "askPrice",
1022 );
1023 let bid_size = parse_quantity_with_precision(
1024 &msg.data.bid_size,
1025 size_precision,
1026 "bidSize",
1027 );
1028 let ask_size = parse_quantity_with_precision(
1029 &msg.data.ask_size,
1030 size_precision,
1031 "askSize",
1032 );
1033
1034 match (bid_price, ask_price, bid_size, ask_size) {
1035 (Ok(bp), Ok(ap), Ok(bs), Ok(as_)) => {
1036 match self.quote_cache.process(
1037 instrument_id,
1038 Some(bp),
1039 Some(ap),
1040 Some(bs),
1041 Some(as_),
1042 ts_event,
1043 ts_init,
1044 ) {
1045 Ok(quote) => {
1046 result.push(NautilusWsMessage::Data(vec![Data::Quote(quote)]));
1047 }
1048 Err(e) => {
1049 let raw_data = serde_json::to_string(&msg.data)
1050 .unwrap_or_else(|_| "<failed to serialize>".to_string());
1051 tracing::debug!(
1052 "Skipping partial ticker update: {e}, raw_data: {raw_data}"
1053 );
1054 }
1055 }
1056 }
1057 _ => {
1058 let raw_data = serde_json::to_string(&msg.data)
1059 .unwrap_or_else(|_| "<failed to serialize>".to_string());
1060 tracing::warn!(
1061 "Failed to parse ticker fields, skipping update, raw_data: {raw_data}"
1062 );
1063 }
1064 }
1065 } else {
1066 tracing::debug!(raw_symbol = %raw_symbol, full_symbol = %symbol, "No instrument found for symbol in TickerOption message");
1067 }
1068 }
1069 BybitWsMessage::AccountOrder(msg) => {
1070 if let Some(account_id) = account_id {
1071 let mut reports = Vec::new();
1072 for order in &msg.data {
1073 let raw_symbol = order.symbol;
1074 let symbol = make_bybit_symbol(raw_symbol, order.category);
1075
1076 if let Some(instrument) = instruments.get(&symbol) {
1077 match parse_ws_order_status_report(
1078 order, instrument, account_id, ts_init,
1079 ) {
1080 Ok(report) => reports.push(report),
1081 Err(e) => tracing::error!("Error parsing order status report: {e}"),
1082 }
1083 } else {
1084 tracing::debug!(raw_symbol = %raw_symbol, full_symbol = %symbol, "No instrument found for symbol in AccountOrder message");
1085 }
1086 }
1087 if !reports.is_empty() {
1088 result.push(NautilusWsMessage::OrderStatusReports(reports));
1089 }
1090 }
1091 }
1092 BybitWsMessage::AccountExecution(msg) => {
1093 if let Some(account_id) = account_id {
1094 let mut reports = Vec::new();
1095 for execution in &msg.data {
1096 let raw_symbol = execution.symbol;
1097 let symbol = make_bybit_symbol(raw_symbol, execution.category);
1098
1099 if let Some(instrument) = instruments.get(&symbol) {
1100 match parse_ws_fill_report(execution, account_id, instrument, ts_init) {
1101 Ok(report) => reports.push(report),
1102 Err(e) => tracing::error!("Error parsing fill report: {e}"),
1103 }
1104 } else {
1105 tracing::debug!(raw_symbol = %raw_symbol, full_symbol = %symbol, "No instrument found for symbol in AccountExecution message");
1106 }
1107 }
1108 if !reports.is_empty() {
1109 result.push(NautilusWsMessage::FillReports(reports));
1110 }
1111 }
1112 }
1113 BybitWsMessage::AccountPosition(msg) => {
1114 if let Some(account_id) = account_id {
1115 for position in &msg.data {
1116 let raw_symbol = position.symbol;
1117 let symbol = make_bybit_symbol(raw_symbol, position.category);
1118
1119 if let Some(instrument) = instruments.get(&symbol) {
1120 match parse_ws_position_status_report(
1121 position, account_id, instrument, ts_init,
1122 ) {
1123 Ok(report) => {
1124 result.push(NautilusWsMessage::PositionStatusReport(report));
1125 }
1126 Err(e) => {
1127 tracing::error!("Error parsing position status report: {e}");
1128 }
1129 }
1130 } else {
1131 tracing::debug!(raw_symbol = %raw_symbol, full_symbol = %symbol, "No instrument found for symbol in AccountPosition message");
1132 }
1133 }
1134 }
1135 }
1136 BybitWsMessage::AccountWallet(msg) => {
1137 if let Some(account_id) = account_id {
1138 for wallet in &msg.data {
1139 let ts_event = UnixNanos::from(msg.creation_time as u64 * 1_000_000);
1140
1141 match parse_ws_account_state(wallet, account_id, ts_event, ts_init) {
1142 Ok(state) => result.push(NautilusWsMessage::AccountState(state)),
1143 Err(e) => tracing::error!("Error parsing account state: {e}"),
1144 }
1145 }
1146 }
1147 }
1148 BybitWsMessage::OrderResponse(resp) => {
1149 if resp.ret_code == 0 {
1150 tracing::debug!(op = %resp.op, ret_msg = %resp.ret_msg, "Order operation successful");
1151
1152 if resp.op.contains("batch") {
1153 self.handle_batch_response(&resp, &mut result);
1154 } else if let Some(req_id) = &resp.req_id {
1155 if resp.op.contains("create") {
1156 self.pending_place_requests.remove(req_id);
1157 } else if resp.op.contains("cancel") {
1158 self.pending_cancel_requests.remove(req_id);
1159 } else if resp.op.contains("amend") {
1160 self.pending_amend_requests.remove(req_id);
1161 }
1162 } else if let Some(order_link_id) =
1163 resp.data.get("orderLinkId").and_then(|v| v.as_str())
1164 {
1165 let client_order_id = ClientOrderId::from(order_link_id);
1167 if resp.op.contains("create") {
1168 self.find_and_remove_place_request_by_client_order_id(&client_order_id);
1169 } else if resp.op.contains("cancel") {
1170 self.find_and_remove_cancel_request_by_client_order_id(
1171 &client_order_id,
1172 );
1173 } else if resp.op.contains("amend") {
1174 self.find_and_remove_amend_request_by_client_order_id(&client_order_id);
1175 }
1176 }
1177 } else if let Some(req_id) = &resp.req_id {
1178 let clock = get_atomic_clock_realtime();
1179 let ts_init = clock.get_time_ns();
1180
1181 if resp.op.contains("batch") {
1182 self.handle_batch_failure(
1183 req_id,
1184 &resp.ret_msg,
1185 &resp.op,
1186 ts_init,
1187 &mut result,
1188 );
1189 } else if resp.op.contains("create")
1190 && let Some((_, (client_order_id, trader_id, strategy_id, instrument_id))) =
1191 self.pending_place_requests.remove(req_id)
1192 {
1193 let Some(account_id) = self.account_id else {
1194 tracing::error!(
1195 request_id = %req_id,
1196 reason = %resp.ret_msg,
1197 "Cannot create OrderRejected event: account_id is None"
1198 );
1199 return result;
1200 };
1201
1202 let rejected = OrderRejected::new(
1203 trader_id,
1204 strategy_id,
1205 instrument_id,
1206 client_order_id,
1207 account_id,
1208 Ustr::from(&resp.ret_msg),
1209 UUID4::new(),
1210 ts_init,
1211 ts_init,
1212 false,
1213 false,
1214 );
1215 result.push(NautilusWsMessage::OrderRejected(rejected));
1216 } else if resp.op.contains("cancel")
1217 && let Some((
1218 _,
1219 (
1220 client_order_id,
1221 trader_id,
1222 strategy_id,
1223 instrument_id,
1224 venue_order_id,
1225 ),
1226 )) = self.pending_cancel_requests.remove(req_id)
1227 {
1228 let rejected = OrderCancelRejected::new(
1229 trader_id,
1230 strategy_id,
1231 instrument_id,
1232 client_order_id,
1233 Ustr::from(&resp.ret_msg),
1234 UUID4::new(),
1235 ts_init,
1236 ts_init,
1237 false,
1238 venue_order_id,
1239 self.account_id,
1240 );
1241 result.push(NautilusWsMessage::OrderCancelRejected(rejected));
1242 } else if resp.op.contains("amend")
1243 && let Some((
1244 _,
1245 (
1246 client_order_id,
1247 trader_id,
1248 strategy_id,
1249 instrument_id,
1250 venue_order_id,
1251 ),
1252 )) = self.pending_amend_requests.remove(req_id)
1253 {
1254 let rejected = OrderModifyRejected::new(
1255 trader_id,
1256 strategy_id,
1257 instrument_id,
1258 client_order_id,
1259 Ustr::from(&resp.ret_msg),
1260 UUID4::new(),
1261 ts_init,
1262 ts_init,
1263 false,
1264 venue_order_id,
1265 self.account_id,
1266 );
1267 result.push(NautilusWsMessage::OrderModifyRejected(rejected));
1268 }
1269 } else if let Some(order_link_id) =
1270 resp.data.get("orderLinkId").and_then(|v| v.as_str())
1271 {
1272 let clock = get_atomic_clock_realtime();
1274 let ts_init = clock.get_time_ns();
1275 let client_order_id = ClientOrderId::from(order_link_id);
1276
1277 if resp.op.contains("create") {
1278 if let Some((_, (_, trader_id, strategy_id, instrument_id))) =
1279 self.find_and_remove_place_request_by_client_order_id(&client_order_id)
1280 {
1281 let Some(account_id) = self.account_id else {
1282 tracing::error!(
1283 client_order_id = %client_order_id,
1284 reason = %resp.ret_msg,
1285 "Cannot create OrderRejected event: account_id is None"
1286 );
1287 return result;
1288 };
1289
1290 let rejected = OrderRejected::new(
1291 trader_id,
1292 strategy_id,
1293 instrument_id,
1294 client_order_id,
1295 account_id,
1296 Ustr::from(&resp.ret_msg),
1297 UUID4::new(),
1298 ts_init,
1299 ts_init,
1300 false,
1301 false,
1302 );
1303 result.push(NautilusWsMessage::OrderRejected(rejected));
1304 }
1305 } else if resp.op.contains("cancel") {
1306 if let Some((
1307 _,
1308 (_, trader_id, strategy_id, instrument_id, venue_order_id),
1309 )) =
1310 self.find_and_remove_cancel_request_by_client_order_id(&client_order_id)
1311 {
1312 let rejected = OrderCancelRejected::new(
1313 trader_id,
1314 strategy_id,
1315 instrument_id,
1316 client_order_id,
1317 Ustr::from(&resp.ret_msg),
1318 UUID4::new(),
1319 ts_init,
1320 ts_init,
1321 false,
1322 venue_order_id,
1323 self.account_id,
1324 );
1325 result.push(NautilusWsMessage::OrderCancelRejected(rejected));
1326 }
1327 } else if resp.op.contains("amend")
1328 && let Some((_, (_, trader_id, strategy_id, instrument_id, venue_order_id))) =
1329 self.find_and_remove_amend_request_by_client_order_id(&client_order_id)
1330 {
1331 let rejected = OrderModifyRejected::new(
1332 trader_id,
1333 strategy_id,
1334 instrument_id,
1335 client_order_id,
1336 Ustr::from(&resp.ret_msg),
1337 UUID4::new(),
1338 ts_init,
1339 ts_init,
1340 false,
1341 venue_order_id,
1342 self.account_id,
1343 );
1344 result.push(NautilusWsMessage::OrderModifyRejected(rejected));
1345 }
1346 } else {
1347 tracing::warn!(
1348 op = %resp.op,
1349 ret_code = resp.ret_code,
1350 ret_msg = %resp.ret_msg,
1351 "Order operation failed but request_id could not be extracted from response"
1352 );
1353 }
1354 }
1355 BybitWsMessage::Auth(auth_response) => {
1356 let is_success =
1357 auth_response.success.unwrap_or(false) || (auth_response.ret_code == Some(0));
1358
1359 if is_success {
1360 self.auth_tracker.succeed();
1361 tracing::info!("WebSocket authenticated");
1362 result.push(NautilusWsMessage::Authenticated);
1363 } else {
1364 let error_msg = auth_response
1365 .ret_msg
1366 .as_deref()
1367 .unwrap_or("Authentication rejected");
1368 self.auth_tracker.fail(error_msg);
1369 tracing::error!(error = error_msg, "WebSocket authentication failed");
1370 result.push(NautilusWsMessage::Error(BybitWebSocketError::from_message(
1371 error_msg.to_string(),
1372 )));
1373 }
1374 }
1375 BybitWsMessage::Error(err) => {
1376 result.push(NautilusWsMessage::Error(err));
1377 }
1378 BybitWsMessage::Reconnected => {
1379 self.quote_cache.clear();
1380 result.push(NautilusWsMessage::Reconnected);
1381 }
1382 BybitWsMessage::Subscription(sub_msg) => {
1383 let pending_topics = self.subscriptions.pending_subscribe_topics();
1384 match sub_msg.op {
1385 BybitWsOperation::Subscribe => {
1386 if sub_msg.success {
1387 for topic in pending_topics {
1388 self.subscriptions.confirm_subscribe(&topic);
1389 tracing::debug!(topic = topic, "Subscription confirmed");
1390 }
1391 } else {
1392 for topic in pending_topics {
1393 self.subscriptions.mark_failure(&topic);
1394 tracing::warn!(
1395 topic = topic,
1396 error = ?sub_msg.ret_msg,
1397 "Subscription failed, will retry on reconnect"
1398 );
1399 }
1400 }
1401 }
1402 BybitWsOperation::Unsubscribe => {
1403 let pending_unsub = self.subscriptions.pending_unsubscribe_topics();
1404 if sub_msg.success {
1405 for topic in pending_unsub {
1406 self.subscriptions.confirm_unsubscribe(&topic);
1407 tracing::debug!(topic = topic, "Unsubscription confirmed");
1408 }
1409 } else {
1410 for topic in pending_unsub {
1411 tracing::warn!(
1412 topic = topic,
1413 error = ?sub_msg.ret_msg,
1414 "Unsubscription failed"
1415 );
1416 }
1417 }
1418 }
1419 _ => {}
1420 }
1421 }
1422 _ => {}
1423 }
1424
1425 result
1426 }
1427}
1428
1429pub fn classify_bybit_message(value: serde_json::Value) -> BybitWsMessage {
1433 if let Some(op_val) = value.get("op") {
1434 if let Ok(op) = serde_json::from_value::<BybitWsOperation>(op_val.clone())
1435 && op == BybitWsOperation::Auth
1436 && let Ok(auth) = serde_json::from_value::<BybitWsAuthResponse>(value.clone())
1437 {
1438 let is_success = auth.success.unwrap_or(false) || auth.ret_code.unwrap_or(-1) == 0;
1439 if is_success {
1440 return BybitWsMessage::Auth(auth);
1441 }
1442 let resp = BybitWsResponse {
1443 op: Some(auth.op.clone()),
1444 topic: None,
1445 success: auth.success,
1446 conn_id: auth.conn_id.clone(),
1447 req_id: None,
1448 ret_code: auth.ret_code,
1449 ret_msg: auth.ret_msg,
1450 };
1451 let error = BybitWebSocketError::from_response(&resp);
1452 return BybitWsMessage::Error(error);
1453 }
1454
1455 if let Some(op_str) = op_val.as_str()
1456 && op_str.starts_with("order.")
1457 {
1458 return serde_json::from_value::<BybitWsOrderResponse>(value.clone()).map_or_else(
1459 |_| BybitWsMessage::Raw(value),
1460 BybitWsMessage::OrderResponse,
1461 );
1462 }
1463 }
1464
1465 if let Some(success) = value.get("success").and_then(serde_json::Value::as_bool) {
1466 if success {
1467 return serde_json::from_value::<BybitWsSubscriptionMsg>(value.clone())
1468 .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::Subscription);
1469 }
1470 return serde_json::from_value::<BybitWsResponse>(value.clone()).map_or_else(
1471 |_| BybitWsMessage::Raw(value),
1472 |resp| {
1473 let error = BybitWebSocketError::from_response(&resp);
1474 BybitWsMessage::Error(error)
1475 },
1476 );
1477 }
1478
1479 if let Some(topic) = value.get("topic").and_then(serde_json::Value::as_str) {
1481 if topic.starts_with(BYBIT_TOPIC_ORDERBOOK) {
1482 return serde_json::from_value(value.clone())
1483 .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::Orderbook);
1484 }
1485
1486 if topic.contains(BYBIT_TOPIC_PUBLIC_TRADE) || topic.starts_with(BYBIT_TOPIC_TRADE) {
1487 return serde_json::from_value(value.clone())
1488 .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::Trade);
1489 }
1490
1491 if topic.starts_with(BYBIT_TOPIC_KLINE) {
1492 return serde_json::from_value(value.clone())
1493 .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::Kline);
1494 }
1495
1496 if topic.starts_with(BYBIT_TOPIC_TICKERS) {
1497 let is_option = value
1499 .get("data")
1500 .and_then(|d| d.get("symbol"))
1501 .and_then(|s| s.as_str())
1502 .is_some_and(|symbol| symbol.contains('-') && symbol.matches('-').count() >= 3);
1503
1504 if is_option {
1505 return serde_json::from_value(value.clone())
1506 .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::TickerOption);
1507 }
1508 return serde_json::from_value(value.clone())
1509 .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::TickerLinear);
1510 }
1511
1512 if topic.starts_with(BYBIT_TOPIC_ORDER) {
1513 return serde_json::from_value(value.clone())
1514 .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::AccountOrder);
1515 }
1516
1517 if topic.starts_with(BYBIT_TOPIC_EXECUTION) {
1518 return serde_json::from_value(value.clone()).map_or_else(
1519 |_| BybitWsMessage::Raw(value),
1520 BybitWsMessage::AccountExecution,
1521 );
1522 }
1523
1524 if topic.starts_with(BYBIT_TOPIC_WALLET) {
1525 return serde_json::from_value(value.clone()).map_or_else(
1526 |_| BybitWsMessage::Raw(value),
1527 BybitWsMessage::AccountWallet,
1528 );
1529 }
1530
1531 if topic.starts_with(BYBIT_TOPIC_POSITION) {
1532 return serde_json::from_value(value.clone()).map_or_else(
1533 |_| BybitWsMessage::Raw(value),
1534 BybitWsMessage::AccountPosition,
1535 );
1536 }
1537 }
1538
1539 BybitWsMessage::Raw(value)
1540}
1541
1542#[cfg(test)]
1543mod tests {
1544 use rstest::rstest;
1545
1546 use super::*;
1547 use crate::common::consts::BYBIT_WS_TOPIC_DELIMITER;
1548
1549 fn create_test_handler() -> FeedHandler {
1550 let signal = Arc::new(AtomicBool::new(false));
1551 let (_cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
1552 let (_raw_tx, raw_rx) = tokio::sync::mpsc::unbounded_channel();
1553 let (out_tx, _out_rx) = tokio::sync::mpsc::unbounded_channel();
1554 let auth_tracker = AuthTracker::new();
1555 let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER);
1556 let funding_cache = Arc::new(tokio::sync::RwLock::new(AHashMap::new()));
1557
1558 FeedHandler::new(
1559 signal,
1560 cmd_rx,
1561 raw_rx,
1562 out_tx,
1563 None,
1564 None,
1565 Arc::new(AtomicU8::new(0)),
1566 auth_tracker,
1567 subscriptions,
1568 funding_cache,
1569 )
1570 }
1571
1572 #[rstest]
1573 fn test_generate_unique_request_id_returns_different_ids() {
1574 let handler = create_test_handler();
1575
1576 let id1 = handler.generate_unique_request_id();
1577 let id2 = handler.generate_unique_request_id();
1578 let id3 = handler.generate_unique_request_id();
1579
1580 assert_ne!(id1, id2);
1581 assert_ne!(id2, id3);
1582 assert_ne!(id1, id3);
1583 }
1584
1585 #[rstest]
1586 fn test_generate_unique_request_id_produces_valid_uuids() {
1587 let handler = create_test_handler();
1588
1589 let id1 = handler.generate_unique_request_id();
1590 let id2 = handler.generate_unique_request_id();
1591
1592 assert!(UUID4::from(id1.as_str()).to_string() == id1);
1593 assert!(UUID4::from(id2.as_str()).to_string() == id2);
1594 }
1595
1596 #[rstest]
1597 fn test_multiple_place_orders_use_different_request_ids() {
1598 let handler = create_test_handler();
1599
1600 let req_id_1 = handler.generate_unique_request_id();
1601 let req_id_2 = handler.generate_unique_request_id();
1602 let req_id_3 = handler.generate_unique_request_id();
1603
1604 assert_ne!(req_id_1, req_id_2);
1605 assert_ne!(req_id_2, req_id_3);
1606 assert_ne!(req_id_1, req_id_3);
1607 }
1608
1609 #[rstest]
1610 fn test_multiple_amends_use_different_request_ids() {
1611 let handler = create_test_handler();
1612
1613 let req_id_1 = handler.generate_unique_request_id();
1615 let req_id_2 = handler.generate_unique_request_id();
1616 let req_id_3 = handler.generate_unique_request_id();
1617
1618 assert_ne!(
1619 req_id_1, req_id_2,
1620 "Multiple amends should generate different request IDs to avoid 'Duplicate reqId' errors"
1621 );
1622 assert_ne!(
1623 req_id_2, req_id_3,
1624 "Multiple amends should generate different request IDs to avoid 'Duplicate reqId' errors"
1625 );
1626 }
1627
1628 #[rstest]
1629 fn test_multiple_cancels_use_different_request_ids() {
1630 let handler = create_test_handler();
1631
1632 let req_id_1 = handler.generate_unique_request_id();
1633 let req_id_2 = handler.generate_unique_request_id();
1634
1635 assert_ne!(
1636 req_id_1, req_id_2,
1637 "Multiple cancels should generate different request IDs"
1638 );
1639 }
1640
1641 #[rstest]
1642 fn test_concurrent_request_id_generation() {
1643 let handler = create_test_handler();
1644
1645 let mut ids = std::collections::HashSet::new();
1646 for _ in 0..100 {
1647 let id = handler.generate_unique_request_id();
1648 assert!(
1649 ids.insert(id.clone()),
1650 "Generated duplicate request ID: {id}"
1651 );
1652 }
1653 assert_eq!(ids.len(), 100);
1654 }
1655}