1use std::sync::{
19 Arc,
20 atomic::{AtomicBool, Ordering},
21};
22
23use ahash::{AHashMap, AHashSet};
24use dashmap::DashMap;
25use nautilus_common::cache::fifo::FifoCache;
26use nautilus_core::{AtomicTime, nanos::UnixNanos, time::get_atomic_clock_realtime};
27use nautilus_model::{
28 data::BarType,
29 identifiers::{AccountId, ClientOrderId},
30 instruments::{Instrument, InstrumentAny},
31};
32use nautilus_network::{
33 RECONNECTED,
34 retry::{RetryManager, create_websocket_retry_manager},
35 websocket::{SubscriptionState, WebSocketClient},
36};
37use tokio_tungstenite::tungstenite::Message;
38use ustr::Ustr;
39
40use super::{
41 client::AssetContextDataType,
42 error::HyperliquidWsError,
43 messages::{
44 CandleData, ExecutionReport, HyperliquidWsMessage, HyperliquidWsRequest, NautilusWsMessage,
45 SubscriptionRequest, WsActiveAssetCtxData, WsUserEventData,
46 },
47 parse::{
48 parse_ws_asset_context, parse_ws_candle, parse_ws_fill_report, parse_ws_order_book_deltas,
49 parse_ws_order_status_report, parse_ws_quote_tick, parse_ws_trade_tick,
50 },
51};
52
53#[derive(Debug)]
55#[allow(
56 clippy::large_enum_variant,
57 reason = "Commands are ephemeral and immediately consumed"
58)]
59#[allow(private_interfaces)]
60pub enum HandlerCommand {
61 SetClient(WebSocketClient),
63 Disconnect,
65 Subscribe {
67 subscriptions: Vec<SubscriptionRequest>,
68 },
69 Unsubscribe {
71 subscriptions: Vec<SubscriptionRequest>,
72 },
73 InitializeInstruments(Vec<InstrumentAny>),
75 UpdateInstrument(InstrumentAny),
77 AddBarType { key: String, bar_type: BarType },
79 RemoveBarType { key: String },
81 UpdateAssetContextSubs {
83 coin: Ustr,
84 data_types: AHashSet<AssetContextDataType>,
85 },
86 CacheSpotFillCoins(AHashMap<Ustr, Ustr>),
88}
89
90pub(super) struct FeedHandler {
91 clock: &'static AtomicTime,
92 signal: Arc<AtomicBool>,
93 client: Option<WebSocketClient>,
94 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
95 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
96 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
97 account_id: Option<AccountId>,
98 subscriptions: SubscriptionState,
99 retry_manager: RetryManager<HyperliquidWsError>,
100 message_buffer: Vec<NautilusWsMessage>,
101 instruments: AHashMap<Ustr, InstrumentAny>,
102 cloid_cache: Arc<DashMap<Ustr, ClientOrderId>>,
103 bar_types_cache: AHashMap<String, BarType>,
104 bar_cache: AHashMap<String, CandleData>,
105 asset_context_subs: AHashMap<Ustr, AHashSet<AssetContextDataType>>,
106 processed_trade_ids: FifoCache<u64, 10_000>,
107 mark_price_cache: AHashMap<Ustr, String>,
108 index_price_cache: AHashMap<Ustr, String>,
109 funding_rate_cache: AHashMap<Ustr, String>,
110}
111
112impl FeedHandler {
113 pub(super) fn new(
115 signal: Arc<AtomicBool>,
116 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
117 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
118 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
119 account_id: Option<AccountId>,
120 subscriptions: SubscriptionState,
121 cloid_cache: Arc<DashMap<Ustr, ClientOrderId>>,
122 ) -> Self {
123 Self {
124 clock: get_atomic_clock_realtime(),
125 signal,
126 client: None,
127 cmd_rx,
128 raw_rx,
129 out_tx,
130 account_id,
131 subscriptions,
132 retry_manager: create_websocket_retry_manager(),
133 message_buffer: Vec::new(),
134 instruments: AHashMap::new(),
135 cloid_cache,
136 bar_types_cache: AHashMap::new(),
137 bar_cache: AHashMap::new(),
138 asset_context_subs: AHashMap::new(),
139 processed_trade_ids: FifoCache::new(),
140 mark_price_cache: AHashMap::new(),
141 index_price_cache: AHashMap::new(),
142 funding_rate_cache: AHashMap::new(),
143 }
144 }
145
146 pub(super) fn send(&self, msg: NautilusWsMessage) -> Result<(), String> {
148 self.out_tx
149 .send(msg)
150 .map_err(|e| format!("Failed to send message: {e}"))
151 }
152
153 pub(super) fn is_stopped(&self) -> bool {
155 self.signal.load(Ordering::Relaxed)
156 }
157
158 async fn send_with_retry(&self, payload: String) -> anyhow::Result<()> {
160 if let Some(client) = &self.client {
161 self.retry_manager
162 .execute_with_retry(
163 "websocket_send",
164 || {
165 let payload = payload.clone();
166 async move {
167 client.send_text(payload, None).await.map_err(|e| {
168 HyperliquidWsError::ClientError(format!("Send failed: {e}"))
169 })
170 }
171 },
172 should_retry_hyperliquid_error,
173 create_hyperliquid_timeout_error,
174 )
175 .await
176 .map_err(|e| anyhow::anyhow!("{e}"))
177 } else {
178 Err(anyhow::anyhow!("No WebSocket client available"))
179 }
180 }
181
182 pub(super) async fn next(&mut self) -> Option<NautilusWsMessage> {
183 if !self.message_buffer.is_empty() {
184 return Some(self.message_buffer.remove(0));
185 }
186
187 loop {
188 tokio::select! {
189 Some(cmd) = self.cmd_rx.recv() => {
190 match cmd {
191 HandlerCommand::SetClient(client) => {
192 log::debug!("Setting WebSocket client in handler");
193 self.client = Some(client);
194 }
195 HandlerCommand::Disconnect => {
196 log::debug!("Handler received disconnect command");
197 if let Some(ref client) = self.client {
198 client.disconnect().await;
199 }
200 self.signal.store(true, Ordering::SeqCst);
201 return None;
202 }
203 HandlerCommand::Subscribe { subscriptions } => {
204 for subscription in subscriptions {
205 let key = subscription_to_key(&subscription);
206 self.subscriptions.mark_subscribe(&key);
207
208 let request = HyperliquidWsRequest::Subscribe { subscription };
209 match serde_json::to_string(&request) {
210 Ok(payload) => {
211 log::debug!("Sending subscribe payload: {payload}");
212 if let Err(e) = self.send_with_retry(payload).await {
213 log::error!("Error subscribing to {key}: {e}");
214 self.subscriptions.mark_failure(&key);
215 }
216 }
217 Err(e) => {
218 log::error!("Error serializing subscription for {key}: {e}");
219 self.subscriptions.mark_failure(&key);
220 }
221 }
222 }
223 }
224 HandlerCommand::Unsubscribe { subscriptions } => {
225 for subscription in subscriptions {
226 let key = subscription_to_key(&subscription);
227 self.subscriptions.mark_unsubscribe(&key);
228
229 let request = HyperliquidWsRequest::Unsubscribe { subscription };
230 match serde_json::to_string(&request) {
231 Ok(payload) => {
232 log::debug!("Sending unsubscribe payload: {payload}");
233 if let Err(e) = self.send_with_retry(payload).await {
234 log::error!("Error unsubscribing from {key}: {e}");
235 }
236 }
237 Err(e) => {
238 log::error!("Error serializing unsubscription for {key}: {e}");
239 }
240 }
241 }
242 }
243 HandlerCommand::InitializeInstruments(instruments) => {
244 for inst in instruments {
245 let coin = inst.raw_symbol().inner();
246 self.instruments.insert(coin, inst);
247 }
248 }
249 HandlerCommand::UpdateInstrument(inst) => {
250 let coin = inst.raw_symbol().inner();
251 self.instruments.insert(coin, inst);
252 }
253 HandlerCommand::AddBarType { key, bar_type } => {
254 self.bar_types_cache.insert(key, bar_type);
255 }
256 HandlerCommand::RemoveBarType { key } => {
257 self.bar_types_cache.remove(&key);
258 self.bar_cache.remove(&key);
259 }
260 HandlerCommand::UpdateAssetContextSubs { coin, data_types } => {
261 if data_types.is_empty() {
262 self.asset_context_subs.remove(&coin);
263 } else {
264 self.asset_context_subs.insert(coin, data_types);
265 }
266 }
267 HandlerCommand::CacheSpotFillCoins(_) => {
268 }
270 }
271 continue;
272 }
273
274 Some(raw_msg) = self.raw_rx.recv() => {
275 match raw_msg {
276 Message::Text(text) => {
277 if text == RECONNECTED {
278 log::info!("Received RECONNECTED sentinel");
279 return Some(NautilusWsMessage::Reconnected);
280 }
281
282 match serde_json::from_str::<HyperliquidWsMessage>(&text) {
283 Ok(msg) => {
284 let ts_init = self.clock.get_time_ns();
285
286 let nautilus_msgs = Self::parse_to_nautilus_messages(
287 msg,
288 &self.instruments,
289 &self.cloid_cache,
290 &self.bar_types_cache,
291 self.account_id,
292 ts_init,
293 &self.asset_context_subs,
294 &mut self.processed_trade_ids,
295 &mut self.mark_price_cache,
296 &mut self.index_price_cache,
297 &mut self.funding_rate_cache,
298 &mut self.bar_cache,
299 );
300
301 if !nautilus_msgs.is_empty() {
302 let mut iter = nautilus_msgs.into_iter();
303 let first = iter.next().unwrap();
304 self.message_buffer.extend(iter);
305 return Some(first);
306 }
307 }
308 Err(e) => {
309 log::error!("Error parsing WebSocket message: {e}, text: {text}");
310 }
311 }
312 }
313 Message::Ping(data) => {
314 if let Some(ref client) = self.client
315 && let Err(e) = client.send_pong(data.to_vec()).await {
316 log::error!("Error sending pong: {e}");
317 }
318 }
319 Message::Close(_) => {
320 log::info!("Received WebSocket close frame");
321 return None;
322 }
323 _ => {}
324 }
325 }
326
327 else => {
328 log::debug!("Handler shutting down: stream ended or command channel closed");
329 return None;
330 }
331 }
332 }
333 }
334
335 #[allow(clippy::too_many_arguments)]
336 fn parse_to_nautilus_messages(
337 msg: HyperliquidWsMessage,
338 instruments: &AHashMap<Ustr, InstrumentAny>,
339 cloid_cache: &DashMap<Ustr, ClientOrderId>,
340 bar_types: &AHashMap<String, BarType>,
341 account_id: Option<AccountId>,
342 ts_init: UnixNanos,
343 asset_context_subs: &AHashMap<Ustr, AHashSet<AssetContextDataType>>,
344 processed_trade_ids: &mut FifoCache<u64, 10_000>,
345 mark_price_cache: &mut AHashMap<Ustr, String>,
346 index_price_cache: &mut AHashMap<Ustr, String>,
347 funding_rate_cache: &mut AHashMap<Ustr, String>,
348 bar_cache: &mut AHashMap<String, CandleData>,
349 ) -> Vec<NautilusWsMessage> {
350 let mut result = Vec::new();
351
352 match msg {
353 HyperliquidWsMessage::OrderUpdates { data } => {
354 if let Some(account_id) = account_id
355 && let Some(msg) = Self::handle_order_updates(
356 &data,
357 instruments,
358 cloid_cache,
359 account_id,
360 ts_init,
361 )
362 {
363 result.push(msg);
364 }
365 }
366 HyperliquidWsMessage::UserEvents { data } | HyperliquidWsMessage::User { data } => {
367 match data {
369 WsUserEventData::Fills { fills } => {
370 log::debug!("Received {} fill(s) from userEvents channel", fills.len());
371 for fill in &fills {
372 log::debug!(
373 "Fill: oid={}, coin={}, side={:?}, sz={}, px={}",
374 fill.oid,
375 fill.coin,
376 fill.side,
377 fill.sz,
378 fill.px
379 );
380 }
381 if let Some(account_id) = account_id {
382 log::debug!("Processing fills with account_id={account_id}");
383 if let Some(msg) = Self::handle_user_fills(
384 &fills,
385 instruments,
386 cloid_cache,
387 account_id,
388 ts_init,
389 processed_trade_ids,
390 ) {
391 log::debug!("Successfully created fill message");
392 result.push(msg);
393 } else {
394 log::debug!("handle_user_fills returned None (no new fills)");
395 }
396 } else {
397 log::warn!("Cannot process fills: account_id is None");
398 }
399 }
400 _ => {
401 log::debug!("Received non-fill user event: {data:?}");
402 }
403 }
404 }
405 HyperliquidWsMessage::UserFills { data } => {
406 if let Some(account_id) = account_id
409 && let Some(msg) = Self::handle_user_fills(
410 &data.fills,
411 instruments,
412 cloid_cache,
413 account_id,
414 ts_init,
415 processed_trade_ids,
416 )
417 {
418 result.push(msg);
419 }
420 }
421 HyperliquidWsMessage::Trades { data } => {
422 if let Some(msg) = Self::handle_trades(&data, instruments, ts_init) {
423 result.push(msg);
424 }
425 }
426 HyperliquidWsMessage::Bbo { data } => {
427 if let Some(msg) = Self::handle_bbo(&data, instruments, ts_init) {
428 result.push(msg);
429 }
430 }
431 HyperliquidWsMessage::L2Book { data } => {
432 if let Some(msg) = Self::handle_l2_book(&data, instruments, ts_init) {
433 result.push(msg);
434 }
435 }
436 HyperliquidWsMessage::Candle { data } => {
437 if let Some(msg) =
438 Self::handle_candle(&data, instruments, bar_types, bar_cache, ts_init)
439 {
440 result.push(msg);
441 }
442 }
443 HyperliquidWsMessage::ActiveAssetCtx { data }
444 | HyperliquidWsMessage::ActiveSpotAssetCtx { data } => {
445 result.extend(Self::handle_asset_context(
446 &data,
447 instruments,
448 asset_context_subs,
449 mark_price_cache,
450 index_price_cache,
451 funding_rate_cache,
452 ts_init,
453 ));
454 }
455 HyperliquidWsMessage::Error { data } => {
456 log::warn!("Received error from Hyperliquid WebSocket: {data}");
457 }
458 _ => {}
460 }
461
462 result
463 }
464
465 fn handle_order_updates(
466 data: &[super::messages::WsOrderData],
467 instruments: &AHashMap<Ustr, InstrumentAny>,
468 cloid_cache: &DashMap<Ustr, ClientOrderId>,
469 account_id: AccountId,
470 ts_init: UnixNanos,
471 ) -> Option<NautilusWsMessage> {
472 let mut exec_reports = Vec::new();
473
474 for order_update in data {
475 let instrument = instruments.get(&order_update.order.coin);
476
477 if let Some(instrument) = instrument {
478 match parse_ws_order_status_report(order_update, instrument, account_id, ts_init) {
479 Ok(mut report) => {
480 if let Some(cloid) = &order_update.order.cloid {
482 let cloid_ustr = Ustr::from(cloid.as_str());
483 if let Some(entry) = cloid_cache.get(&cloid_ustr) {
484 let real_client_order_id = *entry.value();
485 log::debug!("Resolved cloid {cloid} -> {real_client_order_id}");
486 report.client_order_id = Some(real_client_order_id);
487 }
488 }
489 exec_reports.push(ExecutionReport::Order(report));
490 }
491 Err(e) => {
492 log::error!("Error parsing order update: {e}");
493 }
494 }
495 } else {
496 log::debug!("No instrument found for coin: {}", order_update.order.coin);
497 }
498 }
499
500 if exec_reports.is_empty() {
501 None
502 } else {
503 Some(NautilusWsMessage::ExecutionReports(exec_reports))
504 }
505 }
506
507 fn handle_user_fills(
508 fills: &[super::messages::WsFillData],
509 instruments: &AHashMap<Ustr, InstrumentAny>,
510 cloid_cache: &DashMap<Ustr, ClientOrderId>,
511 account_id: AccountId,
512 ts_init: UnixNanos,
513 processed_trade_ids: &mut FifoCache<u64, 10_000>,
514 ) -> Option<NautilusWsMessage> {
515 let mut exec_reports = Vec::new();
516
517 for fill in fills {
518 if processed_trade_ids.contains(&fill.tid) {
520 log::debug!("Skipping duplicate fill: tid={}", fill.tid);
521 continue;
522 }
523 processed_trade_ids.add(fill.tid);
524
525 let instrument = instruments.get(&fill.coin);
526
527 if let Some(instrument) = instrument {
528 log::debug!("Found instrument for fill coin={}", fill.coin);
529 match parse_ws_fill_report(fill, instrument, account_id, ts_init) {
530 Ok(mut report) => {
531 if let Some(cloid) = &fill.cloid {
533 let cloid_ustr = Ustr::from(cloid.as_str());
534 if let Some(entry) = cloid_cache.get(&cloid_ustr) {
535 let real_client_order_id = *entry.value();
536 log::debug!(
537 "Resolved fill cloid {cloid} -> {real_client_order_id}"
538 );
539 report.client_order_id = Some(real_client_order_id);
540 }
541 }
542 log::debug!(
543 "Parsed fill report: venue_order_id={:?}, trade_id={:?}",
544 report.venue_order_id,
545 report.trade_id
546 );
547 exec_reports.push(ExecutionReport::Fill(report));
548 }
549 Err(e) => {
550 log::error!("Error parsing fill: {e}");
551 }
552 }
553 } else {
554 log::warn!(
555 "No instrument found for fill coin={}. Keys: {:?}",
556 fill.coin,
557 instruments.keys().collect::<Vec<_>>()
558 );
559 }
560 }
561
562 if exec_reports.is_empty() {
563 None
564 } else {
565 Some(NautilusWsMessage::ExecutionReports(exec_reports))
566 }
567 }
568
569 fn handle_trades(
570 data: &[super::messages::WsTradeData],
571 instruments: &AHashMap<Ustr, InstrumentAny>,
572 ts_init: UnixNanos,
573 ) -> Option<NautilusWsMessage> {
574 let mut trade_ticks = Vec::new();
575
576 for trade in data {
577 if let Some(instrument) = instruments.get(&trade.coin) {
578 match parse_ws_trade_tick(trade, instrument, ts_init) {
579 Ok(tick) => trade_ticks.push(tick),
580 Err(e) => {
581 log::error!("Error parsing trade tick: {e}");
582 }
583 }
584 } else {
585 log::debug!("No instrument found for coin: {}", trade.coin);
586 }
587 }
588
589 if trade_ticks.is_empty() {
590 None
591 } else {
592 Some(NautilusWsMessage::Trades(trade_ticks))
593 }
594 }
595
596 fn handle_bbo(
597 data: &super::messages::WsBboData,
598 instruments: &AHashMap<Ustr, InstrumentAny>,
599 ts_init: UnixNanos,
600 ) -> Option<NautilusWsMessage> {
601 if let Some(instrument) = instruments.get(&data.coin) {
602 match parse_ws_quote_tick(data, instrument, ts_init) {
603 Ok(quote_tick) => Some(NautilusWsMessage::Quote(quote_tick)),
604 Err(e) => {
605 log::error!("Error parsing quote tick: {e}");
606 None
607 }
608 }
609 } else {
610 log::debug!("No instrument found for coin: {}", data.coin);
611 None
612 }
613 }
614
615 fn handle_l2_book(
616 data: &super::messages::WsBookData,
617 instruments: &AHashMap<Ustr, InstrumentAny>,
618 ts_init: UnixNanos,
619 ) -> Option<NautilusWsMessage> {
620 if let Some(instrument) = instruments.get(&data.coin) {
621 match parse_ws_order_book_deltas(data, instrument, ts_init) {
622 Ok(deltas) => Some(NautilusWsMessage::Deltas(deltas)),
623 Err(e) => {
624 log::error!("Error parsing order book deltas: {e}");
625 None
626 }
627 }
628 } else {
629 log::debug!("No instrument found for coin: {}", data.coin);
630 None
631 }
632 }
633
634 fn handle_candle(
635 data: &CandleData,
636 instruments: &AHashMap<Ustr, InstrumentAny>,
637 bar_types: &AHashMap<String, BarType>,
638 bar_cache: &mut AHashMap<String, CandleData>,
639 ts_init: UnixNanos,
640 ) -> Option<NautilusWsMessage> {
641 let key = format!("candle:{}:{}", data.s, data.i);
642
643 let mut closed_bar = None;
644 if let Some(cached) = bar_cache.get(&key) {
645 if cached.close_time != data.close_time {
647 log::debug!(
648 "Bar period changed for {}: prev_close_time={}, new_close_time={}",
649 data.s,
650 cached.close_time,
651 data.close_time
652 );
653 closed_bar = Some(cached.clone());
654 }
655 }
656
657 bar_cache.insert(key.clone(), data.clone());
658
659 if let Some(closed_data) = closed_bar {
660 if let Some(bar_type) = bar_types.get(&key) {
661 if let Some(instrument) = instruments.get(&data.s) {
662 match parse_ws_candle(&closed_data, instrument, bar_type, ts_init) {
663 Ok(bar) => return Some(NautilusWsMessage::Candle(bar)),
664 Err(e) => {
665 log::error!("Error parsing closed candle: {e}");
666 }
667 }
668 } else {
669 log::debug!("No instrument found for coin: {}", data.s);
670 }
671 } else {
672 log::debug!("No bar type found for key: {key}");
673 }
674 }
675
676 None
677 }
678
679 fn handle_asset_context(
680 data: &WsActiveAssetCtxData,
681 instruments: &AHashMap<Ustr, InstrumentAny>,
682 asset_context_subs: &AHashMap<Ustr, AHashSet<AssetContextDataType>>,
683 mark_price_cache: &mut AHashMap<Ustr, String>,
684 index_price_cache: &mut AHashMap<Ustr, String>,
685 funding_rate_cache: &mut AHashMap<Ustr, String>,
686 ts_init: UnixNanos,
687 ) -> Vec<NautilusWsMessage> {
688 let mut result = Vec::new();
689
690 let coin = match data {
691 WsActiveAssetCtxData::Perp { coin, .. } => coin,
692 WsActiveAssetCtxData::Spot { coin, .. } => coin,
693 };
694
695 if let Some(instrument) = instruments.get(coin) {
696 let (mark_px, oracle_px, funding) = match data {
697 WsActiveAssetCtxData::Perp { ctx, .. } => (
698 &ctx.shared.mark_px,
699 Some(&ctx.oracle_px),
700 Some(&ctx.funding),
701 ),
702 WsActiveAssetCtxData::Spot { ctx, .. } => (&ctx.shared.mark_px, None, None),
703 };
704
705 let mark_changed = mark_price_cache.get(coin) != Some(mark_px);
706 let index_changed = oracle_px.is_some_and(|px| index_price_cache.get(coin) != Some(px));
707 let funding_changed =
708 funding.is_some_and(|rate| funding_rate_cache.get(coin) != Some(rate));
709
710 let subscribed_types = asset_context_subs.get(coin);
711
712 if mark_changed || index_changed || funding_changed {
713 match parse_ws_asset_context(data, instrument, ts_init) {
714 Ok((mark_price, index_price, funding_rate)) => {
715 if mark_changed
716 && subscribed_types
717 .is_some_and(|s| s.contains(&AssetContextDataType::MarkPrice))
718 {
719 mark_price_cache.insert(*coin, mark_px.clone());
720 result.push(NautilusWsMessage::MarkPrice(mark_price));
721 }
722 if index_changed
723 && subscribed_types
724 .is_some_and(|s| s.contains(&AssetContextDataType::IndexPrice))
725 {
726 if let Some(px) = oracle_px {
727 index_price_cache.insert(*coin, px.clone());
728 }
729 if let Some(index) = index_price {
730 result.push(NautilusWsMessage::IndexPrice(index));
731 }
732 }
733 if funding_changed
734 && subscribed_types
735 .is_some_and(|s| s.contains(&AssetContextDataType::FundingRate))
736 {
737 if let Some(rate) = funding {
738 funding_rate_cache.insert(*coin, rate.clone());
739 }
740 if let Some(funding) = funding_rate {
741 result.push(NautilusWsMessage::FundingRate(funding));
742 }
743 }
744 }
745 Err(e) => {
746 log::error!("Error parsing asset context: {e}");
747 }
748 }
749 }
750 } else {
751 log::debug!("No instrument found for coin: {coin}");
752 }
753
754 result
755 }
756}
757
758fn subscription_to_key(sub: &SubscriptionRequest) -> String {
760 match sub {
761 SubscriptionRequest::AllMids { dex } => {
762 if let Some(dex_name) = dex {
763 format!("allMids:{dex_name}")
764 } else {
765 "allMids".to_string()
766 }
767 }
768 SubscriptionRequest::Notification { user } => format!("notification:{user}"),
769 SubscriptionRequest::WebData2 { user } => format!("webData2:{user}"),
770 SubscriptionRequest::Candle { coin, interval } => {
771 format!("candle:{coin}:{}", interval.as_str())
772 }
773 SubscriptionRequest::L2Book { coin, .. } => format!("l2Book:{coin}"),
774 SubscriptionRequest::Trades { coin } => format!("trades:{coin}"),
775 SubscriptionRequest::OrderUpdates { user } => format!("orderUpdates:{user}"),
776 SubscriptionRequest::UserEvents { user } => format!("userEvents:{user}"),
777 SubscriptionRequest::UserFills { user, .. } => format!("userFills:{user}"),
778 SubscriptionRequest::UserFundings { user } => format!("userFundings:{user}"),
779 SubscriptionRequest::UserNonFundingLedgerUpdates { user } => {
780 format!("userNonFundingLedgerUpdates:{user}")
781 }
782 SubscriptionRequest::ActiveAssetCtx { coin } => format!("activeAssetCtx:{coin}"),
783 SubscriptionRequest::ActiveSpotAssetCtx { coin } => format!("activeSpotAssetCtx:{coin}"),
784 SubscriptionRequest::ActiveAssetData { user, coin } => {
785 format!("activeAssetData:{user}:{coin}")
786 }
787 SubscriptionRequest::UserTwapSliceFills { user } => format!("userTwapSliceFills:{user}"),
788 SubscriptionRequest::UserTwapHistory { user } => format!("userTwapHistory:{user}"),
789 SubscriptionRequest::Bbo { coin } => format!("bbo:{coin}"),
790 }
791}
792
793pub(crate) fn should_retry_hyperliquid_error(error: &HyperliquidWsError) -> bool {
795 match error {
796 HyperliquidWsError::TungsteniteError(_) => true,
797 HyperliquidWsError::ClientError(msg) => {
798 let msg_lower = msg.to_lowercase();
799 msg_lower.contains("timeout")
800 || msg_lower.contains("timed out")
801 || msg_lower.contains("connection")
802 || msg_lower.contains("network")
803 }
804 _ => false,
805 }
806}
807
808pub(crate) fn create_hyperliquid_timeout_error(msg: String) -> HyperliquidWsError {
810 HyperliquidWsError::ClientError(msg)
811}