1use std::sync::{
19 Arc,
20 atomic::{AtomicBool, Ordering},
21};
22
23use ahash::AHashMap;
24use dashmap::DashMap;
25use nautilus_common::cache::quote::QuoteCache;
26use nautilus_core::{UnixNanos, time::get_atomic_clock_realtime};
27use nautilus_model::{
28 data::Data,
29 enums::{OrderStatus, OrderType},
30 identifiers::{AccountId, ClientOrderId},
31 instruments::{Instrument, InstrumentAny},
32 types::Price,
33};
34use nautilus_network::{
35 RECONNECTED,
36 retry::{RetryManager, create_websocket_retry_manager},
37 websocket::{AuthTracker, SubscriptionState, WebSocketClient},
38};
39use tokio_tungstenite::tungstenite::Message;
40use ustr::Ustr;
41
42use super::{
43 enums::{BitmexAction, BitmexWsAuthAction, BitmexWsOperation, BitmexWsTopic},
44 error::BitmexWsError,
45 messages::{
46 BitmexExecutionMsg, BitmexFundingMsg, BitmexHttpRequest, BitmexInstrumentMsg,
47 BitmexOrderBook10Msg, BitmexOrderBookMsg, BitmexPositionMsg, BitmexQuoteMsg,
48 BitmexTableMessage, BitmexTradeBinMsg, BitmexTradeMsg, BitmexWalletMsg, BitmexWsMessage,
49 NautilusWsMessage, OrderData,
50 },
51 parse::{
52 parse_book_msg_vec, parse_book10_msg_vec, parse_execution_msg, parse_funding_msg,
53 parse_instrument_msg, parse_order_msg, parse_order_update_msg, parse_position_msg,
54 parse_trade_bin_msg_vec, parse_trade_msg_vec, parse_wallet_msg,
55 },
56};
57use crate::{
58 common::{enums::BitmexExecType, parse::parse_contracts_quantity},
59 http::parse::{InstrumentParseResult, parse_instrument_any},
60};
61
62#[derive(Debug)]
64#[allow(
65 clippy::large_enum_variant,
66 reason = "Commands are ephemeral and immediately consumed"
67)]
68pub enum HandlerCommand {
69 SetClient(WebSocketClient),
71 Disconnect,
73 Authenticate { payload: String },
75 Subscribe { topics: Vec<String> },
77 Unsubscribe { topics: Vec<String> },
79 InitializeInstruments(Vec<InstrumentAny>),
81 UpdateInstrument(InstrumentAny),
83}
84
85pub(super) struct FeedHandler {
86 account_id: AccountId,
87 signal: Arc<AtomicBool>,
88 client: Option<WebSocketClient>,
89 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
90 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
91 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
92 auth_tracker: AuthTracker,
93 subscriptions: SubscriptionState,
94 retry_manager: RetryManager<BitmexWsError>,
95 instruments_cache: AHashMap<Ustr, InstrumentAny>,
96 order_type_cache: Arc<DashMap<ClientOrderId, OrderType>>,
97 order_symbol_cache: Arc<DashMap<ClientOrderId, Ustr>>,
98 quote_cache: QuoteCache,
99}
100
101impl FeedHandler {
102 #[allow(clippy::too_many_arguments)]
104 pub(super) fn new(
105 signal: Arc<AtomicBool>,
106 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
107 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
108 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
109 account_id: AccountId,
110 auth_tracker: AuthTracker,
111 subscriptions: SubscriptionState,
112 order_type_cache: Arc<DashMap<ClientOrderId, OrderType>>,
113 order_symbol_cache: Arc<DashMap<ClientOrderId, Ustr>>,
114 ) -> Self {
115 Self {
116 account_id,
117 signal,
118 client: None,
119 cmd_rx,
120 raw_rx,
121 out_tx,
122 auth_tracker,
123 subscriptions,
124 retry_manager: create_websocket_retry_manager(),
125 instruments_cache: AHashMap::new(),
126 order_type_cache,
127 order_symbol_cache,
128 quote_cache: QuoteCache::new(),
129 }
130 }
131
132 pub(super) fn is_stopped(&self) -> bool {
133 self.signal.load(Ordering::Relaxed)
134 }
135
136 pub(super) fn send(&self, msg: NautilusWsMessage) -> Result<(), ()> {
137 self.out_tx.send(msg).map_err(|_| ())
138 }
139
140 async fn send_with_retry(&self, payload: String) -> anyhow::Result<()> {
142 if let Some(client) = &self.client {
143 self.retry_manager
144 .execute_with_retry(
145 "websocket_send",
146 || {
147 let payload = payload.clone();
148 async move {
149 client.send_text(payload, None).await.map_err(|e| {
150 BitmexWsError::ClientError(format!("Send failed: {e}"))
151 })
152 }
153 },
154 should_retry_bitmex_error,
155 create_bitmex_timeout_error,
156 )
157 .await
158 .map_err(|e| anyhow::anyhow!("{e}"))
159 } else {
160 Err(anyhow::anyhow!("No active WebSocket client"))
161 }
162 }
163
164 #[inline]
165 fn get_instrument(
166 cache: &AHashMap<Ustr, InstrumentAny>,
167 symbol: &Ustr,
168 ) -> Option<InstrumentAny> {
169 cache.get(symbol).cloned()
170 }
171
172 pub(super) async fn next(&mut self) -> Option<NautilusWsMessage> {
173 let clock = get_atomic_clock_realtime();
174
175 loop {
176 tokio::select! {
177 Some(cmd) = self.cmd_rx.recv() => {
178 match cmd {
179 HandlerCommand::SetClient(client) => {
180 log::debug!("WebSocketClient received by handler");
181 self.client = Some(client);
182 }
183 HandlerCommand::Disconnect => {
184 log::debug!("Disconnect command received");
185 if let Some(client) = self.client.take() {
186 client.disconnect().await;
187 }
188 }
189 HandlerCommand::Authenticate { payload } => {
190 log::debug!("Authenticate command received");
191 if let Err(e) = self.send_with_retry(payload).await {
192 log::error!("Failed to send authentication after retries: {e}");
193 }
194 }
195 HandlerCommand::Subscribe { topics } => {
196 for topic in topics {
197 log::debug!("Subscribing to topic: {topic}");
198 if let Err(e) = self.send_with_retry(topic.clone()).await {
199 log::error!("Failed to send subscription after retries: topic={topic}, error={e}");
200 }
201 }
202 }
203 HandlerCommand::Unsubscribe { topics } => {
204 for topic in topics {
205 log::debug!("Unsubscribing from topic: {topic}");
206 if let Err(e) = self.send_with_retry(topic.clone()).await {
207 log::error!("Failed to send unsubscription after retries: topic={topic}, error={e}");
208 }
209 }
210 }
211 HandlerCommand::InitializeInstruments(instruments) => {
212 for inst in instruments {
213 self.instruments_cache.insert(inst.symbol().inner(), inst);
214 }
215 }
216 HandlerCommand::UpdateInstrument(inst) => {
217 self.instruments_cache.insert(inst.symbol().inner(), inst);
218 }
219 }
220 continue;
222 }
223
224 () = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
225 if self.signal.load(std::sync::atomic::Ordering::Relaxed) {
226 log::debug!("Stop signal received during idle period");
227 return None;
228 }
229 continue;
230 }
231
232 msg = self.raw_rx.recv() => {
233 let msg = match msg {
234 Some(msg) => msg,
235 None => {
236 log::debug!("WebSocket stream closed");
237 return None;
238 }
239 };
240
241 if let Message::Ping(data) = &msg {
243 log::trace!("Received ping frame with {} bytes", data.len());
244 if let Some(client) = &self.client
245 && let Err(e) = client.send_pong(data.to_vec()).await
246 {
247 log::warn!("Failed to send pong frame: {e}");
248 }
249 continue;
250 }
251
252 let event = match Self::parse_raw_message(msg) {
253 Some(event) => event,
254 None => continue,
255 };
256
257 if self.signal.load(std::sync::atomic::Ordering::Relaxed) {
258 log::debug!("Stop signal received");
259 return None;
260 }
261
262 match event {
263 BitmexWsMessage::Reconnected => {
264 self.quote_cache.clear();
265 return Some(NautilusWsMessage::Reconnected);
266 }
267 BitmexWsMessage::Subscription {
268 success,
269 subscribe,
270 request,
271 error,
272 } => {
273 if let Some(msg) = self.handle_subscription_message(
274 success,
275 subscribe.as_ref(),
276 request.as_ref(),
277 error.as_deref(),
278 ) {
279 return Some(msg);
280 }
281 continue;
282 }
283 BitmexWsMessage::Table(table_msg) => {
284 let ts_init = clock.get_time_ns();
285
286 let msg = match table_msg {
287 BitmexTableMessage::OrderBookL2 { action, data } => {
288 self.handle_orderbook_l2(action, data, ts_init)
289 }
290 BitmexTableMessage::OrderBookL2_25 { action, data } => {
291 self.handle_orderbook_l2(action, data, ts_init)
292 }
293 BitmexTableMessage::OrderBook10 { data, .. } => {
294 self.handle_orderbook_10(data, ts_init)
295 }
296 BitmexTableMessage::Quote { data, .. } => {
297 self.handle_quote(data, ts_init)
298 }
299 BitmexTableMessage::Trade { data, .. } => {
300 self.handle_trade(data, ts_init)
301 }
302 BitmexTableMessage::TradeBin1m { action, data } => {
303 self.handle_trade_bin(action, data, BitmexWsTopic::TradeBin1m, ts_init)
304 }
305 BitmexTableMessage::TradeBin5m { action, data } => {
306 self.handle_trade_bin(action, data, BitmexWsTopic::TradeBin5m, ts_init)
307 }
308 BitmexTableMessage::TradeBin1h { action, data } => {
309 self.handle_trade_bin(action, data, BitmexWsTopic::TradeBin1h, ts_init)
310 }
311 BitmexTableMessage::TradeBin1d { action, data } => {
312 self.handle_trade_bin(action, data, BitmexWsTopic::TradeBin1d, ts_init)
313 }
314 BitmexTableMessage::Order { data, .. } => {
318 self.handle_order(data)
319 }
320 BitmexTableMessage::Execution { data, .. } => {
321 self.handle_execution(data)
322 }
323 BitmexTableMessage::Position { data, .. } => {
324 self.handle_position(data)
325 }
326 BitmexTableMessage::Wallet { data, .. } => {
327 self.handle_wallet(data, ts_init)
328 }
329 BitmexTableMessage::Margin { .. } => {
330 None
333 }
334 BitmexTableMessage::Instrument { action, data } => {
335 self.handle_instrument(action, data, ts_init)
336 }
337 BitmexTableMessage::Funding { data, .. } => {
338 self.handle_funding(data, ts_init)
339 }
340 _ => {
341 log::warn!("Unhandled table message type: {table_msg:?}");
343 None
344 }
345 };
346
347 if let Some(msg) = msg {
348 return Some(msg);
349 }
350 continue;
351 }
352 BitmexWsMessage::Welcome { .. } | BitmexWsMessage::Error { .. } => continue,
353 }
354 }
355
356 else => {
358 log::debug!("Handler shutting down: stream ended or command channel closed");
359 return None;
360 }
361 }
362 }
363 }
364
365 fn parse_raw_message(msg: Message) -> Option<BitmexWsMessage> {
366 match msg {
367 Message::Text(text) => {
368 if text == RECONNECTED {
369 log::info!("Received WebSocket reconnected signal");
370 return Some(BitmexWsMessage::Reconnected);
371 }
372
373 log::trace!("Raw websocket message: {text}");
374
375 if Self::is_heartbeat_message(&text) {
376 log::trace!("Ignoring heartbeat control message: {text}");
377 return None;
378 }
379
380 match serde_json::from_str(&text) {
381 Ok(msg) => match &msg {
382 BitmexWsMessage::Welcome {
383 version,
384 heartbeat_enabled,
385 limit,
386 ..
387 } => {
388 log::info!(
389 "Welcome to the BitMEX Realtime API: version={}, heartbeat={}, rate_limit={:?}",
390 version,
391 heartbeat_enabled,
392 limit.remaining,
393 );
394 }
395 BitmexWsMessage::Subscription { .. } => return Some(msg),
396 BitmexWsMessage::Error { status, error, .. } => {
397 log::error!(
398 "Received error from BitMEX: status={status}, error={error}",
399 );
400 }
401 _ => return Some(msg),
402 },
403 Err(e) => {
404 log::error!("Failed to parse WebSocket message: {e}: {text}");
405 }
406 }
407 }
408 Message::Binary(msg) => {
409 log::debug!("Raw binary: {msg:?}");
410 }
411 Message::Close(_) => {
412 log::debug!("Received close message, waiting for reconnection");
413 }
414 Message::Ping(data) => {
415 log::trace!("Ping frame with {} bytes (already handled)", data.len());
417 }
418 Message::Pong(data) => {
419 log::trace!("Received pong frame with {} bytes", data.len());
420 }
421 Message::Frame(frame) => {
422 log::debug!("Received raw frame: {frame:?}");
423 }
424 }
425
426 None
427 }
428
429 fn is_heartbeat_message(text: &str) -> bool {
430 let trimmed = text.trim();
431
432 if !trimmed.starts_with('{') || trimmed.len() > 64 {
433 return false;
434 }
435
436 trimmed.contains("\"op\":\"ping\"") || trimmed.contains("\"op\":\"pong\"")
437 }
438
439 fn handle_subscription_ack(
440 &self,
441 success: bool,
442 request: Option<&BitmexHttpRequest>,
443 subscribe: Option<&String>,
444 error: Option<&str>,
445 ) {
446 let topics = Self::topics_from_request(request, subscribe);
447
448 if topics.is_empty() {
449 log::debug!("Subscription acknowledgement without topics");
450 return;
451 }
452
453 for topic in topics {
454 if success {
455 self.subscriptions.confirm_subscribe(topic);
456 log::debug!("Subscription confirmed: topic={topic}");
457 } else {
458 self.subscriptions.mark_failure(topic);
459 let reason = error.unwrap_or("Subscription rejected");
460 log::error!("Subscription failed: topic={topic}, error={reason}");
461 }
462 }
463 }
464
465 fn handle_unsubscribe_ack(
466 &self,
467 success: bool,
468 request: Option<&BitmexHttpRequest>,
469 subscribe: Option<&String>,
470 error: Option<&str>,
471 ) {
472 let topics = Self::topics_from_request(request, subscribe);
473
474 if topics.is_empty() {
475 log::debug!("Unsubscription acknowledgement without topics");
476 return;
477 }
478
479 for topic in topics {
480 if success {
481 log::debug!("Unsubscription confirmed: topic={topic}");
482 self.subscriptions.confirm_unsubscribe(topic);
483 } else {
484 let reason = error.unwrap_or("Unsubscription rejected");
485 log::error!(
486 "Unsubscription failed - restoring subscription: topic={topic}, error={reason}",
487 );
488 self.subscriptions.confirm_unsubscribe(topic); self.subscriptions.mark_subscribe(topic); self.subscriptions.confirm_subscribe(topic); }
493 }
494 }
495
496 fn topics_from_request<'a>(
497 request: Option<&'a BitmexHttpRequest>,
498 fallback: Option<&'a String>,
499 ) -> Vec<&'a str> {
500 if let Some(req) = request
501 && !req.args.is_empty()
502 {
503 return req.args.iter().filter_map(|arg| arg.as_str()).collect();
504 }
505
506 fallback.into_iter().map(|topic| topic.as_str()).collect()
507 }
508
509 fn handle_orderbook_l2(
510 &self,
511 action: BitmexAction,
512 data: Vec<BitmexOrderBookMsg>,
513 ts_init: UnixNanos,
514 ) -> Option<NautilusWsMessage> {
515 if data.is_empty() {
516 return None;
517 }
518 let data = parse_book_msg_vec(data, action, &self.instruments_cache, ts_init);
519 Some(NautilusWsMessage::Data(data))
520 }
521
522 fn handle_orderbook_10(
523 &self,
524 data: Vec<BitmexOrderBook10Msg>,
525 ts_init: UnixNanos,
526 ) -> Option<NautilusWsMessage> {
527 if data.is_empty() {
528 return None;
529 }
530 let data = parse_book10_msg_vec(data, &self.instruments_cache, ts_init);
531 Some(NautilusWsMessage::Data(data))
532 }
533
534 fn handle_quote(
535 &mut self,
536 mut data: Vec<BitmexQuoteMsg>,
537 ts_init: UnixNanos,
538 ) -> Option<NautilusWsMessage> {
539 if data.is_empty() {
541 return None;
542 }
543
544 let msg = data.remove(0);
545 let Some(instrument) = Self::get_instrument(&self.instruments_cache, &msg.symbol) else {
546 log::error!(
547 "Instrument cache miss: quote message dropped for symbol={}",
548 msg.symbol
549 );
550 return None;
551 };
552
553 let instrument_id = instrument.id();
554 let price_precision = instrument.price_precision();
555
556 let bid_price = msg.bid_price.map(|p| Price::new(p, price_precision));
557 let ask_price = msg.ask_price.map(|p| Price::new(p, price_precision));
558 let bid_size = msg
559 .bid_size
560 .map(|s| parse_contracts_quantity(s, &instrument));
561 let ask_size = msg
562 .ask_size
563 .map(|s| parse_contracts_quantity(s, &instrument));
564 let ts_event = UnixNanos::from(msg.timestamp);
565
566 match self.quote_cache.process(
567 instrument_id,
568 bid_price,
569 ask_price,
570 bid_size,
571 ask_size,
572 ts_event,
573 ts_init,
574 ) {
575 Ok(quote) => Some(NautilusWsMessage::Data(vec![Data::Quote(quote)])),
576 Err(e) => {
577 log::warn!("Failed to process quote: {e}");
578 None
579 }
580 }
581 }
582
583 fn handle_trade(
584 &self,
585 data: Vec<BitmexTradeMsg>,
586 ts_init: UnixNanos,
587 ) -> Option<NautilusWsMessage> {
588 if data.is_empty() {
589 return None;
590 }
591 let data = parse_trade_msg_vec(data, &self.instruments_cache, ts_init);
592 Some(NautilusWsMessage::Data(data))
593 }
594
595 fn handle_trade_bin(
596 &self,
597 action: BitmexAction,
598 data: Vec<BitmexTradeBinMsg>,
599 topic: BitmexWsTopic,
600 ts_init: UnixNanos,
601 ) -> Option<NautilusWsMessage> {
602 if action == BitmexAction::Partial || data.is_empty() {
603 return None;
604 }
605 let data = parse_trade_bin_msg_vec(data, topic, &self.instruments_cache, ts_init);
606 Some(NautilusWsMessage::Data(data))
607 }
608
609 fn handle_order(&mut self, data: Vec<OrderData>) -> Option<NautilusWsMessage> {
610 let mut reports = Vec::with_capacity(data.len());
612
613 for order_data in data {
614 match order_data {
615 OrderData::Full(order_msg) => {
616 let Some(instrument) =
617 Self::get_instrument(&self.instruments_cache, &order_msg.symbol)
618 else {
619 log::error!(
620 "Instrument cache miss: order message dropped for symbol={}, order_id={}",
621 order_msg.symbol,
622 order_msg.order_id
623 );
624 continue;
625 };
626
627 match parse_order_msg(&order_msg, &instrument, &self.order_type_cache) {
628 Ok(report) => {
629 if let Some(client_order_id) = &order_msg.cl_ord_id {
631 let client_order_id = ClientOrderId::new(client_order_id);
632
633 if let Some(ord_type) = &order_msg.ord_type {
634 let order_type: OrderType = (*ord_type).into();
635 self.order_type_cache.insert(client_order_id, order_type);
636 }
637
638 self.order_symbol_cache
640 .insert(client_order_id, order_msg.symbol);
641 }
642
643 if is_terminal_order_status(report.order_status)
644 && let Some(client_id) = report.client_order_id
645 {
646 self.order_type_cache.remove(&client_id);
647 self.order_symbol_cache.remove(&client_id);
648 }
649
650 reports.push(report);
651 }
652 Err(e) => {
653 log::error!(
654 "Failed to parse full order message - potential data loss: \
655 error={e}, symbol={}, order_id={}, time_in_force={:?}",
656 order_msg.symbol,
657 order_msg.order_id,
658 order_msg.time_in_force,
659 );
660 continue;
662 }
663 }
664 }
665 OrderData::Update(msg) => {
666 let Some(instrument) =
667 Self::get_instrument(&self.instruments_cache, &msg.symbol)
668 else {
669 log::error!(
670 "Instrument cache miss: order update dropped for symbol={}, order_id={}",
671 msg.symbol,
672 msg.order_id
673 );
674 continue;
675 };
676
677 if let Some(cl_ord_id) = &msg.cl_ord_id {
679 let client_order_id = ClientOrderId::new(cl_ord_id);
680 self.order_symbol_cache.insert(client_order_id, msg.symbol);
681 }
682
683 if let Some(event) = parse_order_update_msg(&msg, &instrument, self.account_id)
684 {
685 return Some(NautilusWsMessage::OrderUpdated(event));
686 } else {
687 log::warn!(
688 "Skipped order update message (insufficient data): \
689 order_id={}, price={:?}",
690 msg.order_id,
691 msg.price,
692 );
693 }
694 }
695 }
696 }
697
698 if reports.is_empty() {
699 return None;
700 }
701
702 Some(NautilusWsMessage::OrderStatusReports(reports))
703 }
704
705 fn handle_execution(&mut self, data: Vec<BitmexExecutionMsg>) -> Option<NautilusWsMessage> {
706 let mut fills = Vec::with_capacity(data.len());
707
708 for exec_msg in data {
709 let symbol_opt = if let Some(sym) = &exec_msg.symbol {
711 Some(*sym)
712 } else if let Some(cl_ord_id) = &exec_msg.cl_ord_id {
713 let client_order_id = ClientOrderId::new(cl_ord_id);
715 self.order_symbol_cache
716 .get(&client_order_id)
717 .map(|r| *r.value())
718 } else {
719 None
720 };
721
722 let Some(symbol) = symbol_opt else {
723 if let Some(cl_ord_id) = &exec_msg.cl_ord_id {
725 if exec_msg.exec_type == Some(BitmexExecType::Trade) {
726 log::warn!(
727 "Execution message missing symbol and not found in cache: \
728 cl_ord_id={cl_ord_id}, exec_id={:?}, ord_rej_reason={:?}, text={:?}",
729 exec_msg.exec_id,
730 exec_msg.ord_rej_reason,
731 exec_msg.text,
732 );
733 } else {
734 log::debug!(
735 "Execution message missing symbol and not found in cache: \
736 cl_ord_id={cl_ord_id}, exec_id={:?}, exec_type={:?}, \
737 ord_rej_reason={:?}, text={:?}",
738 exec_msg.exec_id,
739 exec_msg.exec_type,
740 exec_msg.ord_rej_reason,
741 exec_msg.text,
742 );
743 }
744 } else {
745 if exec_msg.exec_type == Some(BitmexExecType::CancelReject) {
749 log::debug!(
750 "CancelReject message missing symbol/clOrdID (expected with redundant cancels): \
751 exec_id={:?}, order_id={:?}",
752 exec_msg.exec_id,
753 exec_msg.order_id,
754 );
755 } else {
756 log::warn!(
757 "Execution message missing both symbol and clOrdID, cannot process: \
758 exec_id={:?}, order_id={:?}, exec_type={:?}, \
759 ord_rej_reason={:?}, text={:?}",
760 exec_msg.exec_id,
761 exec_msg.order_id,
762 exec_msg.exec_type,
763 exec_msg.ord_rej_reason,
764 exec_msg.text,
765 );
766 }
767 }
768 continue;
769 };
770
771 let Some(instrument) = Self::get_instrument(&self.instruments_cache, &symbol) else {
772 log::error!(
773 "Instrument cache miss: execution message dropped for symbol={}, exec_id={:?}, exec_type={:?}, Liquidation/ADL fills may be lost",
774 symbol,
775 exec_msg.exec_id,
776 exec_msg.exec_type
777 );
778 continue;
779 };
780
781 if let Some(fill) = parse_execution_msg(exec_msg, &instrument) {
782 fills.push(fill);
783 }
784 }
785
786 if fills.is_empty() {
787 return None;
788 }
789 Some(NautilusWsMessage::FillReports(fills))
790 }
791
792 fn handle_position(&self, data: Vec<BitmexPositionMsg>) -> Option<NautilusWsMessage> {
793 if let Some(pos_msg) = data.into_iter().next() {
794 let Some(instrument) = Self::get_instrument(&self.instruments_cache, &pos_msg.symbol)
795 else {
796 log::error!(
797 "Instrument cache miss: position message dropped for symbol={}, account={}",
798 pos_msg.symbol,
799 pos_msg.account
800 );
801 return None;
802 };
803 let report = parse_position_msg(pos_msg, &instrument);
804 Some(NautilusWsMessage::PositionStatusReport(report))
805 } else {
806 None
807 }
808 }
809
810 fn handle_wallet(
811 &self,
812 data: Vec<BitmexWalletMsg>,
813 ts_init: UnixNanos,
814 ) -> Option<NautilusWsMessage> {
815 if let Some(wallet_msg) = data.into_iter().next() {
816 let account_state = parse_wallet_msg(wallet_msg, ts_init);
817 Some(NautilusWsMessage::AccountState(account_state))
818 } else {
819 None
820 }
821 }
822
823 fn handle_instrument(
824 &mut self,
825 action: BitmexAction,
826 data: Vec<BitmexInstrumentMsg>,
827 ts_init: UnixNanos,
828 ) -> Option<NautilusWsMessage> {
829 match action {
830 BitmexAction::Partial | BitmexAction::Insert => {
831 let mut instruments = Vec::with_capacity(data.len());
832 let mut temp_cache = AHashMap::new();
833
834 let data_for_prices = data.clone();
835
836 for msg in data {
837 match msg.try_into() {
838 Ok(http_inst) => {
839 match parse_instrument_any(&http_inst, ts_init) {
840 InstrumentParseResult::Ok(boxed) => {
841 let instrument_any = *boxed;
842 let symbol = instrument_any.symbol().inner();
843 temp_cache.insert(symbol, instrument_any.clone());
844 instruments.push(instrument_any);
845 }
846 InstrumentParseResult::Unsupported { .. }
847 | InstrumentParseResult::Inactive { .. } => {
848 }
850 InstrumentParseResult::Failed {
851 symbol,
852 instrument_type,
853 error,
854 } => {
855 log::warn!(
856 "Failed to parse instrument {symbol} ({instrument_type:?}): {error}"
857 );
858 }
859 }
860 }
861 Err(e) => {
862 log::debug!("Skipping instrument (missing required fields): {e}");
863 }
864 }
865 }
866
867 for (symbol, instrument) in &temp_cache {
869 self.instruments_cache.insert(*symbol, instrument.clone());
870 }
871
872 if !instruments.is_empty()
873 && let Err(e) = self
874 .out_tx
875 .send(NautilusWsMessage::Instruments(instruments))
876 {
877 log::error!("Error sending instruments: {e}");
878 }
879
880 let mut data_msgs = Vec::with_capacity(data_for_prices.len());
881
882 for msg in data_for_prices {
883 let parsed = parse_instrument_msg(msg, &temp_cache, ts_init);
884 data_msgs.extend(parsed);
885 }
886
887 if data_msgs.is_empty() {
888 return None;
889 }
890 Some(NautilusWsMessage::Data(data_msgs))
891 }
892 BitmexAction::Update => {
893 let mut data_msgs = Vec::with_capacity(data.len());
894
895 for msg in data {
896 let parsed = parse_instrument_msg(msg, &self.instruments_cache, ts_init);
897 data_msgs.extend(parsed);
898 }
899
900 if data_msgs.is_empty() {
901 return None;
902 }
903 Some(NautilusWsMessage::Data(data_msgs))
904 }
905 BitmexAction::Delete => {
906 log::info!(
907 "Received instrument delete action for {} instrument(s)",
908 data.len()
909 );
910 None
911 }
912 }
913 }
914
915 fn handle_funding(
916 &self,
917 data: Vec<BitmexFundingMsg>,
918 ts_init: UnixNanos,
919 ) -> Option<NautilusWsMessage> {
920 let mut funding_updates = Vec::with_capacity(data.len());
921
922 for msg in data {
923 if let Some(parsed) = parse_funding_msg(msg, ts_init) {
924 funding_updates.push(parsed);
925 }
926 }
927
928 if funding_updates.is_empty() {
929 None
930 } else {
931 Some(NautilusWsMessage::FundingRateUpdates(funding_updates))
932 }
933 }
934
935 fn handle_subscription_message(
936 &self,
937 success: bool,
938 subscribe: Option<&String>,
939 request: Option<&BitmexHttpRequest>,
940 error: Option<&str>,
941 ) -> Option<NautilusWsMessage> {
942 if let Some(req) = request {
943 if req
944 .op
945 .eq_ignore_ascii_case(BitmexWsAuthAction::AuthKeyExpires.as_ref())
946 {
947 if success {
948 log::info!("WebSocket authenticated");
949 self.auth_tracker.succeed();
950 return Some(NautilusWsMessage::Authenticated);
951 } else {
952 let reason = error.unwrap_or("Authentication rejected").to_string();
953 log::error!("WebSocket authentication failed: {reason}");
954 self.auth_tracker.fail(reason);
955 }
956 return None;
957 }
958
959 if req
960 .op
961 .eq_ignore_ascii_case(BitmexWsOperation::Subscribe.as_ref())
962 {
963 self.handle_subscription_ack(success, request, subscribe, error);
964 return None;
965 }
966
967 if req
968 .op
969 .eq_ignore_ascii_case(BitmexWsOperation::Unsubscribe.as_ref())
970 {
971 self.handle_unsubscribe_ack(success, request, subscribe, error);
972 return None;
973 }
974 }
975
976 if subscribe.is_some() {
977 self.handle_subscription_ack(success, request, subscribe, error);
978 return None;
979 }
980
981 if let Some(error) = error {
982 log::warn!("Unhandled subscription control message: success={success}, error={error}");
983 }
984
985 None
986 }
987}
988
989fn is_terminal_order_status(status: OrderStatus) -> bool {
990 matches!(
991 status,
992 OrderStatus::Canceled | OrderStatus::Expired | OrderStatus::Rejected | OrderStatus::Filled,
993 )
994}
995
996pub(crate) fn should_retry_bitmex_error(error: &BitmexWsError) -> bool {
998 match error {
999 BitmexWsError::TungsteniteError(_) => true, BitmexWsError::ClientError(msg) => {
1001 let msg_lower = msg.to_lowercase();
1003 msg_lower.contains("timeout")
1004 || msg_lower.contains("timed out")
1005 || msg_lower.contains("connection")
1006 || msg_lower.contains("network")
1007 }
1008 _ => false,
1009 }
1010}
1011
1012pub(crate) fn create_bitmex_timeout_error(msg: String) -> BitmexWsError {
1014 BitmexWsError::ClientError(msg)
1015}
1016
1017#[cfg(test)]
1018mod tests {
1019 use rstest::rstest;
1020
1021 use super::*;
1022
1023 #[rstest]
1024 fn test_is_heartbeat_message_detection() {
1025 assert!(FeedHandler::is_heartbeat_message("{\"op\":\"ping\"}"));
1026 assert!(FeedHandler::is_heartbeat_message("{\"op\":\"pong\"}"));
1027 assert!(!FeedHandler::is_heartbeat_message(
1028 "{\"op\":\"subscribe\",\"args\":[\"trade:XBTUSD\"]}"
1029 ));
1030 }
1031}