1use std::{
17 collections::HashSet,
18 str::FromStr,
19 sync::{
20 Arc,
21 atomic::{AtomicBool, AtomicU8, Ordering},
22 },
23};
24
25use anyhow::Context;
26use arc_swap::ArcSwap;
27use dashmap::DashMap;
28use nautilus_model::{
29 data::BarType,
30 identifiers::{AccountId, InstrumentId},
31 instruments::{Instrument, InstrumentAny},
32};
33use nautilus_network::{
34 mode::ConnectionMode,
35 websocket::{
36 AuthTracker, SubscriptionState, WebSocketClient, WebSocketConfig, channel_message_handler,
37 },
38};
39use ustr::Ustr;
40
41use crate::{
42 common::{HyperliquidProductType, enums::HyperliquidBarInterval, parse::bar_type_to_interval},
43 websocket::{
44 handler::{FeedHandler, HandlerCommand},
45 messages::{NautilusWsMessage, SubscriptionRequest},
46 },
47};
48
49const HYPERLIQUID_HEARTBEAT_MSG: &str = r#"{"method":"ping"}"#;
50
51#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
53pub(super) enum AssetContextDataType {
54 MarkPrice,
55 IndexPrice,
56 FundingRate,
57}
58
59#[derive(Debug)]
64#[cfg_attr(
65 feature = "python",
66 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
67)]
68pub struct HyperliquidWebSocketClient {
69 url: String,
70 product_type: HyperliquidProductType,
71 connection_mode: Arc<ArcSwap<AtomicU8>>,
72 signal: Arc<AtomicBool>,
73 cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
74 out_rx: Option<tokio::sync::mpsc::UnboundedReceiver<NautilusWsMessage>>,
75 auth_tracker: AuthTracker,
76 subscriptions: SubscriptionState,
77 instruments: Arc<DashMap<Ustr, InstrumentAny>>,
78 bar_types: Arc<DashMap<String, BarType>>,
79 asset_context_subs: Arc<DashMap<Ustr, HashSet<AssetContextDataType>>>,
80 task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
81 account_id: Option<AccountId>,
82}
83
84impl Clone for HyperliquidWebSocketClient {
85 fn clone(&self) -> Self {
86 Self {
87 url: self.url.clone(),
88 product_type: self.product_type,
89 connection_mode: Arc::clone(&self.connection_mode),
90 signal: Arc::clone(&self.signal),
91 cmd_tx: Arc::clone(&self.cmd_tx),
92 out_rx: None,
93 auth_tracker: self.auth_tracker.clone(),
94 subscriptions: self.subscriptions.clone(),
95 instruments: Arc::clone(&self.instruments),
96 bar_types: Arc::clone(&self.bar_types),
97 asset_context_subs: Arc::clone(&self.asset_context_subs),
98 task_handle: None,
99 account_id: self.account_id,
100 }
101 }
102}
103
104impl HyperliquidWebSocketClient {
105 pub fn new(
113 url: Option<String>,
114 testnet: bool,
115 product_type: HyperliquidProductType,
116 account_id: Option<AccountId>,
117 ) -> Self {
118 let url = url.unwrap_or_else(|| {
119 if testnet {
120 "wss://api.hyperliquid-testnet.xyz/ws".to_string()
121 } else {
122 "wss://api.hyperliquid.xyz/ws".to_string()
123 }
124 });
125 let connection_mode = Arc::new(ArcSwap::new(Arc::new(AtomicU8::new(
126 ConnectionMode::Closed as u8,
127 ))));
128 Self {
129 url,
130 product_type,
131 connection_mode,
132 signal: Arc::new(AtomicBool::new(false)),
133 auth_tracker: AuthTracker::new(),
134 subscriptions: SubscriptionState::new(':'),
135 instruments: Arc::new(DashMap::new()),
136 bar_types: Arc::new(DashMap::new()),
137 asset_context_subs: Arc::new(DashMap::new()),
138 cmd_tx: {
139 let (tx, _) = tokio::sync::mpsc::unbounded_channel();
141 Arc::new(tokio::sync::RwLock::new(tx))
142 },
143 out_rx: None,
144 task_handle: None,
145 account_id,
146 }
147 }
148
149 pub async fn connect(&mut self) -> anyhow::Result<()> {
151 if self.is_active() {
152 tracing::warn!("WebSocket already connected");
153 return Ok(());
154 }
155 let (message_handler, raw_rx) = channel_message_handler();
156 let cfg = WebSocketConfig {
157 url: self.url.clone(),
158 headers: vec![],
159 message_handler: Some(message_handler),
160 heartbeat: Some(30),
161 heartbeat_msg: Some(HYPERLIQUID_HEARTBEAT_MSG.to_string()),
162 ping_handler: None,
163 reconnect_timeout_ms: Some(15_000),
164 reconnect_delay_initial_ms: Some(250),
165 reconnect_delay_max_ms: Some(5_000),
166 reconnect_backoff_factor: Some(2.0),
167 reconnect_jitter_ms: Some(200),
168 reconnect_max_attempts: None,
169 };
170 let client = WebSocketClient::connect(cfg, None, vec![], None).await?;
171
172 self.connection_mode.store(client.connection_mode_atomic());
174 tracing::info!("Hyperliquid WebSocket connected: {}", self.url);
175
176 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel::<HandlerCommand>();
178 let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel::<NautilusWsMessage>();
179
180 if let Err(e) = cmd_tx.send(HandlerCommand::SetClient(client)) {
182 anyhow::bail!("Failed to send SetClient command: {e}");
183 }
184
185 let instruments_vec: Vec<InstrumentAny> = self
187 .instruments
188 .iter()
189 .map(|entry| entry.value().clone())
190 .collect();
191 if !instruments_vec.is_empty()
192 && let Err(e) = cmd_tx.send(HandlerCommand::InitializeInstruments(instruments_vec))
193 {
194 tracing::error!("Failed to send InitializeInstruments: {e}");
195 }
196
197 let signal = Arc::clone(&self.signal);
199 let account_id = self.account_id;
200 let subscriptions = self.subscriptions.clone();
201 let cmd_tx_for_reconnect = cmd_tx.clone();
202
203 let stream_handle = tokio::spawn(async move {
204 let mut handler = FeedHandler::new(
205 signal,
206 cmd_rx,
207 raw_rx,
208 out_tx,
209 account_id,
210 subscriptions.clone(),
211 );
212
213 let resubscribe_all = || {
214 let topics = subscriptions.all_topics();
215 if topics.is_empty() {
216 tracing::debug!("No active subscriptions to restore after reconnection");
217 return;
218 }
219
220 tracing::info!(
221 "Resubscribing to {} active subscriptions after reconnection",
222 topics.len()
223 );
224 for topic in topics {
225 match subscription_from_topic(&topic) {
226 Ok(subscription) => {
227 if let Err(e) = cmd_tx_for_reconnect.send(HandlerCommand::Subscribe {
228 subscriptions: vec![subscription],
229 }) {
230 tracing::error!(error = %e, "Failed to send resubscribe command");
231 }
232 }
233 Err(e) => {
234 tracing::error!(
235 error = %e,
236 topic = %topic,
237 "Failed to reconstruct subscription from topic"
238 );
239 }
240 }
241 }
242 };
243 loop {
244 match handler.next().await {
245 Some(NautilusWsMessage::Reconnected) => {
246 tracing::info!("WebSocket reconnected");
247 resubscribe_all();
248 continue;
249 }
250 Some(msg) => {
251 if handler.send(msg).is_err() {
252 tracing::error!("Failed to send message (receiver dropped)");
253 break;
254 }
255 }
256 None => {
257 if handler.is_stopped() {
258 tracing::debug!("Stop signal received, ending message processing");
259 break;
260 }
261 tracing::warn!("WebSocket stream ended unexpectedly");
262 break;
263 }
264 }
265 }
266 tracing::debug!("Handler task completed");
267 });
268 self.task_handle = Some(Arc::new(stream_handle));
269 *self.cmd_tx.write().await = cmd_tx;
270 self.out_rx = Some(out_rx);
271 Ok(())
272 }
273
274 pub async fn disconnect(&mut self) -> anyhow::Result<()> {
276 tracing::info!("Disconnecting Hyperliquid WebSocket");
277 self.signal.store(true, Ordering::Relaxed);
278 if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Disconnect) {
279 tracing::debug!(
280 "Failed to send disconnect command (handler may already be shut down): {e}"
281 );
282 }
283 if let Some(task_handle) = self.task_handle.take() {
284 match Arc::try_unwrap(task_handle) {
285 Ok(handle) => {
286 tracing::debug!("Waiting for task handle to complete");
287 match tokio::time::timeout(tokio::time::Duration::from_secs(2), handle).await {
288 Ok(Ok(())) => tracing::debug!("Task handle completed successfully"),
289 Ok(Err(e)) => tracing::error!("Task handle encountered an error: {e:?}"),
290 Err(_) => {
291 tracing::warn!(
292 "Timeout waiting for task handle, task may still be running"
293 );
294 }
295 }
296 }
297 Err(arc_handle) => {
298 tracing::debug!(
299 "Cannot take ownership of task handle - other references exist, aborting task"
300 );
301 arc_handle.abort();
302 }
303 }
304 } else {
305 tracing::debug!("No task handle to await");
306 }
307 tracing::debug!("Disconnected");
308 Ok(())
309 }
310
311 pub fn is_active(&self) -> bool {
313 let mode = self.connection_mode.load();
314 mode.load(Ordering::Relaxed) == ConnectionMode::Active as u8
315 }
316
317 pub fn url(&self) -> &str {
319 &self.url
320 }
321
322 pub async fn subscribe_order_updates(&self, user: &str) -> anyhow::Result<()> {
324 let subscription = SubscriptionRequest::OrderUpdates {
325 user: user.to_string(),
326 };
327 self.cmd_tx
328 .read()
329 .await
330 .send(HandlerCommand::Subscribe {
331 subscriptions: vec![subscription],
332 })
333 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
334 Ok(())
335 }
336
337 pub async fn subscribe_user_events(&self, user: &str) -> anyhow::Result<()> {
339 let subscription = SubscriptionRequest::UserEvents {
340 user: user.to_string(),
341 };
342 self.cmd_tx
343 .read()
344 .await
345 .send(HandlerCommand::Subscribe {
346 subscriptions: vec![subscription],
347 })
348 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
349 Ok(())
350 }
351
352 pub async fn subscribe_all_user_channels(&self, user: &str) -> anyhow::Result<()> {
354 self.subscribe_order_updates(user).await?;
355 self.subscribe_user_events(user).await?;
356 Ok(())
357 }
358
359 pub async fn subscribe_trades(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
361 let instrument = self
362 .get_instrument(&instrument_id)
363 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
364 let coin = instrument.raw_symbol().inner();
365
366 let cmd_tx = self.cmd_tx.read().await;
367
368 cmd_tx
370 .send(HandlerCommand::UpdateInstrument(instrument.clone()))
371 .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
372
373 let subscription = SubscriptionRequest::Trades { coin };
374
375 cmd_tx
376 .send(HandlerCommand::Subscribe {
377 subscriptions: vec![subscription],
378 })
379 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
380 Ok(())
381 }
382
383 pub async fn unsubscribe_trades(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
385 let instrument = self
386 .get_instrument(&instrument_id)
387 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
388 let coin = instrument.raw_symbol().inner();
389
390 let subscription = SubscriptionRequest::Trades { coin };
391
392 self.cmd_tx
393 .read()
394 .await
395 .send(HandlerCommand::Unsubscribe {
396 subscriptions: vec![subscription],
397 })
398 .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
399 Ok(())
400 }
401
402 pub async fn subscribe_quotes(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
404 let instrument = self
405 .get_instrument(&instrument_id)
406 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
407 let coin = instrument.raw_symbol().inner();
408
409 let cmd_tx = self.cmd_tx.read().await;
410
411 cmd_tx
413 .send(HandlerCommand::UpdateInstrument(instrument.clone()))
414 .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
415
416 let subscription = SubscriptionRequest::Bbo { coin };
417
418 cmd_tx
419 .send(HandlerCommand::Subscribe {
420 subscriptions: vec![subscription],
421 })
422 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
423 Ok(())
424 }
425
426 pub async fn unsubscribe_quotes(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
428 let instrument = self
429 .get_instrument(&instrument_id)
430 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
431 let coin = instrument.raw_symbol().inner();
432
433 let subscription = SubscriptionRequest::Bbo { coin };
434
435 self.cmd_tx
436 .read()
437 .await
438 .send(HandlerCommand::Unsubscribe {
439 subscriptions: vec![subscription],
440 })
441 .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
442 Ok(())
443 }
444
445 pub async fn subscribe_book(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
447 let instrument = self
448 .get_instrument(&instrument_id)
449 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
450 let coin = instrument.raw_symbol().inner();
451
452 let cmd_tx = self.cmd_tx.read().await;
453
454 cmd_tx
456 .send(HandlerCommand::UpdateInstrument(instrument.clone()))
457 .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
458
459 let subscription = SubscriptionRequest::L2Book {
460 coin,
461 mantissa: None,
462 n_sig_figs: None,
463 };
464
465 cmd_tx
466 .send(HandlerCommand::Subscribe {
467 subscriptions: vec![subscription],
468 })
469 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
470 Ok(())
471 }
472
473 pub async fn unsubscribe_book(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
475 let instrument = self
476 .get_instrument(&instrument_id)
477 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
478 let coin = instrument.raw_symbol().inner();
479
480 let subscription = SubscriptionRequest::L2Book {
481 coin,
482 mantissa: None,
483 n_sig_figs: None,
484 };
485
486 self.cmd_tx
487 .read()
488 .await
489 .send(HandlerCommand::Unsubscribe {
490 subscriptions: vec![subscription],
491 })
492 .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
493 Ok(())
494 }
495
496 pub async fn subscribe_bars(&self, bar_type: BarType) -> anyhow::Result<()> {
498 let instrument = self
500 .get_instrument(&bar_type.instrument_id())
501 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {}", bar_type.instrument_id()))?;
502 let coin = instrument.raw_symbol().inner();
503 let interval = bar_type_to_interval(&bar_type)?;
504 let subscription = SubscriptionRequest::Candle { coin, interval };
505
506 let key = format!("candle:{coin}:{interval}");
508 self.bar_types.insert(key.clone(), bar_type);
509
510 let cmd_tx = self.cmd_tx.read().await;
511
512 cmd_tx
513 .send(HandlerCommand::UpdateInstrument(instrument.clone()))
514 .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
515
516 cmd_tx
517 .send(HandlerCommand::AddBarType { key, bar_type })
518 .map_err(|e| anyhow::anyhow!("Failed to send AddBarType command: {e}"))?;
519
520 cmd_tx
521 .send(HandlerCommand::Subscribe {
522 subscriptions: vec![subscription],
523 })
524 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
525 Ok(())
526 }
527
528 pub async fn unsubscribe_bars(&self, bar_type: BarType) -> anyhow::Result<()> {
530 let instrument = self
532 .get_instrument(&bar_type.instrument_id())
533 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {}", bar_type.instrument_id()))?;
534 let coin = instrument.raw_symbol().inner();
535 let interval = bar_type_to_interval(&bar_type)?;
536 let subscription = SubscriptionRequest::Candle { coin, interval };
537
538 let key = format!("candle:{coin}:{interval}");
539 self.bar_types.remove(&key);
540
541 let cmd_tx = self.cmd_tx.read().await;
542
543 cmd_tx
544 .send(HandlerCommand::RemoveBarType { key })
545 .map_err(|e| anyhow::anyhow!("Failed to send RemoveBarType command: {e}"))?;
546
547 cmd_tx
548 .send(HandlerCommand::Unsubscribe {
549 subscriptions: vec![subscription],
550 })
551 .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
552 Ok(())
553 }
554
555 pub async fn subscribe_mark_prices(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
557 self.subscribe_asset_context_data(instrument_id, AssetContextDataType::MarkPrice)
558 .await
559 }
560
561 pub async fn unsubscribe_mark_prices(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
563 self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::MarkPrice)
564 .await
565 }
566
567 pub async fn subscribe_index_prices(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
569 self.subscribe_asset_context_data(instrument_id, AssetContextDataType::IndexPrice)
570 .await
571 }
572
573 pub async fn unsubscribe_index_prices(
575 &self,
576 instrument_id: InstrumentId,
577 ) -> anyhow::Result<()> {
578 self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::IndexPrice)
579 .await
580 }
581
582 pub async fn subscribe_funding_rates(&self, instrument_id: InstrumentId) -> anyhow::Result<()> {
584 self.subscribe_asset_context_data(instrument_id, AssetContextDataType::FundingRate)
585 .await
586 }
587
588 pub async fn unsubscribe_funding_rates(
590 &self,
591 instrument_id: InstrumentId,
592 ) -> anyhow::Result<()> {
593 self.unsubscribe_asset_context_data(instrument_id, AssetContextDataType::FundingRate)
594 .await
595 }
596
597 async fn subscribe_asset_context_data(
598 &self,
599 instrument_id: InstrumentId,
600 data_type: AssetContextDataType,
601 ) -> anyhow::Result<()> {
602 let instrument = self
603 .get_instrument(&instrument_id)
604 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
605 let coin = instrument.raw_symbol().inner();
606
607 let mut entry = self.asset_context_subs.entry(coin).or_default();
608 let is_first_subscription = entry.is_empty();
609 entry.insert(data_type);
610 let data_types = entry.clone();
611 drop(entry);
612
613 let cmd_tx = self.cmd_tx.read().await;
614
615 cmd_tx
616 .send(HandlerCommand::UpdateAssetContextSubs { coin, data_types })
617 .map_err(|e| anyhow::anyhow!("Failed to send UpdateAssetContextSubs command: {e}"))?;
618
619 if is_first_subscription {
620 tracing::debug!(
621 "First asset context subscription for coin '{coin}', subscribing to ActiveAssetCtx"
622 );
623 let subscription = SubscriptionRequest::ActiveAssetCtx { coin };
624
625 cmd_tx
626 .send(HandlerCommand::UpdateInstrument(instrument.clone()))
627 .map_err(|e| anyhow::anyhow!("Failed to send UpdateInstrument command: {e}"))?;
628
629 cmd_tx
630 .send(HandlerCommand::Subscribe {
631 subscriptions: vec![subscription],
632 })
633 .map_err(|e| anyhow::anyhow!("Failed to send subscribe command: {e}"))?;
634 } else {
635 tracing::debug!(
636 "Already subscribed to ActiveAssetCtx for coin '{coin}', adding {data_type:?} to tracked types"
637 );
638 }
639
640 Ok(())
641 }
642
643 async fn unsubscribe_asset_context_data(
644 &self,
645 instrument_id: InstrumentId,
646 data_type: AssetContextDataType,
647 ) -> anyhow::Result<()> {
648 let instrument = self
649 .get_instrument(&instrument_id)
650 .ok_or_else(|| anyhow::anyhow!("Instrument not found: {instrument_id}"))?;
651 let coin = instrument.raw_symbol().inner();
652
653 if let Some(mut entry) = self.asset_context_subs.get_mut(&coin) {
654 entry.remove(&data_type);
655 let should_unsubscribe = entry.is_empty();
656 let data_types = entry.clone();
657 drop(entry);
658
659 let cmd_tx = self.cmd_tx.read().await;
660
661 if should_unsubscribe {
662 self.asset_context_subs.remove(&coin);
663
664 tracing::debug!(
665 "Last asset context subscription removed for coin '{coin}', unsubscribing from ActiveAssetCtx"
666 );
667 let subscription = SubscriptionRequest::ActiveAssetCtx { coin };
668
669 cmd_tx
670 .send(HandlerCommand::UpdateAssetContextSubs {
671 coin,
672 data_types: HashSet::new(),
673 })
674 .map_err(|e| {
675 anyhow::anyhow!("Failed to send UpdateAssetContextSubs command: {e}")
676 })?;
677
678 cmd_tx
679 .send(HandlerCommand::Unsubscribe {
680 subscriptions: vec![subscription],
681 })
682 .map_err(|e| anyhow::anyhow!("Failed to send unsubscribe command: {e}"))?;
683 } else {
684 tracing::debug!(
685 "Removed {data_type:?} from tracked types for coin '{coin}', but keeping ActiveAssetCtx subscription"
686 );
687
688 cmd_tx
689 .send(HandlerCommand::UpdateAssetContextSubs { coin, data_types })
690 .map_err(|e| {
691 anyhow::anyhow!("Failed to send UpdateAssetContextSubs command: {e}")
692 })?;
693 }
694 }
695
696 Ok(())
697 }
698
699 pub fn cache_instruments(&mut self, instruments: Vec<InstrumentAny>) {
704 self.instruments.clear();
705 for inst in instruments {
706 let symbol = inst.symbol().inner();
707 self.instruments.insert(symbol, inst.clone());
708 }
709 tracing::info!(
710 "Hyperliquid instrument cache initialized with {} instruments",
711 self.instruments.len()
712 );
713 }
714
715 pub fn cache_instrument(&self, instrument: InstrumentAny) {
719 let symbol = instrument.symbol().inner();
720 self.instruments.insert(symbol, instrument.clone());
721
722 if let Ok(cmd_tx) = self.cmd_tx.try_read() {
725 let _ = cmd_tx.send(HandlerCommand::UpdateInstrument(instrument));
726 }
727 }
728
729 pub fn get_instrument(&self, id: &InstrumentId) -> Option<InstrumentAny> {
733 self.instruments
734 .get(&id.symbol.inner())
735 .map(|e| e.value().clone())
736 }
737
738 pub fn get_instrument_by_symbol(&self, symbol: &Ustr) -> Option<InstrumentAny> {
740 self.instruments.get(symbol).map(|e| e.value().clone())
741 }
742
743 pub fn subscription_count(&self) -> usize {
745 self.subscriptions.len()
746 }
747
748 pub fn get_bar_type(&self, coin: &str, interval: &str) -> Option<BarType> {
752 let key = format!("candle:{coin}:{interval}");
754 self.bar_types.get(&key).map(|entry| *entry.value())
755 }
756
757 pub async fn next_event(&mut self) -> Option<NautilusWsMessage> {
761 if let Some(ref mut rx) = self.out_rx {
762 rx.recv().await
763 } else {
764 None
765 }
766 }
767}
768
769fn subscription_from_topic(topic: &str) -> anyhow::Result<SubscriptionRequest> {
771 let parts: Vec<&str> = topic.split(':').collect();
772
773 match parts.first() {
774 Some(&"allMids") => {
775 let dex = parts.get(1).map(|s| (*s).to_string());
776 Ok(SubscriptionRequest::AllMids { dex })
777 }
778 Some(&"notification") => Ok(SubscriptionRequest::Notification {
779 user: (*parts.get(1).context("Missing user")?).to_string(),
780 }),
781 Some(&"webData2") => Ok(SubscriptionRequest::WebData2 {
782 user: (*parts.get(1).context("Missing user")?).to_string(),
783 }),
784 Some(&"candle") => {
785 let coin = Ustr::from(parts.get(1).context("Missing coin")?);
786 let interval_str = parts.get(2).context("Missing interval")?;
787 let interval = HyperliquidBarInterval::from_str(interval_str)?;
788 Ok(SubscriptionRequest::Candle { coin, interval })
789 }
790 Some(&"l2Book") => Ok(SubscriptionRequest::L2Book {
791 coin: Ustr::from(parts.get(1).context("Missing coin")?),
792 mantissa: None,
793 n_sig_figs: None,
794 }),
795 Some(&"trades") => Ok(SubscriptionRequest::Trades {
796 coin: Ustr::from(parts.get(1).context("Missing coin")?),
797 }),
798 Some(&"orderUpdates") => Ok(SubscriptionRequest::OrderUpdates {
799 user: (*parts.get(1).context("Missing user")?).to_string(),
800 }),
801 Some(&"userEvents") => Ok(SubscriptionRequest::UserEvents {
802 user: (*parts.get(1).context("Missing user")?).to_string(),
803 }),
804 Some(&"userFills") => Ok(SubscriptionRequest::UserFills {
805 user: (*parts.get(1).context("Missing user")?).to_string(),
806 aggregate_by_time: None,
807 }),
808 Some(&"userFundings") => Ok(SubscriptionRequest::UserFundings {
809 user: (*parts.get(1).context("Missing user")?).to_string(),
810 }),
811 Some(&"userNonFundingLedgerUpdates") => {
812 Ok(SubscriptionRequest::UserNonFundingLedgerUpdates {
813 user: (*parts.get(1).context("Missing user")?).to_string(),
814 })
815 }
816 Some(&"activeAssetCtx") => Ok(SubscriptionRequest::ActiveAssetCtx {
817 coin: Ustr::from(parts.get(1).context("Missing coin")?),
818 }),
819 Some(&"activeSpotAssetCtx") => Ok(SubscriptionRequest::ActiveSpotAssetCtx {
820 coin: Ustr::from(parts.get(1).context("Missing coin")?),
821 }),
822 Some(&"activeAssetData") => Ok(SubscriptionRequest::ActiveAssetData {
823 user: (*parts.get(1).context("Missing user")?).to_string(),
824 coin: (*parts.get(2).context("Missing coin")?).to_string(),
825 }),
826 Some(&"userTwapSliceFills") => Ok(SubscriptionRequest::UserTwapSliceFills {
827 user: (*parts.get(1).context("Missing user")?).to_string(),
828 }),
829 Some(&"userTwapHistory") => Ok(SubscriptionRequest::UserTwapHistory {
830 user: (*parts.get(1).context("Missing user")?).to_string(),
831 }),
832 Some(&"bbo") => Ok(SubscriptionRequest::Bbo {
833 coin: Ustr::from(parts.get(1).context("Missing coin")?),
834 }),
835 Some(channel) => anyhow::bail!("Unknown subscription channel: {channel}"),
836 None => anyhow::bail!("Empty topic string"),
837 }
838}
839
840#[cfg(test)]
845mod tests {
846 use rstest::rstest;
847
848 use super::*;
849 use crate::common::enums::HyperliquidBarInterval;
850
851 fn subscription_topic(sub: &SubscriptionRequest) -> String {
853 match sub {
854 SubscriptionRequest::AllMids { dex } => {
855 if let Some(dex) = dex {
856 format!("allMids:{dex}")
857 } else {
858 "allMids".to_string()
859 }
860 }
861 SubscriptionRequest::Notification { user } => format!("notification:{user}"),
862 SubscriptionRequest::WebData2 { user } => format!("webData2:{user}"),
863 SubscriptionRequest::Candle { coin, interval } => {
864 format!("candle:{coin}:{}", interval.as_str())
865 }
866 SubscriptionRequest::L2Book { coin, .. } => format!("l2Book:{coin}"),
867 SubscriptionRequest::Trades { coin } => format!("trades:{coin}"),
868 SubscriptionRequest::OrderUpdates { user } => format!("orderUpdates:{user}"),
869 SubscriptionRequest::UserEvents { user } => format!("userEvents:{user}"),
870 SubscriptionRequest::UserFills { user, .. } => format!("userFills:{user}"),
871 SubscriptionRequest::UserFundings { user } => format!("userFundings:{user}"),
872 SubscriptionRequest::UserNonFundingLedgerUpdates { user } => {
873 format!("userNonFundingLedgerUpdates:{user}")
874 }
875 SubscriptionRequest::ActiveAssetCtx { coin } => format!("activeAssetCtx:{coin}"),
876 SubscriptionRequest::ActiveSpotAssetCtx { coin } => {
877 format!("activeSpotAssetCtx:{coin}")
878 }
879 SubscriptionRequest::ActiveAssetData { user, coin } => {
880 format!("activeAssetData:{user}:{coin}")
881 }
882 SubscriptionRequest::UserTwapSliceFills { user } => {
883 format!("userTwapSliceFills:{user}")
884 }
885 SubscriptionRequest::UserTwapHistory { user } => format!("userTwapHistory:{user}"),
886 SubscriptionRequest::Bbo { coin } => format!("bbo:{coin}"),
887 }
888 }
889
890 #[rstest]
891 #[case(SubscriptionRequest::Trades { coin: "BTC".into() }, "trades:BTC")]
892 #[case(SubscriptionRequest::Bbo { coin: "BTC".into() }, "bbo:BTC")]
893 #[case(SubscriptionRequest::OrderUpdates { user: "0x123".to_string() }, "orderUpdates:0x123")]
894 #[case(SubscriptionRequest::UserEvents { user: "0xabc".to_string() }, "userEvents:0xabc")]
895 fn test_subscription_topic_generation(
896 #[case] subscription: SubscriptionRequest,
897 #[case] expected_topic: &str,
898 ) {
899 assert_eq!(subscription_topic(&subscription), expected_topic);
900 }
901
902 #[rstest]
903 fn test_subscription_topics_unique() {
904 let sub1 = SubscriptionRequest::Trades { coin: "BTC".into() };
905 let sub2 = SubscriptionRequest::Bbo { coin: "BTC".into() };
906
907 let topic1 = subscription_topic(&sub1);
908 let topic2 = subscription_topic(&sub2);
909
910 assert_ne!(topic1, topic2);
911 }
912
913 #[rstest]
914 #[case(SubscriptionRequest::Trades { coin: "BTC".into() })]
915 #[case(SubscriptionRequest::Bbo { coin: "ETH".into() })]
916 #[case(SubscriptionRequest::Candle { coin: "SOL".into(), interval: HyperliquidBarInterval::OneHour })]
917 #[case(SubscriptionRequest::OrderUpdates { user: "0x123".to_string() })]
918 fn test_subscription_reconstruction(#[case] subscription: SubscriptionRequest) {
919 let topic = subscription_topic(&subscription);
920 let reconstructed = subscription_from_topic(&topic).expect("Failed to reconstruct");
921 assert_eq!(subscription_topic(&reconstructed), topic);
922 }
923
924 #[rstest]
925 fn test_subscription_topic_candle() {
926 let sub = SubscriptionRequest::Candle {
927 coin: "BTC".into(),
928 interval: HyperliquidBarInterval::OneHour,
929 };
930
931 let topic = subscription_topic(&sub);
932 assert_eq!(topic, "candle:BTC:1h");
933 }
934}