bitmex_ws_data/
ws_data.rs1use std::env;
17
18use futures_util::StreamExt;
19use nautilus_bitmex::{http::client::BitmexHttpClient, websocket::client::BitmexWebSocketClient};
20use nautilus_model::{data::bar::BarType, identifiers::InstrumentId};
21use tokio::time::Duration;
22use tracing::level_filters::LevelFilter;
23
24#[tokio::main]
25async fn main() -> Result<(), Box<dyn std::error::Error>> {
26 let log_level = env::var("LOG_LEVEL")
27 .unwrap_or_else(|_| "INFO".to_string())
28 .parse::<LevelFilter>()
29 .unwrap_or(LevelFilter::INFO);
30
31 tracing_subscriber::fmt().with_max_level(log_level).init();
32
33 let args: Vec<String> = env::args().collect();
34 let subscription_type = args.get(1).map_or("all", String::as_str);
35 let symbol = args.get(2).map_or("XBTUSD", String::as_str);
36 let testnet = args.get(3).is_some_and(|s| s == "testnet");
37
38 tracing::info!("Starting Bitmex WebSocket test");
39 tracing::info!("Subscription type: {subscription_type}");
40 tracing::info!("Symbol: {symbol}");
41 tracing::info!("Testnet: {testnet}");
42
43 let (http_url, ws_url) = if testnet {
45 (
46 Some("https://testnet.bitmex.com".to_string()),
47 Some("wss://ws.testnet.bitmex.com/realtime".to_string()),
48 )
49 } else {
50 (None, None) };
52
53 tracing::info!("Fetching instruments from HTTP API...");
54 let http_client = BitmexHttpClient::new(
55 http_url, None, None, testnet, Some(60), None, None, None, None, None, None, None, )
68 .expect("Failed to create HTTP client");
69
70 let instruments = http_client
71 .request_instruments(true) .await?;
73
74 tracing::info!("Fetched {} instruments", instruments.len());
75
76 let mut ws_client = BitmexWebSocketClient::new(
78 ws_url, None, None, None, Some(5), )
84 .unwrap();
85 ws_client.cache_instruments(instruments);
86 ws_client.connect().await?;
87
88 tokio::time::sleep(Duration::from_millis(500)).await;
90
91 let instrument_id = InstrumentId::from(format!("{symbol}.BITMEX").as_str());
92 tracing::info!("Using instrument_id: {instrument_id}");
93
94 match subscription_type {
95 "quotes" => {
96 tracing::info!("Subscribing to quotes for {instrument_id}");
97 ws_client.subscribe_quotes(instrument_id).await?;
98 }
99 "trades" => {
100 tracing::info!("Subscribing to trades for {instrument_id}");
101 ws_client.subscribe_trades(instrument_id).await?;
102 }
103 "orderbook" | "book" => {
104 tracing::info!("Subscribing to order book L2 for {instrument_id}");
105 ws_client.subscribe_book(instrument_id).await?;
106 }
107 "orderbook25" | "book25" => {
108 tracing::info!("Subscribing to order book L2_25 for {instrument_id}");
109 ws_client.subscribe_book_25(instrument_id).await?;
110 }
111 "depth10" | "book10" => {
112 tracing::info!("Subscribing to order book depth 10 for {instrument_id}");
113 ws_client.subscribe_book_depth10(instrument_id).await?;
114 }
115 "bars" => {
116 let bar_type =
117 BarType::from(format!("{symbol}.BITMEX-1-MINUTE-LAST-EXTERNAL").as_str());
118 tracing::info!("Subscribing to bars: {bar_type}");
119 ws_client.subscribe_bars(bar_type).await?;
120 }
121 "funding" => {
122 tracing::info!("Subscribing to funding rates");
123 tracing::warn!("Funding rate subscription may not be implemented yet");
125 }
126 "liquidation" => {
127 tracing::info!("Subscribing to liquidations");
128 tracing::warn!("Liquidation subscription may not be implemented yet");
130 }
131 "all" => {
132 tracing::info!("Subscribing to all available data types for {instrument_id}",);
133
134 tracing::info!("- Subscribing to quotes");
135 if let Err(e) = ws_client.subscribe_quotes(instrument_id).await {
136 tracing::error!("Failed to subscribe to quotes: {e}");
137 } else {
138 tracing::info!(" ✓ Quotes subscription successful");
139 }
140
141 tokio::time::sleep(Duration::from_millis(100)).await;
142
143 tracing::info!("- Subscribing to trades");
144 if let Err(e) = ws_client.subscribe_trades(instrument_id).await {
145 tracing::error!("Failed to subscribe to trades: {e}");
146 } else {
147 tracing::info!(" ✓ Trades subscription successful");
148 }
149
150 tokio::time::sleep(Duration::from_millis(100)).await;
151
152 tracing::info!("- Subscribing to order book L2");
153 if let Err(e) = ws_client.subscribe_book(instrument_id).await {
154 tracing::error!("Failed to subscribe to order book: {e}");
155 } else {
156 tracing::info!(" ✓ Order book L2 subscription successful");
157 }
158
159 tokio::time::sleep(Duration::from_millis(100)).await;
160
161 tracing::info!("- Subscribing to order book L2_25");
162 if let Err(e) = ws_client.subscribe_book_25(instrument_id).await {
163 tracing::error!("Failed to subscribe to order book 25: {e}");
164 } else {
165 tracing::info!(" ✓ Order book L2_25 subscription successful");
166 }
167
168 tokio::time::sleep(Duration::from_millis(100)).await;
169
170 tracing::info!("- Subscribing to order book depth 10");
171 if let Err(e) = ws_client.subscribe_book_depth10(instrument_id).await {
172 tracing::error!("Failed to subscribe to depth 10: {e}");
173 } else {
174 tracing::info!(" ✓ Order book depth 10 subscription successful");
175 }
176
177 tokio::time::sleep(Duration::from_millis(100)).await;
178
179 let bar_type =
180 BarType::from(format!("{symbol}.BITMEX-1-MINUTE-LAST-EXTERNAL").as_str());
181 tracing::info!("- Subscribing to bars: {bar_type}");
182 if let Err(e) = ws_client.subscribe_bars(bar_type).await {
183 tracing::error!("Failed to subscribe to bars: {e}");
184 } else {
185 tracing::info!(" ✓ Bars subscription successful");
186 }
187 }
188 _ => {
189 tracing::error!("Unknown subscription type: {subscription_type}");
190 tracing::info!(
191 "Available types: quotes, trades, orderbook, orderbook25, depth10, bars, funding, liquidation, all"
192 );
193 return Ok(());
194 }
195 }
196
197 tracing::info!("Subscriptions completed, waiting for data...");
198 tracing::info!("Press CTRL+C to stop");
199
200 let sigint = tokio::signal::ctrl_c();
202 tokio::pin!(sigint);
203
204 let stream = ws_client.stream();
205 tokio::pin!(stream); let mut should_close = false;
209 let mut message_count = 0u64;
210
211 loop {
212 tokio::select! {
213 Some(msg) = stream.next() => {
214 message_count += 1;
215 tracing::info!("[Message #{message_count}] {msg:?}");
216 }
217 _ = &mut sigint => {
218 tracing::info!("Received SIGINT, closing connection...");
219 should_close = true;
220 break;
221 }
222 else => {
223 tracing::warn!("Stream ended unexpectedly");
224 break;
225 }
226 }
227 }
228
229 if should_close {
230 tracing::info!("Total messages received: {message_count}");
231 ws_client.close().await?;
232 tracing::info!("Connection closed successfully");
233 }
234
235 Ok(())
236}