1use std::sync::{
19 Arc,
20 atomic::{AtomicBool, Ordering},
21};
22
23use ahash::AHashMap;
24use dashmap::DashMap;
25use nautilus_common::cache::quote::QuoteCache;
26use nautilus_core::{UnixNanos, time::get_atomic_clock_realtime};
27use nautilus_model::{
28 data::Data,
29 enums::{OrderStatus, OrderType},
30 identifiers::{AccountId, ClientOrderId},
31 instruments::{Instrument, InstrumentAny},
32 types::Price,
33};
34use nautilus_network::{
35 RECONNECTED,
36 retry::{RetryManager, create_websocket_retry_manager},
37 websocket::{AuthTracker, SubscriptionState, WebSocketClient},
38};
39use tokio_tungstenite::tungstenite::Message;
40use ustr::Ustr;
41
42use super::{
43 enums::{BitmexAction, BitmexWsAuthAction, BitmexWsOperation, BitmexWsTopic},
44 error::BitmexWsError,
45 messages::{
46 BitmexExecutionMsg, BitmexFundingMsg, BitmexHttpRequest, BitmexInstrumentMsg,
47 BitmexOrderBook10Msg, BitmexOrderBookMsg, BitmexPositionMsg, BitmexQuoteMsg,
48 BitmexTableMessage, BitmexTradeBinMsg, BitmexTradeMsg, BitmexWalletMsg, BitmexWsMessage,
49 NautilusWsMessage, OrderData,
50 },
51 parse::{
52 parse_book_msg_vec, parse_book10_msg_vec, parse_execution_msg, parse_funding_msg,
53 parse_instrument_msg, parse_order_msg, parse_order_update_msg, parse_position_msg,
54 parse_trade_bin_msg_vec, parse_trade_msg_vec, parse_wallet_msg,
55 },
56};
57use crate::{
58 common::{enums::BitmexExecType, parse::parse_contracts_quantity},
59 http::parse::{InstrumentParseResult, parse_instrument_any},
60};
61
62#[derive(Debug)]
64#[allow(
65 clippy::large_enum_variant,
66 reason = "Commands are ephemeral and immediately consumed"
67)]
68pub enum HandlerCommand {
69 SetClient(WebSocketClient),
71 Disconnect,
73 Authenticate { payload: String },
75 Subscribe { topics: Vec<String> },
77 Unsubscribe { topics: Vec<String> },
79 InitializeInstruments(Vec<InstrumentAny>),
81 UpdateInstrument(InstrumentAny),
83}
84
85pub(super) struct FeedHandler {
86 account_id: AccountId,
87 signal: Arc<AtomicBool>,
88 client: Option<WebSocketClient>,
89 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
90 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
91 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
92 auth_tracker: AuthTracker,
93 subscriptions: SubscriptionState,
94 retry_manager: RetryManager<BitmexWsError>,
95 instruments_cache: AHashMap<Ustr, InstrumentAny>,
96 order_type_cache: Arc<DashMap<ClientOrderId, OrderType>>,
97 order_symbol_cache: Arc<DashMap<ClientOrderId, Ustr>>,
98 quote_cache: QuoteCache,
99}
100
101impl FeedHandler {
102 #[allow(clippy::too_many_arguments)]
104 pub(super) fn new(
105 signal: Arc<AtomicBool>,
106 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
107 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
108 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
109 account_id: AccountId,
110 auth_tracker: AuthTracker,
111 subscriptions: SubscriptionState,
112 order_type_cache: Arc<DashMap<ClientOrderId, OrderType>>,
113 order_symbol_cache: Arc<DashMap<ClientOrderId, Ustr>>,
114 ) -> Self {
115 Self {
116 account_id,
117 signal,
118 client: None,
119 cmd_rx,
120 raw_rx,
121 out_tx,
122 auth_tracker,
123 subscriptions,
124 retry_manager: create_websocket_retry_manager(),
125 instruments_cache: AHashMap::new(),
126 order_type_cache,
127 order_symbol_cache,
128 quote_cache: QuoteCache::new(),
129 }
130 }
131
132 pub(super) fn is_stopped(&self) -> bool {
133 self.signal.load(Ordering::Relaxed)
134 }
135
136 pub(super) fn send(&self, msg: NautilusWsMessage) -> Result<(), ()> {
137 self.out_tx.send(msg).map_err(|_| ())
138 }
139
140 async fn send_with_retry(&self, payload: String) -> anyhow::Result<()> {
142 if let Some(client) = &self.client {
143 self.retry_manager
144 .execute_with_retry(
145 "websocket_send",
146 || {
147 let payload = payload.clone();
148 async move {
149 client.send_text(payload, None).await.map_err(|e| {
150 BitmexWsError::ClientError(format!("Send failed: {e}"))
151 })
152 }
153 },
154 should_retry_bitmex_error,
155 create_bitmex_timeout_error,
156 )
157 .await
158 .map_err(|e| anyhow::anyhow!("{e}"))
159 } else {
160 Err(anyhow::anyhow!("No active WebSocket client"))
161 }
162 }
163
164 #[inline]
165 fn get_instrument(
166 cache: &AHashMap<Ustr, InstrumentAny>,
167 symbol: &Ustr,
168 ) -> Option<InstrumentAny> {
169 cache.get(symbol).cloned()
170 }
171
172 pub(super) async fn next(&mut self) -> Option<NautilusWsMessage> {
173 let clock = get_atomic_clock_realtime();
174
175 loop {
176 tokio::select! {
177 Some(cmd) = self.cmd_rx.recv() => {
178 match cmd {
179 HandlerCommand::SetClient(client) => {
180 tracing::debug!("WebSocketClient received by handler");
181 self.client = Some(client);
182 }
183 HandlerCommand::Disconnect => {
184 tracing::debug!("Disconnect command received");
185 if let Some(client) = self.client.take() {
186 client.disconnect().await;
187 }
188 }
189 HandlerCommand::Authenticate { payload } => {
190 tracing::debug!("Authenticate command received");
191 if let Err(e) = self.send_with_retry(payload).await {
192 tracing::error!(error = %e, "Failed to send authentication after retries");
193 }
194 }
195 HandlerCommand::Subscribe { topics } => {
196 for topic in topics {
197 tracing::debug!(topic = %topic, "Subscribing to topic");
198 if let Err(e) = self.send_with_retry(topic.clone()).await {
199 tracing::error!(topic = %topic, error = %e, "Failed to send subscription after retries");
200 }
201 }
202 }
203 HandlerCommand::Unsubscribe { topics } => {
204 for topic in topics {
205 tracing::debug!(topic = %topic, "Unsubscribing from topic");
206 if let Err(e) = self.send_with_retry(topic.clone()).await {
207 tracing::error!(topic = %topic, error = %e, "Failed to send unsubscription after retries");
208 }
209 }
210 }
211 HandlerCommand::InitializeInstruments(instruments) => {
212 for inst in instruments {
213 self.instruments_cache.insert(inst.symbol().inner(), inst);
214 }
215 }
216 HandlerCommand::UpdateInstrument(inst) => {
217 self.instruments_cache.insert(inst.symbol().inner(), inst);
218 }
219 }
220 continue;
222 }
223
224 _ = tokio::time::sleep(std::time::Duration::from_millis(100)) => {
225 if self.signal.load(std::sync::atomic::Ordering::Relaxed) {
226 tracing::debug!("Stop signal received during idle period");
227 return None;
228 }
229 continue;
230 }
231
232 msg = self.raw_rx.recv() => {
233 let msg = match msg {
234 Some(msg) => msg,
235 None => {
236 tracing::debug!("WebSocket stream closed");
237 return None;
238 }
239 };
240
241 if let Message::Ping(data) = &msg {
243 tracing::trace!("Received ping frame with {} bytes", data.len());
244 if let Some(client) = &self.client
245 && let Err(e) = client.send_pong(data.to_vec()).await
246 {
247 tracing::warn!(error = %e, "Failed to send pong frame");
248 }
249 continue;
250 }
251
252 let event = match Self::parse_raw_message(msg) {
253 Some(event) => event,
254 None => continue,
255 };
256
257 if self.signal.load(std::sync::atomic::Ordering::Relaxed) {
258 tracing::debug!("Stop signal received");
259 return None;
260 }
261
262 match event {
263 BitmexWsMessage::Reconnected => {
264 self.quote_cache.clear();
265 return Some(NautilusWsMessage::Reconnected);
266 }
267 BitmexWsMessage::Subscription {
268 success,
269 subscribe,
270 request,
271 error,
272 } => {
273 if let Some(msg) = self.handle_subscription_message(
274 success,
275 subscribe.as_ref(),
276 request.as_ref(),
277 error.as_deref(),
278 ) {
279 return Some(msg);
280 }
281 continue;
282 }
283 BitmexWsMessage::Table(table_msg) => {
284 let ts_init = clock.get_time_ns();
285
286 let msg = match table_msg {
287 BitmexTableMessage::OrderBookL2 { action, data } => {
288 self.handle_orderbook_l2(action, data, ts_init)
289 }
290 BitmexTableMessage::OrderBookL2_25 { action, data } => {
291 self.handle_orderbook_l2(action, data, ts_init)
292 }
293 BitmexTableMessage::OrderBook10 { data, .. } => {
294 self.handle_orderbook_10(data, ts_init)
295 }
296 BitmexTableMessage::Quote { data, .. } => {
297 self.handle_quote(data, ts_init)
298 }
299 BitmexTableMessage::Trade { data, .. } => {
300 self.handle_trade(data, ts_init)
301 }
302 BitmexTableMessage::TradeBin1m { action, data } => {
303 self.handle_trade_bin(action, data, BitmexWsTopic::TradeBin1m, ts_init)
304 }
305 BitmexTableMessage::TradeBin5m { action, data } => {
306 self.handle_trade_bin(action, data, BitmexWsTopic::TradeBin5m, ts_init)
307 }
308 BitmexTableMessage::TradeBin1h { action, data } => {
309 self.handle_trade_bin(action, data, BitmexWsTopic::TradeBin1h, ts_init)
310 }
311 BitmexTableMessage::TradeBin1d { action, data } => {
312 self.handle_trade_bin(action, data, BitmexWsTopic::TradeBin1d, ts_init)
313 }
314 BitmexTableMessage::Order { data, .. } => {
318 self.handle_order(data)
319 }
320 BitmexTableMessage::Execution { data, .. } => {
321 self.handle_execution(data)
322 }
323 BitmexTableMessage::Position { data, .. } => {
324 self.handle_position(data)
325 }
326 BitmexTableMessage::Wallet { data, .. } => {
327 self.handle_wallet(data, ts_init)
328 }
329 BitmexTableMessage::Margin { .. } => {
330 None
333 }
334 BitmexTableMessage::Instrument { action, data } => {
335 self.handle_instrument(action, data, ts_init)
336 }
337 BitmexTableMessage::Funding { data, .. } => {
338 self.handle_funding(data, ts_init)
339 }
340 _ => {
341 tracing::warn!("Unhandled table message type: {table_msg:?}");
343 None
344 }
345 };
346
347 if let Some(msg) = msg {
348 return Some(msg);
349 }
350 continue;
351 }
352 BitmexWsMessage::Welcome { .. } | BitmexWsMessage::Error { .. } => continue,
353 }
354 }
355
356 else => {
358 tracing::debug!("Handler shutting down: stream ended or command channel closed");
359 return None;
360 }
361 }
362 }
363 }
364
365 fn parse_raw_message(msg: Message) -> Option<BitmexWsMessage> {
366 match msg {
367 Message::Text(text) => {
368 if text == RECONNECTED {
369 tracing::info!("Received WebSocket reconnected signal");
370 return Some(BitmexWsMessage::Reconnected);
371 }
372
373 tracing::trace!("Raw websocket message: {text}");
374
375 if Self::is_heartbeat_message(&text) {
376 tracing::trace!("Ignoring heartbeat control message: {text}");
377 return None;
378 }
379
380 match serde_json::from_str(&text) {
381 Ok(msg) => match &msg {
382 BitmexWsMessage::Welcome {
383 version,
384 heartbeat_enabled,
385 limit,
386 ..
387 } => {
388 tracing::info!(
389 version = version,
390 heartbeat = heartbeat_enabled,
391 rate_limit = ?limit.remaining,
392 "Welcome to the BitMEX Realtime API:",
393 );
394 }
395 BitmexWsMessage::Subscription { .. } => return Some(msg),
396 BitmexWsMessage::Error { status, error, .. } => {
397 tracing::error!(
398 status = status,
399 error = error,
400 "Received error from BitMEX"
401 );
402 }
403 _ => return Some(msg),
404 },
405 Err(e) => {
406 tracing::error!("Failed to parse WebSocket message: {e}: {text}");
407 }
408 }
409 }
410 Message::Binary(msg) => {
411 tracing::debug!("Raw binary: {msg:?}");
412 }
413 Message::Close(_) => {
414 tracing::debug!("Received close message, waiting for reconnection");
415 }
416 Message::Ping(data) => {
417 tracing::trace!("Ping frame with {} bytes (already handled)", data.len());
419 }
420 Message::Pong(data) => {
421 tracing::trace!("Received pong frame with {} bytes", data.len());
422 }
423 Message::Frame(frame) => {
424 tracing::debug!("Received raw frame: {frame:?}");
425 }
426 }
427
428 None
429 }
430
431 fn is_heartbeat_message(text: &str) -> bool {
432 let trimmed = text.trim();
433
434 if !trimmed.starts_with('{') || trimmed.len() > 64 {
435 return false;
436 }
437
438 trimmed.contains("\"op\":\"ping\"") || trimmed.contains("\"op\":\"pong\"")
439 }
440
441 fn handle_subscription_ack(
442 &self,
443 success: bool,
444 request: Option<&BitmexHttpRequest>,
445 subscribe: Option<&String>,
446 error: Option<&str>,
447 ) {
448 let topics = Self::topics_from_request(request, subscribe);
449
450 if topics.is_empty() {
451 tracing::debug!("Subscription acknowledgement without topics");
452 return;
453 }
454
455 for topic in topics {
456 if success {
457 self.subscriptions.confirm_subscribe(topic);
458 tracing::debug!(topic = topic, "Subscription confirmed");
459 } else {
460 self.subscriptions.mark_failure(topic);
461 let reason = error.unwrap_or("Subscription rejected");
462 tracing::error!(topic = topic, error = reason, "Subscription failed");
463 }
464 }
465 }
466
467 fn handle_unsubscribe_ack(
468 &self,
469 success: bool,
470 request: Option<&BitmexHttpRequest>,
471 subscribe: Option<&String>,
472 error: Option<&str>,
473 ) {
474 let topics = Self::topics_from_request(request, subscribe);
475
476 if topics.is_empty() {
477 tracing::debug!("Unsubscription acknowledgement without topics");
478 return;
479 }
480
481 for topic in topics {
482 if success {
483 tracing::debug!(topic = topic, "Unsubscription confirmed");
484 self.subscriptions.confirm_unsubscribe(topic);
485 } else {
486 let reason = error.unwrap_or("Unsubscription rejected");
487 tracing::error!(
488 topic = topic,
489 error = reason,
490 "Unsubscription failed - restoring subscription"
491 );
492 self.subscriptions.confirm_unsubscribe(topic); self.subscriptions.mark_subscribe(topic); self.subscriptions.confirm_subscribe(topic); }
497 }
498 }
499
500 fn topics_from_request<'a>(
501 request: Option<&'a BitmexHttpRequest>,
502 fallback: Option<&'a String>,
503 ) -> Vec<&'a str> {
504 if let Some(req) = request
505 && !req.args.is_empty()
506 {
507 return req.args.iter().filter_map(|arg| arg.as_str()).collect();
508 }
509
510 fallback.into_iter().map(|topic| topic.as_str()).collect()
511 }
512
513 fn handle_orderbook_l2(
514 &self,
515 action: BitmexAction,
516 data: Vec<BitmexOrderBookMsg>,
517 ts_init: UnixNanos,
518 ) -> Option<NautilusWsMessage> {
519 if data.is_empty() {
520 return None;
521 }
522 let data = parse_book_msg_vec(data, action, &self.instruments_cache, ts_init);
523 Some(NautilusWsMessage::Data(data))
524 }
525
526 fn handle_orderbook_10(
527 &self,
528 data: Vec<BitmexOrderBook10Msg>,
529 ts_init: UnixNanos,
530 ) -> Option<NautilusWsMessage> {
531 if data.is_empty() {
532 return None;
533 }
534 let data = parse_book10_msg_vec(data, &self.instruments_cache, ts_init);
535 Some(NautilusWsMessage::Data(data))
536 }
537
538 fn handle_quote(
539 &mut self,
540 mut data: Vec<BitmexQuoteMsg>,
541 ts_init: UnixNanos,
542 ) -> Option<NautilusWsMessage> {
543 if data.is_empty() {
545 return None;
546 }
547
548 let msg = data.remove(0);
549 let Some(instrument) = Self::get_instrument(&self.instruments_cache, &msg.symbol) else {
550 tracing::error!(
551 "Instrument cache miss: quote message dropped for symbol={}",
552 msg.symbol
553 );
554 return None;
555 };
556
557 let instrument_id = instrument.id();
558 let price_precision = instrument.price_precision();
559
560 let bid_price = msg.bid_price.map(|p| Price::new(p, price_precision));
561 let ask_price = msg.ask_price.map(|p| Price::new(p, price_precision));
562 let bid_size = msg
563 .bid_size
564 .map(|s| parse_contracts_quantity(s, &instrument));
565 let ask_size = msg
566 .ask_size
567 .map(|s| parse_contracts_quantity(s, &instrument));
568 let ts_event = UnixNanos::from(msg.timestamp);
569
570 match self.quote_cache.process(
571 instrument_id,
572 bid_price,
573 ask_price,
574 bid_size,
575 ask_size,
576 ts_event,
577 ts_init,
578 ) {
579 Ok(quote) => Some(NautilusWsMessage::Data(vec![Data::Quote(quote)])),
580 Err(e) => {
581 tracing::warn!(error = %e, "Failed to process quote");
582 None
583 }
584 }
585 }
586
587 fn handle_trade(
588 &self,
589 data: Vec<BitmexTradeMsg>,
590 ts_init: UnixNanos,
591 ) -> Option<NautilusWsMessage> {
592 if data.is_empty() {
593 return None;
594 }
595 let data = parse_trade_msg_vec(data, &self.instruments_cache, ts_init);
596 Some(NautilusWsMessage::Data(data))
597 }
598
599 fn handle_trade_bin(
600 &self,
601 action: BitmexAction,
602 data: Vec<BitmexTradeBinMsg>,
603 topic: BitmexWsTopic,
604 ts_init: UnixNanos,
605 ) -> Option<NautilusWsMessage> {
606 if action == BitmexAction::Partial || data.is_empty() {
607 return None;
608 }
609 let data = parse_trade_bin_msg_vec(data, topic, &self.instruments_cache, ts_init);
610 Some(NautilusWsMessage::Data(data))
611 }
612
613 fn handle_order(&mut self, data: Vec<OrderData>) -> Option<NautilusWsMessage> {
614 let mut reports = Vec::with_capacity(data.len());
616
617 for order_data in data {
618 match order_data {
619 OrderData::Full(order_msg) => {
620 let Some(instrument) =
621 Self::get_instrument(&self.instruments_cache, &order_msg.symbol)
622 else {
623 tracing::error!(
624 "Instrument cache miss: order message dropped for symbol={}, order_id={}",
625 order_msg.symbol,
626 order_msg.order_id
627 );
628 continue;
629 };
630
631 match parse_order_msg(&order_msg, &instrument, &self.order_type_cache) {
632 Ok(report) => {
633 if let Some(client_order_id) = &order_msg.cl_ord_id {
635 let client_order_id = ClientOrderId::new(client_order_id);
636
637 if let Some(ord_type) = &order_msg.ord_type {
638 let order_type: OrderType = (*ord_type).into();
639 self.order_type_cache.insert(client_order_id, order_type);
640 }
641
642 self.order_symbol_cache
644 .insert(client_order_id, order_msg.symbol);
645 }
646
647 if is_terminal_order_status(report.order_status)
648 && let Some(client_id) = report.client_order_id
649 {
650 self.order_type_cache.remove(&client_id);
651 self.order_symbol_cache.remove(&client_id);
652 }
653
654 reports.push(report);
655 }
656 Err(e) => {
657 tracing::error!(
658 error = %e,
659 symbol = %order_msg.symbol,
660 order_id = %order_msg.order_id,
661 time_in_force = ?order_msg.time_in_force,
662 "Failed to parse full order message - potential data loss"
663 );
664 continue;
666 }
667 }
668 }
669 OrderData::Update(msg) => {
670 let Some(instrument) =
671 Self::get_instrument(&self.instruments_cache, &msg.symbol)
672 else {
673 tracing::error!(
674 "Instrument cache miss: order update dropped for symbol={}, order_id={}",
675 msg.symbol,
676 msg.order_id
677 );
678 continue;
679 };
680
681 if let Some(cl_ord_id) = &msg.cl_ord_id {
683 let client_order_id = ClientOrderId::new(cl_ord_id);
684 self.order_symbol_cache.insert(client_order_id, msg.symbol);
685 }
686
687 if let Some(event) = parse_order_update_msg(&msg, &instrument, self.account_id)
688 {
689 return Some(NautilusWsMessage::OrderUpdated(event));
690 } else {
691 tracing::warn!(
692 order_id = %msg.order_id,
693 price = ?msg.price,
694 "Skipped order update message (insufficient data)"
695 );
696 }
697 }
698 }
699 }
700
701 if reports.is_empty() {
702 return None;
703 }
704
705 Some(NautilusWsMessage::OrderStatusReports(reports))
706 }
707
708 fn handle_execution(&mut self, data: Vec<BitmexExecutionMsg>) -> Option<NautilusWsMessage> {
709 let mut fills = Vec::with_capacity(data.len());
710
711 for exec_msg in data {
712 let symbol_opt = if let Some(sym) = &exec_msg.symbol {
714 Some(*sym)
715 } else if let Some(cl_ord_id) = &exec_msg.cl_ord_id {
716 let client_order_id = ClientOrderId::new(cl_ord_id);
718 self.order_symbol_cache
719 .get(&client_order_id)
720 .map(|r| *r.value())
721 } else {
722 None
723 };
724
725 let Some(symbol) = symbol_opt else {
726 if let Some(cl_ord_id) = &exec_msg.cl_ord_id {
728 if exec_msg.exec_type == Some(BitmexExecType::Trade) {
729 tracing::warn!(
730 cl_ord_id = %cl_ord_id,
731 exec_id = ?exec_msg.exec_id,
732 ord_rej_reason = ?exec_msg.ord_rej_reason,
733 text = ?exec_msg.text,
734 "Execution message missing symbol and not found in cache"
735 );
736 } else {
737 tracing::debug!(
738 cl_ord_id = %cl_ord_id,
739 exec_id = ?exec_msg.exec_id,
740 exec_type = ?exec_msg.exec_type,
741 ord_rej_reason = ?exec_msg.ord_rej_reason,
742 text = ?exec_msg.text,
743 "Execution message missing symbol and not found in cache"
744 );
745 }
746 } else {
747 if exec_msg.exec_type == Some(BitmexExecType::CancelReject) {
751 tracing::debug!(
752 exec_id = ?exec_msg.exec_id,
753 order_id = ?exec_msg.order_id,
754 "CancelReject message missing symbol/clOrdID (expected with redundant cancels)"
755 );
756 } else {
757 tracing::warn!(
758 exec_id = ?exec_msg.exec_id,
759 order_id = ?exec_msg.order_id,
760 exec_type = ?exec_msg.exec_type,
761 ord_rej_reason = ?exec_msg.ord_rej_reason,
762 text = ?exec_msg.text,
763 "Execution message missing both symbol and clOrdID, cannot process"
764 );
765 }
766 }
767 continue;
768 };
769
770 let Some(instrument) = Self::get_instrument(&self.instruments_cache, &symbol) else {
771 tracing::error!(
772 "Instrument cache miss: execution message dropped for symbol={}, exec_id={:?}, exec_type={:?}, Liquidation/ADL fills may be lost",
773 symbol,
774 exec_msg.exec_id,
775 exec_msg.exec_type
776 );
777 continue;
778 };
779
780 if let Some(fill) = parse_execution_msg(exec_msg, &instrument) {
781 fills.push(fill);
782 }
783 }
784
785 if fills.is_empty() {
786 return None;
787 }
788 Some(NautilusWsMessage::FillReports(fills))
789 }
790
791 fn handle_position(&self, data: Vec<BitmexPositionMsg>) -> Option<NautilusWsMessage> {
792 if let Some(pos_msg) = data.into_iter().next() {
793 let Some(instrument) = Self::get_instrument(&self.instruments_cache, &pos_msg.symbol)
794 else {
795 tracing::error!(
796 "Instrument cache miss: position message dropped for symbol={}, account={}",
797 pos_msg.symbol,
798 pos_msg.account
799 );
800 return None;
801 };
802 let report = parse_position_msg(pos_msg, &instrument);
803 Some(NautilusWsMessage::PositionStatusReport(report))
804 } else {
805 None
806 }
807 }
808
809 fn handle_wallet(
810 &self,
811 data: Vec<BitmexWalletMsg>,
812 ts_init: UnixNanos,
813 ) -> Option<NautilusWsMessage> {
814 if let Some(wallet_msg) = data.into_iter().next() {
815 let account_state = parse_wallet_msg(wallet_msg, ts_init);
816 Some(NautilusWsMessage::AccountState(account_state))
817 } else {
818 None
819 }
820 }
821
822 fn handle_instrument(
823 &mut self,
824 action: BitmexAction,
825 data: Vec<BitmexInstrumentMsg>,
826 ts_init: UnixNanos,
827 ) -> Option<NautilusWsMessage> {
828 match action {
829 BitmexAction::Partial | BitmexAction::Insert => {
830 let mut instruments = Vec::with_capacity(data.len());
831 let mut temp_cache = AHashMap::new();
832
833 let data_for_prices = data.clone();
834
835 for msg in data {
836 match msg.try_into() {
837 Ok(http_inst) => {
838 match parse_instrument_any(&http_inst, ts_init) {
839 InstrumentParseResult::Ok(boxed) => {
840 let instrument_any = *boxed;
841 let symbol = instrument_any.symbol().inner();
842 temp_cache.insert(symbol, instrument_any.clone());
843 instruments.push(instrument_any);
844 }
845 InstrumentParseResult::Unsupported { .. } => {
846 }
848 InstrumentParseResult::Failed {
849 symbol,
850 instrument_type,
851 error,
852 } => {
853 log::warn!(
854 "Failed to parse instrument {symbol} ({instrument_type:?}): {error}"
855 );
856 }
857 }
858 }
859 Err(e) => {
860 log::debug!("Skipping instrument (missing required fields): {e}");
861 }
862 }
863 }
864
865 for (symbol, instrument) in &temp_cache {
867 self.instruments_cache.insert(*symbol, instrument.clone());
868 }
869
870 if !instruments.is_empty()
871 && let Err(e) = self
872 .out_tx
873 .send(NautilusWsMessage::Instruments(instruments))
874 {
875 tracing::error!("Error sending instruments: {e}");
876 }
877
878 let mut data_msgs = Vec::with_capacity(data_for_prices.len());
879
880 for msg in data_for_prices {
881 let parsed = parse_instrument_msg(msg, &temp_cache, ts_init);
882 data_msgs.extend(parsed);
883 }
884
885 if data_msgs.is_empty() {
886 return None;
887 }
888 Some(NautilusWsMessage::Data(data_msgs))
889 }
890 BitmexAction::Update => {
891 let mut data_msgs = Vec::with_capacity(data.len());
892
893 for msg in data {
894 let parsed = parse_instrument_msg(msg, &self.instruments_cache, ts_init);
895 data_msgs.extend(parsed);
896 }
897
898 if data_msgs.is_empty() {
899 return None;
900 }
901 Some(NautilusWsMessage::Data(data_msgs))
902 }
903 BitmexAction::Delete => {
904 log::info!(
905 "Received instrument delete action for {} instrument(s)",
906 data.len()
907 );
908 None
909 }
910 }
911 }
912
913 fn handle_funding(
914 &self,
915 data: Vec<BitmexFundingMsg>,
916 ts_init: UnixNanos,
917 ) -> Option<NautilusWsMessage> {
918 let mut funding_updates = Vec::with_capacity(data.len());
919
920 for msg in data {
921 if let Some(parsed) = parse_funding_msg(msg, ts_init) {
922 funding_updates.push(parsed);
923 }
924 }
925
926 if !funding_updates.is_empty() {
927 Some(NautilusWsMessage::FundingRateUpdates(funding_updates))
928 } else {
929 None
930 }
931 }
932
933 fn handle_subscription_message(
934 &self,
935 success: bool,
936 subscribe: Option<&String>,
937 request: Option<&BitmexHttpRequest>,
938 error: Option<&str>,
939 ) -> Option<NautilusWsMessage> {
940 if let Some(req) = request {
941 if req
942 .op
943 .eq_ignore_ascii_case(BitmexWsAuthAction::AuthKeyExpires.as_ref())
944 {
945 if success {
946 tracing::info!("WebSocket authenticated");
947 self.auth_tracker.succeed();
948 return Some(NautilusWsMessage::Authenticated);
949 } else {
950 let reason = error.unwrap_or("Authentication rejected").to_string();
951 tracing::error!(error = %reason, "WebSocket authentication failed");
952 self.auth_tracker.fail(reason);
953 }
954 return None;
955 }
956
957 if req
958 .op
959 .eq_ignore_ascii_case(BitmexWsOperation::Subscribe.as_ref())
960 {
961 self.handle_subscription_ack(success, request, subscribe, error);
962 return None;
963 }
964
965 if req
966 .op
967 .eq_ignore_ascii_case(BitmexWsOperation::Unsubscribe.as_ref())
968 {
969 self.handle_unsubscribe_ack(success, request, subscribe, error);
970 return None;
971 }
972 }
973
974 if subscribe.is_some() {
975 self.handle_subscription_ack(success, request, subscribe, error);
976 return None;
977 }
978
979 if let Some(error) = error {
980 tracing::warn!(
981 success = success,
982 error = error,
983 "Unhandled subscription control message"
984 );
985 }
986
987 None
988 }
989}
990
991fn is_terminal_order_status(status: OrderStatus) -> bool {
992 matches!(
993 status,
994 OrderStatus::Canceled | OrderStatus::Expired | OrderStatus::Rejected | OrderStatus::Filled,
995 )
996}
997
998pub(crate) fn should_retry_bitmex_error(error: &BitmexWsError) -> bool {
1000 match error {
1001 BitmexWsError::TungsteniteError(_) => true, BitmexWsError::ClientError(msg) => {
1003 let msg_lower = msg.to_lowercase();
1005 msg_lower.contains("timeout")
1006 || msg_lower.contains("timed out")
1007 || msg_lower.contains("connection")
1008 || msg_lower.contains("network")
1009 }
1010 _ => false,
1011 }
1012}
1013
1014pub(crate) fn create_bitmex_timeout_error(msg: String) -> BitmexWsError {
1016 BitmexWsError::ClientError(msg)
1017}
1018
1019#[cfg(test)]
1020mod tests {
1021 use rstest::rstest;
1022
1023 use super::*;
1024
1025 #[rstest]
1026 fn test_is_heartbeat_message_detection() {
1027 assert!(FeedHandler::is_heartbeat_message("{\"op\":\"ping\"}"));
1028 assert!(FeedHandler::is_heartbeat_message("{\"op\":\"pong\"}"));
1029 assert!(!FeedHandler::is_heartbeat_message(
1030 "{\"op\":\"subscribe\",\"args\":[\"trade:XBTUSD\"]}"
1031 ));
1032 }
1033}