bitmex_ws_data/
ws_data.rs1use std::env;
17
18use futures_util::StreamExt;
19use nautilus_bitmex::{http::client::BitmexHttpClient, websocket::client::BitmexWebSocketClient};
20use nautilus_model::{data::bar::BarType, identifiers::InstrumentId};
21use tokio::time::Duration;
22use tracing::level_filters::LevelFilter;
23
24#[tokio::main]
25async fn main() -> Result<(), Box<dyn std::error::Error>> {
26 let log_level = env::var("LOG_LEVEL")
27 .unwrap_or_else(|_| "INFO".to_string())
28 .parse::<LevelFilter>()
29 .unwrap_or(LevelFilter::INFO);
30
31 tracing_subscriber::fmt().with_max_level(log_level).init();
32
33 let args: Vec<String> = env::args().collect();
34 let subscription_type = args.get(1).map_or("all", String::as_str);
35 let symbol = args.get(2).map_or("XBTUSD", String::as_str);
36 let testnet = args.get(3).is_some_and(|s| s == "testnet");
37
38 tracing::info!("Starting Bitmex WebSocket test");
39 tracing::info!("Subscription type: {subscription_type}");
40 tracing::info!("Symbol: {symbol}");
41 tracing::info!("Testnet: {testnet}");
42
43 let (http_url, ws_url) = if testnet {
45 (
46 Some("https://testnet.bitmex.com".to_string()),
47 Some("wss://ws.testnet.bitmex.com/realtime".to_string()),
48 )
49 } else {
50 (None, None) };
52
53 tracing::info!("Fetching instruments from HTTP API...");
54 let http_client = BitmexHttpClient::new(
55 http_url, None, None, testnet, Some(60), None, None, None, None, None, None, )
67 .expect("Failed to create HTTP client");
68
69 let instruments = http_client
70 .request_instruments(true) .await?;
72
73 tracing::info!("Fetched {} instruments", instruments.len());
74
75 let mut ws_client = BitmexWebSocketClient::new(
77 ws_url, None, None, None, Some(5), )
83 .unwrap();
84 ws_client.initialize_instruments_cache(instruments);
85 ws_client.connect().await?;
86
87 tokio::time::sleep(Duration::from_millis(500)).await;
89
90 let instrument_id = InstrumentId::from(format!("{symbol}.BITMEX").as_str());
91 tracing::info!("Using instrument_id: {instrument_id}");
92
93 match subscription_type {
94 "quotes" => {
95 tracing::info!("Subscribing to quotes for {instrument_id}");
96 ws_client.subscribe_quotes(instrument_id).await?;
97 }
98 "trades" => {
99 tracing::info!("Subscribing to trades for {instrument_id}");
100 ws_client.subscribe_trades(instrument_id).await?;
101 }
102 "orderbook" | "book" => {
103 tracing::info!("Subscribing to order book L2 for {instrument_id}");
104 ws_client.subscribe_book(instrument_id).await?;
105 }
106 "orderbook25" | "book25" => {
107 tracing::info!("Subscribing to order book L2_25 for {instrument_id}");
108 ws_client.subscribe_book_25(instrument_id).await?;
109 }
110 "depth10" | "book10" => {
111 tracing::info!("Subscribing to order book depth 10 for {instrument_id}");
112 ws_client.subscribe_book_depth10(instrument_id).await?;
113 }
114 "bars" => {
115 let bar_type =
116 BarType::from(format!("{symbol}.BITMEX-1-MINUTE-LAST-EXTERNAL").as_str());
117 tracing::info!("Subscribing to bars: {bar_type}");
118 ws_client.subscribe_bars(bar_type).await?;
119 }
120 "funding" => {
121 tracing::info!("Subscribing to funding rates");
122 tracing::warn!("Funding rate subscription may not be implemented yet");
124 }
125 "liquidation" => {
126 tracing::info!("Subscribing to liquidations");
127 tracing::warn!("Liquidation subscription may not be implemented yet");
129 }
130 "all" => {
131 tracing::info!("Subscribing to all available data types for {instrument_id}",);
132
133 tracing::info!("- Subscribing to quotes");
134 if let Err(e) = ws_client.subscribe_quotes(instrument_id).await {
135 tracing::error!("Failed to subscribe to quotes: {e}");
136 } else {
137 tracing::info!(" ✓ Quotes subscription successful");
138 }
139
140 tokio::time::sleep(Duration::from_millis(100)).await;
141
142 tracing::info!("- Subscribing to trades");
143 if let Err(e) = ws_client.subscribe_trades(instrument_id).await {
144 tracing::error!("Failed to subscribe to trades: {e}");
145 } else {
146 tracing::info!(" ✓ Trades subscription successful");
147 }
148
149 tokio::time::sleep(Duration::from_millis(100)).await;
150
151 tracing::info!("- Subscribing to order book L2");
152 if let Err(e) = ws_client.subscribe_book(instrument_id).await {
153 tracing::error!("Failed to subscribe to order book: {e}");
154 } else {
155 tracing::info!(" ✓ Order book L2 subscription successful");
156 }
157
158 tokio::time::sleep(Duration::from_millis(100)).await;
159
160 tracing::info!("- Subscribing to order book L2_25");
161 if let Err(e) = ws_client.subscribe_book_25(instrument_id).await {
162 tracing::error!("Failed to subscribe to order book 25: {e}");
163 } else {
164 tracing::info!(" ✓ Order book L2_25 subscription successful");
165 }
166
167 tokio::time::sleep(Duration::from_millis(100)).await;
168
169 tracing::info!("- Subscribing to order book depth 10");
170 if let Err(e) = ws_client.subscribe_book_depth10(instrument_id).await {
171 tracing::error!("Failed to subscribe to depth 10: {e}");
172 } else {
173 tracing::info!(" ✓ Order book depth 10 subscription successful");
174 }
175
176 tokio::time::sleep(Duration::from_millis(100)).await;
177
178 let bar_type =
179 BarType::from(format!("{symbol}.BITMEX-1-MINUTE-LAST-EXTERNAL").as_str());
180 tracing::info!("- Subscribing to bars: {bar_type}");
181 if let Err(e) = ws_client.subscribe_bars(bar_type).await {
182 tracing::error!("Failed to subscribe to bars: {e}");
183 } else {
184 tracing::info!(" ✓ Bars subscription successful");
185 }
186 }
187 _ => {
188 tracing::error!("Unknown subscription type: {subscription_type}");
189 tracing::info!(
190 "Available types: quotes, trades, orderbook, orderbook25, depth10, bars, funding, liquidation, all"
191 );
192 return Ok(());
193 }
194 }
195
196 tracing::info!("Subscriptions completed, waiting for data...");
197 tracing::info!("Press CTRL+C to stop");
198
199 let sigint = tokio::signal::ctrl_c();
201 tokio::pin!(sigint);
202
203 let stream = ws_client.stream();
204 tokio::pin!(stream); let mut should_close = false;
208 let mut message_count = 0u64;
209
210 loop {
211 tokio::select! {
212 Some(msg) = stream.next() => {
213 message_count += 1;
214 tracing::info!("[Message #{message_count}] {msg:?}");
215 }
216 _ = &mut sigint => {
217 tracing::info!("Received SIGINT, closing connection...");
218 should_close = true;
219 break;
220 }
221 else => {
222 tracing::warn!("Stream ended unexpectedly");
223 break;
224 }
225 }
226 }
227
228 if should_close {
229 tracing::info!("Total messages received: {message_count}");
230 ws_client.close().await?;
231 tracing::info!("Connection closed successfully");
232 }
233
234 Ok(())
235}