hyperliquid_ws_data/
ws_data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{env, error::Error, time::Duration};
17
18use nautilus_hyperliquid::{
19    common::consts::ws_url,
20    websocket::{
21        client::HyperliquidWebSocketInnerClient,
22        messages::{HyperliquidWsMessage, SubscriptionRequest},
23    },
24};
25use tokio::{pin, signal, time::sleep};
26use tracing::level_filters::LevelFilter;
27use ustr::Ustr;
28
29#[tokio::main]
30async fn main() -> Result<(), Box<dyn Error>> {
31    let log_level = env::var("LOG_LEVEL")
32        .unwrap_or_else(|_| "INFO".to_string())
33        .parse::<LevelFilter>()
34        .unwrap_or(LevelFilter::INFO);
35
36    tracing_subscriber::fmt()
37        .with_max_level(log_level)
38        .with_target(false)
39        .compact()
40        .init();
41
42    let args: Vec<String> = env::args().collect();
43    let subscription_type = args.get(1).map_or("all", String::as_str);
44    let symbol = args.get(2).map_or("BTC", String::as_str);
45    let testnet = args.get(3).is_some_and(|s| s == "testnet");
46
47    tracing::info!("Starting Hyperliquid WebSocket test");
48    tracing::info!("Subscription type: {subscription_type}");
49    tracing::info!("Symbol: {symbol}");
50    tracing::info!("Network: {}", if testnet { "testnet" } else { "mainnet" });
51
52    let url = ws_url(testnet);
53    let mut client = HyperliquidWebSocketInnerClient::connect(url).await?;
54
55    // Give the connection a moment to stabilize
56    sleep(Duration::from_millis(500)).await;
57
58    let coin = Ustr::from(symbol);
59    tracing::info!("Using symbol: {symbol}");
60
61    match subscription_type {
62        "trades" => {
63            tracing::info!("Subscribing to trades for {symbol}");
64            let subscription = SubscriptionRequest::Trades { coin };
65            client.ws_subscribe(subscription).await?;
66        }
67        "book" | "l2book" | "orderbook" => {
68            tracing::info!("Subscribing to L2 book for {symbol}");
69            let subscription = SubscriptionRequest::L2Book {
70                coin,
71                n_sig_figs: None,
72                mantissa: None,
73            };
74            client.ws_subscribe(subscription).await?;
75        }
76        "candles" | "klines" => {
77            tracing::info!("Subscribing to candles for {symbol}");
78            let subscription = SubscriptionRequest::Candle {
79                coin,
80                interval: "1m".to_string(),
81            };
82            client.ws_subscribe(subscription).await?;
83        }
84        "allmids" => {
85            tracing::info!("Subscribing to all mids");
86            let subscription = SubscriptionRequest::AllMids { dex: None };
87            client.ws_subscribe(subscription).await?;
88        }
89        "bbo" => {
90            tracing::info!("Subscribing to best bid/offer for {symbol}");
91            let subscription = SubscriptionRequest::Bbo { coin };
92            client.ws_subscribe(subscription).await?;
93        }
94        "all" => {
95            tracing::info!("Subscribing to all available data types for {symbol}");
96
97            tracing::info!("- Subscribing to trades");
98            let subscription = SubscriptionRequest::Trades { coin };
99            if let Err(e) = client.ws_subscribe(subscription).await {
100                tracing::error!("Failed to subscribe to trades: {e}");
101            } else {
102                tracing::info!("  ✓ Trades subscription successful");
103            }
104
105            sleep(Duration::from_millis(100)).await;
106
107            tracing::info!("- Subscribing to L2 book");
108            let subscription = SubscriptionRequest::L2Book {
109                coin,
110                n_sig_figs: None,
111                mantissa: None,
112            };
113            if let Err(e) = client.ws_subscribe(subscription).await {
114                tracing::error!("Failed to subscribe to L2 book: {e}");
115            } else {
116                tracing::info!("  ✓ L2 book subscription successful");
117            }
118
119            sleep(Duration::from_millis(100)).await;
120
121            tracing::info!("- Subscribing to best bid/offer");
122            let subscription = SubscriptionRequest::Bbo { coin };
123            if let Err(e) = client.ws_subscribe(subscription).await {
124                tracing::error!("Failed to subscribe to BBO: {e}");
125            } else {
126                tracing::info!("  ✓ BBO subscription successful");
127            }
128
129            sleep(Duration::from_millis(100)).await;
130
131            tracing::info!("- Subscribing to candles");
132            let subscription = SubscriptionRequest::Candle {
133                coin,
134                interval: "1m".to_string(),
135            };
136            if let Err(e) = client.ws_subscribe(subscription).await {
137                tracing::error!("Failed to subscribe to candles: {e}");
138            } else {
139                tracing::info!("  ✓ Candles subscription successful");
140            }
141        }
142        _ => {
143            tracing::error!("Unknown subscription type: {subscription_type}");
144            tracing::info!("Available types: trades, book, candles, allmids, bbo, all");
145            tracing::info!("Usage: {} <subscription_type> <symbol> [testnet]", args[0]);
146            tracing::info!("Example: {} trades BTC testnet", args[0]);
147            return Ok(());
148        }
149    }
150
151    tracing::info!("Subscriptions completed, waiting for data...");
152    tracing::info!("Press CTRL+C to stop");
153
154    // Create a future that completes on CTRL+C
155    let sigint = signal::ctrl_c();
156    pin!(sigint);
157
158    // Use a flag to track if we should close
159    let mut should_close = false;
160    let mut message_count = 0u64;
161
162    loop {
163        tokio::select! {
164            event = client.ws_next_event() => {
165                match event {
166                    Some(HyperliquidWsMessage::Trades { data }) => {
167                        message_count += 1;
168                        tracing::info!(
169                            "[Message #{message_count}] Trade update: {} trades",
170                            data.len()
171                        );
172                        for trade in &data {
173                            tracing::debug!(
174                                coin = %trade.coin,
175                                side = %trade.side,
176                                px = %trade.px,
177                                sz = %trade.sz,
178                                time = trade.time,
179                                tid = trade.tid,
180                                "trade"
181                            );
182                        }
183                    }
184                    Some(HyperliquidWsMessage::L2Book { data }) => {
185                        message_count += 1;
186                        tracing::info!(
187                            "[Message #{message_count}] L2 book update: coin={}, levels={}",
188                            data.coin,
189                            data.levels.len()
190                        );
191                        tracing::debug!(
192                            coin = %data.coin,
193                            time = data.time,
194                            bids = data.levels[0].len(),
195                            asks = data.levels[1].len(),
196                            "L2 book"
197                        );
198                    }
199                    Some(HyperliquidWsMessage::Bbo { data }) => {
200                        message_count += 1;
201                        tracing::info!(
202                            "[Message #{message_count}] BBO update: coin={}",
203                            data.coin
204                        );
205                        let bid = data.bbo[0].as_ref().map(|l| l.px.as_str()).unwrap_or("None");
206                        let ask = data.bbo[1].as_ref().map(|l| l.px.as_str()).unwrap_or("None");
207                        tracing::debug!(
208                            coin = %data.coin,
209                            bid = bid,
210                            ask = ask,
211                            time = data.time,
212                            "BBO"
213                        );
214                    }
215                    Some(HyperliquidWsMessage::AllMids { data }) => {
216                        message_count += 1;
217                        tracing::info!(
218                            "[Message #{message_count}] All mids update: {} coins",
219                            data.mids.len()
220                        );
221                        for (coin, mid) in &data.mids {
222                            tracing::debug!(coin = %coin, mid = %mid, "mid price");
223                        }
224                    }
225                    Some(HyperliquidWsMessage::Candle { data }) => {
226                        message_count += 1;
227                        tracing::info!("[Message #{message_count}] Candle update");
228                        tracing::debug!(
229                            symbol = %data.s,
230                            interval = %data.i,
231                            time = data.t,
232                            open = %data.o,
233                            high = %data.h,
234                            low = %data.l,
235                            close = %data.c,
236                            volume = %data.v,
237                            trades = data.n,
238                            "candle data"
239                        );
240                    }
241                    Some(HyperliquidWsMessage::SubscriptionResponse { data: sub_data }) => {
242                        tracing::info!(
243                            "Subscription response received: method={}, type={:?}",
244                            sub_data.method,
245                            sub_data.subscription
246                        );
247                    }
248                    Some(HyperliquidWsMessage::Pong) => {
249                        tracing::trace!("Received pong");
250                    }
251                    Some(event) => {
252                        message_count += 1;
253                        tracing::debug!("[Message #{message_count}] Other message: {event:?}");
254                    }
255                    None => {
256                        tracing::warn!("WebSocket stream ended unexpectedly");
257                        break;
258                    }
259                }
260            }
261            _ = &mut sigint => {
262                tracing::info!("Received CTRL+C, closing connection...");
263                should_close = true;
264                break;
265            }
266        }
267    }
268
269    if should_close {
270        tracing::info!("Total messages received: {message_count}");
271        client.ws_disconnect().await?;
272        tracing::info!("Connection closed successfully");
273    }
274
275    Ok(())
276}