1use std::{
19 collections::VecDeque,
20 num::NonZero,
21 sync::{
22 Arc,
23 atomic::{AtomicBool, AtomicU8, Ordering},
24 },
25};
26
27use ahash::AHashMap;
28use dashmap::DashMap;
29use nautilus_common::cache::quote::QuoteCache;
30use nautilus_core::{UUID4, nanos::UnixNanos, time::get_atomic_clock_realtime};
31use nautilus_model::{
32 data::{BarSpecification, BarType, Data},
33 enums::{AggregationSource, BarAggregation, PriceType},
34 events::{OrderCancelRejected, OrderModifyRejected, OrderRejected},
35 identifiers::{AccountId, ClientOrderId, InstrumentId, StrategyId, TraderId, VenueOrderId},
36 instruments::{Instrument, InstrumentAny},
37};
38use nautilus_network::{
39 retry::{RetryManager, create_websocket_retry_manager},
40 websocket::{AuthTracker, SubscriptionState, WebSocketClient},
41};
42use tokio_tungstenite::tungstenite::Message;
43use ustr::Ustr;
44
45use super::{
46 enums::BybitWsOperation,
47 error::{BybitWsError, create_bybit_timeout_error, should_retry_bybit_error},
48 messages::{
49 BybitWebSocketError, BybitWsAuthResponse, BybitWsHeader, BybitWsMessage, BybitWsRequest,
50 BybitWsResponse, BybitWsSubscriptionMsg, NautilusWsMessage,
51 },
52 parse::{
53 parse_kline_topic, parse_millis_i64, parse_orderbook_deltas, parse_orderbook_quote,
54 parse_ticker_linear_funding, parse_ws_account_state, parse_ws_fill_report,
55 parse_ws_kline_bar, parse_ws_order_status_report, parse_ws_position_status_report,
56 parse_ws_trade_tick,
57 },
58};
59use crate::{
60 common::{
61 consts::{
62 BYBIT_NAUTILUS_BROKER_ID, BYBIT_TOPIC_EXECUTION, BYBIT_TOPIC_KLINE, BYBIT_TOPIC_ORDER,
63 BYBIT_TOPIC_ORDERBOOK, BYBIT_TOPIC_POSITION, BYBIT_TOPIC_PUBLIC_TRADE,
64 BYBIT_TOPIC_TICKERS, BYBIT_TOPIC_TRADE, BYBIT_TOPIC_WALLET,
65 },
66 enums::{BybitProductType, BybitTimeInForce, BybitWsOrderRequestOp},
67 parse::{make_bybit_symbol, parse_price_with_precision, parse_quantity_with_precision},
68 },
69 websocket::messages::{
70 BybitBatchOrderError, BybitWsAmendOrderParams, BybitWsCancelOrderParams,
71 BybitWsOrderResponse, BybitWsPlaceOrderParams,
72 },
73};
74
75#[derive(Debug)]
77#[allow(
78 clippy::large_enum_variant,
79 reason = "Commands are ephemeral and immediately consumed"
80)]
81pub enum HandlerCommand {
82 SetClient(WebSocketClient),
83 Disconnect,
84 Authenticate {
85 payload: String,
86 },
87 Subscribe {
88 topics: Vec<String>,
89 },
90 Unsubscribe {
91 topics: Vec<String>,
92 },
93 SendText {
94 payload: String,
95 },
96 PlaceOrder {
97 params: BybitWsPlaceOrderParams,
98 client_order_id: ClientOrderId,
99 trader_id: TraderId,
100 strategy_id: StrategyId,
101 instrument_id: InstrumentId,
102 },
103 AmendOrder {
104 params: BybitWsAmendOrderParams,
105 client_order_id: ClientOrderId,
106 trader_id: TraderId,
107 strategy_id: StrategyId,
108 instrument_id: InstrumentId,
109 venue_order_id: Option<VenueOrderId>,
110 },
111 CancelOrder {
112 params: BybitWsCancelOrderParams,
113 client_order_id: ClientOrderId,
114 trader_id: TraderId,
115 strategy_id: StrategyId,
116 instrument_id: InstrumentId,
117 venue_order_id: Option<VenueOrderId>,
118 },
119 RegisterBatchPlace {
120 req_id: String,
121 orders: Vec<BatchOrderData>,
122 },
123 RegisterBatchCancel {
124 req_id: String,
125 cancels: Vec<BatchCancelData>,
126 },
127 InitializeInstruments(Vec<InstrumentAny>),
128 UpdateInstrument(InstrumentAny),
129}
130
131type FundingCache = Arc<tokio::sync::RwLock<AHashMap<Ustr, (Option<String>, Option<String>)>>>;
133
134type PlaceRequestData = (ClientOrderId, TraderId, StrategyId, InstrumentId);
136
137type CancelRequestData = (
139 ClientOrderId,
140 TraderId,
141 StrategyId,
142 InstrumentId,
143 Option<VenueOrderId>,
144);
145
146type AmendRequestData = (
148 ClientOrderId,
149 TraderId,
150 StrategyId,
151 InstrumentId,
152 Option<VenueOrderId>,
153);
154
155type BatchOrderData = (ClientOrderId, PlaceRequestData);
157
158type BatchCancelData = (ClientOrderId, CancelRequestData);
160
161pub(super) struct FeedHandler {
162 signal: Arc<AtomicBool>,
163 client: Option<WebSocketClient>,
164 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
165 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
166 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
167 auth_tracker: AuthTracker,
168 subscriptions: SubscriptionState,
169 instruments_cache: AHashMap<Ustr, InstrumentAny>,
170 account_id: Option<AccountId>,
171 mm_level: Arc<AtomicU8>,
172 product_type: Option<BybitProductType>,
173 bars_timestamp_on_close: bool,
174 quote_cache: QuoteCache,
175 funding_cache: FundingCache,
176 retry_manager: RetryManager<BybitWsError>,
177 pending_place_requests: DashMap<String, PlaceRequestData>,
178 pending_cancel_requests: DashMap<String, CancelRequestData>,
179 pending_amend_requests: DashMap<String, AmendRequestData>,
180 pending_batch_place_requests: DashMap<String, Vec<BatchOrderData>>,
181 pending_batch_cancel_requests: DashMap<String, Vec<BatchCancelData>>,
182 message_queue: VecDeque<NautilusWsMessage>,
183}
184
185impl FeedHandler {
186 #[allow(clippy::too_many_arguments)]
188 pub(super) fn new(
189 signal: Arc<AtomicBool>,
190 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
191 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
192 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
193 account_id: Option<AccountId>,
194 product_type: Option<BybitProductType>,
195 bars_timestamp_on_close: bool,
196 mm_level: Arc<AtomicU8>,
197 auth_tracker: AuthTracker,
198 subscriptions: SubscriptionState,
199 funding_cache: FundingCache,
200 ) -> Self {
201 Self {
202 signal,
203 client: None,
204 cmd_rx,
205 raw_rx,
206 out_tx,
207 auth_tracker,
208 subscriptions,
209 instruments_cache: AHashMap::new(),
210 account_id,
211 mm_level,
212 product_type,
213 bars_timestamp_on_close,
214 quote_cache: QuoteCache::new(),
215 funding_cache,
216 retry_manager: create_websocket_retry_manager(),
217 pending_place_requests: DashMap::new(),
218 pending_cancel_requests: DashMap::new(),
219 pending_amend_requests: DashMap::new(),
220 pending_batch_place_requests: DashMap::new(),
221 pending_batch_cancel_requests: DashMap::new(),
222 message_queue: VecDeque::new(),
223 }
224 }
225
226 pub(super) fn is_stopped(&self) -> bool {
227 self.signal.load(Ordering::Relaxed)
228 }
229
230 #[allow(dead_code)]
231 pub(super) fn send(&self, msg: NautilusWsMessage) -> Result<(), ()> {
232 self.out_tx.send(msg).map_err(|_| ())
233 }
234
235 fn generate_unique_request_id(&self) -> String {
236 UUID4::new().to_string()
237 }
238
239 fn find_and_remove_place_request_by_client_order_id(
240 &self,
241 client_order_id: &ClientOrderId,
242 ) -> Option<(String, PlaceRequestData)> {
243 self.pending_place_requests
244 .iter()
245 .find(|entry| entry.value().0 == *client_order_id)
246 .and_then(|entry| {
247 let key = entry.key().clone();
248 drop(entry);
249 self.pending_place_requests.remove(&key)
250 })
251 }
252
253 fn find_and_remove_cancel_request_by_client_order_id(
254 &self,
255 client_order_id: &ClientOrderId,
256 ) -> Option<(String, CancelRequestData)> {
257 self.pending_cancel_requests
258 .iter()
259 .find(|entry| entry.value().0 == *client_order_id)
260 .and_then(|entry| {
261 let key = entry.key().clone();
262 drop(entry);
263 self.pending_cancel_requests.remove(&key)
264 })
265 }
266
267 fn find_and_remove_amend_request_by_client_order_id(
268 &self,
269 client_order_id: &ClientOrderId,
270 ) -> Option<(String, AmendRequestData)> {
271 self.pending_amend_requests
272 .iter()
273 .find(|entry| entry.value().0 == *client_order_id)
274 .and_then(|entry| {
275 let key = entry.key().clone();
276 drop(entry);
277 self.pending_amend_requests.remove(&key)
278 })
279 }
280
281 fn include_referer_header(&self, time_in_force: Option<BybitTimeInForce>) -> bool {
282 let is_post_only = matches!(time_in_force, Some(BybitTimeInForce::PostOnly));
283 let mm_level = self.mm_level.load(Ordering::Relaxed);
284 !(is_post_only && mm_level > 0)
285 }
286
287 async fn send_with_retry(&self, payload: String) -> Result<(), BybitWsError> {
289 if let Some(client) = &self.client {
290 self.retry_manager
291 .execute_with_retry(
292 "websocket_send",
293 || {
294 let payload = payload.clone();
295 async move {
296 client
297 .send_text(payload, None)
298 .await
299 .map_err(|e| BybitWsError::Transport(format!("Send failed: {e}")))
300 }
301 },
302 should_retry_bybit_error,
303 create_bybit_timeout_error,
304 )
305 .await
306 } else {
307 Err(BybitWsError::ClientError(
308 "No active WebSocket client".to_string(),
309 ))
310 }
311 }
312
313 fn handle_batch_failure(
318 &self,
319 req_id: &str,
320 ret_msg: &str,
321 op: &str,
322 ts_init: UnixNanos,
323 result: &mut Vec<NautilusWsMessage>,
324 ) {
325 if op.contains("create") {
326 if let Some((_, batch_data)) = self.pending_batch_place_requests.remove(req_id) {
327 log::warn!(
328 "Batch place request failed: req_id={req_id}, ret_msg={ret_msg}, num_orders={}",
329 batch_data.len()
330 );
331
332 let Some(account_id) = self.account_id else {
333 log::error!("Cannot create OrderRejected events: account_id is None");
334 return;
335 };
336
337 let reason = Ustr::from(ret_msg);
338 for (client_order_id, (_, trader_id, strategy_id, instrument_id)) in batch_data {
339 let rejected = OrderRejected::new(
340 trader_id,
341 strategy_id,
342 instrument_id,
343 client_order_id,
344 account_id,
345 reason,
346 UUID4::new(),
347 ts_init,
348 ts_init,
349 false,
350 false,
351 );
352 result.push(NautilusWsMessage::OrderRejected(rejected));
353 }
354 }
355 } else if op.contains("cancel")
356 && let Some((_, batch_data)) = self.pending_batch_cancel_requests.remove(req_id)
357 {
358 log::warn!(
359 "Batch cancel request failed: req_id={req_id}, ret_msg={ret_msg}, num_cancels={}",
360 batch_data.len()
361 );
362
363 let reason = Ustr::from(ret_msg);
364 for (client_order_id, (_, trader_id, strategy_id, instrument_id, venue_order_id)) in
365 batch_data
366 {
367 let rejected = OrderCancelRejected::new(
368 trader_id,
369 strategy_id,
370 instrument_id,
371 client_order_id,
372 reason,
373 UUID4::new(),
374 ts_init,
375 ts_init,
376 false,
377 venue_order_id,
378 self.account_id,
379 );
380 result.push(NautilusWsMessage::OrderCancelRejected(rejected));
381 }
382 }
383 }
384
385 fn handle_batch_response(
387 &self,
388 resp: &BybitWsOrderResponse,
389 result: &mut Vec<NautilusWsMessage>,
390 ) {
391 let Some(req_id) = &resp.req_id else {
392 log::warn!(
393 "Batch response missing req_id - cannot correlate with pending requests: op={}",
394 resp.op
395 );
396 return;
397 };
398
399 let batch_errors = resp.extract_batch_errors();
400
401 if resp.op.contains("create") {
402 if let Some((_, batch_data)) = self.pending_batch_place_requests.remove(req_id) {
403 self.process_batch_place_errors(batch_data, batch_errors, result);
404 } else {
405 log::debug!(
406 "Batch place response received but no pending request found: req_id={req_id}"
407 );
408 }
409 } else if resp.op.contains("cancel") {
410 if let Some((_, batch_data)) = self.pending_batch_cancel_requests.remove(req_id) {
411 self.process_batch_cancel_errors(batch_data, batch_errors, result);
412 } else {
413 log::debug!(
414 "Batch cancel response received but no pending request found: req_id={req_id}"
415 );
416 }
417 }
418 }
419
420 fn process_batch_place_errors(
422 &self,
423 batch_data: Vec<BatchOrderData>,
424 errors: Vec<BybitBatchOrderError>,
425 result: &mut Vec<NautilusWsMessage>,
426 ) {
427 let Some(account_id) = self.account_id else {
428 log::error!("Cannot create OrderRejected events: account_id is None");
429 return;
430 };
431
432 let clock = get_atomic_clock_realtime();
433 let ts_init = clock.get_time_ns();
434
435 for (idx, (client_order_id, (_, trader_id, strategy_id, instrument_id))) in
436 batch_data.into_iter().enumerate()
437 {
438 if let Some(error) = errors.get(idx)
439 && error.code != 0
440 {
441 log::warn!(
442 "Batch order rejected: client_order_id={client_order_id}, error_code={}, error_msg={}",
443 error.code,
444 error.msg
445 );
446
447 let rejected = OrderRejected::new(
448 trader_id,
449 strategy_id,
450 instrument_id,
451 client_order_id,
452 account_id,
453 Ustr::from(&error.msg),
454 UUID4::new(),
455 ts_init,
456 ts_init,
457 false,
458 false,
459 );
460 result.push(NautilusWsMessage::OrderRejected(rejected));
461 }
462 }
463 }
464
465 fn process_batch_cancel_errors(
467 &self,
468 batch_data: Vec<BatchCancelData>,
469 errors: Vec<BybitBatchOrderError>,
470 result: &mut Vec<NautilusWsMessage>,
471 ) {
472 let clock = get_atomic_clock_realtime();
473 let ts_init = clock.get_time_ns();
474
475 for (idx, (client_order_id, (_, trader_id, strategy_id, instrument_id, venue_order_id))) in
476 batch_data.into_iter().enumerate()
477 {
478 if let Some(error) = errors.get(idx)
479 && error.code != 0
480 {
481 log::warn!(
482 "Batch cancel rejected: client_order_id={client_order_id}, error_code={}, error_msg={}",
483 error.code,
484 error.msg
485 );
486
487 let rejected = OrderCancelRejected::new(
488 trader_id,
489 strategy_id,
490 instrument_id,
491 client_order_id,
492 Ustr::from(&error.msg),
493 UUID4::new(),
494 ts_init,
495 ts_init,
496 false,
497 venue_order_id,
498 self.account_id,
499 );
500 result.push(NautilusWsMessage::OrderCancelRejected(rejected));
501 }
502 }
503 }
504
505 pub(super) async fn next(&mut self) -> Option<NautilusWsMessage> {
506 let clock = get_atomic_clock_realtime();
507
508 loop {
509 if let Some(msg) = self.message_queue.pop_front() {
510 return Some(msg);
511 }
512
513 tokio::select! {
514 Some(cmd) = self.cmd_rx.recv() => {
515 match cmd {
516 HandlerCommand::SetClient(client) => {
517 log::debug!("WebSocketClient received by handler");
518 self.client = Some(client);
519 }
520 HandlerCommand::Disconnect => {
521 log::debug!("Disconnect command received");
522
523 if let Some(client) = self.client.take() {
524 client.disconnect().await;
525 }
526 }
527 HandlerCommand::Authenticate { payload } => {
528 log::debug!("Authenticate command received");
529 if let Err(e) = self.send_with_retry(payload).await {
530 log::error!("Failed to send authentication after retries: {e}");
531 }
532 }
533 HandlerCommand::Subscribe { topics } => {
534 for topic in topics {
535 log::debug!("Subscribing to topic: topic={topic}");
536 if let Err(e) = self.send_with_retry(topic.clone()).await {
537 log::error!("Failed to send subscription after retries: topic={topic}, error={e}");
538 }
539 }
540 }
541 HandlerCommand::Unsubscribe { topics } => {
542 for topic in topics {
543 log::debug!("Unsubscribing from topic: topic={topic}");
544 if let Err(e) = self.send_with_retry(topic.clone()).await {
545 log::error!("Failed to send unsubscription after retries: topic={topic}, error={e}");
546 }
547 }
548 }
549 HandlerCommand::SendText { payload } => {
550 if let Err(e) = self.send_with_retry(payload).await {
551 log::error!("Error sending text with retry: {e}");
552 }
553 }
554 HandlerCommand::InitializeInstruments(instruments) => {
555 for inst in instruments {
556 self.instruments_cache.insert(inst.symbol().inner(), inst);
557 }
558 }
559 HandlerCommand::UpdateInstrument(inst) => {
560 self.instruments_cache.insert(inst.symbol().inner(), inst);
561 }
562 HandlerCommand::RegisterBatchPlace { req_id, orders } => {
563 log::debug!(
564 "Registering batch place request: req_id={req_id}, num_orders={}",
565 orders.len()
566 );
567 self.pending_batch_place_requests.insert(req_id, orders);
568 }
569 HandlerCommand::RegisterBatchCancel { req_id, cancels } => {
570 log::debug!(
571 "Registering batch cancel request: req_id={req_id}, num_cancels={}",
572 cancels.len()
573 );
574 self.pending_batch_cancel_requests.insert(req_id, cancels);
575 }
576 HandlerCommand::PlaceOrder {
577 params,
578 client_order_id,
579 trader_id,
580 strategy_id,
581 instrument_id,
582 } => {
583 let request_id = self.generate_unique_request_id();
584
585 self.pending_place_requests.insert(
586 request_id.clone(),
587 (client_order_id, trader_id, strategy_id, instrument_id),
588 );
589
590 let referer = if self.include_referer_header(params.time_in_force) {
591 Some(BYBIT_NAUTILUS_BROKER_ID.to_string())
592 } else {
593 None
594 };
595
596 let request = BybitWsRequest {
597 req_id: Some(request_id.clone()),
598 op: BybitWsOrderRequestOp::Create,
599 header: BybitWsHeader::with_referer(referer),
600 args: vec![params],
601 };
602
603 if let Ok(payload) = serde_json::to_string(&request)
604 && let Err(e) = self.send_with_retry(payload).await
605 {
606 log::error!("Failed to send place order after retries: {e}");
607 self.pending_place_requests.remove(&request_id);
608 }
609 }
610 HandlerCommand::AmendOrder {
611 params,
612 client_order_id,
613 trader_id,
614 strategy_id,
615 instrument_id,
616 venue_order_id,
617 } => {
618 let request_id = self.generate_unique_request_id();
619
620 self.pending_amend_requests.insert(
621 request_id.clone(),
622 (client_order_id, trader_id, strategy_id, instrument_id, venue_order_id),
623 );
624
625 let request = BybitWsRequest {
626 req_id: Some(request_id.clone()),
627 op: BybitWsOrderRequestOp::Amend,
628 header: BybitWsHeader::now(),
629 args: vec![params],
630 };
631
632 if let Ok(payload) = serde_json::to_string(&request)
633 && let Err(e) = self.send_with_retry(payload).await
634 {
635 log::error!("Failed to send amend order after retries: {e}");
636 self.pending_amend_requests.remove(&request_id);
637 }
638 }
639 HandlerCommand::CancelOrder {
640 params,
641 client_order_id,
642 trader_id,
643 strategy_id,
644 instrument_id,
645 venue_order_id,
646 } => {
647 let request_id = self.generate_unique_request_id();
648
649 self.pending_cancel_requests.insert(
650 request_id.clone(),
651 (client_order_id, trader_id, strategy_id, instrument_id, venue_order_id),
652 );
653
654 let request = BybitWsRequest {
655 req_id: Some(request_id.clone()),
656 op: BybitWsOrderRequestOp::Cancel,
657 header: BybitWsHeader::now(),
658 args: vec![params],
659 };
660
661 if let Ok(payload) = serde_json::to_string(&request)
662 && let Err(e) = self.send_with_retry(payload).await
663 {
664 log::error!("Failed to send cancel order after retries: {e}");
665 self.pending_cancel_requests.remove(&request_id);
666 }
667 }
668 }
669
670 continue;
671 }
672
673 () = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
674 if self.signal.load(Ordering::Relaxed) {
675 log::debug!("Stop signal received during idle period");
676 return None;
677 }
678 continue;
679 }
680
681 msg = self.raw_rx.recv() => {
682 let msg = match msg {
683 Some(msg) => msg,
684 None => {
685 log::debug!("WebSocket stream closed");
686 return None;
687 }
688 };
689
690 if let Message::Ping(data) = &msg {
691 log::trace!("Received ping frame with {} bytes", data.len());
692
693 if let Some(client) = &self.client
694 && let Err(e) = client.send_pong(data.to_vec()).await
695 {
696 log::warn!("Failed to send pong frame: error={e}");
697 }
698 continue;
699 }
700
701 let event = match Self::parse_raw_message(msg) {
702 Some(event) => event,
703 None => continue,
704 };
705
706 if self.signal.load(Ordering::Relaxed) {
707 log::debug!("Stop signal received");
708 return None;
709 }
710
711 let ts_init = clock.get_time_ns();
712 let instruments = self.instruments_cache.clone();
713 let funding_cache = Arc::clone(&self.funding_cache);
714 let nautilus_messages = self.parse_to_nautilus_messages(
715 event,
716 &instruments,
717 self.account_id,
718 self.product_type,
719 &funding_cache,
720 ts_init,
721 )
722 .await;
723
724 self.message_queue.extend(nautilus_messages);
726 }
727 }
728 }
729 }
730
731 fn parse_raw_message(msg: Message) -> Option<BybitWsMessage> {
732 use serde_json::Value;
733
734 match msg {
735 Message::Text(text) => {
736 if text == nautilus_network::RECONNECTED {
737 log::info!("Received WebSocket reconnected signal");
738 return Some(BybitWsMessage::Reconnected);
739 }
740
741 if text.trim().eq_ignore_ascii_case("pong") {
742 return None;
743 }
744
745 log::trace!("Raw websocket message: {text}");
746
747 let value: Value = match serde_json::from_str(&text) {
748 Ok(v) => v,
749 Err(e) => {
750 log::error!("Failed to parse WebSocket message: {e}: {text}");
751 return None;
752 }
753 };
754
755 Some(classify_bybit_message(value))
756 }
757 Message::Binary(msg) => {
758 log::debug!("Raw binary: {msg:?}");
759 None
760 }
761 Message::Close(_) => {
762 log::debug!("Received close message, waiting for reconnection");
763 None
764 }
765 _ => None,
766 }
767 }
768
769 #[allow(clippy::too_many_arguments)]
770 async fn parse_to_nautilus_messages(
771 &mut self,
772 msg: BybitWsMessage,
773 instruments: &AHashMap<Ustr, InstrumentAny>,
774 account_id: Option<AccountId>,
775 product_type: Option<BybitProductType>,
776 funding_cache: &FundingCache,
777 ts_init: UnixNanos,
778 ) -> Vec<NautilusWsMessage> {
779 let mut result = Vec::new();
780
781 match msg {
782 BybitWsMessage::Orderbook(msg) => {
783 let raw_symbol = msg.data.s;
784 let symbol =
785 product_type.map_or(raw_symbol, |pt| make_bybit_symbol(raw_symbol, pt));
786
787 if let Some(instrument) = instruments.get(&symbol) {
788 match parse_orderbook_deltas(&msg, instrument, ts_init) {
789 Ok(deltas) => result.push(NautilusWsMessage::Deltas(deltas)),
790 Err(e) => log::error!("Error parsing orderbook deltas: {e}"),
791 }
792
793 if let Some(depth_str) = msg.topic.as_str().split('.').nth(1)
795 && depth_str == "1"
796 {
797 let instrument_id = instrument.id();
798 let last_quote = self.quote_cache.get(&instrument_id);
799
800 match parse_orderbook_quote(&msg, instrument, last_quote, ts_init) {
801 Ok(quote) => {
802 self.quote_cache.insert(instrument_id, quote);
803 result.push(NautilusWsMessage::Data(vec![Data::Quote(quote)]));
804 }
805 Err(e) => log::debug!("Skipping orderbook quote: {e}"),
806 }
807 }
808 } else {
809 log::debug!(
810 "No instrument found for symbol in Orderbook message: raw_symbol={raw_symbol}, full_symbol={symbol}"
811 );
812 }
813 }
814 BybitWsMessage::Trade(msg) => {
815 let mut data_vec = Vec::new();
816 for trade in &msg.data {
817 let raw_symbol = trade.s;
818 let symbol =
819 product_type.map_or(raw_symbol, |pt| make_bybit_symbol(raw_symbol, pt));
820
821 if let Some(instrument) = instruments.get(&symbol) {
822 match parse_ws_trade_tick(trade, instrument, ts_init) {
823 Ok(tick) => data_vec.push(Data::Trade(tick)),
824 Err(e) => log::error!("Error parsing trade tick: {e}"),
825 }
826 } else {
827 log::debug!(
828 "No instrument found for symbol in Trade message: raw_symbol={raw_symbol}, full_symbol={symbol}"
829 );
830 }
831 }
832
833 if !data_vec.is_empty() {
834 result.push(NautilusWsMessage::Data(data_vec));
835 }
836 }
837 BybitWsMessage::Kline(msg) => {
838 let (interval_str, raw_symbol) = match parse_kline_topic(&msg.topic) {
839 Ok(parts) => parts,
840 Err(e) => {
841 log::warn!("Failed to parse kline topic: {e}");
842 return result;
843 }
844 };
845
846 let symbol = product_type
847 .map_or_else(|| raw_symbol.into(), |pt| make_bybit_symbol(raw_symbol, pt));
848
849 if let Some(instrument) = instruments.get(&symbol) {
850 let (step, aggregation) = match interval_str.parse::<usize>() {
851 Ok(minutes) if minutes > 0 => (minutes, BarAggregation::Minute),
852 _ => {
853 log::warn!("Unsupported kline interval: {interval_str}");
854 return result;
855 }
856 };
857
858 if let Some(non_zero_step) = NonZero::new(step) {
859 let bar_spec = BarSpecification {
860 step: non_zero_step,
861 aggregation,
862 price_type: PriceType::Last,
863 };
864 let bar_type =
865 BarType::new(instrument.id(), bar_spec, AggregationSource::External);
866
867 let mut data_vec = Vec::new();
868 for kline in &msg.data {
869 if !kline.confirm {
871 continue;
872 }
873 match parse_ws_kline_bar(
874 kline,
875 instrument,
876 bar_type,
877 self.bars_timestamp_on_close,
878 ts_init,
879 ) {
880 Ok(bar) => data_vec.push(Data::Bar(bar)),
881 Err(e) => log::error!("Error parsing kline to bar: {e}"),
882 }
883 }
884 if !data_vec.is_empty() {
885 result.push(NautilusWsMessage::Data(data_vec));
886 }
887 } else {
888 log::error!("Invalid step value: {step}");
889 }
890 } else {
891 log::debug!(
892 "No instrument found for symbol in Kline message: raw_symbol={raw_symbol}, full_symbol={symbol}"
893 );
894 }
895 }
896 BybitWsMessage::TickerLinear(msg) => {
897 let raw_symbol = msg.data.symbol;
898 let symbol =
899 product_type.map_or(raw_symbol, |pt| make_bybit_symbol(raw_symbol, pt));
900
901 if let Some(instrument) = instruments.get(&symbol) {
902 let instrument_id = instrument.id();
903 let ts_event = parse_millis_i64(msg.ts, "ticker.ts").unwrap_or(ts_init);
904 let price_precision = instrument.price_precision();
905 let size_precision = instrument.size_precision();
906
907 let bid_price = msg
909 .data
910 .bid1_price
911 .as_deref()
912 .map(|s| parse_price_with_precision(s, price_precision, "bid1Price"))
913 .transpose();
914 let ask_price = msg
915 .data
916 .ask1_price
917 .as_deref()
918 .map(|s| parse_price_with_precision(s, price_precision, "ask1Price"))
919 .transpose();
920 let bid_size = msg
921 .data
922 .bid1_size
923 .as_deref()
924 .map(|s| parse_quantity_with_precision(s, size_precision, "bid1Size"))
925 .transpose();
926 let ask_size = msg
927 .data
928 .ask1_size
929 .as_deref()
930 .map(|s| parse_quantity_with_precision(s, size_precision, "ask1Size"))
931 .transpose();
932
933 match (bid_price, ask_price, bid_size, ask_size) {
934 (Ok(bp), Ok(ap), Ok(bs), Ok(as_)) => {
935 match self.quote_cache.process(
936 instrument_id,
937 bp,
938 ap,
939 bs,
940 as_,
941 ts_event,
942 ts_init,
943 ) {
944 Ok(quote) => {
945 result.push(NautilusWsMessage::Data(vec![Data::Quote(quote)]));
946 }
947 Err(e) => {
948 let raw_data = serde_json::to_string(&msg.data)
949 .unwrap_or_else(|_| "<failed to serialize>".to_string());
950 log::debug!(
951 "Skipping partial ticker update: {e}, raw_data: {raw_data}"
952 );
953 }
954 }
955 }
956 _ => {
957 let raw_data = serde_json::to_string(&msg.data)
958 .unwrap_or_else(|_| "<failed to serialize>".to_string());
959 log::warn!(
960 "Failed to parse ticker fields, skipping update, raw_data: {raw_data}"
961 );
962 }
963 }
964
965 if msg.data.funding_rate.is_some() && msg.data.next_funding_time.is_some() {
967 let should_publish = {
968 let cache = funding_cache.read().await;
969 match cache.get(&symbol) {
970 Some((cached_rate, cached_time)) => {
971 cached_rate.as_ref() != msg.data.funding_rate.as_ref()
972 || cached_time.as_ref()
973 != msg.data.next_funding_time.as_ref()
974 }
975 None => true,
976 }
977 };
978
979 if should_publish {
980 match parse_ticker_linear_funding(
981 &msg.data,
982 instrument_id,
983 ts_event,
984 ts_init,
985 ) {
986 Ok(funding) => {
987 let cache_key = (
988 msg.data.funding_rate.clone(),
989 msg.data.next_funding_time.clone(),
990 );
991 funding_cache.write().await.insert(symbol, cache_key);
992 result.push(NautilusWsMessage::FundingRates(vec![funding]));
993 }
994 Err(e) => {
995 log::debug!("Skipping funding rate update: {e}");
996 }
997 }
998 }
999 }
1000 } else {
1001 log::debug!(
1002 "No instrument found for symbol in TickerLinear message: raw_symbol={raw_symbol}, full_symbol={symbol}"
1003 );
1004 }
1005 }
1006 BybitWsMessage::TickerOption(msg) => {
1007 let raw_symbol = &msg.data.symbol;
1008 let symbol = product_type.map_or_else(
1009 || raw_symbol.as_str().into(),
1010 |pt| make_bybit_symbol(raw_symbol, pt),
1011 );
1012
1013 if let Some(instrument) = instruments.get(&symbol) {
1014 let instrument_id = instrument.id();
1015 let ts_event = parse_millis_i64(msg.ts, "ticker.ts").unwrap_or(ts_init);
1016 let price_precision = instrument.price_precision();
1017 let size_precision = instrument.size_precision();
1018
1019 let bid_price = parse_price_with_precision(
1021 &msg.data.bid_price,
1022 price_precision,
1023 "bidPrice",
1024 );
1025 let ask_price = parse_price_with_precision(
1026 &msg.data.ask_price,
1027 price_precision,
1028 "askPrice",
1029 );
1030 let bid_size = parse_quantity_with_precision(
1031 &msg.data.bid_size,
1032 size_precision,
1033 "bidSize",
1034 );
1035 let ask_size = parse_quantity_with_precision(
1036 &msg.data.ask_size,
1037 size_precision,
1038 "askSize",
1039 );
1040
1041 match (bid_price, ask_price, bid_size, ask_size) {
1042 (Ok(bp), Ok(ap), Ok(bs), Ok(as_)) => {
1043 match self.quote_cache.process(
1044 instrument_id,
1045 Some(bp),
1046 Some(ap),
1047 Some(bs),
1048 Some(as_),
1049 ts_event,
1050 ts_init,
1051 ) {
1052 Ok(quote) => {
1053 result.push(NautilusWsMessage::Data(vec![Data::Quote(quote)]));
1054 }
1055 Err(e) => {
1056 let raw_data = serde_json::to_string(&msg.data)
1057 .unwrap_or_else(|_| "<failed to serialize>".to_string());
1058 log::debug!(
1059 "Skipping partial ticker update: {e}, raw_data: {raw_data}"
1060 );
1061 }
1062 }
1063 }
1064 _ => {
1065 let raw_data = serde_json::to_string(&msg.data)
1066 .unwrap_or_else(|_| "<failed to serialize>".to_string());
1067 log::warn!(
1068 "Failed to parse ticker fields, skipping update, raw_data: {raw_data}"
1069 );
1070 }
1071 }
1072 } else {
1073 log::debug!(
1074 "No instrument found for symbol in TickerOption message: raw_symbol={raw_symbol}, full_symbol={symbol}"
1075 );
1076 }
1077 }
1078 BybitWsMessage::AccountOrder(msg) => {
1079 if let Some(account_id) = account_id {
1080 let mut reports = Vec::new();
1081 for order in &msg.data {
1082 let raw_symbol = order.symbol;
1083 let symbol = make_bybit_symbol(raw_symbol, order.category);
1084
1085 if let Some(instrument) = instruments.get(&symbol) {
1086 match parse_ws_order_status_report(
1087 order, instrument, account_id, ts_init,
1088 ) {
1089 Ok(report) => reports.push(report),
1090 Err(e) => log::error!("Error parsing order status report: {e}"),
1091 }
1092 } else {
1093 log::debug!(
1094 "No instrument found for symbol in AccountOrder message: raw_symbol={raw_symbol}, full_symbol={symbol}"
1095 );
1096 }
1097 }
1098 if !reports.is_empty() {
1099 result.push(NautilusWsMessage::OrderStatusReports(reports));
1100 }
1101 }
1102 }
1103 BybitWsMessage::AccountExecution(msg) => {
1104 if let Some(account_id) = account_id {
1105 let mut reports = Vec::new();
1106 for execution in &msg.data {
1107 let raw_symbol = execution.symbol;
1108 let symbol = make_bybit_symbol(raw_symbol, execution.category);
1109
1110 if let Some(instrument) = instruments.get(&symbol) {
1111 match parse_ws_fill_report(execution, account_id, instrument, ts_init) {
1112 Ok(report) => reports.push(report),
1113 Err(e) => log::error!("Error parsing fill report: {e}"),
1114 }
1115 } else {
1116 log::debug!(
1117 "No instrument found for symbol in AccountExecution message: raw_symbol={raw_symbol}, full_symbol={symbol}"
1118 );
1119 }
1120 }
1121 if !reports.is_empty() {
1122 result.push(NautilusWsMessage::FillReports(reports));
1123 }
1124 }
1125 }
1126 BybitWsMessage::AccountPosition(msg) => {
1127 if let Some(account_id) = account_id {
1128 for position in &msg.data {
1129 let raw_symbol = position.symbol;
1130 let symbol = make_bybit_symbol(raw_symbol, position.category);
1131
1132 if let Some(instrument) = instruments.get(&symbol) {
1133 match parse_ws_position_status_report(
1134 position, account_id, instrument, ts_init,
1135 ) {
1136 Ok(report) => {
1137 result.push(NautilusWsMessage::PositionStatusReport(report));
1138 }
1139 Err(e) => {
1140 log::error!("Error parsing position status report: {e}");
1141 }
1142 }
1143 } else {
1144 log::debug!(
1145 "No instrument found for symbol in AccountPosition message: raw_symbol={raw_symbol}, full_symbol={symbol}"
1146 );
1147 }
1148 }
1149 }
1150 }
1151 BybitWsMessage::AccountWallet(msg) => {
1152 if let Some(account_id) = account_id {
1153 for wallet in &msg.data {
1154 let ts_event = UnixNanos::from(msg.creation_time as u64 * 1_000_000);
1155
1156 match parse_ws_account_state(wallet, account_id, ts_event, ts_init) {
1157 Ok(state) => result.push(NautilusWsMessage::AccountState(state)),
1158 Err(e) => log::error!("Error parsing account state: {e}"),
1159 }
1160 }
1161 }
1162 }
1163 BybitWsMessage::OrderResponse(resp) => {
1164 if resp.ret_code == 0 {
1165 log::debug!(
1166 "Order operation successful: op={}, ret_msg={}",
1167 resp.op,
1168 resp.ret_msg
1169 );
1170
1171 if resp.op.contains("batch") {
1172 self.handle_batch_response(&resp, &mut result);
1173 } else if let Some(req_id) = &resp.req_id {
1174 if resp.op.contains("create") {
1175 self.pending_place_requests.remove(req_id);
1176 } else if resp.op.contains("cancel") {
1177 self.pending_cancel_requests.remove(req_id);
1178 } else if resp.op.contains("amend") {
1179 self.pending_amend_requests.remove(req_id);
1180 }
1181 } else if let Some(order_link_id) =
1182 resp.data.get("orderLinkId").and_then(|v| v.as_str())
1183 {
1184 let client_order_id = ClientOrderId::from(order_link_id);
1186 if resp.op.contains("create") {
1187 self.find_and_remove_place_request_by_client_order_id(&client_order_id);
1188 } else if resp.op.contains("cancel") {
1189 self.find_and_remove_cancel_request_by_client_order_id(
1190 &client_order_id,
1191 );
1192 } else if resp.op.contains("amend") {
1193 self.find_and_remove_amend_request_by_client_order_id(&client_order_id);
1194 }
1195 }
1196 } else if let Some(req_id) = &resp.req_id {
1197 let clock = get_atomic_clock_realtime();
1198 let ts_init = clock.get_time_ns();
1199
1200 if resp.op.contains("batch") {
1201 self.handle_batch_failure(
1202 req_id,
1203 &resp.ret_msg,
1204 &resp.op,
1205 ts_init,
1206 &mut result,
1207 );
1208 } else if resp.op.contains("create")
1209 && let Some((_, (client_order_id, trader_id, strategy_id, instrument_id))) =
1210 self.pending_place_requests.remove(req_id)
1211 {
1212 let Some(account_id) = self.account_id else {
1213 log::error!(
1214 "Cannot create OrderRejected event: account_id is None: request_id={req_id}, reason={}",
1215 resp.ret_msg
1216 );
1217 return result;
1218 };
1219
1220 let rejected = OrderRejected::new(
1221 trader_id,
1222 strategy_id,
1223 instrument_id,
1224 client_order_id,
1225 account_id,
1226 Ustr::from(&resp.ret_msg),
1227 UUID4::new(),
1228 ts_init,
1229 ts_init,
1230 false,
1231 false,
1232 );
1233 result.push(NautilusWsMessage::OrderRejected(rejected));
1234 } else if resp.op.contains("cancel")
1235 && let Some((
1236 _,
1237 (
1238 client_order_id,
1239 trader_id,
1240 strategy_id,
1241 instrument_id,
1242 venue_order_id,
1243 ),
1244 )) = self.pending_cancel_requests.remove(req_id)
1245 {
1246 let rejected = OrderCancelRejected::new(
1247 trader_id,
1248 strategy_id,
1249 instrument_id,
1250 client_order_id,
1251 Ustr::from(&resp.ret_msg),
1252 UUID4::new(),
1253 ts_init,
1254 ts_init,
1255 false,
1256 venue_order_id,
1257 self.account_id,
1258 );
1259 result.push(NautilusWsMessage::OrderCancelRejected(rejected));
1260 } else if resp.op.contains("amend")
1261 && let Some((
1262 _,
1263 (
1264 client_order_id,
1265 trader_id,
1266 strategy_id,
1267 instrument_id,
1268 venue_order_id,
1269 ),
1270 )) = self.pending_amend_requests.remove(req_id)
1271 {
1272 let rejected = OrderModifyRejected::new(
1273 trader_id,
1274 strategy_id,
1275 instrument_id,
1276 client_order_id,
1277 Ustr::from(&resp.ret_msg),
1278 UUID4::new(),
1279 ts_init,
1280 ts_init,
1281 false,
1282 venue_order_id,
1283 self.account_id,
1284 );
1285 result.push(NautilusWsMessage::OrderModifyRejected(rejected));
1286 }
1287 } else if let Some(order_link_id) =
1288 resp.data.get("orderLinkId").and_then(|v| v.as_str())
1289 {
1290 let clock = get_atomic_clock_realtime();
1292 let ts_init = clock.get_time_ns();
1293 let client_order_id = ClientOrderId::from(order_link_id);
1294
1295 if resp.op.contains("create") {
1296 if let Some((_, (_, trader_id, strategy_id, instrument_id))) =
1297 self.find_and_remove_place_request_by_client_order_id(&client_order_id)
1298 {
1299 let Some(account_id) = self.account_id else {
1300 log::error!(
1301 "Cannot create OrderRejected event: account_id is None: client_order_id={client_order_id}, reason={}",
1302 resp.ret_msg
1303 );
1304 return result;
1305 };
1306
1307 let rejected = OrderRejected::new(
1308 trader_id,
1309 strategy_id,
1310 instrument_id,
1311 client_order_id,
1312 account_id,
1313 Ustr::from(&resp.ret_msg),
1314 UUID4::new(),
1315 ts_init,
1316 ts_init,
1317 false,
1318 false,
1319 );
1320 result.push(NautilusWsMessage::OrderRejected(rejected));
1321 }
1322 } else if resp.op.contains("cancel") {
1323 if let Some((
1324 _,
1325 (_, trader_id, strategy_id, instrument_id, venue_order_id),
1326 )) =
1327 self.find_and_remove_cancel_request_by_client_order_id(&client_order_id)
1328 {
1329 let rejected = OrderCancelRejected::new(
1330 trader_id,
1331 strategy_id,
1332 instrument_id,
1333 client_order_id,
1334 Ustr::from(&resp.ret_msg),
1335 UUID4::new(),
1336 ts_init,
1337 ts_init,
1338 false,
1339 venue_order_id,
1340 self.account_id,
1341 );
1342 result.push(NautilusWsMessage::OrderCancelRejected(rejected));
1343 }
1344 } else if resp.op.contains("amend")
1345 && let Some((_, (_, trader_id, strategy_id, instrument_id, venue_order_id))) =
1346 self.find_and_remove_amend_request_by_client_order_id(&client_order_id)
1347 {
1348 let rejected = OrderModifyRejected::new(
1349 trader_id,
1350 strategy_id,
1351 instrument_id,
1352 client_order_id,
1353 Ustr::from(&resp.ret_msg),
1354 UUID4::new(),
1355 ts_init,
1356 ts_init,
1357 false,
1358 venue_order_id,
1359 self.account_id,
1360 );
1361 result.push(NautilusWsMessage::OrderModifyRejected(rejected));
1362 }
1363 } else {
1364 log::warn!(
1365 "Order operation failed but request_id could not be extracted from response: op={}, ret_code={}, ret_msg={}",
1366 resp.op,
1367 resp.ret_code,
1368 resp.ret_msg
1369 );
1370 }
1371 }
1372 BybitWsMessage::Auth(auth_response) => {
1373 let is_success =
1374 auth_response.success.unwrap_or(false) || (auth_response.ret_code == Some(0));
1375
1376 if is_success {
1377 self.auth_tracker.succeed();
1378 log::info!("WebSocket authenticated");
1379 result.push(NautilusWsMessage::Authenticated);
1380 } else {
1381 let error_msg = auth_response
1382 .ret_msg
1383 .as_deref()
1384 .unwrap_or("Authentication rejected");
1385 self.auth_tracker.fail(error_msg);
1386 log::error!("WebSocket authentication failed: error={error_msg}");
1387 result.push(NautilusWsMessage::Error(BybitWebSocketError::from_message(
1388 error_msg.to_string(),
1389 )));
1390 }
1391 }
1392 BybitWsMessage::Error(err) => {
1393 result.push(NautilusWsMessage::Error(err));
1394 }
1395 BybitWsMessage::Reconnected => {
1396 self.quote_cache.clear();
1397 result.push(NautilusWsMessage::Reconnected);
1398 }
1399 BybitWsMessage::Subscription(sub_msg) => {
1400 let pending_topics = self.subscriptions.pending_subscribe_topics();
1401 match sub_msg.op {
1402 BybitWsOperation::Subscribe => {
1403 if sub_msg.success {
1404 for topic in pending_topics {
1405 self.subscriptions.confirm_subscribe(&topic);
1406 log::debug!("Subscription confirmed: topic={topic}");
1407 }
1408 } else {
1409 for topic in pending_topics {
1410 self.subscriptions.mark_failure(&topic);
1411 log::warn!(
1412 "Subscription failed, will retry on reconnect: topic={topic}, error={:?}",
1413 sub_msg.ret_msg
1414 );
1415 }
1416 }
1417 }
1418 BybitWsOperation::Unsubscribe => {
1419 let pending_unsub = self.subscriptions.pending_unsubscribe_topics();
1420 if sub_msg.success {
1421 for topic in pending_unsub {
1422 self.subscriptions.confirm_unsubscribe(&topic);
1423 log::debug!("Unsubscription confirmed: topic={topic}");
1424 }
1425 } else {
1426 for topic in pending_unsub {
1427 log::warn!(
1428 "Unsubscription failed: topic={topic}, error={:?}",
1429 sub_msg.ret_msg
1430 );
1431 }
1432 }
1433 }
1434 _ => {}
1435 }
1436 }
1437 _ => {}
1438 }
1439
1440 result
1441 }
1442}
1443
1444pub fn classify_bybit_message(value: serde_json::Value) -> BybitWsMessage {
1448 if let Some(op_val) = value.get("op") {
1449 if let Ok(op) = serde_json::from_value::<BybitWsOperation>(op_val.clone())
1450 && op == BybitWsOperation::Auth
1451 && let Ok(auth) = serde_json::from_value::<BybitWsAuthResponse>(value.clone())
1452 {
1453 let is_success = auth.success.unwrap_or(false) || auth.ret_code.unwrap_or(-1) == 0;
1454 if is_success {
1455 return BybitWsMessage::Auth(auth);
1456 }
1457 let resp = BybitWsResponse {
1458 op: Some(auth.op.clone()),
1459 topic: None,
1460 success: auth.success,
1461 conn_id: auth.conn_id.clone(),
1462 req_id: None,
1463 ret_code: auth.ret_code,
1464 ret_msg: auth.ret_msg,
1465 };
1466 let error = BybitWebSocketError::from_response(&resp);
1467 return BybitWsMessage::Error(error);
1468 }
1469
1470 if let Some(op_str) = op_val.as_str()
1471 && op_str.starts_with("order.")
1472 {
1473 return serde_json::from_value::<BybitWsOrderResponse>(value.clone()).map_or_else(
1474 |_| BybitWsMessage::Raw(value),
1475 BybitWsMessage::OrderResponse,
1476 );
1477 }
1478 }
1479
1480 if let Some(success) = value.get("success").and_then(serde_json::Value::as_bool) {
1481 if success {
1482 return serde_json::from_value::<BybitWsSubscriptionMsg>(value.clone())
1483 .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::Subscription);
1484 }
1485 return serde_json::from_value::<BybitWsResponse>(value.clone()).map_or_else(
1486 |_| BybitWsMessage::Raw(value),
1487 |resp| {
1488 let error = BybitWebSocketError::from_response(&resp);
1489 BybitWsMessage::Error(error)
1490 },
1491 );
1492 }
1493
1494 if let Some(topic) = value.get("topic").and_then(serde_json::Value::as_str) {
1496 if topic.starts_with(BYBIT_TOPIC_ORDERBOOK) {
1497 return serde_json::from_value(value.clone())
1498 .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::Orderbook);
1499 }
1500
1501 if topic.contains(BYBIT_TOPIC_PUBLIC_TRADE) || topic.starts_with(BYBIT_TOPIC_TRADE) {
1502 return serde_json::from_value(value.clone())
1503 .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::Trade);
1504 }
1505
1506 if topic.starts_with(BYBIT_TOPIC_KLINE) {
1507 return serde_json::from_value(value.clone())
1508 .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::Kline);
1509 }
1510
1511 if topic.starts_with(BYBIT_TOPIC_TICKERS) {
1512 let is_option = value
1514 .get("data")
1515 .and_then(|d| d.get("symbol"))
1516 .and_then(|s| s.as_str())
1517 .is_some_and(|symbol| symbol.contains('-') && symbol.matches('-').count() >= 3);
1518
1519 if is_option {
1520 return serde_json::from_value(value.clone())
1521 .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::TickerOption);
1522 }
1523 return serde_json::from_value(value.clone())
1524 .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::TickerLinear);
1525 }
1526
1527 if topic.starts_with(BYBIT_TOPIC_ORDER) {
1528 return serde_json::from_value(value.clone())
1529 .map_or_else(|_| BybitWsMessage::Raw(value), BybitWsMessage::AccountOrder);
1530 }
1531
1532 if topic.starts_with(BYBIT_TOPIC_EXECUTION) {
1533 return serde_json::from_value(value.clone()).map_or_else(
1534 |_| BybitWsMessage::Raw(value),
1535 BybitWsMessage::AccountExecution,
1536 );
1537 }
1538
1539 if topic.starts_with(BYBIT_TOPIC_WALLET) {
1540 return serde_json::from_value(value.clone()).map_or_else(
1541 |_| BybitWsMessage::Raw(value),
1542 BybitWsMessage::AccountWallet,
1543 );
1544 }
1545
1546 if topic.starts_with(BYBIT_TOPIC_POSITION) {
1547 return serde_json::from_value(value.clone()).map_or_else(
1548 |_| BybitWsMessage::Raw(value),
1549 BybitWsMessage::AccountPosition,
1550 );
1551 }
1552 }
1553
1554 BybitWsMessage::Raw(value)
1555}
1556
1557#[cfg(test)]
1558mod tests {
1559 use rstest::rstest;
1560
1561 use super::*;
1562 use crate::common::consts::BYBIT_WS_TOPIC_DELIMITER;
1563
1564 fn create_test_handler() -> FeedHandler {
1565 let signal = Arc::new(AtomicBool::new(false));
1566 let (_cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
1567 let (_raw_tx, raw_rx) = tokio::sync::mpsc::unbounded_channel();
1568 let (out_tx, _out_rx) = tokio::sync::mpsc::unbounded_channel();
1569 let auth_tracker = AuthTracker::new();
1570 let subscriptions = SubscriptionState::new(BYBIT_WS_TOPIC_DELIMITER);
1571 let funding_cache = Arc::new(tokio::sync::RwLock::new(AHashMap::new()));
1572
1573 FeedHandler::new(
1574 signal,
1575 cmd_rx,
1576 raw_rx,
1577 out_tx,
1578 None,
1579 None,
1580 true,
1581 Arc::new(AtomicU8::new(0)),
1582 auth_tracker,
1583 subscriptions,
1584 funding_cache,
1585 )
1586 }
1587
1588 #[rstest]
1589 fn test_generate_unique_request_id_returns_different_ids() {
1590 let handler = create_test_handler();
1591
1592 let id1 = handler.generate_unique_request_id();
1593 let id2 = handler.generate_unique_request_id();
1594 let id3 = handler.generate_unique_request_id();
1595
1596 assert_ne!(id1, id2);
1597 assert_ne!(id2, id3);
1598 assert_ne!(id1, id3);
1599 }
1600
1601 #[rstest]
1602 fn test_generate_unique_request_id_produces_valid_uuids() {
1603 let handler = create_test_handler();
1604
1605 let id1 = handler.generate_unique_request_id();
1606 let id2 = handler.generate_unique_request_id();
1607
1608 assert!(UUID4::from(id1.as_str()).to_string() == id1);
1609 assert!(UUID4::from(id2.as_str()).to_string() == id2);
1610 }
1611
1612 #[rstest]
1613 fn test_multiple_place_orders_use_different_request_ids() {
1614 let handler = create_test_handler();
1615
1616 let req_id_1 = handler.generate_unique_request_id();
1617 let req_id_2 = handler.generate_unique_request_id();
1618 let req_id_3 = handler.generate_unique_request_id();
1619
1620 assert_ne!(req_id_1, req_id_2);
1621 assert_ne!(req_id_2, req_id_3);
1622 assert_ne!(req_id_1, req_id_3);
1623 }
1624
1625 #[rstest]
1626 fn test_multiple_amends_use_different_request_ids() {
1627 let handler = create_test_handler();
1628
1629 let req_id_1 = handler.generate_unique_request_id();
1631 let req_id_2 = handler.generate_unique_request_id();
1632 let req_id_3 = handler.generate_unique_request_id();
1633
1634 assert_ne!(
1635 req_id_1, req_id_2,
1636 "Multiple amends should generate different request IDs to avoid 'Duplicate reqId' errors"
1637 );
1638 assert_ne!(
1639 req_id_2, req_id_3,
1640 "Multiple amends should generate different request IDs to avoid 'Duplicate reqId' errors"
1641 );
1642 }
1643
1644 #[rstest]
1645 fn test_multiple_cancels_use_different_request_ids() {
1646 let handler = create_test_handler();
1647
1648 let req_id_1 = handler.generate_unique_request_id();
1649 let req_id_2 = handler.generate_unique_request_id();
1650
1651 assert_ne!(
1652 req_id_1, req_id_2,
1653 "Multiple cancels should generate different request IDs"
1654 );
1655 }
1656
1657 #[rstest]
1658 fn test_concurrent_request_id_generation() {
1659 let handler = create_test_handler();
1660
1661 let mut ids = std::collections::HashSet::new();
1662 for _ in 0..100 {
1663 let id = handler.generate_unique_request_id();
1664 assert!(
1665 ids.insert(id.clone()),
1666 "Generated duplicate request ID: {id}"
1667 );
1668 }
1669 assert_eq!(ids.len(), 100);
1670 }
1671}