hyperliquid_ws_data/
ws_data.rs1use std::{env, error::Error, time::Duration};
17
18use nautilus_hyperliquid::{
19 common::consts::ws_url,
20 websocket::{
21 client::HyperliquidWebSocketInnerClient,
22 messages::{HyperliquidWsMessage, SubscriptionRequest},
23 },
24};
25use tokio::{pin, signal, time::sleep};
26use tracing::level_filters::LevelFilter;
27use ustr::Ustr;
28
29#[tokio::main]
30async fn main() -> Result<(), Box<dyn Error>> {
31 let log_level = env::var("LOG_LEVEL")
32 .unwrap_or_else(|_| "INFO".to_string())
33 .parse::<LevelFilter>()
34 .unwrap_or(LevelFilter::INFO);
35
36 tracing_subscriber::fmt()
37 .with_max_level(log_level)
38 .with_target(false)
39 .compact()
40 .init();
41
42 let args: Vec<String> = env::args().collect();
43 let subscription_type = args.get(1).map_or("all", String::as_str);
44 let symbol = args.get(2).map_or("BTC", String::as_str);
45 let testnet = args.get(3).is_some_and(|s| s == "testnet");
46
47 tracing::info!("Starting Hyperliquid WebSocket test");
48 tracing::info!("Subscription type: {subscription_type}");
49 tracing::info!("Symbol: {symbol}");
50 tracing::info!("Network: {}", if testnet { "testnet" } else { "mainnet" });
51
52 let url = ws_url(testnet);
53 let mut client = HyperliquidWebSocketInnerClient::connect(url).await?;
54
55 sleep(Duration::from_millis(500)).await;
57
58 let coin = Ustr::from(symbol);
59 tracing::info!("Using symbol: {symbol}");
60
61 match subscription_type {
62 "trades" => {
63 tracing::info!("Subscribing to trades for {symbol}");
64 let subscription = SubscriptionRequest::Trades { coin };
65 client.ws_subscribe(subscription).await?;
66 }
67 "book" | "l2book" | "orderbook" => {
68 tracing::info!("Subscribing to L2 book for {symbol}");
69 let subscription = SubscriptionRequest::L2Book {
70 coin,
71 n_sig_figs: None,
72 mantissa: None,
73 };
74 client.ws_subscribe(subscription).await?;
75 }
76 "candles" | "klines" => {
77 tracing::info!("Subscribing to candles for {symbol}");
78 let subscription = SubscriptionRequest::Candle {
79 coin,
80 interval: "1m".to_string(),
81 };
82 client.ws_subscribe(subscription).await?;
83 }
84 "allmids" => {
85 tracing::info!("Subscribing to all mids");
86 let subscription = SubscriptionRequest::AllMids { dex: None };
87 client.ws_subscribe(subscription).await?;
88 }
89 "bbo" => {
90 tracing::info!("Subscribing to best bid/offer for {symbol}");
91 let subscription = SubscriptionRequest::Bbo { coin };
92 client.ws_subscribe(subscription).await?;
93 }
94 "all" => {
95 tracing::info!("Subscribing to all available data types for {symbol}");
96
97 tracing::info!("- Subscribing to trades");
98 let subscription = SubscriptionRequest::Trades { coin };
99 if let Err(e) = client.ws_subscribe(subscription).await {
100 tracing::error!("Failed to subscribe to trades: {e}");
101 } else {
102 tracing::info!(" ✓ Trades subscription successful");
103 }
104
105 sleep(Duration::from_millis(100)).await;
106
107 tracing::info!("- Subscribing to L2 book");
108 let subscription = SubscriptionRequest::L2Book {
109 coin,
110 n_sig_figs: None,
111 mantissa: None,
112 };
113 if let Err(e) = client.ws_subscribe(subscription).await {
114 tracing::error!("Failed to subscribe to L2 book: {e}");
115 } else {
116 tracing::info!(" ✓ L2 book subscription successful");
117 }
118
119 sleep(Duration::from_millis(100)).await;
120
121 tracing::info!("- Subscribing to best bid/offer");
122 let subscription = SubscriptionRequest::Bbo { coin };
123 if let Err(e) = client.ws_subscribe(subscription).await {
124 tracing::error!("Failed to subscribe to BBO: {e}");
125 } else {
126 tracing::info!(" ✓ BBO subscription successful");
127 }
128
129 sleep(Duration::from_millis(100)).await;
130
131 tracing::info!("- Subscribing to candles");
132 let subscription = SubscriptionRequest::Candle {
133 coin,
134 interval: "1m".to_string(),
135 };
136 if let Err(e) = client.ws_subscribe(subscription).await {
137 tracing::error!("Failed to subscribe to candles: {e}");
138 } else {
139 tracing::info!(" ✓ Candles subscription successful");
140 }
141 }
142 _ => {
143 tracing::error!("Unknown subscription type: {subscription_type}");
144 tracing::info!("Available types: trades, book, candles, allmids, bbo, all");
145 tracing::info!("Usage: {} <subscription_type> <symbol> [testnet]", args[0]);
146 tracing::info!("Example: {} trades BTC testnet", args[0]);
147 return Ok(());
148 }
149 }
150
151 tracing::info!("Subscriptions completed, waiting for data...");
152 tracing::info!("Press CTRL+C to stop");
153
154 let sigint = signal::ctrl_c();
156 pin!(sigint);
157
158 let mut should_close = false;
160 let mut message_count = 0u64;
161
162 loop {
163 tokio::select! {
164 event = client.ws_next_event() => {
165 match event {
166 Some(HyperliquidWsMessage::Trades { data }) => {
167 message_count += 1;
168 tracing::info!(
169 "[Message #{message_count}] Trade update: {} trades",
170 data.len()
171 );
172 for trade in &data {
173 tracing::debug!(
174 coin = %trade.coin,
175 side = %trade.side,
176 px = %trade.px,
177 sz = %trade.sz,
178 time = trade.time,
179 tid = trade.tid,
180 "trade"
181 );
182 }
183 }
184 Some(HyperliquidWsMessage::L2Book { data }) => {
185 message_count += 1;
186 tracing::info!(
187 "[Message #{message_count}] L2 book update: coin={}, levels={}",
188 data.coin,
189 data.levels.len()
190 );
191 tracing::debug!(
192 coin = %data.coin,
193 time = data.time,
194 bids = data.levels[0].len(),
195 asks = data.levels[1].len(),
196 "L2 book"
197 );
198 }
199 Some(HyperliquidWsMessage::Bbo { data }) => {
200 message_count += 1;
201 tracing::info!(
202 "[Message #{message_count}] BBO update: coin={}",
203 data.coin
204 );
205 let bid = data.bbo[0].as_ref().map(|l| l.px.as_str()).unwrap_or("None");
206 let ask = data.bbo[1].as_ref().map(|l| l.px.as_str()).unwrap_or("None");
207 tracing::debug!(
208 coin = %data.coin,
209 bid = bid,
210 ask = ask,
211 time = data.time,
212 "BBO"
213 );
214 }
215 Some(HyperliquidWsMessage::AllMids { data }) => {
216 message_count += 1;
217 tracing::info!(
218 "[Message #{message_count}] All mids update: {} coins",
219 data.mids.len()
220 );
221 for (coin, mid) in &data.mids {
222 tracing::debug!(coin = %coin, mid = %mid, "mid price");
223 }
224 }
225 Some(HyperliquidWsMessage::Candle { data }) => {
226 message_count += 1;
227 tracing::info!("[Message #{message_count}] Candle update");
228 tracing::debug!(
229 symbol = %data.s,
230 interval = %data.i,
231 time = data.t,
232 open = %data.o,
233 high = %data.h,
234 low = %data.l,
235 close = %data.c,
236 volume = %data.v,
237 trades = data.n,
238 "candle data"
239 );
240 }
241 Some(HyperliquidWsMessage::SubscriptionResponse { data: sub_data }) => {
242 tracing::info!(
243 "Subscription response received: method={}, type={:?}",
244 sub_data.method,
245 sub_data.subscription
246 );
247 }
248 Some(HyperliquidWsMessage::Pong) => {
249 tracing::trace!("Received pong");
250 }
251 Some(event) => {
252 message_count += 1;
253 tracing::debug!("[Message #{message_count}] Other message: {event:?}");
254 }
255 None => {
256 tracing::warn!("WebSocket stream ended unexpectedly");
257 break;
258 }
259 }
260 }
261 _ = &mut sigint => {
262 tracing::info!("Received CTRL+C, closing connection...");
263 should_close = true;
264 break;
265 }
266 }
267 }
268
269 if should_close {
270 tracing::info!("Total messages received: {message_count}");
271 client.ws_disconnect().await?;
272 tracing::info!("Connection closed successfully");
273 }
274
275 Ok(())
276}