1use std::{
17 str::FromStr,
18 sync::{
19 Arc,
20 atomic::{AtomicBool, AtomicU8, Ordering},
21 },
22};
23
24use ahash::{AHashMap, AHashSet};
25use anyhow::Context;
26use arc_swap::ArcSwap;
27use dashmap::DashMap;
28use nautilus_common::live::get_runtime;
29use nautilus_model::{
30 data::BarType,
31 identifiers::{AccountId, ClientOrderId, InstrumentId},
32 instruments::{Instrument, InstrumentAny},
33};
34use nautilus_network::{
35 mode::ConnectionMode,
36 websocket::{
37 AuthTracker, SubscriptionState, WebSocketClient, WebSocketConfig, channel_message_handler,
38 },
39};
40use ustr::Ustr;
41
42use crate::{
43 common::{enums::HyperliquidBarInterval, parse::bar_type_to_interval},
44 websocket::{
45 handler::{FeedHandler, HandlerCommand},
46 messages::{NautilusWsMessage, SubscriptionRequest},
47 },
48};
49
50const HYPERLIQUID_HEARTBEAT_MSG: &str = r#"{"method":"ping"}"#;
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
54pub(super) enum AssetContextDataType {
55 MarkPrice,
56 IndexPrice,
57 FundingRate,
58}
59
60#[derive(Debug)]
65#[cfg_attr(
66 feature = "python",
67 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.hyperliquid")
68)]
69pub struct HyperliquidWebSocketClient {
70 url: String,
71 connection_mode: Arc<ArcSwap<AtomicU8>>,
72 signal: Arc<AtomicBool>,
73 cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
74 out_rx: Option<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>,
75 auth_tracker: AuthTracker,
76 subscriptions: SubscriptionState,
77 instruments: Arc<DashMap<Ustr, InstrumentAny>>,
78 bar_types: Arc<DashMap<String, BarType>>,
79 asset_context_subs: Arc<DashMap<Ustr, AHashSet<AssetContextDataType>>>,
80 cloid_cache: Arc<DashMap<Ustr, ClientOrderId>>,
81 task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
82 account_id: Option<AccountId>,
83}
84
85impl Clone for HyperliquidWebSocketClient {
86 fn clone(&self) -> Self {
87 Self {
88 url: self.url.clone(),
89 connection_mode: Arc::clone(&self.connection_mode),
90 signal: Arc::clone(&self.signal),
91 cmd_tx: Arc::clone(&self.cmd_tx),
92 out_rx: None,
93 auth_tracker: self.auth_tracker.clone(),
94 subscriptions: self.subscriptions.clone(),
95 instruments: Arc::clone(&self.instruments),
96 bar_types: Arc::clone(&self.bar_types),
97 asset_context_subs: Arc::clone(&self.asset_context_subs),
98 cloid_cache: Arc::clone(&self.cloid_cache),
99 task_handle: None,
100 account_id: self.account_id,
101 }
102 }
103}
104
105impl HyperliquidWebSocketClient {
106 pub fn new(url: Option<String>, testnet: bool, account_id: Option<AccountId>) -> Self {
114 let url = url.unwrap_or_else(|| {
115 if testnet {
116 "wss://api.hyperliquid-testnet.xyz/ws".to_string()
117 } else {
118 "wss://api.hyperliquid.xyz/ws".to_string()
119 }
120 });
121 let connection_mode = Arc::new(ArcSwap::new(Arc::new(AtomicU8::new(
122 ConnectionMode::Closed as u8,
123 ))));
124 Self {
125 url,
126 connection_mode,
127 signal: Arc::new(AtomicBool::new(false)),
128 auth_tracker: AuthTracker::new(),
129 subscriptions: SubscriptionState::new(':'),
130 instruments: Arc::new(DashMap::new()),
131 bar_types: Arc::new(DashMap::new()),
132 asset_context_subs: Arc::new(DashMap::new()),
133 cloid_cache: Arc::new(DashMap::new()),
134 cmd_tx: {
135 let (tx, _) = tokio::sync::mpsc::unbounded_channel();
137 Arc::new(tokio::sync::RwLock::new(tx))
138 },
139 out_rx: None,
140 task_handle: None,
141 account_id,
142 }
143 }
144
145 pub async fn connect(&mut self) -> anyhow::Result<()> {
147 if self.is_active() {
148 log::warn!("WebSocket already connected");
149 return Ok(());
150 }
151 let (message_handler, raw_rx) = channel_message_handler();
152 let cfg = WebSocketConfig {
153 url: self.url.clone(),
154 headers: vec![],
155 heartbeat: Some(30),
156 heartbeat_msg: Some(HYPERLIQUID_HEARTBEAT_MSG.to_string()),
157 reconnect_timeout_ms: Some(15_000),
158 reconnect_delay_initial_ms: Some(250),
159 reconnect_delay_max_ms: Some(5_000),
160 reconnect_backoff_factor: Some(2.0),
161 reconnect_jitter_ms: Some(200),
162 reconnect_max_attempts: None,
163 };
164 let client =
165 WebSocketClient::connect(cfg, Some(message_handler), None, None, vec![], None).await?;
166
167 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
169 let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
170
171 *self.cmd_tx.write().await = cmd_tx.clone();
174 self.out_rx = Some(out_rx);
175
176 self.connection_mode.store(client.connection_mode_atomic());
177 log::info!("Hyperliquid WebSocket connected: {}", self.url);
178
179 if let Err(e) = cmd_tx.send(HandlerCommand::SetClient(client)) {
181 anyhow::bail!("Failed to send SetClient command: {e}");
182 }
183
184 let instruments_vec: Vec<InstrumentAny> = self
186 .instruments
187 .iter()
188 .map(|entry| entry.value().clone())
189 .collect();
190 if !instruments_vec.is_empty()
191 && let Err(e) = cmd_tx.send(HandlerCommand::InitializeInstruments(instruments_vec))
192 {
193 log::error!("Failed to send InitializeInstruments: {e}");
194 }
195
196 let signal = Arc::clone(&self.signal);
198 let account_id = self.account_id;
199 let subscriptions = self.subscriptions.clone();
200 let cmd_tx_for_reconnect = cmd_tx.clone();
201 let cloid_cache = Arc::clone(&self.cloid_cache);
202
203 let stream_handle = get_runtime().spawn(async move {
204 let mut handler = FeedHandler::new(
205 signal,
206 cmd_rx,
207 raw_rx,
208 out_tx,
209 account_id,
210 subscriptions.clone(),
211 cloid_cache,
212 );
213
214 let resubscribe_all = || {
215 let topics = subscriptions.all_topics();
216 if topics.is_empty() {
217 log::debug!("No active subscriptions to restore after reconnection");
218 return;
219 }
220
221 log::info!(
222 "Resubscribing to {} active subscriptions after reconnection",
223 topics.len()
224 );
225 for topic in topics {
226 match subscription_from_topic(&topic) {
227 Ok(subscription) => {
228 if let Err(e) = cmd_tx_for_reconnect.send(HandlerCommand::Subscribe {
229 subscriptions: vec![subscription],
230 }) {
231 log::error!("Failed to send resubscribe command: {e}");
232 }
233 }
234 Err(e) => {
235 log::error!(
236 "Failed to reconstruct subscription from topic: topic={topic}, {e}"
237 );
238 }
239 }
240 }
241 };
242 loop {
243 match handler.next().await {
244 Some(NautilusWsMessage::Reconnected) => {
245 log::info!("WebSocket reconnected");
246 resubscribe_all();
247 continue;
248 }
249 Some(msg) => {
250 if handler.send(msg).is_err() {
251 log::error!("Failed to send message (receiver dropped)");
252 break;
253 }
254 }
255 None => {
256 if handler.is_stopped() {
257 log::debug!("Stop signal received, ending message processing");
258 break;
259 }
260 log::warn!("WebSocket stream ended unexpectedly");
261 break;
262 }
263 }
264 }
265 log::debug!("Handler task completed");
266 });
267 self.task_handle = Some(Arc::new(stream_handle));
268 Ok(())
269 }
270
271 pub async fn disconnect(&mut self) -> anyhow::Result<()> {
273 log::info!("Disconnecting Hyperliquid WebSocket");
274 self.signal.store(true, Ordering::Relaxed);
275 if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Disconnect) {
276 log::debug!(
277 "Failed to send disconnect command (handler may already be shut down): {e}"
278 );
279 }
280 if let Some(task_handle) = self.task_handle.take() {
281 match Arc::try_unwrap(task_handle) {
282 Ok(handle) => {
283 log::debug!("Waiting for task handle to complete");
284 let abort_handle = handle.abort_handle();
285 tokio::select! {
286 result = handle => {
287 match result {
288 Ok(()) => log::debug!("Task handle completed successfully"),
289 Err(e) if e.is_cancelled() => {
290 log::debug!("Task was cancelled");
291 }
292 Err(e) => log::error!("Task handle encountered an error: {e:?}"),
293 }
294 }
295 () = tokio::time::sleep(tokio::time::Duration::from_secs(2)) => {
296 log::warn!("Timeout waiting for task handle, aborting task");
297 abort_handle.abort();
298 }
299 }
300 }
301 Err(arc_handle) => {
302 log::debug!(
303 "Cannot take ownership of task handle - other references exist, aborting task"
304 );
305 arc_handle.abort();
306 }
307 }
308 } else {
309 log::debug!("No task handle to await");
310 }
311 log::debug!("Disconnected");
312 Ok(())
313 }
314
315 pub fn is_active(&self) -> bool {
317 let mode = self.connection_mode.load();
318 mode.load(Ordering::Relaxed) == ConnectionMode::Active as u8
319 }
320
321 pub fn url(&self) -> &str {
323 &self.url
324 }
325
326 pub fn cache_instruments(&mut self, instruments: Vec<InstrumentAny>) {
333 self.instruments.clear();
334 for inst in instruments {
335 let coin = inst.raw_symbol().inner();
336 self.instruments.insert(coin, inst);
337 }
338 log::info!(
339 "Hyperliquid instrument cache initialized with {} instruments",
340 self.instruments.len()
341 );
342 }
343
344 pub fn cache_instrument(&self, instrument: InstrumentAny) {
348 let coin = instrument.raw_symbol().inner();
349 self.instruments.insert(coin, instrument.clone());
350
351 if let Ok(cmd_tx) = self.cmd_tx.try_read() {
354 let _ = cmd_tx.send(HandlerCommand::UpdateInstrument(instrument));
355 }
356 }
357
358 pub fn cache_spot_fill_coins(&self, mapping: AHashMap<Ustr, Ustr>) {
364 if let Ok(cmd_tx) = self.cmd_tx.try_read() {
365 let _ = cmd_tx.send(HandlerCommand::CacheSpotFillCoins(mapping));
366 }
367 }
368
369 pub fn cache_cloid_mapping(&self, cloid: Ustr, client_order_id: ClientOrderId) {
378 log::debug!("Caching cloid mapping: {cloid} -> {client_order_id}");
379 self.cloid_cache.insert(cloid, client_order_id);
380 }
381
382 pub fn remove_cloid_mapping(&self, cloid: &Ustr) {
387 if self.cloid_cache.remove(cloid).is_some() {
388 log::debug!("Removed cloid mapping: {cloid}");
389 }
390 }
391
392 pub fn clear_cloid_cache(&self) {
396 let count = self.cloid_cache.len();
397 self.cloid_cache.clear();
398 if count > 0 {
399 log::debug!("Cleared {count} cloid mappings from cache");
400 }
401 }
402
403 #[must_use]
405 pub fn cloid_cache_len(&self) -> usize {
406 self.cloid_cache.len()
407 }
408
409 #[must_use]
413 pub fn get_cloid_mapping(&self, cloid: &Ustr) -> Option<ClientOrderId> {
414 self.cloid_cache.get(cloid).map(|entry| *entry.value())
415 }
416
417 pub fn get_instrument(&self, id: &InstrumentId) -> Option<InstrumentAny> {
421 self.instruments
422 .iter()
423 .find(|entry| entry.value().id() == *id)
424 .map(|entry| entry.value().clone())
425 }
426
427 pub fn get_instrument_by_symbol(&self, symbol: &Ustr) -> Option<InstrumentAny> {
429 self.instruments.get(symbol).map(|e| e.value().clone())
430 }
431
432 pub fn subscription_count(&self) -> usize {
434 self.subscriptions.len()
435 }
436
437 pub fn get_bar_type(&self, coin: &str, interval: &str) -> Option<BarType> {
441 let key = format!("candle:{coin}:{interval}");
443 self.bar_types.get(&key).map(|entry| *entry.value())
444 }
445
446 pub async fn subscribe_book(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
448 let instrument = self
449 .get_instrument(&instrument_id)
450 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
451 let coin = instrument.raw_symbol().inner();
452
453 let cmd_tx = self.cmd_tx.read().await;
454
455 cmd_tx
457 .send(HandlerCommand::UpdateInstrument(instrument.clone()))
458 .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
459
460 let subscription = SubscriptionRequest::L2Book {
461 coin,
462 mantissa: None,
463 n_sig_figs: None,
464 };
465
466 cmd_tx
467 .send(HandlerCommand::Subscribe {
468 subscriptions: vec![subscription],
469 })
470 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
471 Ok(())
472 }
473
474 pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
476 let instrument = self
477 .get_instrument(&instrument_id)
478 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
479 let coin = instrument.raw_symbol().inner();
480
481 let cmd_tx = self.cmd_tx.read().await;
482
483 cmd_tx
485 .send(HandlerCommand::UpdateInstrument(instrument.clone()))
486 .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
487
488 let subscription = SubscriptionRequest::Bbo { coin };
489
490 cmd_tx
491 .send(HandlerCommand::Subscribe {
492 subscriptions: vec![subscription],
493 })
494 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
495 Ok(())
496 }
497
498 pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
500 let instrument = self
501 .get_instrument(&instrument_id)
502 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
503 let coin = instrument.raw_symbol().inner();
504
505 let cmd_tx = self.cmd_tx.read().await;
506
507 cmd_tx
509 .send(HandlerCommand::UpdateInstrument(instrument.clone()))
510 .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
511
512 let subscription = SubscriptionRequest::Trades { coin };
513
514 cmd_tx
515 .send(HandlerCommand::Subscribe {
516 subscriptions: vec![subscription],
517 })
518 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
519 Ok(())
520 }
521
522 pub async fn subscribe_mark_prices(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
524 self.subscribe_asset_context_data(instrument_id, AssetContextDataType::MarkPrice)
525 .await
526 }
527
528 pub async fn subscribe_index_prices(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
530 self.subscribe_asset_context_data(instrument_id, AssetContextDataType::IndexPrice)
531 .await
532 }
533
534 pub async fn subscribe_bars(&self, bar_type: BarType) -> anyhow::Result<()> {
536 let instrument = self
538 .get_instrument(&bar_type.instrument_id())
539 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {}", bar_type.instrument_id()))?;
540 let coin = instrument.raw_symbol().inner();
541 let interval = bar_type_to_interval(&bar_type)?;
542 let subscription = SubscriptionRequest::Candle { coin, interval };
543
544 let key = format!("candle:{coin}:{interval}");
546 self.bar_types.insert(key.clone(), bar_type);
547
548 let cmd_tx = self.cmd_tx.read().await;
549
550 cmd_tx
551 .send(HandlerCommand::UpdateInstrument(instrument.clone()))
552 .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
553
554 cmd_tx
555 .send(HandlerCommand::AddBarType { key, bar_type })
556 .map_err(|e| anyhow::anyhow!("Failed to send AddBarType command: {e}"))?;
557
558 cmd_tx
559 .send(HandlerCommand::Subscribe {
560 subscriptions: vec![subscription],
561 })
562 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
563 Ok(())
564 }
565
566 pub async fn subscribe_funding_rates(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
568 self.subscribe_asset_context_data(instrument_id, AssetContextDataType::FundingRate)
569 .await
570 }
571
572 pub async fn subscribe_order_updates(&self, user: &str) -> anyhow::Result<()> {
574 let subscription = SubscriptionRequest::OrderUpdates {
575 user: user.to_string(),
576 };
577 self.cmd_tx
578 .read()
579 .await
580 .send(HandlerCommand::Subscribe {
581 subscriptions: vec![subscription],
582 })
583 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
584 Ok(())
585 }
586
587 pub async fn subscribe_user_events(&self, user: &str) -> anyhow::Result<()> {
589 let subscription = SubscriptionRequest::UserEvents {
590 user: user.to_string(),
591 };
592 self.cmd_tx
593 .read()
594 .await
595 .send(HandlerCommand::Subscribe {
596 subscriptions: vec![subscription],
597 })
598 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
599 Ok(())
600 }
601
602 pub async fn subscribe_user_fills(&self, user: &str) -> anyhow::Result<()> {
607 let subscription = SubscriptionRequest::UserFills {
608 user: user.to_string(),
609 aggregate_by_time: None,
610 };
611 self.cmd_tx
612 .read()
613 .await
614 .send(HandlerCommand::Subscribe {
615 subscriptions: vec![subscription],
616 })
617 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
618 Ok(())
619 }
620
621 pub async fn subscribe_all_user_channels(&self, user: &str) -> anyhow::Result<()> {
626 self.subscribe_order_updates(user).await?;
627 self.subscribe_user_events(user).await?;
628 Ok(())
629 }
630
631 pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
633 let instrument = self
634 .get_instrument(&instrument_id)
635 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
636 let coin = instrument.raw_symbol().inner();
637
638 let subscription = SubscriptionRequest::L2Book {
639 coin,
640 mantissa: None,
641 n_sig_figs: None,
642 };
643
644 self.cmd_tx
645 .read()
646 .await
647 .send(HandlerCommand::Unsubscribe {
648 subscriptions: vec![subscription],
649 })
650 .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
651 Ok(())
652 }
653
654 pub async fn unsubscribe_quotes(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
656 let instrument = self
657 .get_instrument(&instrument_id)
658 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
659 let coin = instrument.raw_symbol().inner();
660
661 let subscription = SubscriptionRequest::Bbo { coin };
662
663 self.cmd_tx
664 .read()
665 .await
666 .send(HandlerCommand::Unsubscribe {
667 subscriptions: vec![subscription],
668 })
669 .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
670 Ok(())
671 }
672
673 pub async fn unsubscribe_trades(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
675 let instrument = self
676 .get_instrument(&instrument_id)
677 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
678 let coin = instrument.raw_symbol().inner();
679
680 let subscription = SubscriptionRequest::Trades { coin };
681
682 self.cmd_tx
683 .read()
684 .await
685 .send(HandlerCommand::Unsubscribe {
686 subscriptions: vec![subscription],
687 })
688 .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
689 Ok(())
690 }
691
692 pub async fn unsubscribe_mark_prices(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
694 self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::MarkPrice)
695 .await
696 }
697
698 pub async fn unsubscribe_index_prices(
700 &self,
701 instrument_id: InstrumentId,
702 ) -> anyhow::Result<()> {
703 self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::IndexPrice)
704 .await
705 }
706
707 pub async fn unsubscribe_bars(&self, bar_type: BarType) -> anyhow::Result<()> {
709 let instrument = self
711 .get_instrument(&bar_type.instrument_id())
712 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {}", bar_type.instrument_id()))?;
713 let coin = instrument.raw_symbol().inner();
714 let interval = bar_type_to_interval(&bar_type)?;
715 let subscription = SubscriptionRequest::Candle { coin, interval };
716
717 let key = format!("candle:{coin}:{interval}");
718 self.bar_types.remove(&key);
719
720 let cmd_tx = self.cmd_tx.read().await;
721
722 cmd_tx
723 .send(HandlerCommand::RemoveBarType { key })
724 .map_err(|e| anyhow::anyhow!("Failed to send RemoveBarType command: {e}"))?;
725
726 cmd_tx
727 .send(HandlerCommand::Unsubscribe {
728 subscriptions: vec![subscription],
729 })
730 .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
731 Ok(())
732 }
733
734 pub async fn unsubscribe_funding_rates(
736 &self,
737 instrument_id: InstrumentId,
738 ) -> anyhow::Result<()> {
739 self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::FundingRate)
740 .await
741 }
742
743 async fn subscribe_asset_context_data(
744 &self,
745 instrument_id: InstrumentId,
746 data_type: AssetContextDataType,
747 ) -> anyhow::Result<()> {
748 let instrument = self
749 .get_instrument(&instrument_id)
750 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
751 let coin = instrument.raw_symbol().inner();
752
753 let mut entry = self.asset_context_subs.entry(coin).or_default();
754 let is_first_subscription = entry.is_empty();
755 entry.insert(data_type);
756 let data_types = entry.clone();
757 drop(entry);
758
759 let cmd_tx = self.cmd_tx.read().await;
760
761 cmd_tx
762 .send(HandlerCommand::UpdateAssetContextSubs { coin, data_types })
763 .map_err(|e| anyhow::anyhow!("Failed to send UpdateAssetContextSubs command: {e}"))?;
764
765 if is_first_subscription {
766 log::debug!(
767 "First asset context subscription for coin '{coin}', subscribing to ActiveAssetCtx"
768 );
769 let subscription = SubscriptionRequest::ActiveAssetCtx { coin };
770
771 cmd_tx
772 .send(HandlerCommand::UpdateInstrument(instrument.clone()))
773 .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
774
775 cmd_tx
776 .send(HandlerCommand::Subscribe {
777 subscriptions: vec![subscription],
778 })
779 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
780 } else {
781 log::debug!(
782 "Already subscribed to ActiveAssetCtx for coin '{coin}', adding {data_type:?} to tracked types"
783 );
784 }
785
786 Ok(())
787 }
788
789 async fn unsubscribe_asset_context_data(
790 &self,
791 instrument_id: InstrumentId,
792 data_type: AssetContextDataType,
793 ) -> anyhow::Result<()> {
794 let instrument = self
795 .get_instrument(&instrument_id)
796 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
797 let coin = instrument.raw_symbol().inner();
798
799 if let Some(mut entry) = self.asset_context_subs.get_mut(&coin) {
800 entry.remove(&data_type);
801 let should_unsubscribe = entry.is_empty();
802 let data_types = entry.clone();
803 drop(entry);
804
805 let cmd_tx = self.cmd_tx.read().await;
806
807 if should_unsubscribe {
808 self.asset_context_subs.remove(&coin);
809
810 log::debug!(
811 "Last asset context subscription removed for coin '{coin}', unsubscribing from ActiveAssetCtx"
812 );
813 let subscription = SubscriptionRequest::ActiveAssetCtx { coin };
814
815 cmd_tx
816 .send(HandlerCommand::UpdateAssetContextSubs {
817 coin,
818 data_types: AHashSet::new(),
819 })
820 .map_err(|e| {
821 anyhow::anyhow!("Failed to send UpdateAssetContextSubs command: {e}")
822 })?;
823
824 cmd_tx
825 .send(HandlerCommand::Unsubscribe {
826 subscriptions: vec![subscription],
827 })
828 .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
829 } else {
830 log::debug!(
831 "Removed {data_type:?} from tracked types for coin '{coin}', but keeping ActiveAssetCtx subscription"
832 );
833
834 cmd_tx
835 .send(HandlerCommand::UpdateAssetContextSubs { coin, data_types })
836 .map_err(|e| {
837 anyhow::anyhow!("Failed to send UpdateAssetContextSubs command: {e}")
838 })?;
839 }
840 }
841
842 Ok(())
843 }
844
845 pub async fn next_event(&mut self) -> Option<NautilusWsMessage> {
849 if let Some(ref mut rx) = self.out_rx {
850 rx.recv().await
851 } else {
852 None
853 }
854 }
855}
856
857fn subscription_from_topic(topic: &str) -> anyhow::Result<SubscriptionRequest> {
859 let parts: Vec<&str> = topic.split(':').collect();
860
861 match parts.first() {
862 Some(&"allMids") => {
863 let dex = parts.get(1).map(|s| (*s).to_string());
864 Ok(SubscriptionRequest::AllMids { dex })
865 }
866 Some(&"notification") => Ok(SubscriptionRequest::Notification {
867 user: (*parts.get(1).context("Missing user")?).to_string(),
868 }),
869 Some(&"webData2") => Ok(SubscriptionRequest::WebData2 {
870 user: (*parts.get(1).context("Missing user")?).to_string(),
871 }),
872 Some(&"candle") => {
873 let coin = Ustr::from(parts.get(1).context("Missing coin")?);
874 let interval_str = parts.get(2).context("Missing interval")?;
875 let interval = HyperliquidBarInterval::from_str(interval_str)?;
876 Ok(SubscriptionRequest::Candle { coin, interval })
877 }
878 Some(&"l2Book") => Ok(SubscriptionRequest::L2Book {
879 coin: Ustr::from(parts.get(1).context("Missing coin")?),
880 mantissa: None,
881 n_sig_figs: None,
882 }),
883 Some(&"trades") => Ok(SubscriptionRequest::Trades {
884 coin: Ustr::from(parts.get(1).context("Missing coin")?),
885 }),
886 Some(&"orderUpdates") => Ok(SubscriptionRequest::OrderUpdates {
887 user: (*parts.get(1).context("Missing user")?).to_string(),
888 }),
889 Some(&"userEvents") => Ok(SubscriptionRequest::UserEvents {
890 user: (*parts.get(1).context("Missing user")?).to_string(),
891 }),
892 Some(&"userFills") => Ok(SubscriptionRequest::UserFills {
893 user: (*parts.get(1).context("Missing user")?).to_string(),
894 aggregate_by_time: None,
895 }),
896 Some(&"userFundings") => Ok(SubscriptionRequest::UserFundings {
897 user: (*parts.get(1).context("Missing user")?).to_string(),
898 }),
899 Some(&"userNonFundingLedgerUpdates") => {
900 Ok(SubscriptionRequest::UserNonFundingLedgerUpdates {
901 user: (*parts.get(1).context("Missing user")?).to_string(),
902 })
903 }
904 Some(&"activeAssetCtx") => Ok(SubscriptionRequest::ActiveAssetCtx {
905 coin: Ustr::from(parts.get(1).context("Missing coin")?),
906 }),
907 Some(&"activeSpotAssetCtx") => Ok(SubscriptionRequest::ActiveSpotAssetCtx {
908 coin: Ustr::from(parts.get(1).context("Missing coin")?),
909 }),
910 Some(&"activeAssetData") => Ok(SubscriptionRequest::ActiveAssetData {
911 user: (*parts.get(1).context("Missing user")?).to_string(),
912 coin: (*parts.get(2).context("Missing coin")?).to_string(),
913 }),
914 Some(&"userTwapSliceFills") => Ok(SubscriptionRequest::UserTwapSliceFills {
915 user: (*parts.get(1).context("Missing user")?).to_string(),
916 }),
917 Some(&"userTwapHistory") => Ok(SubscriptionRequest::UserTwapHistory {
918 user: (*parts.get(1).context("Missing user")?).to_string(),
919 }),
920 Some(&"bbo") => Ok(SubscriptionRequest::Bbo {
921 coin: Ustr::from(parts.get(1).context("Missing coin")?),
922 }),
923 Some(channel) => anyhow::bail!("Unknown subscription channel: {channel}"),
924 None => anyhow::bail!("Empty topic string"),
925 }
926}
927
928#[cfg(test)]
929mod tests {
930 use rstest::rstest;
931
932 use super::*;
933 use crate::common::enums::HyperliquidBarInterval;
934
935 fn subscription_topic(sub: &SubscriptionRequest) -> String {
937 match sub {
938 SubscriptionRequest::AllMids { dex } => {
939 if let Some(dex) = dex {
940 format!("allMids:{dex}")
941 } else {
942 "allMids".to_string()
943 }
944 }
945 SubscriptionRequest::Notification { user } => format!("notification:{user}"),
946 SubscriptionRequest::WebData2 { user } => format!("webData2:{user}"),
947 SubscriptionRequest::Candle { coin, interval } => {
948 format!("candle:{coin}:{}", interval.as_str())
949 }
950 SubscriptionRequest::L2Book { coin, .. } => format!("l2Book:{coin}"),
951 SubscriptionRequest::Trades { coin } => format!("trades:{coin}"),
952 SubscriptionRequest::OrderUpdates { user } => format!("orderUpdates:{user}"),
953 SubscriptionRequest::UserEvents { user } => format!("userEvents:{user}"),
954 SubscriptionRequest::UserFills { user, .. } => format!("userFills:{user}"),
955 SubscriptionRequest::UserFundings { user } => format!("userFundings:{user}"),
956 SubscriptionRequest::UserNonFundingLedgerUpdates { user } => {
957 format!("userNonFundingLedgerUpdates:{user}")
958 }
959 SubscriptionRequest::ActiveAssetCtx { coin } => format!("activeAssetCtx:{coin}"),
960 SubscriptionRequest::ActiveSpotAssetCtx { coin } => {
961 format!("activeSpotAssetCtx:{coin}")
962 }
963 SubscriptionRequest::ActiveAssetData { user, coin } => {
964 format!("activeAssetData:{user}:{coin}")
965 }
966 SubscriptionRequest::UserTwapSliceFills { user } => {
967 format!("userTwapSliceFills:{user}")
968 }
969 SubscriptionRequest::UserTwapHistory { user } => format!("userTwapHistory:{user}"),
970 SubscriptionRequest::Bbo { coin } => format!("bbo:{coin}"),
971 }
972 }
973
974 #[rstest]
975 #[case(SubscriptionRequest::Trades { coin: "BTC".into() }, "trades:BTC")]
976 #[case(SubscriptionRequest::Bbo { coin: "BTC".into() }, "bbo:BTC")]
977 #[case(SubscriptionRequest::OrderUpdates { user: "0x123".to_string() }, "orderUpdates:0x123")]
978 #[case(SubscriptionRequest::UserEvents { user: "0xabc".to_string() }, "userEvents:0xabc")]
979 fn test_subscription_topic_generation(
980 #[case] subscription: SubscriptionRequest,
981 #[case] expected_topic: &str,
982 ) {
983 assert_eq!(subscription_topic(&subscription), expected_topic);
984 }
985
986 #[rstest]
987 fn test_subscription_topics_unique() {
988 let sub1 = SubscriptionRequest::Trades { coin: "BTC".into() };
989 let sub2 = SubscriptionRequest::Bbo { coin: "BTC".into() };
990
991 let topic1 = subscription_topic(&sub1);
992 let topic2 = subscription_topic(&sub2);
993
994 assert_ne!(topic1, topic2);
995 }
996
997 #[rstest]
998 #[case(SubscriptionRequest::Trades { coin: "BTC".into() })]
999 #[case(SubscriptionRequest::Bbo { coin: "ETH".into() })]
1000 #[case(SubscriptionRequest::Candle { coin: "SOL".into(), interval: HyperliquidBarInterval::OneHour })]
1001 #[case(SubscriptionRequest::OrderUpdates { user: "0x123".to_string() })]
1002 fn test_subscription_reconstruction(#[case] subscription: SubscriptionRequest) {
1003 let topic = subscription_topic(&subscription);
1004 let reconstructed = subscription_from_topic(&topic).expect("Failed to reconstruct");
1005 assert_eq!(subscription_topic(&reconstructed), topic);
1006 }
1007
1008 #[rstest]
1009 fn test_subscription_topic_candle() {
1010 let sub = SubscriptionRequest::Candle {
1011 coin: "BTC".into(),
1012 interval: HyperliquidBarInterval::OneHour,
1013 };
1014
1015 let topic = subscription_topic(&sub);
1016 assert_eq!(topic, "candle:BTC:1h");
1017 }
1018}