1use std::{
17 collections::HashSet,
18 str::FromStr,
19 sync::{
20 Arc,
21 atomic::{AtomicBool, AtomicU8, Ordering},
22 },
23};
24
25use anyhow::Context;
26use arc_swap::ArcSwap;
27use dashmap::DashMap;
28use nautilus_common::live::get_runtime;
29use nautilus_model::{
30 data::BarType,
31 identifiers::{AccountId, InstrumentId},
32 instruments::{Instrument, InstrumentAny},
33};
34use nautilus_network::{
35 mode::ConnectionMode,
36 websocket::{
37 AuthTracker, SubscriptionState, WebSocketClient, WebSocketConfig, channel_message_handler,
38 },
39};
40use ustr::Ustr;
41
42use crate::{
43 common::{HyperliquidProductType, enums::HyperliquidBarInterval, parse::bar_type_to_interval},
44 websocket::{
45 handler::{FeedHandler, HandlerCommand},
46 messages::{NautilusWsMessage, SubscriptionRequest},
47 },
48};
49
50const HYPERLIQUID_HEARTBEAT_MSG: &str = r#"{"method":"ping"}"#;
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
54pub(super) enum AssetContextDataType {
55 MarkPrice,
56 IndexPrice,
57 FundingRate,
58}
59
60#[derive(Debug)]
65#[cfg_attr(
66 feature = "python",
67 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.hyperliquid")
68)]
69pub struct HyperliquidWebSocketClient {
70 url: String,
71 product_type: HyperliquidProductType,
72 connection_mode: Arc<ArcSwap<AtomicU8>>,
73 signal: Arc<AtomicBool>,
74 cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
75 out_rx: Option<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>,
76 auth_tracker: AuthTracker,
77 subscriptions: SubscriptionState,
78 instruments: Arc<DashMap<Ustr, InstrumentAny>>,
79 bar_types: Arc<DashMap<String, BarType>>,
80 asset_context_subs: Arc<DashMap<Ustr, HashSet<AssetContextDataType>>>,
81 task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
82 account_id: Option<AccountId>,
83}
84
85impl Clone for HyperliquidWebSocketClient {
86 fn clone(&self) -> Self {
87 Self {
88 url: self.url.clone(),
89 product_type: self.product_type,
90 connection_mode: Arc::clone(&self.connection_mode),
91 signal: Arc::clone(&self.signal),
92 cmd_tx: Arc::clone(&self.cmd_tx),
93 out_rx: None,
94 auth_tracker: self.auth_tracker.clone(),
95 subscriptions: self.subscriptions.clone(),
96 instruments: Arc::clone(&self.instruments),
97 bar_types: Arc::clone(&self.bar_types),
98 asset_context_subs: Arc::clone(&self.asset_context_subs),
99 task_handle: None,
100 account_id: self.account_id,
101 }
102 }
103}
104
105impl HyperliquidWebSocketClient {
106 pub fn new(
114 url: Option<String>,
115 testnet: bool,
116 product_type: HyperliquidProductType,
117 account_id: Option<AccountId>,
118 ) -> Self {
119 let url = url.unwrap_or_else(|| {
120 if testnet {
121 "wss://api.hyperliquid-testnet.xyz/ws".to_string()
122 } else {
123 "wss://api.hyperliquid.xyz/ws".to_string()
124 }
125 });
126 let connection_mode = Arc::new(ArcSwap::new(Arc::new(AtomicU8::new(
127 ConnectionMode::Closed as u8,
128 ))));
129 Self {
130 url,
131 product_type,
132 connection_mode,
133 signal: Arc::new(AtomicBool::new(false)),
134 auth_tracker: AuthTracker::new(),
135 subscriptions: SubscriptionState::new(':'),
136 instruments: Arc::new(DashMap::new()),
137 bar_types: Arc::new(DashMap::new()),
138 asset_context_subs: Arc::new(DashMap::new()),
139 cmd_tx: {
140 let (tx, _) = tokio::sync::mpsc::unbounded_channel();
142 Arc::new(tokio::sync::RwLock::new(tx))
143 },
144 out_rx: None,
145 task_handle: None,
146 account_id,
147 }
148 }
149
150 pub async fn connect(&mut self) -> anyhow::Result<()> {
152 if self.is_active() {
153 log::warn!("WebSocket already connected");
154 return Ok(());
155 }
156 let (message_handler, raw_rx) = channel_message_handler();
157 let cfg = WebSocketConfig {
158 url: self.url.clone(),
159 headers: vec![],
160 heartbeat: Some(30),
161 heartbeat_msg: Some(HYPERLIQUID_HEARTBEAT_MSG.to_string()),
162 reconnect_timeout_ms: Some(15_000),
163 reconnect_delay_initial_ms: Some(250),
164 reconnect_delay_max_ms: Some(5_000),
165 reconnect_backoff_factor: Some(2.0),
166 reconnect_jitter_ms: Some(200),
167 reconnect_max_attempts: None,
168 };
169 let client =
170 WebSocketClient::connect(cfg, Some(message_handler), None, None, vec![], None).await?;
171
172 self.connection_mode.store(client.connection_mode_atomic());
174 log::info!("Hyperliquid WebSocket connected: {}", self.url);
175
176 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
178 let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
179
180 if let Err(e) = cmd_tx.send(HandlerCommand::SetClient(client)) {
182 anyhow::bail!("Failed to send SetClient command: {e}");
183 }
184
185 let instruments_vec: Vec<InstrumentAny> = self
187 .instruments
188 .iter()
189 .map(|entry| entry.value().clone())
190 .collect();
191 if !instruments_vec.is_empty()
192 && let Err(e) = cmd_tx.send(HandlerCommand::InitializeInstruments(instruments_vec))
193 {
194 log::error!("Failed to send InitializeInstruments: {e}");
195 }
196
197 let signal = Arc::clone(&self.signal);
199 let account_id = self.account_id;
200 let subscriptions = self.subscriptions.clone();
201 let cmd_tx_for_reconnect = cmd_tx.clone();
202
203 let stream_handle = get_runtime().spawn(async move {
204 let mut handler = FeedHandler::new(
205 signal,
206 cmd_rx,
207 raw_rx,
208 out_tx,
209 account_id,
210 subscriptions.clone(),
211 );
212
213 let resubscribe_all = || {
214 let topics = subscriptions.all_topics();
215 if topics.is_empty() {
216 log::debug!("No active subscriptions to restore after reconnection");
217 return;
218 }
219
220 log::info!(
221 "Resubscribing to {} active subscriptions after reconnection",
222 topics.len()
223 );
224 for topic in topics {
225 match subscription_from_topic(&topic) {
226 Ok(subscription) => {
227 if let Err(e) = cmd_tx_for_reconnect.send(HandlerCommand::Subscribe {
228 subscriptions: vec![subscription],
229 }) {
230 log::error!("Failed to send resubscribe command: {e}");
231 }
232 }
233 Err(e) => {
234 log::error!(
235 "Failed to reconstruct subscription from topic: topic={topic}, {e}"
236 );
237 }
238 }
239 }
240 };
241 loop {
242 match handler.next().await {
243 Some(NautilusWsMessage::Reconnected) => {
244 log::info!("WebSocket reconnected");
245 resubscribe_all();
246 continue;
247 }
248 Some(msg) => {
249 if handler.send(msg).is_err() {
250 log::error!("Failed to send message (receiver dropped)");
251 break;
252 }
253 }
254 None => {
255 if handler.is_stopped() {
256 log::debug!("Stop signal received, ending message processing");
257 break;
258 }
259 log::warn!("WebSocket stream ended unexpectedly");
260 break;
261 }
262 }
263 }
264 log::debug!("Handler task completed");
265 });
266 self.task_handle = Some(Arc::new(stream_handle));
267 *self.cmd_tx.write().await = cmd_tx;
268 self.out_rx = Some(out_rx);
269 Ok(())
270 }
271
272 pub async fn disconnect(&mut self) -> anyhow::Result<()> {
274 log::info!("Disconnecting Hyperliquid WebSocket");
275 self.signal.store(true, Ordering::Relaxed);
276 if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Disconnect) {
277 log::debug!(
278 "Failed to send disconnect command (handler may already be shut down): {e}"
279 );
280 }
281 if let Some(task_handle) = self.task_handle.take() {
282 match Arc::try_unwrap(task_handle) {
283 Ok(handle) => {
284 log::debug!("Waiting for task handle to complete");
285 match tokio::time::timeout(tokio::time::Duration::from_secs(2), handle).await {
286 Ok(Ok(())) => log::debug!("Task handle completed successfully"),
287 Ok(Err(e)) => log::error!("Task handle encountered an error: {e:?}"),
288 Err(_) => {
289 log::warn!(
290 "Timeout waiting for task handle, task may still be running"
291 );
292 }
293 }
294 }
295 Err(arc_handle) => {
296 log::debug!(
297 "Cannot take ownership of task handle - other references exist, aborting task"
298 );
299 arc_handle.abort();
300 }
301 }
302 } else {
303 log::debug!("No task handle to await");
304 }
305 log::debug!("Disconnected");
306 Ok(())
307 }
308
309 pub fn is_active(&self) -> bool {
311 let mode = self.connection_mode.load();
312 mode.load(Ordering::Relaxed) == ConnectionMode::Active as u8
313 }
314
315 pub fn url(&self) -> &str {
317 &self.url
318 }
319
320 pub async fn subscribe_order_updates(&self, user: &str) -> anyhow::Result<()> {
322 let subscription = SubscriptionRequest::OrderUpdates {
323 user: user.to_string(),
324 };
325 self.cmd_tx
326 .read()
327 .await
328 .send(HandlerCommand::Subscribe {
329 subscriptions: vec![subscription],
330 })
331 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
332 Ok(())
333 }
334
335 pub async fn subscribe_user_events(&self, user: &str) -> anyhow::Result<()> {
337 let subscription = SubscriptionRequest::UserEvents {
338 user: user.to_string(),
339 };
340 self.cmd_tx
341 .read()
342 .await
343 .send(HandlerCommand::Subscribe {
344 subscriptions: vec![subscription],
345 })
346 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
347 Ok(())
348 }
349
350 pub async fn subscribe_all_user_channels(&self, user: &str) -> anyhow::Result<()> {
352 self.subscribe_order_updates(user).await?;
353 self.subscribe_user_events(user).await?;
354 Ok(())
355 }
356
357 pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
359 let instrument = self
360 .get_instrument(&instrument_id)
361 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
362 let coin = instrument.raw_symbol().inner();
363
364 let cmd_tx = self.cmd_tx.read().await;
365
366 cmd_tx
368 .send(HandlerCommand::UpdateInstrument(instrument.clone()))
369 .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
370
371 let subscription = SubscriptionRequest::Trades { coin };
372
373 cmd_tx
374 .send(HandlerCommand::Subscribe {
375 subscriptions: vec![subscription],
376 })
377 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
378 Ok(())
379 }
380
381 pub async fn unsubscribe_trades(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
383 let instrument = self
384 .get_instrument(&instrument_id)
385 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
386 let coin = instrument.raw_symbol().inner();
387
388 let subscription = SubscriptionRequest::Trades { coin };
389
390 self.cmd_tx
391 .read()
392 .await
393 .send(HandlerCommand::Unsubscribe {
394 subscriptions: vec![subscription],
395 })
396 .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
397 Ok(())
398 }
399
400 pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
402 let instrument = self
403 .get_instrument(&instrument_id)
404 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
405 let coin = instrument.raw_symbol().inner();
406
407 let cmd_tx = self.cmd_tx.read().await;
408
409 cmd_tx
411 .send(HandlerCommand::UpdateInstrument(instrument.clone()))
412 .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
413
414 let subscription = SubscriptionRequest::Bbo { coin };
415
416 cmd_tx
417 .send(HandlerCommand::Subscribe {
418 subscriptions: vec![subscription],
419 })
420 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
421 Ok(())
422 }
423
424 pub async fn unsubscribe_quotes(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
426 let instrument = self
427 .get_instrument(&instrument_id)
428 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
429 let coin = instrument.raw_symbol().inner();
430
431 let subscription = SubscriptionRequest::Bbo { coin };
432
433 self.cmd_tx
434 .read()
435 .await
436 .send(HandlerCommand::Unsubscribe {
437 subscriptions: vec![subscription],
438 })
439 .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
440 Ok(())
441 }
442
443 pub async fn subscribe_book(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
445 let instrument = self
446 .get_instrument(&instrument_id)
447 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
448 let coin = instrument.raw_symbol().inner();
449
450 let cmd_tx = self.cmd_tx.read().await;
451
452 cmd_tx
454 .send(HandlerCommand::UpdateInstrument(instrument.clone()))
455 .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
456
457 let subscription = SubscriptionRequest::L2Book {
458 coin,
459 mantissa: None,
460 n_sig_figs: None,
461 };
462
463 cmd_tx
464 .send(HandlerCommand::Subscribe {
465 subscriptions: vec![subscription],
466 })
467 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
468 Ok(())
469 }
470
471 pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
473 let instrument = self
474 .get_instrument(&instrument_id)
475 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
476 let coin = instrument.raw_symbol().inner();
477
478 let subscription = SubscriptionRequest::L2Book {
479 coin,
480 mantissa: None,
481 n_sig_figs: None,
482 };
483
484 self.cmd_tx
485 .read()
486 .await
487 .send(HandlerCommand::Unsubscribe {
488 subscriptions: vec![subscription],
489 })
490 .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
491 Ok(())
492 }
493
494 pub async fn subscribe_bars(&self, bar_type: BarType) -> anyhow::Result<()> {
496 let instrument = self
498 .get_instrument(&bar_type.instrument_id())
499 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {}", bar_type.instrument_id()))?;
500 let coin = instrument.raw_symbol().inner();
501 let interval = bar_type_to_interval(&bar_type)?;
502 let subscription = SubscriptionRequest::Candle { coin, interval };
503
504 let key = format!("candle:{coin}:{interval}");
506 self.bar_types.insert(key.clone(), bar_type);
507
508 let cmd_tx = self.cmd_tx.read().await;
509
510 cmd_tx
511 .send(HandlerCommand::UpdateInstrument(instrument.clone()))
512 .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
513
514 cmd_tx
515 .send(HandlerCommand::AddBarType { key, bar_type })
516 .map_err(|e| anyhow::anyhow!("Failed to send AddBarType command: {e}"))?;
517
518 cmd_tx
519 .send(HandlerCommand::Subscribe {
520 subscriptions: vec![subscription],
521 })
522 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
523 Ok(())
524 }
525
526 pub async fn unsubscribe_bars(&self, bar_type: BarType) -> anyhow::Result<()> {
528 let instrument = self
530 .get_instrument(&bar_type.instrument_id())
531 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {}", bar_type.instrument_id()))?;
532 let coin = instrument.raw_symbol().inner();
533 let interval = bar_type_to_interval(&bar_type)?;
534 let subscription = SubscriptionRequest::Candle { coin, interval };
535
536 let key = format!("candle:{coin}:{interval}");
537 self.bar_types.remove(&key);
538
539 let cmd_tx = self.cmd_tx.read().await;
540
541 cmd_tx
542 .send(HandlerCommand::RemoveBarType { key })
543 .map_err(|e| anyhow::anyhow!("Failed to send RemoveBarType command: {e}"))?;
544
545 cmd_tx
546 .send(HandlerCommand::Unsubscribe {
547 subscriptions: vec![subscription],
548 })
549 .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
550 Ok(())
551 }
552
553 pub async fn subscribe_mark_prices(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
555 self.subscribe_asset_context_data(instrument_id, AssetContextDataType::MarkPrice)
556 .await
557 }
558
559 pub async fn unsubscribe_mark_prices(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
561 self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::MarkPrice)
562 .await
563 }
564
565 pub async fn subscribe_index_prices(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
567 self.subscribe_asset_context_data(instrument_id, AssetContextDataType::IndexPrice)
568 .await
569 }
570
571 pub async fn unsubscribe_index_prices(
573 &self,
574 instrument_id: InstrumentId,
575 ) -> anyhow::Result<()> {
576 self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::IndexPrice)
577 .await
578 }
579
580 pub async fn subscribe_funding_rates(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
582 self.subscribe_asset_context_data(instrument_id, AssetContextDataType::FundingRate)
583 .await
584 }
585
586 pub async fn unsubscribe_funding_rates(
588 &self,
589 instrument_id: InstrumentId,
590 ) -> anyhow::Result<()> {
591 self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::FundingRate)
592 .await
593 }
594
595 async fn subscribe_asset_context_data(
596 &self,
597 instrument_id: InstrumentId,
598 data_type: AssetContextDataType,
599 ) -> anyhow::Result<()> {
600 let instrument = self
601 .get_instrument(&instrument_id)
602 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
603 let coin = instrument.raw_symbol().inner();
604
605 let mut entry = self.asset_context_subs.entry(coin).or_default();
606 let is_first_subscription = entry.is_empty();
607 entry.insert(data_type);
608 let data_types = entry.clone();
609 drop(entry);
610
611 let cmd_tx = self.cmd_tx.read().await;
612
613 cmd_tx
614 .send(HandlerCommand::UpdateAssetContextSubs { coin, data_types })
615 .map_err(|e| anyhow::anyhow!("Failed to send UpdateAssetContextSubs command: {e}"))?;
616
617 if is_first_subscription {
618 log::debug!(
619 "First asset context subscription for coin '{coin}', subscribing to ActiveAssetCtx"
620 );
621 let subscription = SubscriptionRequest::ActiveAssetCtx { coin };
622
623 cmd_tx
624 .send(HandlerCommand::UpdateInstrument(instrument.clone()))
625 .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
626
627 cmd_tx
628 .send(HandlerCommand::Subscribe {
629 subscriptions: vec![subscription],
630 })
631 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
632 } else {
633 log::debug!(
634 "Already subscribed to ActiveAssetCtx for coin '{coin}', adding {data_type:?} to tracked types"
635 );
636 }
637
638 Ok(())
639 }
640
641 async fn unsubscribe_asset_context_data(
642 &self,
643 instrument_id: InstrumentId,
644 data_type: AssetContextDataType,
645 ) -> anyhow::Result<()> {
646 let instrument = self
647 .get_instrument(&instrument_id)
648 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
649 let coin = instrument.raw_symbol().inner();
650
651 if let Some(mut entry) = self.asset_context_subs.get_mut(&coin) {
652 entry.remove(&data_type);
653 let should_unsubscribe = entry.is_empty();
654 let data_types = entry.clone();
655 drop(entry);
656
657 let cmd_tx = self.cmd_tx.read().await;
658
659 if should_unsubscribe {
660 self.asset_context_subs.remove(&coin);
661
662 log::debug!(
663 "Last asset context subscription removed for coin '{coin}', unsubscribing from ActiveAssetCtx"
664 );
665 let subscription = SubscriptionRequest::ActiveAssetCtx { coin };
666
667 cmd_tx
668 .send(HandlerCommand::UpdateAssetContextSubs {
669 coin,
670 data_types: HashSet::new(),
671 })
672 .map_err(|e| {
673 anyhow::anyhow!("Failed to send UpdateAssetContextSubs command: {e}")
674 })?;
675
676 cmd_tx
677 .send(HandlerCommand::Unsubscribe {
678 subscriptions: vec![subscription],
679 })
680 .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
681 } else {
682 log::debug!(
683 "Removed {data_type:?} from tracked types for coin '{coin}', but keeping ActiveAssetCtx subscription"
684 );
685
686 cmd_tx
687 .send(HandlerCommand::UpdateAssetContextSubs { coin, data_types })
688 .map_err(|e| {
689 anyhow::anyhow!("Failed to send UpdateAssetContextSubs command: {e}")
690 })?;
691 }
692 }
693
694 Ok(())
695 }
696
697 pub fn cache_instruments(&mut self, instruments: Vec<InstrumentAny>) {
702 self.instruments.clear();
703 for inst in instruments {
704 let symbol = inst.symbol().inner();
705 self.instruments.insert(symbol, inst.clone());
706 }
707 log::info!(
708 "Hyperliquid instrument cache initialized with {} instruments",
709 self.instruments.len()
710 );
711 }
712
713 pub fn cache_instrument(&self, instrument: InstrumentAny) {
717 let symbol = instrument.symbol().inner();
718 self.instruments.insert(symbol, instrument.clone());
719
720 if let Ok(cmd_tx) = self.cmd_tx.try_read() {
723 let _ = cmd_tx.send(HandlerCommand::UpdateInstrument(instrument));
724 }
725 }
726
727 pub fn get_instrument(&self, id: &InstrumentId) -> Option<InstrumentAny> {
731 self.instruments
732 .get(&id.symbol.inner())
733 .map(|e| e.value().clone())
734 }
735
736 pub fn get_instrument_by_symbol(&self, symbol: &Ustr) -> Option<InstrumentAny> {
738 self.instruments.get(symbol).map(|e| e.value().clone())
739 }
740
741 pub fn subscription_count(&self) -> usize {
743 self.subscriptions.len()
744 }
745
746 pub fn get_bar_type(&self, coin: &str, interval: &str) -> Option<BarType> {
750 let key = format!("candle:{coin}:{interval}");
752 self.bar_types.get(&key).map(|entry| *entry.value())
753 }
754
755 pub async fn next_event(&mut self) -> Option<NautilusWsMessage> {
759 if let Some(ref mut rx) = self.out_rx {
760 rx.recv().await
761 } else {
762 None
763 }
764 }
765}
766
767fn subscription_from_topic(topic: &str) -> anyhow::Result<SubscriptionRequest> {
769 let parts: Vec<&str> = topic.split(':').collect();
770
771 match parts.first() {
772 Some(&"allMids") => {
773 let dex = parts.get(1).map(|s| (*s).to_string());
774 Ok(SubscriptionRequest::AllMids { dex })
775 }
776 Some(&"notification") => Ok(SubscriptionRequest::Notification {
777 user: (*parts.get(1).context("Missing user")?).to_string(),
778 }),
779 Some(&"webData2") => Ok(SubscriptionRequest::WebData2 {
780 user: (*parts.get(1).context("Missing user")?).to_string(),
781 }),
782 Some(&"candle") => {
783 let coin = Ustr::from(parts.get(1).context("Missing coin")?);
784 let interval_str = parts.get(2).context("Missing interval")?;
785 let interval = HyperliquidBarInterval::from_str(interval_str)?;
786 Ok(SubscriptionRequest::Candle { coin, interval })
787 }
788 Some(&"l2Book") => Ok(SubscriptionRequest::L2Book {
789 coin: Ustr::from(parts.get(1).context("Missing coin")?),
790 mantissa: None,
791 n_sig_figs: None,
792 }),
793 Some(&"trades") => Ok(SubscriptionRequest::Trades {
794 coin: Ustr::from(parts.get(1).context("Missing coin")?),
795 }),
796 Some(&"orderUpdates") => Ok(SubscriptionRequest::OrderUpdates {
797 user: (*parts.get(1).context("Missing user")?).to_string(),
798 }),
799 Some(&"userEvents") => Ok(SubscriptionRequest::UserEvents {
800 user: (*parts.get(1).context("Missing user")?).to_string(),
801 }),
802 Some(&"userFills") => Ok(SubscriptionRequest::UserFills {
803 user: (*parts.get(1).context("Missing user")?).to_string(),
804 aggregate_by_time: None,
805 }),
806 Some(&"userFundings") => Ok(SubscriptionRequest::UserFundings {
807 user: (*parts.get(1).context("Missing user")?).to_string(),
808 }),
809 Some(&"userNonFundingLedgerUpdates") => {
810 Ok(SubscriptionRequest::UserNonFundingLedgerUpdates {
811 user: (*parts.get(1).context("Missing user")?).to_string(),
812 })
813 }
814 Some(&"activeAssetCtx") => Ok(SubscriptionRequest::ActiveAssetCtx {
815 coin: Ustr::from(parts.get(1).context("Missing coin")?),
816 }),
817 Some(&"activeSpotAssetCtx") => Ok(SubscriptionRequest::ActiveSpotAssetCtx {
818 coin: Ustr::from(parts.get(1).context("Missing coin")?),
819 }),
820 Some(&"activeAssetData") => Ok(SubscriptionRequest::ActiveAssetData {
821 user: (*parts.get(1).context("Missing user")?).to_string(),
822 coin: (*parts.get(2).context("Missing coin")?).to_string(),
823 }),
824 Some(&"userTwapSliceFills") => Ok(SubscriptionRequest::UserTwapSliceFills {
825 user: (*parts.get(1).context("Missing user")?).to_string(),
826 }),
827 Some(&"userTwapHistory") => Ok(SubscriptionRequest::UserTwapHistory {
828 user: (*parts.get(1).context("Missing user")?).to_string(),
829 }),
830 Some(&"bbo") => Ok(SubscriptionRequest::Bbo {
831 coin: Ustr::from(parts.get(1).context("Missing coin")?),
832 }),
833 Some(channel) => anyhow::bail!("Unknown subscription channel: {channel}"),
834 None => anyhow::bail!("Empty topic string"),
835 }
836}
837
838#[cfg(test)]
839mod tests {
840 use rstest::rstest;
841
842 use super::*;
843 use crate::common::enums::HyperliquidBarInterval;
844
845 fn subscription_topic(sub: &SubscriptionRequest) -> String {
847 match sub {
848 SubscriptionRequest::AllMids { dex } => {
849 if let Some(dex) = dex {
850 format!("allMids:{dex}")
851 } else {
852 "allMids".to_string()
853 }
854 }
855 SubscriptionRequest::Notification { user } => format!("notification:{user}"),
856 SubscriptionRequest::WebData2 { user } => format!("webData2:{user}"),
857 SubscriptionRequest::Candle { coin, interval } => {
858 format!("candle:{coin}:{}", interval.as_str())
859 }
860 SubscriptionRequest::L2Book { coin, .. } => format!("l2Book:{coin}"),
861 SubscriptionRequest::Trades { coin } => format!("trades:{coin}"),
862 SubscriptionRequest::OrderUpdates { user } => format!("orderUpdates:{user}"),
863 SubscriptionRequest::UserEvents { user } => format!("userEvents:{user}"),
864 SubscriptionRequest::UserFills { user, .. } => format!("userFills:{user}"),
865 SubscriptionRequest::UserFundings { user } => format!("userFundings:{user}"),
866 SubscriptionRequest::UserNonFundingLedgerUpdates { user } => {
867 format!("userNonFundingLedgerUpdates:{user}")
868 }
869 SubscriptionRequest::ActiveAssetCtx { coin } => format!("activeAssetCtx:{coin}"),
870 SubscriptionRequest::ActiveSpotAssetCtx { coin } => {
871 format!("activeSpotAssetCtx:{coin}")
872 }
873 SubscriptionRequest::ActiveAssetData { user, coin } => {
874 format!("activeAssetData:{user}:{coin}")
875 }
876 SubscriptionRequest::UserTwapSliceFills { user } => {
877 format!("userTwapSliceFills:{user}")
878 }
879 SubscriptionRequest::UserTwapHistory { user } => format!("userTwapHistory:{user}"),
880 SubscriptionRequest::Bbo { coin } => format!("bbo:{coin}"),
881 }
882 }
883
884 #[rstest]
885 #[case(SubscriptionRequest::Trades { coin: "BTC".into() }, "trades:BTC")]
886 #[case(SubscriptionRequest::Bbo { coin: "BTC".into() }, "bbo:BTC")]
887 #[case(SubscriptionRequest::OrderUpdates { user: "0x123".to_string() }, "orderUpdates:0x123")]
888 #[case(SubscriptionRequest::UserEvents { user: "0xabc".to_string() }, "userEvents:0xabc")]
889 fn test_subscription_topic_generation(
890 #[case] subscription: SubscriptionRequest,
891 #[case] expected_topic: &str,
892 ) {
893 assert_eq!(subscription_topic(&subscription), expected_topic);
894 }
895
896 #[rstest]
897 fn test_subscription_topics_unique() {
898 let sub1 = SubscriptionRequest::Trades { coin: "BTC".into() };
899 let sub2 = SubscriptionRequest::Bbo { coin: "BTC".into() };
900
901 let topic1 = subscription_topic(&sub1);
902 let topic2 = subscription_topic(&sub2);
903
904 assert_ne!(topic1, topic2);
905 }
906
907 #[rstest]
908 #[case(SubscriptionRequest::Trades { coin: "BTC".into() })]
909 #[case(SubscriptionRequest::Bbo { coin: "ETH".into() })]
910 #[case(SubscriptionRequest::Candle { coin: "SOL".into(), interval: HyperliquidBarInterval::OneHour })]
911 #[case(SubscriptionRequest::OrderUpdates { user: "0x123".to_string() })]
912 fn test_subscription_reconstruction(#[case] subscription: SubscriptionRequest) {
913 let topic = subscription_topic(&subscription);
914 let reconstructed = subscription_from_topic(&topic).expect("Failed to reconstruct");
915 assert_eq!(subscription_topic(&reconstructed), topic);
916 }
917
918 #[rstest]
919 fn test_subscription_topic_candle() {
920 let sub = SubscriptionRequest::Candle {
921 coin: "BTC".into(),
922 interval: HyperliquidBarInterval::OneHour,
923 };
924
925 let topic = subscription_topic(&sub);
926 assert_eq!(topic, "candle:BTC:1h");
927 }
928}