1use std::{
19 collections::VecDeque,
20 sync::{
21 Arc,
22 atomic::{AtomicBool, Ordering},
23 },
24};
25
26use ahash::AHashMap;
27use dashmap::DashMap;
28use nautilus_common::cache::quote::QuoteCache;
29use nautilus_core::{UnixNanos, time::get_atomic_clock_realtime};
30use nautilus_model::{
31 data::Data,
32 enums::{OrderStatus, OrderType},
33 identifiers::{AccountId, ClientOrderId},
34 instruments::{Instrument, InstrumentAny},
35 types::Price,
36};
37use nautilus_network::{
38 RECONNECTED,
39 retry::{RetryManager, create_websocket_retry_manager},
40 websocket::{AuthTracker, SubscriptionState, WebSocketClient},
41};
42use tokio_tungstenite::tungstenite::Message;
43use ustr::Ustr;
44
45use super::{
46 enums::{BitmexAction, BitmexWsAuthAction, BitmexWsOperation, BitmexWsTopic},
47 error::BitmexWsError,
48 messages::{
49 BitmexExecutionMsg, BitmexFundingMsg, BitmexHttpRequest, BitmexInstrumentMsg,
50 BitmexOrderBook10Msg, BitmexOrderBookMsg, BitmexPositionMsg, BitmexQuoteMsg,
51 BitmexTableMessage, BitmexTradeBinMsg, BitmexTradeMsg, BitmexWalletMsg, BitmexWsMessage,
52 NautilusWsMessage, OrderData,
53 },
54 parse::{
55 parse_book_msg_vec, parse_book10_msg_vec, parse_execution_msg, parse_funding_msg,
56 parse_instrument_msg, parse_order_msg, parse_order_update_msg, parse_position_msg,
57 parse_trade_bin_msg_vec, parse_trade_msg_vec, parse_wallet_msg,
58 },
59};
60use crate::{
61 common::{
62 enums::{BitmexExecType, BitmexOrderType, BitmexPegPriceType},
63 parse::parse_contracts_quantity,
64 },
65 http::parse::{InstrumentParseResult, parse_instrument_any},
66};
67
68#[derive(Debug)]
70#[allow(
71 clippy::large_enum_variant,
72 reason = "Commands are ephemeral and immediately consumed"
73)]
74pub enum HandlerCommand {
75 SetClient(WebSocketClient),
77 Disconnect,
79 Authenticate { payload: String },
81 Subscribe { topics: Vec<String> },
83 Unsubscribe { topics: Vec<String> },
85 InitializeInstruments(Vec<InstrumentAny>),
87 UpdateInstrument(InstrumentAny),
89}
90
91pub(super) struct FeedHandler {
92 account_id: AccountId,
93 signal: Arc<AtomicBool>,
94 client: Option<WebSocketClient>,
95 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
96 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
97 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
98 auth_tracker: AuthTracker,
99 subscriptions: SubscriptionState,
100 retry_manager: RetryManager<BitmexWsError>,
101 instruments_cache: AHashMap<Ustr, InstrumentAny>,
102 order_type_cache: Arc<DashMap<ClientOrderId, OrderType>>,
103 order_symbol_cache: Arc<DashMap<ClientOrderId, Ustr>>,
104 quote_cache: QuoteCache,
105 pending_msgs: VecDeque<NautilusWsMessage>,
106}
107
108impl FeedHandler {
109 #[allow(clippy::too_many_arguments)]
111 pub(super) fn new(
112 signal: Arc<AtomicBool>,
113 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
114 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
115 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
116 account_id: AccountId,
117 auth_tracker: AuthTracker,
118 subscriptions: SubscriptionState,
119 order_type_cache: Arc<DashMap<ClientOrderId, OrderType>>,
120 order_symbol_cache: Arc<DashMap<ClientOrderId, Ustr>>,
121 ) -> Self {
122 Self {
123 account_id,
124 signal,
125 client: None,
126 cmd_rx,
127 raw_rx,
128 out_tx,
129 auth_tracker,
130 subscriptions,
131 retry_manager: create_websocket_retry_manager(),
132 instruments_cache: AHashMap::new(),
133 order_type_cache,
134 order_symbol_cache,
135 quote_cache: QuoteCache::new(),
136 pending_msgs: VecDeque::new(),
137 }
138 }
139
140 pub(super) fn is_stopped(&self) -> bool {
141 self.signal.load(Ordering::Relaxed)
142 }
143
144 pub(super) fn send(&self, msg: NautilusWsMessage) -> Result<(), ()> {
145 self.out_tx.send(msg).map_err(|_| ())
146 }
147
148 async fn send_with_retry(&self, payload: String) -> anyhow::Result<()> {
150 if let Some(client) = &self.client {
151 self.retry_manager
152 .execute_with_retry(
153 "websocket_send",
154 || {
155 let payload = payload.clone();
156 async move {
157 client.send_text(payload, None).await.map_err(|e| {
158 BitmexWsError::ClientError(format!("Send failed: {e}"))
159 })
160 }
161 },
162 should_retry_bitmex_error,
163 create_bitmex_timeout_error,
164 )
165 .await
166 .map_err(|e| anyhow::anyhow!("{e}"))
167 } else {
168 Err(anyhow::anyhow!("No active WebSocket client"))
169 }
170 }
171
172 #[inline]
173 fn get_instrument(
174 cache: &AHashMap<Ustr, InstrumentAny>,
175 symbol: &Ustr,
176 ) -> Option<InstrumentAny> {
177 cache.get(symbol).cloned()
178 }
179
180 pub(super) async fn next(&mut self) -> Option<NautilusWsMessage> {
181 if let Some(msg) = self.pending_msgs.pop_front() {
182 return Some(msg);
183 }
184
185 let clock = get_atomic_clock_realtime();
186
187 loop {
188 tokio::select! {
189 Some(cmd) = self.cmd_rx.recv() => {
190 match cmd {
191 HandlerCommand::SetClient(client) => {
192 log::debug!("WebSocketClient received by handler");
193 self.client = Some(client);
194 }
195 HandlerCommand::Disconnect => {
196 log::debug!("Disconnect command received");
197 if let Some(client) = self.client.take() {
198 client.disconnect().await;
199 }
200 }
201 HandlerCommand::Authenticate { payload } => {
202 log::debug!("Authenticate command received");
203 if let Err(e) = self.send_with_retry(payload).await {
204 log::error!("Failed to send authentication after retries: {e}");
205 }
206 }
207 HandlerCommand::Subscribe { topics } => {
208 for topic in topics {
209 log::debug!("Subscribing to topic: {topic}");
210 if let Err(e) = self.send_with_retry(topic.clone()).await {
211 log::error!("Failed to send subscription after retries: topic={topic}, error={e}");
212 }
213 }
214 }
215 HandlerCommand::Unsubscribe { topics } => {
216 for topic in topics {
217 log::debug!("Unsubscribing from topic: {topic}");
218 if let Err(e) = self.send_with_retry(topic.clone()).await {
219 log::error!("Failed to send unsubscription after retries: topic={topic}, error={e}");
220 }
221 }
222 }
223 HandlerCommand::InitializeInstruments(instruments) => {
224 for inst in instruments {
225 self.instruments_cache.insert(inst.symbol().inner(), inst);
226 }
227 }
228 HandlerCommand::UpdateInstrument(inst) => {
229 self.instruments_cache.insert(inst.symbol().inner(), inst);
230 }
231 }
232 continue;
234 }
235
236 () = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
237 if self.signal.load(std::sync::atomic::Ordering::Relaxed) {
238 log::debug!("Stop signal received during idle period");
239 return None;
240 }
241 continue;
242 }
243
244 msg = self.raw_rx.recv() => {
245 let msg = match msg {
246 Some(msg) => msg,
247 None => {
248 log::debug!("WebSocket stream closed");
249 return None;
250 }
251 };
252
253 if let Message::Ping(data) = &msg {
255 log::trace!("Received ping frame with {} bytes", data.len());
256 if let Some(client) = &self.client
257 && let Err(e) = client.send_pong(data.to_vec()).await
258 {
259 log::warn!("Failed to send pong frame: {e}");
260 }
261 continue;
262 }
263
264 let event = match Self::parse_raw_message(msg) {
265 Some(event) => event,
266 None => continue,
267 };
268
269 if self.signal.load(std::sync::atomic::Ordering::Relaxed) {
270 log::debug!("Stop signal received");
271 return None;
272 }
273
274 match event {
275 BitmexWsMessage::Reconnected => {
276 self.quote_cache.clear();
277 self.order_type_cache.clear();
278 self.order_symbol_cache.clear();
279 return Some(NautilusWsMessage::Reconnected);
280 }
281 BitmexWsMessage::Subscription {
282 success,
283 subscribe,
284 request,
285 error,
286 } => {
287 if let Some(msg) = self.handle_subscription_message(
288 success,
289 subscribe.as_ref(),
290 request.as_ref(),
291 error.as_deref(),
292 ) {
293 return Some(msg);
294 }
295 continue;
296 }
297 BitmexWsMessage::Table(table_msg) => {
298 let ts_init = clock.get_time_ns();
299
300 let msg = match table_msg {
301 BitmexTableMessage::OrderBookL2 { action, data } => {
302 self.handle_orderbook_l2(action, data, ts_init)
303 }
304 BitmexTableMessage::OrderBookL2_25 { action, data } => {
305 self.handle_orderbook_l2(action, data, ts_init)
306 }
307 BitmexTableMessage::OrderBook10 { data, .. } => {
308 self.handle_orderbook_10(data, ts_init)
309 }
310 BitmexTableMessage::Quote { data, .. } => {
311 self.handle_quote(data, ts_init)
312 }
313 BitmexTableMessage::Trade { data, .. } => {
314 self.handle_trade(data, ts_init)
315 }
316 BitmexTableMessage::TradeBin1m { action, data } => {
317 self.handle_trade_bin(action, data, BitmexWsTopic::TradeBin1m, ts_init)
318 }
319 BitmexTableMessage::TradeBin5m { action, data } => {
320 self.handle_trade_bin(action, data, BitmexWsTopic::TradeBin5m, ts_init)
321 }
322 BitmexTableMessage::TradeBin1h { action, data } => {
323 self.handle_trade_bin(action, data, BitmexWsTopic::TradeBin1h, ts_init)
324 }
325 BitmexTableMessage::TradeBin1d { action, data } => {
326 self.handle_trade_bin(action, data, BitmexWsTopic::TradeBin1d, ts_init)
327 }
328 BitmexTableMessage::Order { data, .. } => {
332 let mut msgs = self.handle_order(data);
333 if msgs.is_empty() {
334 None
335 } else {
336 let first = msgs.remove(0);
338 self.pending_msgs.extend(msgs);
339 Some(first)
340 }
341 }
342 BitmexTableMessage::Execution { data, .. } => {
343 self.handle_execution(data)
344 }
345 BitmexTableMessage::Position { data, .. } => {
346 self.handle_position(data)
347 }
348 BitmexTableMessage::Wallet { data, .. } => {
349 self.handle_wallet(data, ts_init)
350 }
351 BitmexTableMessage::Margin { .. } => {
352 None
355 }
356 BitmexTableMessage::Instrument { action, data } => {
357 self.handle_instrument(action, data, ts_init)
358 }
359 BitmexTableMessage::Funding { data, .. } => {
360 self.handle_funding(data, ts_init)
361 }
362 _ => {
363 log::warn!("Unhandled table message type: {table_msg:?}");
365 None
366 }
367 };
368
369 if let Some(msg) = msg {
370 return Some(msg);
371 }
372 continue;
373 }
374 BitmexWsMessage::Welcome { .. } | BitmexWsMessage::Error { .. } => continue,
375 }
376 }
377
378 else => {
380 log::debug!("Handler shutting down: stream ended or command channel closed");
381 return None;
382 }
383 }
384 }
385 }
386
387 fn parse_raw_message(msg: Message) -> Option<BitmexWsMessage> {
388 match msg {
389 Message::Text(text) => {
390 if text == RECONNECTED {
391 log::info!("Received WebSocket reconnected signal");
392 return Some(BitmexWsMessage::Reconnected);
393 }
394
395 log::trace!("Raw websocket message: {text}");
396
397 if Self::is_heartbeat_message(&text) {
398 log::trace!("Ignoring heartbeat control message: {text}");
399 return None;
400 }
401
402 match serde_json::from_str(&text) {
403 Ok(msg) => match &msg {
404 BitmexWsMessage::Welcome {
405 version,
406 heartbeat_enabled,
407 limit,
408 ..
409 } => {
410 log::info!(
411 "Welcome to the BitMEX Realtime API: version={}, heartbeat={}, rate_limit={:?}",
412 version,
413 heartbeat_enabled,
414 limit.remaining,
415 );
416 }
417 BitmexWsMessage::Subscription { .. } => return Some(msg),
418 BitmexWsMessage::Error { status, error, .. } => {
419 log::error!(
420 "Received error from BitMEX: status={status}, error={error}",
421 );
422 }
423 _ => return Some(msg),
424 },
425 Err(e) => {
426 log::error!("Failed to parse WebSocket message: {e}: {text}");
427 }
428 }
429 }
430 Message::Binary(msg) => {
431 log::debug!("Raw binary: {msg:?}");
432 }
433 Message::Close(_) => {
434 log::debug!("Received close message, waiting for reconnection");
435 }
436 Message::Ping(data) => {
437 log::trace!("Ping frame with {} bytes (already handled)", data.len());
439 }
440 Message::Pong(data) => {
441 log::trace!("Received pong frame with {} bytes", data.len());
442 }
443 Message::Frame(frame) => {
444 log::debug!("Received raw frame: {frame:?}");
445 }
446 }
447
448 None
449 }
450
451 fn is_heartbeat_message(text: &str) -> bool {
452 let trimmed = text.trim();
453
454 if !trimmed.starts_with('{') || trimmed.len() > 64 {
455 return false;
456 }
457
458 trimmed.contains("\"op\":\"ping\"") || trimmed.contains("\"op\":\"pong\"")
459 }
460
461 fn handle_subscription_ack(
462 &self,
463 success: bool,
464 request: Option<&BitmexHttpRequest>,
465 subscribe: Option<&String>,
466 error: Option<&str>,
467 ) {
468 let topics = Self::topics_from_request(request, subscribe);
469
470 if topics.is_empty() {
471 log::debug!("Subscription acknowledgement without topics");
472 return;
473 }
474
475 for topic in topics {
476 if success {
477 self.subscriptions.confirm_subscribe(topic);
478 log::debug!("Subscription confirmed: topic={topic}");
479 } else {
480 self.subscriptions.mark_failure(topic);
481 let reason = error.unwrap_or("Subscription rejected");
482 log::error!("Subscription failed: topic={topic}, error={reason}");
483 }
484 }
485 }
486
487 fn handle_unsubscribe_ack(
488 &self,
489 success: bool,
490 request: Option<&BitmexHttpRequest>,
491 subscribe: Option<&String>,
492 error: Option<&str>,
493 ) {
494 let topics = Self::topics_from_request(request, subscribe);
495
496 if topics.is_empty() {
497 log::debug!("Unsubscription acknowledgement without topics");
498 return;
499 }
500
501 for topic in topics {
502 if success {
503 log::debug!("Unsubscription confirmed: topic={topic}");
504 self.subscriptions.confirm_unsubscribe(topic);
505 } else {
506 let reason = error.unwrap_or("Unsubscription rejected");
507 log::error!(
508 "Unsubscription failed - restoring subscription: topic={topic}, error={reason}",
509 );
510 self.subscriptions.confirm_unsubscribe(topic); self.subscriptions.mark_subscribe(topic); self.subscriptions.confirm_subscribe(topic); }
515 }
516 }
517
518 fn topics_from_request<'a>(
519 request: Option<&'a BitmexHttpRequest>,
520 fallback: Option<&'a String>,
521 ) -> Vec<&'a str> {
522 if let Some(req) = request
523 && !req.args.is_empty()
524 {
525 return req.args.iter().filter_map(|arg| arg.as_str()).collect();
526 }
527
528 fallback.into_iter().map(|topic| topic.as_str()).collect()
529 }
530
531 fn handle_orderbook_l2(
532 &self,
533 action: BitmexAction,
534 data: Vec<BitmexOrderBookMsg>,
535 ts_init: UnixNanos,
536 ) -> Option<NautilusWsMessage> {
537 if data.is_empty() {
538 return None;
539 }
540 let data = parse_book_msg_vec(data, action, &self.instruments_cache, ts_init);
541 Some(NautilusWsMessage::Data(data))
542 }
543
544 fn handle_orderbook_10(
545 &self,
546 data: Vec<BitmexOrderBook10Msg>,
547 ts_init: UnixNanos,
548 ) -> Option<NautilusWsMessage> {
549 if data.is_empty() {
550 return None;
551 }
552 let data = parse_book10_msg_vec(data, &self.instruments_cache, ts_init);
553 Some(NautilusWsMessage::Data(data))
554 }
555
556 fn handle_quote(
557 &mut self,
558 data: Vec<BitmexQuoteMsg>,
559 ts_init: UnixNanos,
560 ) -> Option<NautilusWsMessage> {
561 if data.is_empty() {
563 return None;
564 }
565
566 let mut quotes = Vec::with_capacity(data.len());
567
568 for msg in data {
569 let Some(instrument) = Self::get_instrument(&self.instruments_cache, &msg.symbol)
570 else {
571 log::error!(
572 "Instrument cache miss: quote message dropped for symbol={}",
573 msg.symbol
574 );
575 continue;
576 };
577
578 let instrument_id = instrument.id();
579 let price_precision = instrument.price_precision();
580
581 let bid_price = msg.bid_price.map(|p| Price::new(p, price_precision));
582 let ask_price = msg.ask_price.map(|p| Price::new(p, price_precision));
583 let bid_size = msg
584 .bid_size
585 .map(|s| parse_contracts_quantity(s, &instrument));
586 let ask_size = msg
587 .ask_size
588 .map(|s| parse_contracts_quantity(s, &instrument));
589 let ts_event = UnixNanos::from(msg.timestamp);
590
591 match self.quote_cache.process(
592 instrument_id,
593 bid_price,
594 ask_price,
595 bid_size,
596 ask_size,
597 ts_event,
598 ts_init,
599 ) {
600 Ok(quote) => quotes.push(Data::Quote(quote)),
601 Err(e) => {
602 log::warn!("Failed to process quote for {}: {e}", msg.symbol);
603 }
604 }
605 }
606
607 if quotes.is_empty() {
608 return None;
609 }
610
611 Some(NautilusWsMessage::Data(quotes))
612 }
613
614 fn handle_trade(
615 &self,
616 data: Vec<BitmexTradeMsg>,
617 ts_init: UnixNanos,
618 ) -> Option<NautilusWsMessage> {
619 if data.is_empty() {
620 return None;
621 }
622 let data = parse_trade_msg_vec(data, &self.instruments_cache, ts_init);
623 Some(NautilusWsMessage::Data(data))
624 }
625
626 fn handle_trade_bin(
627 &self,
628 action: BitmexAction,
629 data: Vec<BitmexTradeBinMsg>,
630 topic: BitmexWsTopic,
631 ts_init: UnixNanos,
632 ) -> Option<NautilusWsMessage> {
633 if action == BitmexAction::Partial || data.is_empty() {
634 return None;
635 }
636 let data = parse_trade_bin_msg_vec(data, topic, &self.instruments_cache, ts_init);
637 Some(NautilusWsMessage::Data(data))
638 }
639
640 fn handle_order(&mut self, data: Vec<OrderData>) -> Vec<NautilusWsMessage> {
641 let mut reports = Vec::with_capacity(data.len());
642 let mut updates = Vec::new();
643
644 for order_data in data {
645 match order_data {
646 OrderData::Full(order_msg) => {
647 let Some(instrument) =
648 Self::get_instrument(&self.instruments_cache, &order_msg.symbol)
649 else {
650 log::error!(
651 "Instrument cache miss: order message dropped for symbol={}, order_id={}",
652 order_msg.symbol,
653 order_msg.order_id
654 );
655 continue;
656 };
657
658 match parse_order_msg(&order_msg, &instrument, &self.order_type_cache) {
659 Ok(report) => {
660 if let Some(client_order_id) = &order_msg.cl_ord_id {
662 let client_order_id = ClientOrderId::new(client_order_id);
663
664 if let Some(ord_type) = &order_msg.ord_type {
665 let order_type: OrderType = if *ord_type
667 == BitmexOrderType::Pegged
668 && order_msg.peg_price_type
669 == Some(BitmexPegPriceType::TrailingStopPeg)
670 {
671 if order_msg.price.is_some() {
672 OrderType::TrailingStopLimit
673 } else {
674 OrderType::TrailingStopMarket
675 }
676 } else {
677 (*ord_type).into()
678 };
679 self.order_type_cache.insert(client_order_id, order_type);
680 }
681
682 self.order_symbol_cache
684 .insert(client_order_id, order_msg.symbol);
685 }
686
687 if is_terminal_order_status(report.order_status)
688 && let Some(client_id) = report.client_order_id
689 {
690 self.order_type_cache.remove(&client_id);
691 self.order_symbol_cache.remove(&client_id);
692 }
693
694 reports.push(report);
695 }
696 Err(e) => {
697 log::error!(
698 "Failed to parse full order message - potential data loss: \
699 error={e}, symbol={}, order_id={}, time_in_force={:?}",
700 order_msg.symbol,
701 order_msg.order_id,
702 order_msg.time_in_force,
703 );
704 continue;
706 }
707 }
708 }
709 OrderData::Update(msg) => {
710 let Some(instrument) =
711 Self::get_instrument(&self.instruments_cache, &msg.symbol)
712 else {
713 log::error!(
714 "Instrument cache miss: order update dropped for symbol={}, order_id={}",
715 msg.symbol,
716 msg.order_id
717 );
718 continue;
719 };
720
721 if let Some(cl_ord_id) = &msg.cl_ord_id {
723 let client_order_id = ClientOrderId::new(cl_ord_id);
724 self.order_symbol_cache.insert(client_order_id, msg.symbol);
725 }
726
727 if let Some(event) = parse_order_update_msg(&msg, &instrument, self.account_id)
728 {
729 updates.push(event);
730 } else {
731 log::warn!(
732 "Skipped order update message (insufficient data): \
733 order_id={}, price={:?}",
734 msg.order_id,
735 msg.price,
736 );
737 }
738 }
739 }
740 }
741
742 let mut msgs = Vec::new();
743
744 if !reports.is_empty() {
745 msgs.push(NautilusWsMessage::OrderStatusReports(reports));
746 }
747
748 if !updates.is_empty() {
749 msgs.push(NautilusWsMessage::OrderUpdates(updates));
750 }
751
752 msgs
753 }
754
755 fn handle_execution(&mut self, data: Vec<BitmexExecutionMsg>) -> Option<NautilusWsMessage> {
756 let mut fills = Vec::with_capacity(data.len());
757
758 for exec_msg in data {
759 let symbol_opt = if let Some(sym) = &exec_msg.symbol {
761 Some(*sym)
762 } else if let Some(cl_ord_id) = &exec_msg.cl_ord_id {
763 let client_order_id = ClientOrderId::new(cl_ord_id);
765 self.order_symbol_cache
766 .get(&client_order_id)
767 .map(|r| *r.value())
768 } else {
769 None
770 };
771
772 let Some(symbol) = symbol_opt else {
773 if let Some(cl_ord_id) = &exec_msg.cl_ord_id {
775 if exec_msg.exec_type == Some(BitmexExecType::Trade) {
776 log::warn!(
777 "Execution message missing symbol and not found in cache: \
778 cl_ord_id={cl_ord_id}, exec_id={:?}, ord_rej_reason={:?}, text={:?}",
779 exec_msg.exec_id,
780 exec_msg.ord_rej_reason,
781 exec_msg.text,
782 );
783 } else {
784 log::debug!(
785 "Execution message missing symbol and not found in cache: \
786 cl_ord_id={cl_ord_id}, exec_id={:?}, exec_type={:?}, \
787 ord_rej_reason={:?}, text={:?}",
788 exec_msg.exec_id,
789 exec_msg.exec_type,
790 exec_msg.ord_rej_reason,
791 exec_msg.text,
792 );
793 }
794 } else {
795 if exec_msg.exec_type == Some(BitmexExecType::CancelReject) {
799 log::debug!(
800 "CancelReject message missing symbol/clOrdID (expected with redundant cancels): \
801 exec_id={:?}, order_id={:?}",
802 exec_msg.exec_id,
803 exec_msg.order_id,
804 );
805 } else {
806 log::warn!(
807 "Execution message missing both symbol and clOrdID, cannot process: \
808 exec_id={:?}, order_id={:?}, exec_type={:?}, \
809 ord_rej_reason={:?}, text={:?}",
810 exec_msg.exec_id,
811 exec_msg.order_id,
812 exec_msg.exec_type,
813 exec_msg.ord_rej_reason,
814 exec_msg.text,
815 );
816 }
817 }
818 continue;
819 };
820
821 let Some(instrument) = Self::get_instrument(&self.instruments_cache, &symbol) else {
822 log::error!(
823 "Instrument cache miss: execution message dropped for symbol={}, exec_id={:?}, exec_type={:?}, Liquidation/ADL fills may be lost",
824 symbol,
825 exec_msg.exec_id,
826 exec_msg.exec_type
827 );
828 continue;
829 };
830
831 if let Some(fill) = parse_execution_msg(exec_msg, &instrument) {
832 fills.push(fill);
833 }
834 }
835
836 if fills.is_empty() {
837 return None;
838 }
839 Some(NautilusWsMessage::FillReports(fills))
840 }
841
842 fn handle_position(&self, data: Vec<BitmexPositionMsg>) -> Option<NautilusWsMessage> {
843 if data.is_empty() {
844 return None;
845 }
846
847 let mut reports = Vec::with_capacity(data.len());
848
849 for pos_msg in data {
850 let Some(instrument) = Self::get_instrument(&self.instruments_cache, &pos_msg.symbol)
851 else {
852 log::error!(
853 "Instrument cache miss: position message dropped for symbol={}, account={}",
854 pos_msg.symbol,
855 pos_msg.account
856 );
857 continue;
858 };
859 reports.push(parse_position_msg(pos_msg, &instrument));
860 }
861
862 if reports.is_empty() {
863 return None;
864 }
865
866 Some(NautilusWsMessage::PositionStatusReports(reports))
867 }
868
869 fn handle_wallet(
870 &self,
871 data: Vec<BitmexWalletMsg>,
872 ts_init: UnixNanos,
873 ) -> Option<NautilusWsMessage> {
874 if data.is_empty() {
875 return None;
876 }
877
878 let states: Vec<_> = data
879 .into_iter()
880 .map(|wallet_msg| parse_wallet_msg(wallet_msg, ts_init))
881 .collect();
882
883 Some(NautilusWsMessage::AccountStates(states))
884 }
885
886 fn handle_instrument(
887 &mut self,
888 action: BitmexAction,
889 data: Vec<BitmexInstrumentMsg>,
890 ts_init: UnixNanos,
891 ) -> Option<NautilusWsMessage> {
892 match action {
893 BitmexAction::Partial | BitmexAction::Insert => {
894 let mut instruments = Vec::with_capacity(data.len());
895 let mut temp_cache = AHashMap::new();
896
897 let data_for_prices = data.clone();
898
899 for msg in data {
900 match msg.try_into() {
901 Ok(http_inst) => {
902 match parse_instrument_any(&http_inst, ts_init) {
903 InstrumentParseResult::Ok(boxed) => {
904 let instrument_any = *boxed;
905 let symbol = instrument_any.symbol().inner();
906 temp_cache.insert(symbol, instrument_any.clone());
907 instruments.push(instrument_any);
908 }
909 InstrumentParseResult::Unsupported { .. }
910 | InstrumentParseResult::Inactive { .. } => {
911 }
913 InstrumentParseResult::Failed {
914 symbol,
915 instrument_type,
916 error,
917 } => {
918 log::warn!(
919 "Failed to parse instrument {symbol} ({instrument_type:?}): {error}"
920 );
921 }
922 }
923 }
924 Err(e) => {
925 log::debug!("Skipping instrument (missing required fields): {e}");
926 }
927 }
928 }
929
930 for (symbol, instrument) in &temp_cache {
932 self.instruments_cache.insert(*symbol, instrument.clone());
933 }
934
935 if !instruments.is_empty()
936 && let Err(e) = self
937 .out_tx
938 .send(NautilusWsMessage::Instruments(instruments))
939 {
940 log::error!("Error sending instruments: {e}");
941 }
942
943 let mut data_msgs = Vec::with_capacity(data_for_prices.len());
944
945 for msg in data_for_prices {
946 let parsed = parse_instrument_msg(msg, &temp_cache, ts_init);
947 data_msgs.extend(parsed);
948 }
949
950 if data_msgs.is_empty() {
951 return None;
952 }
953 Some(NautilusWsMessage::Data(data_msgs))
954 }
955 BitmexAction::Update => {
956 let mut data_msgs = Vec::with_capacity(data.len());
957
958 for msg in data {
959 let parsed = parse_instrument_msg(msg, &self.instruments_cache, ts_init);
960 data_msgs.extend(parsed);
961 }
962
963 if data_msgs.is_empty() {
964 return None;
965 }
966 Some(NautilusWsMessage::Data(data_msgs))
967 }
968 BitmexAction::Delete => {
969 log::info!(
970 "Received instrument delete action for {} instrument(s)",
971 data.len()
972 );
973 None
974 }
975 }
976 }
977
978 fn handle_funding(
979 &self,
980 data: Vec<BitmexFundingMsg>,
981 ts_init: UnixNanos,
982 ) -> Option<NautilusWsMessage> {
983 if data.is_empty() {
984 return None;
985 }
986
987 let funding_updates: Vec<_> = data
988 .into_iter()
989 .map(|msg| parse_funding_msg(msg, ts_init))
990 .collect();
991
992 Some(NautilusWsMessage::FundingRateUpdates(funding_updates))
993 }
994
995 fn handle_subscription_message(
996 &self,
997 success: bool,
998 subscribe: Option<&String>,
999 request: Option<&BitmexHttpRequest>,
1000 error: Option<&str>,
1001 ) -> Option<NautilusWsMessage> {
1002 if let Some(req) = request {
1003 if req
1004 .op
1005 .eq_ignore_ascii_case(BitmexWsAuthAction::AuthKeyExpires.as_ref())
1006 {
1007 if success {
1008 log::info!("WebSocket authenticated");
1009 self.auth_tracker.succeed();
1010 return Some(NautilusWsMessage::Authenticated);
1011 } else {
1012 let reason = error.unwrap_or("Authentication rejected").to_string();
1013 log::error!("WebSocket authentication failed: {reason}");
1014 self.auth_tracker.fail(reason);
1015 }
1016 return None;
1017 }
1018
1019 if req
1020 .op
1021 .eq_ignore_ascii_case(BitmexWsOperation::Subscribe.as_ref())
1022 {
1023 self.handle_subscription_ack(success, request, subscribe, error);
1024 return None;
1025 }
1026
1027 if req
1028 .op
1029 .eq_ignore_ascii_case(BitmexWsOperation::Unsubscribe.as_ref())
1030 {
1031 self.handle_unsubscribe_ack(success, request, subscribe, error);
1032 return None;
1033 }
1034 }
1035
1036 if subscribe.is_some() {
1037 self.handle_subscription_ack(success, request, subscribe, error);
1038 return None;
1039 }
1040
1041 if let Some(error) = error {
1042 log::warn!("Unhandled subscription control message: success={success}, error={error}");
1043 }
1044
1045 None
1046 }
1047}
1048
1049fn is_terminal_order_status(status: OrderStatus) -> bool {
1050 matches!(
1051 status,
1052 OrderStatus::Canceled | OrderStatus::Expired | OrderStatus::Rejected | OrderStatus::Filled,
1053 )
1054}
1055
1056pub(crate) fn should_retry_bitmex_error(error: &BitmexWsError) -> bool {
1058 match error {
1059 BitmexWsError::TungsteniteError(_) => true, BitmexWsError::ClientError(msg) => {
1061 let msg_lower = msg.to_lowercase();
1063 msg_lower.contains("timeout")
1064 || msg_lower.contains("timed out")
1065 || msg_lower.contains("connection")
1066 || msg_lower.contains("network")
1067 }
1068 _ => false,
1069 }
1070}
1071
1072pub(crate) fn create_bitmex_timeout_error(msg: String) -> BitmexWsError {
1074 BitmexWsError::ClientError(msg)
1075}
1076
1077#[cfg(test)]
1078mod tests {
1079 use rstest::rstest;
1080
1081 use super::*;
1082
1083 #[rstest]
1084 fn test_is_heartbeat_message_detection() {
1085 assert!(FeedHandler::is_heartbeat_message("{\"op\":\"ping\"}"));
1086 assert!(FeedHandler::is_heartbeat_message("{\"op\":\"pong\"}"));
1087 assert!(!FeedHandler::is_heartbeat_message(
1088 "{\"op\":\"subscribe\",\"args\":[\"trade:XBTUSD\"]}"
1089 ));
1090 }
1091}