1use std::sync::{
19 Arc,
20 atomic::{AtomicBool, Ordering},
21};
22
23use ahash::AHashMap;
24use dashmap::DashMap;
25use nautilus_common::cache::quote::QuoteCache;
26use nautilus_core::{UnixNanos, time::get_atomic_clock_realtime};
27use nautilus_model::{
28 data::Data,
29 enums::{OrderStatus, OrderType},
30 identifiers::{AccountId, ClientOrderId},
31 instruments::{Instrument, InstrumentAny},
32 types::Price,
33};
34use nautilus_network::{
35 RECONNECTED,
36 retry::{RetryManager, create_websocket_retry_manager},
37 websocket::{AuthTracker, SubscriptionState, WebSocketClient},
38};
39use tokio_tungstenite::tungstenite::Message;
40use ustr::Ustr;
41
42use super::{
43 enums::{BitmexAction, BitmexWsAuthAction, BitmexWsOperation, BitmexWsTopic},
44 error::BitmexWsError,
45 messages::{
46 BitmexExecutionMsg, BitmexFundingMsg, BitmexHttpRequest, BitmexInstrumentMsg,
47 BitmexOrderBook10Msg, BitmexOrderBookMsg, BitmexPositionMsg, BitmexQuoteMsg,
48 BitmexTableMessage, BitmexTradeBinMsg, BitmexTradeMsg, BitmexWalletMsg, BitmexWsMessage,
49 NautilusWsMessage, OrderData,
50 },
51 parse::{
52 parse_book_msg_vec, parse_book10_msg_vec, parse_execution_msg, parse_funding_msg,
53 parse_instrument_msg, parse_order_msg, parse_order_update_msg, parse_position_msg,
54 parse_trade_bin_msg_vec, parse_trade_msg_vec, parse_wallet_msg,
55 },
56};
57use crate::common::{enums::BitmexExecType, parse::parse_contracts_quantity};
58
59#[derive(Debug)]
61#[allow(
62 clippy::large_enum_variant,
63 reason = "Commands are ephemeral and immediately consumed"
64)]
65pub enum HandlerCommand {
66 SetClient(WebSocketClient),
68 Disconnect,
70 Authenticate { payload: String },
72 Subscribe { topics: Vec<String> },
74 Unsubscribe { topics: Vec<String> },
76 InitializeInstruments(Vec<InstrumentAny>),
78 UpdateInstrument(InstrumentAny),
80}
81
82pub(super) struct FeedHandler {
83 account_id: AccountId,
84 signal: Arc<AtomicBool>,
85 client: Option<WebSocketClient>,
86 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
87 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
88 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
89 auth_tracker: AuthTracker,
90 subscriptions: SubscriptionState,
91 retry_manager: RetryManager<BitmexWsError>,
92 instruments_cache: AHashMap<Ustr, InstrumentAny>,
93 order_type_cache: Arc<DashMap<ClientOrderId, OrderType>>,
94 order_symbol_cache: Arc<DashMap<ClientOrderId, Ustr>>,
95 quote_cache: QuoteCache,
96}
97
98impl FeedHandler {
99 #[allow(clippy::too_many_arguments)]
101 pub(super) fn new(
102 signal: Arc<AtomicBool>,
103 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
104 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
105 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
106 account_id: AccountId,
107 auth_tracker: AuthTracker,
108 subscriptions: SubscriptionState,
109 order_type_cache: Arc<DashMap<ClientOrderId, OrderType>>,
110 order_symbol_cache: Arc<DashMap<ClientOrderId, Ustr>>,
111 ) -> Self {
112 Self {
113 account_id,
114 signal,
115 client: None,
116 cmd_rx,
117 raw_rx,
118 out_tx,
119 auth_tracker,
120 subscriptions,
121 retry_manager: create_websocket_retry_manager(),
122 instruments_cache: AHashMap::new(),
123 order_type_cache,
124 order_symbol_cache,
125 quote_cache: QuoteCache::new(),
126 }
127 }
128
129 pub(super) fn is_stopped(&self) -> bool {
130 self.signal.load(Ordering::Relaxed)
131 }
132
133 pub(super) fn send(&self, msg: NautilusWsMessage) -> Result<(), ()> {
134 self.out_tx.send(msg).map_err(|_| ())
135 }
136
137 async fn send_with_retry(&self, payload: String) -> anyhow::Result<()> {
139 if let Some(client) = &self.client {
140 self.retry_manager
141 .execute_with_retry(
142 "websocket_send",
143 || {
144 let payload = payload.clone();
145 async move {
146 client.send_text(payload, None).await.map_err(|e| {
147 BitmexWsError::ClientError(format!("Send failed: {e}"))
148 })
149 }
150 },
151 should_retry_bitmex_error,
152 create_bitmex_timeout_error,
153 )
154 .await
155 .map_err(|e| anyhow::anyhow!("{e}"))
156 } else {
157 Err(anyhow::anyhow!("No active WebSocket client"))
158 }
159 }
160
161 #[inline]
162 fn get_instrument(
163 cache: &AHashMap<Ustr, InstrumentAny>,
164 symbol: &Ustr,
165 ) -> Option<InstrumentAny> {
166 cache.get(symbol).cloned()
167 }
168
169 pub(super) async fn next(&mut self) -> Option<NautilusWsMessage> {
170 let clock = get_atomic_clock_realtime();
171
172 loop {
173 tokio::select! {
174 Some(cmd) = self.cmd_rx.recv() => {
175 match cmd {
176 HandlerCommand::SetClient(client) => {
177 tracing::debug!("WebSocketClient received by handler");
178 self.client = Some(client);
179 }
180 HandlerCommand::Disconnect => {
181 tracing::debug!("Disconnect command received");
182 if let Some(client) = self.client.take() {
183 client.disconnect().await;
184 }
185 }
186 HandlerCommand::Authenticate { payload } => {
187 tracing::debug!("Authenticate command received");
188 if let Err(e) = self.send_with_retry(payload).await {
189 tracing::error!(error = %e, "Failed to send authentication after retries");
190 }
191 }
192 HandlerCommand::Subscribe { topics } => {
193 for topic in topics {
194 tracing::debug!(topic = %topic, "Subscribing to topic");
195 if let Err(e) = self.send_with_retry(topic.clone()).await {
196 tracing::error!(topic = %topic, error = %e, "Failed to send subscription after retries");
197 }
198 }
199 }
200 HandlerCommand::Unsubscribe { topics } => {
201 for topic in topics {
202 tracing::debug!(topic = %topic, "Unsubscribing from topic");
203 if let Err(e) = self.send_with_retry(topic.clone()).await {
204 tracing::error!(topic = %topic, error = %e, "Failed to send unsubscription after retries");
205 }
206 }
207 }
208 HandlerCommand::InitializeInstruments(instruments) => {
209 for inst in instruments {
210 self.instruments_cache.insert(inst.symbol().inner(), inst);
211 }
212 }
213 HandlerCommand::UpdateInstrument(inst) => {
214 self.instruments_cache.insert(inst.symbol().inner(), inst);
215 }
216 }
217 continue;
219 }
220
221 _ = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
222 if self.signal.load(std::sync::atomic::Ordering::Relaxed) {
223 tracing::debug!("Stop signal received during idle period");
224 return None;
225 }
226 continue;
227 }
228
229 msg = self.raw_rx.recv() => {
230 let msg = match msg {
231 Some(msg) => msg,
232 None => {
233 tracing::debug!("WebSocket stream closed");
234 return None;
235 }
236 };
237
238 if let Message::Ping(data) = &msg {
240 tracing::trace!("Received ping frame with {} bytes", data.len());
241 if let Some(client) = &self.client
242 && let Err(e) = client.send_pong(data.to_vec()).await
243 {
244 tracing::warn!(error = %e, "Failed to send pong frame");
245 }
246 continue;
247 }
248
249 let event = match Self::parse_raw_message(msg) {
250 Some(event) => event,
251 None => continue,
252 };
253
254 if self.signal.load(std::sync::atomic::Ordering::Relaxed) {
255 tracing::debug!("Stop signal received");
256 return None;
257 }
258
259 match event {
260 BitmexWsMessage::Reconnected => {
261 self.quote_cache.clear();
262 return Some(NautilusWsMessage::Reconnected);
263 }
264 BitmexWsMessage::Subscription {
265 success,
266 subscribe,
267 request,
268 error,
269 } => {
270 if let Some(msg) = self.handle_subscription_message(
271 success,
272 subscribe.as_ref(),
273 request.as_ref(),
274 error.as_deref(),
275 ) {
276 return Some(msg);
277 }
278 continue;
279 }
280 BitmexWsMessage::Table(table_msg) => {
281 let ts_init = clock.get_time_ns();
282
283 let msg = match table_msg {
284 BitmexTableMessage::OrderBookL2 { action, data } => {
285 self.handle_orderbook_l2(action, data, ts_init)
286 }
287 BitmexTableMessage::OrderBookL2_25 { action, data } => {
288 self.handle_orderbook_l2(action, data, ts_init)
289 }
290 BitmexTableMessage::OrderBook10 { data, .. } => {
291 self.handle_orderbook_10(data, ts_init)
292 }
293 BitmexTableMessage::Quote { data, .. } => {
294 self.handle_quote(data, ts_init)
295 }
296 BitmexTableMessage::Trade { data, .. } => {
297 self.handle_trade(data, ts_init)
298 }
299 BitmexTableMessage::TradeBin1m { action, data } => {
300 self.handle_trade_bin(action, data, BitmexWsTopic::TradeBin1m, ts_init)
301 }
302 BitmexTableMessage::TradeBin5m { action, data } => {
303 self.handle_trade_bin(action, data, BitmexWsTopic::TradeBin5m, ts_init)
304 }
305 BitmexTableMessage::TradeBin1h { action, data } => {
306 self.handle_trade_bin(action, data, BitmexWsTopic::TradeBin1h, ts_init)
307 }
308 BitmexTableMessage::TradeBin1d { action, data } => {
309 self.handle_trade_bin(action, data, BitmexWsTopic::TradeBin1d, ts_init)
310 }
311 BitmexTableMessage::Order { data, .. } => {
315 self.handle_order(data)
316 }
317 BitmexTableMessage::Execution { data, .. } => {
318 self.handle_execution(data)
319 }
320 BitmexTableMessage::Position { data, .. } => {
321 self.handle_position(data)
322 }
323 BitmexTableMessage::Wallet { data, .. } => {
324 self.handle_wallet(data, ts_init)
325 }
326 BitmexTableMessage::Margin { .. } => {
327 None
330 }
331 BitmexTableMessage::Instrument { action, data } => {
332 self.handle_instrument(action, data, ts_init)
333 }
334 BitmexTableMessage::Funding { data, .. } => {
335 self.handle_funding(data, ts_init)
336 }
337 _ => {
338 tracing::warn!("Unhandled table message type: {table_msg:?}");
340 None
341 }
342 };
343
344 if let Some(msg) = msg {
345 return Some(msg);
346 }
347 continue;
348 }
349 BitmexWsMessage::Welcome { .. } | BitmexWsMessage::Error { .. } => continue,
350 }
351 }
352
353 else => {
355 tracing::debug!("Handler shutting down: stream ended or command channel closed");
356 return None;
357 }
358 }
359 }
360 }
361
362 fn parse_raw_message(msg: Message) -> Option<BitmexWsMessage> {
363 match msg {
364 Message::Text(text) => {
365 if text == RECONNECTED {
366 tracing::info!("Received WebSocket reconnected signal");
367 return Some(BitmexWsMessage::Reconnected);
368 }
369
370 tracing::trace!("Raw websocket message: {text}");
371
372 if Self::is_heartbeat_message(&text) {
373 tracing::trace!("Ignoring heartbeat control message: {text}");
374 return None;
375 }
376
377 match serde_json::from_str(&text) {
378 Ok(msg) => match &msg {
379 BitmexWsMessage::Welcome {
380 version,
381 heartbeat_enabled,
382 limit,
383 ..
384 } => {
385 tracing::info!(
386 version = version,
387 heartbeat = heartbeat_enabled,
388 rate_limit = ?limit.remaining,
389 "Welcome to the BitMEX Realtime API:",
390 );
391 }
392 BitmexWsMessage::Subscription { .. } => return Some(msg),
393 BitmexWsMessage::Error { status, error, .. } => {
394 tracing::error!(
395 status = status,
396 error = error,
397 "Received error from BitMEX"
398 );
399 }
400 _ => return Some(msg),
401 },
402 Err(e) => {
403 tracing::error!("Failed to parse WebSocket message: {e}: {text}");
404 }
405 }
406 }
407 Message::Binary(msg) => {
408 tracing::debug!("Raw binary: {msg:?}");
409 }
410 Message::Close(_) => {
411 tracing::debug!("Received close message, waiting for reconnection");
412 }
413 Message::Ping(data) => {
414 tracing::trace!("Ping frame with {} bytes (already handled)", data.len());
416 }
417 Message::Pong(data) => {
418 tracing::trace!("Received pong frame with {} bytes", data.len());
419 }
420 Message::Frame(frame) => {
421 tracing::debug!("Received raw frame: {frame:?}");
422 }
423 }
424
425 None
426 }
427
428 fn is_heartbeat_message(text: &str) -> bool {
429 let trimmed = text.trim();
430
431 if !trimmed.starts_with('{') || trimmed.len() > 64 {
432 return false;
433 }
434
435 trimmed.contains("\"op\":\"ping\"") || trimmed.contains("\"op\":\"pong\"")
436 }
437
438 fn handle_subscription_ack(
439 &self,
440 success: bool,
441 request: Option<&BitmexHttpRequest>,
442 subscribe: Option<&String>,
443 error: Option<&str>,
444 ) {
445 let topics = Self::topics_from_request(request, subscribe);
446
447 if topics.is_empty() {
448 tracing::debug!("Subscription acknowledgement without topics");
449 return;
450 }
451
452 for topic in topics {
453 if success {
454 self.subscriptions.confirm_subscribe(topic);
455 tracing::debug!(topic = topic, "Subscription confirmed");
456 } else {
457 self.subscriptions.mark_failure(topic);
458 let reason = error.unwrap_or("Subscription rejected");
459 tracing::error!(topic = topic, error = reason, "Subscription failed");
460 }
461 }
462 }
463
464 fn handle_unsubscribe_ack(
465 &self,
466 success: bool,
467 request: Option<&BitmexHttpRequest>,
468 subscribe: Option<&String>,
469 error: Option<&str>,
470 ) {
471 let topics = Self::topics_from_request(request, subscribe);
472
473 if topics.is_empty() {
474 tracing::debug!("Unsubscription acknowledgement without topics");
475 return;
476 }
477
478 for topic in topics {
479 if success {
480 tracing::debug!(topic = topic, "Unsubscription confirmed");
481 self.subscriptions.confirm_unsubscribe(topic);
482 } else {
483 let reason = error.unwrap_or("Unsubscription rejected");
484 tracing::error!(
485 topic = topic,
486 error = reason,
487 "Unsubscription failed - restoring subscription"
488 );
489 self.subscriptions.confirm_unsubscribe(topic); self.subscriptions.mark_subscribe(topic); self.subscriptions.confirm_subscribe(topic); }
494 }
495 }
496
497 fn topics_from_request<'a>(
498 request: Option<&'a BitmexHttpRequest>,
499 fallback: Option<&'a String>,
500 ) -> Vec<&'a str> {
501 if let Some(req) = request
502 && !req.args.is_empty()
503 {
504 return req.args.iter().filter_map(|arg| arg.as_str()).collect();
505 }
506
507 fallback.into_iter().map(|topic| topic.as_str()).collect()
508 }
509
510 fn handle_orderbook_l2(
511 &self,
512 action: BitmexAction,
513 data: Vec<BitmexOrderBookMsg>,
514 ts_init: UnixNanos,
515 ) -> Option<NautilusWsMessage> {
516 if data.is_empty() {
517 return None;
518 }
519 let data = parse_book_msg_vec(data, action, &self.instruments_cache, ts_init);
520 Some(NautilusWsMessage::Data(data))
521 }
522
523 fn handle_orderbook_10(
524 &self,
525 data: Vec<BitmexOrderBook10Msg>,
526 ts_init: UnixNanos,
527 ) -> Option<NautilusWsMessage> {
528 if data.is_empty() {
529 return None;
530 }
531 let data = parse_book10_msg_vec(data, &self.instruments_cache, ts_init);
532 Some(NautilusWsMessage::Data(data))
533 }
534
535 fn handle_quote(
536 &mut self,
537 mut data: Vec<BitmexQuoteMsg>,
538 ts_init: UnixNanos,
539 ) -> Option<NautilusWsMessage> {
540 if data.is_empty() {
542 return None;
543 }
544
545 let msg = data.remove(0);
546 let Some(instrument) = Self::get_instrument(&self.instruments_cache, &msg.symbol) else {
547 tracing::error!(
548 "Instrument cache miss: quote message dropped for symbol={}",
549 msg.symbol
550 );
551 return None;
552 };
553
554 let instrument_id = instrument.id();
555 let price_precision = instrument.price_precision();
556
557 let bid_price = msg.bid_price.map(|p| Price::new(p, price_precision));
558 let ask_price = msg.ask_price.map(|p| Price::new(p, price_precision));
559 let bid_size = msg
560 .bid_size
561 .map(|s| parse_contracts_quantity(s, &instrument));
562 let ask_size = msg
563 .ask_size
564 .map(|s| parse_contracts_quantity(s, &instrument));
565 let ts_event = UnixNanos::from(msg.timestamp);
566
567 match self.quote_cache.process(
568 instrument_id,
569 bid_price,
570 ask_price,
571 bid_size,
572 ask_size,
573 ts_event,
574 ts_init,
575 ) {
576 Ok(quote) => Some(NautilusWsMessage::Data(vec![Data::Quote(quote)])),
577 Err(e) => {
578 tracing::warn!(error = %e, "Failed to process quote");
579 None
580 }
581 }
582 }
583
584 fn handle_trade(
585 &self,
586 data: Vec<BitmexTradeMsg>,
587 ts_init: UnixNanos,
588 ) -> Option<NautilusWsMessage> {
589 if data.is_empty() {
590 return None;
591 }
592 let data = parse_trade_msg_vec(data, &self.instruments_cache, ts_init);
593 Some(NautilusWsMessage::Data(data))
594 }
595
596 fn handle_trade_bin(
597 &self,
598 action: BitmexAction,
599 data: Vec<BitmexTradeBinMsg>,
600 topic: BitmexWsTopic,
601 ts_init: UnixNanos,
602 ) -> Option<NautilusWsMessage> {
603 if action == BitmexAction::Partial || data.is_empty() {
604 return None;
605 }
606 let data = parse_trade_bin_msg_vec(data, topic, &self.instruments_cache, ts_init);
607 Some(NautilusWsMessage::Data(data))
608 }
609
610 fn handle_order(&mut self, data: Vec<OrderData>) -> Option<NautilusWsMessage> {
611 let mut reports = Vec::with_capacity(data.len());
613
614 for order_data in data {
615 match order_data {
616 OrderData::Full(order_msg) => {
617 let Some(instrument) =
618 Self::get_instrument(&self.instruments_cache, &order_msg.symbol)
619 else {
620 tracing::error!(
621 "Instrument cache miss: order message dropped for symbol={}, order_id={}",
622 order_msg.symbol,
623 order_msg.order_id
624 );
625 continue;
626 };
627
628 match parse_order_msg(&order_msg, &instrument, &self.order_type_cache) {
629 Ok(report) => {
630 if let Some(client_order_id) = &order_msg.cl_ord_id {
632 let client_order_id = ClientOrderId::new(client_order_id);
633
634 if let Some(ord_type) = &order_msg.ord_type {
635 let order_type: OrderType = (*ord_type).into();
636 self.order_type_cache.insert(client_order_id, order_type);
637 }
638
639 self.order_symbol_cache
641 .insert(client_order_id, order_msg.symbol);
642 }
643
644 if is_terminal_order_status(report.order_status)
645 && let Some(client_id) = report.client_order_id
646 {
647 self.order_type_cache.remove(&client_id);
648 self.order_symbol_cache.remove(&client_id);
649 }
650
651 reports.push(report);
652 }
653 Err(e) => {
654 tracing::error!(
655 error = %e,
656 symbol = %order_msg.symbol,
657 order_id = %order_msg.order_id,
658 time_in_force = ?order_msg.time_in_force,
659 "Failed to parse full order message - potential data loss"
660 );
661 continue;
663 }
664 }
665 }
666 OrderData::Update(msg) => {
667 let Some(instrument) =
668 Self::get_instrument(&self.instruments_cache, &msg.symbol)
669 else {
670 tracing::error!(
671 "Instrument cache miss: order update dropped for symbol={}, order_id={}",
672 msg.symbol,
673 msg.order_id
674 );
675 continue;
676 };
677
678 if let Some(cl_ord_id) = &msg.cl_ord_id {
680 let client_order_id = ClientOrderId::new(cl_ord_id);
681 self.order_symbol_cache.insert(client_order_id, msg.symbol);
682 }
683
684 if let Some(event) = parse_order_update_msg(&msg, &instrument, self.account_id)
685 {
686 return Some(NautilusWsMessage::OrderUpdated(event));
687 } else {
688 tracing::warn!(
689 order_id = %msg.order_id,
690 price = ?msg.price,
691 "Skipped order update message (insufficient data)"
692 );
693 }
694 }
695 }
696 }
697
698 if reports.is_empty() {
699 return None;
700 }
701
702 Some(NautilusWsMessage::OrderStatusReports(reports))
703 }
704
705 fn handle_execution(&mut self, data: Vec<BitmexExecutionMsg>) -> Option<NautilusWsMessage> {
706 let mut fills = Vec::with_capacity(data.len());
707
708 for exec_msg in data {
709 let symbol_opt = if let Some(sym) = &exec_msg.symbol {
711 Some(*sym)
712 } else if let Some(cl_ord_id) = &exec_msg.cl_ord_id {
713 let client_order_id = ClientOrderId::new(cl_ord_id);
715 self.order_symbol_cache
716 .get(&client_order_id)
717 .map(|r| *r.value())
718 } else {
719 None
720 };
721
722 let Some(symbol) = symbol_opt else {
723 if let Some(cl_ord_id) = &exec_msg.cl_ord_id {
725 if exec_msg.exec_type == Some(BitmexExecType::Trade) {
726 tracing::warn!(
727 cl_ord_id = %cl_ord_id,
728 exec_id = ?exec_msg.exec_id,
729 ord_rej_reason = ?exec_msg.ord_rej_reason,
730 text = ?exec_msg.text,
731 "Execution message missing symbol and not found in cache"
732 );
733 } else {
734 tracing::debug!(
735 cl_ord_id = %cl_ord_id,
736 exec_id = ?exec_msg.exec_id,
737 exec_type = ?exec_msg.exec_type,
738 ord_rej_reason = ?exec_msg.ord_rej_reason,
739 text = ?exec_msg.text,
740 "Execution message missing symbol and not found in cache"
741 );
742 }
743 } else {
744 if exec_msg.exec_type == Some(BitmexExecType::CancelReject) {
748 tracing::debug!(
749 exec_id = ?exec_msg.exec_id,
750 order_id = ?exec_msg.order_id,
751 "CancelReject message missing symbol/clOrdID (expected with redundant cancels)"
752 );
753 } else {
754 tracing::warn!(
755 exec_id = ?exec_msg.exec_id,
756 order_id = ?exec_msg.order_id,
757 exec_type = ?exec_msg.exec_type,
758 ord_rej_reason = ?exec_msg.ord_rej_reason,
759 text = ?exec_msg.text,
760 "Execution message missing both symbol and clOrdID, cannot process"
761 );
762 }
763 }
764 continue;
765 };
766
767 let Some(instrument) = Self::get_instrument(&self.instruments_cache, &symbol) else {
768 tracing::error!(
769 "Instrument cache miss: execution message dropped for symbol={}, exec_id={:?}, exec_type={:?}, Liquidation/ADL fills may be lost",
770 symbol,
771 exec_msg.exec_id,
772 exec_msg.exec_type
773 );
774 continue;
775 };
776
777 if let Some(fill) = parse_execution_msg(exec_msg, &instrument) {
778 fills.push(fill);
779 }
780 }
781
782 if fills.is_empty() {
783 return None;
784 }
785 Some(NautilusWsMessage::FillReports(fills))
786 }
787
788 fn handle_position(&self, data: Vec<BitmexPositionMsg>) -> Option<NautilusWsMessage> {
789 if let Some(pos_msg) = data.into_iter().next() {
790 let Some(instrument) = Self::get_instrument(&self.instruments_cache, &pos_msg.symbol)
791 else {
792 tracing::error!(
793 "Instrument cache miss: position message dropped for symbol={}, account={}",
794 pos_msg.symbol,
795 pos_msg.account
796 );
797 return None;
798 };
799 let report = parse_position_msg(pos_msg, &instrument);
800 Some(NautilusWsMessage::PositionStatusReport(report))
801 } else {
802 None
803 }
804 }
805
806 fn handle_wallet(
807 &self,
808 data: Vec<BitmexWalletMsg>,
809 ts_init: UnixNanos,
810 ) -> Option<NautilusWsMessage> {
811 if let Some(wallet_msg) = data.into_iter().next() {
812 let account_state = parse_wallet_msg(wallet_msg, ts_init);
813 Some(NautilusWsMessage::AccountState(account_state))
814 } else {
815 None
816 }
817 }
818
819 fn handle_instrument(
820 &mut self,
821 action: BitmexAction,
822 data: Vec<BitmexInstrumentMsg>,
823 ts_init: UnixNanos,
824 ) -> Option<NautilusWsMessage> {
825 match action {
826 BitmexAction::Partial | BitmexAction::Insert => {
827 let mut instruments = Vec::with_capacity(data.len());
828 let mut temp_cache = AHashMap::new();
829
830 let data_for_prices = data.clone();
831
832 for msg in data {
833 match msg.try_into() {
834 Ok(http_inst) => {
835 match crate::http::parse::parse_instrument_any(&http_inst, ts_init) {
836 Some(instrument_any) => {
837 let symbol = instrument_any.symbol().inner();
838 temp_cache.insert(symbol, instrument_any.clone());
839 instruments.push(instrument_any);
840 }
841 None => {
842 log::warn!("Failed to parse instrument from WebSocket");
843 }
844 }
845 }
846 Err(e) => {
847 log::debug!("Skipping instrument (missing required fields): {e}");
848 }
849 }
850 }
851
852 for (symbol, instrument) in temp_cache.iter() {
854 self.instruments_cache.insert(*symbol, instrument.clone());
855 }
856
857 if !instruments.is_empty()
858 && let Err(e) = self
859 .out_tx
860 .send(NautilusWsMessage::Instruments(instruments))
861 {
862 tracing::error!("Error sending instruments: {e}");
863 }
864
865 let mut data_msgs = Vec::with_capacity(data_for_prices.len());
866
867 for msg in data_for_prices {
868 let parsed = parse_instrument_msg(msg, &temp_cache, ts_init);
869 data_msgs.extend(parsed);
870 }
871
872 if data_msgs.is_empty() {
873 return None;
874 }
875 Some(NautilusWsMessage::Data(data_msgs))
876 }
877 BitmexAction::Update => {
878 let mut data_msgs = Vec::with_capacity(data.len());
879
880 for msg in data {
881 let parsed = parse_instrument_msg(msg, &self.instruments_cache, ts_init);
882 data_msgs.extend(parsed);
883 }
884
885 if data_msgs.is_empty() {
886 return None;
887 }
888 Some(NautilusWsMessage::Data(data_msgs))
889 }
890 BitmexAction::Delete => {
891 log::info!(
892 "Received instrument delete action for {} instrument(s)",
893 data.len()
894 );
895 None
896 }
897 }
898 }
899
900 fn handle_funding(
901 &self,
902 data: Vec<BitmexFundingMsg>,
903 ts_init: UnixNanos,
904 ) -> Option<NautilusWsMessage> {
905 let mut funding_updates = Vec::with_capacity(data.len());
906
907 for msg in data {
908 if let Some(parsed) = parse_funding_msg(msg, ts_init) {
909 funding_updates.push(parsed);
910 }
911 }
912
913 if !funding_updates.is_empty() {
914 Some(NautilusWsMessage::FundingRateUpdates(funding_updates))
915 } else {
916 None
917 }
918 }
919
920 fn handle_subscription_message(
921 &self,
922 success: bool,
923 subscribe: Option<&String>,
924 request: Option<&BitmexHttpRequest>,
925 error: Option<&str>,
926 ) -> Option<NautilusWsMessage> {
927 if let Some(req) = request {
928 if req
929 .op
930 .eq_ignore_ascii_case(BitmexWsAuthAction::AuthKeyExpires.as_ref())
931 {
932 if success {
933 tracing::info!("WebSocket authenticated");
934 self.auth_tracker.succeed();
935 return Some(NautilusWsMessage::Authenticated);
936 } else {
937 let reason = error.unwrap_or("Authentication rejected").to_string();
938 tracing::error!(error = %reason, "WebSocket authentication failed");
939 self.auth_tracker.fail(reason);
940 }
941 return None;
942 }
943
944 if req
945 .op
946 .eq_ignore_ascii_case(BitmexWsOperation::Subscribe.as_ref())
947 {
948 self.handle_subscription_ack(success, request, subscribe, error);
949 return None;
950 }
951
952 if req
953 .op
954 .eq_ignore_ascii_case(BitmexWsOperation::Unsubscribe.as_ref())
955 {
956 self.handle_unsubscribe_ack(success, request, subscribe, error);
957 return None;
958 }
959 }
960
961 if subscribe.is_some() {
962 self.handle_subscription_ack(success, request, subscribe, error);
963 return None;
964 }
965
966 if let Some(error) = error {
967 tracing::warn!(
968 success = success,
969 error = error,
970 "Unhandled subscription control message"
971 );
972 }
973
974 None
975 }
976}
977
978fn is_terminal_order_status(status: OrderStatus) -> bool {
979 matches!(
980 status,
981 OrderStatus::Canceled | OrderStatus::Expired | OrderStatus::Rejected | OrderStatus::Filled,
982 )
983}
984
985pub(crate) fn should_retry_bitmex_error(error: &BitmexWsError) -> bool {
987 match error {
988 BitmexWsError::TungsteniteError(_) => true, BitmexWsError::ClientError(msg) => {
990 let msg_lower = msg.to_lowercase();
992 msg_lower.contains("timeout")
993 || msg_lower.contains("timed out")
994 || msg_lower.contains("connection")
995 || msg_lower.contains("network")
996 }
997 _ => false,
998 }
999}
1000
1001pub(crate) fn create_bitmex_timeout_error(msg: String) -> BitmexWsError {
1003 BitmexWsError::ClientError(msg)
1004}
1005
1006#[cfg(test)]
1011mod tests {
1012 use rstest::rstest;
1013
1014 use super::*;
1015
1016 #[rstest]
1017 fn test_is_heartbeat_message_detection() {
1018 assert!(FeedHandler::is_heartbeat_message("{\"op\":\"ping\"}"));
1019 assert!(FeedHandler::is_heartbeat_message("{\"op\":\"pong\"}"));
1020 assert!(!FeedHandler::is_heartbeat_message(
1021 "{\"op\":\"subscribe\",\"args\":[\"trade:XBTUSD\"]}"
1022 ));
1023 }
1024}