bitmex_ws_data/
ws-data.rs1use std::env;
17
18use futures_util::StreamExt;
19use nautilus_bitmex::{http::client::BitmexHttpClient, websocket::client::BitmexWebSocketClient};
20use nautilus_model::{data::bar::BarType, identifiers::InstrumentId};
21use tokio::time::Duration;
22use tracing::level_filters::LevelFilter;
23
24#[tokio::main]
25async fn main() -> Result<(), Box<dyn std::error::Error>> {
26 let log_level = env::var("LOG_LEVEL")
27 .unwrap_or_else(|_| "INFO".to_string())
28 .parse::<LevelFilter>()
29 .unwrap_or(LevelFilter::INFO);
30
31 tracing_subscriber::fmt().with_max_level(log_level).init();
32
33 let args: Vec<String> = env::args().collect();
34 let subscription_type = args.get(1).map_or("all", String::as_str);
35 let symbol = args.get(2).map_or("XBTUSD", String::as_str);
36 let testnet = args.get(3).is_some_and(|s| s == "testnet");
37
38 tracing::info!("Starting Bitmex WebSocket test");
39 tracing::info!("Subscription type: {subscription_type}");
40 tracing::info!("Symbol: {symbol}");
41 tracing::info!("Testnet: {testnet}");
42
43 let (http_url, ws_url) = if testnet {
45 (
46 Some("https://testnet.bitmex.com".to_string()),
47 Some("wss://ws.testnet.bitmex.com/realtime".to_string()),
48 )
49 } else {
50 (None, None) };
52
53 tracing::info!("Fetching instruments from HTTP API...");
54 let http_client = BitmexHttpClient::new(
55 http_url, None, None, testnet, Some(60), None, None, None, )
64 .expect("Failed to create HTTP client");
65
66 let instruments = http_client
67 .request_instruments(true) .await?;
69
70 tracing::info!("Fetched {} instruments", instruments.len());
71
72 let mut ws_client = BitmexWebSocketClient::new(
74 ws_url, None, None, None, Some(5), )
80 .unwrap();
81 ws_client.initialize_instruments_cache(instruments);
82 ws_client.connect().await?;
83
84 tokio::time::sleep(Duration::from_millis(500)).await;
86
87 let instrument_id = InstrumentId::from(format!("{symbol}.BITMEX").as_str());
88 tracing::info!("Using instrument_id: {instrument_id}");
89
90 match subscription_type {
91 "quotes" => {
92 tracing::info!("Subscribing to quotes for {instrument_id}");
93 ws_client.subscribe_quotes(instrument_id).await?;
94 }
95 "trades" => {
96 tracing::info!("Subscribing to trades for {instrument_id}");
97 ws_client.subscribe_trades(instrument_id).await?;
98 }
99 "orderbook" | "book" => {
100 tracing::info!("Subscribing to order book L2 for {instrument_id}");
101 ws_client.subscribe_book(instrument_id).await?;
102 }
103 "orderbook25" | "book25" => {
104 tracing::info!("Subscribing to order book L2_25 for {instrument_id}");
105 ws_client.subscribe_book_25(instrument_id).await?;
106 }
107 "depth10" | "book10" => {
108 tracing::info!("Subscribing to order book depth 10 for {instrument_id}");
109 ws_client.subscribe_book_depth10(instrument_id).await?;
110 }
111 "bars" => {
112 let bar_type =
113 BarType::from(format!("{symbol}.BITMEX-1-MINUTE-LAST-EXTERNAL").as_str());
114 tracing::info!("Subscribing to bars: {bar_type}");
115 ws_client.subscribe_bars(bar_type).await?;
116 }
117 "funding" => {
118 tracing::info!("Subscribing to funding rates");
119 tracing::warn!("Funding rate subscription may not be implemented yet");
121 }
122 "liquidation" => {
123 tracing::info!("Subscribing to liquidations");
124 tracing::warn!("Liquidation subscription may not be implemented yet");
126 }
127 "all" => {
128 tracing::info!("Subscribing to all available data types for {instrument_id}",);
129
130 tracing::info!("- Subscribing to quotes");
131 if let Err(e) = ws_client.subscribe_quotes(instrument_id).await {
132 tracing::error!("Failed to subscribe to quotes: {e}");
133 } else {
134 tracing::info!(" ✓ Quotes subscription successful");
135 }
136
137 tokio::time::sleep(Duration::from_millis(100)).await;
138
139 tracing::info!("- Subscribing to trades");
140 if let Err(e) = ws_client.subscribe_trades(instrument_id).await {
141 tracing::error!("Failed to subscribe to trades: {e}");
142 } else {
143 tracing::info!(" ✓ Trades subscription successful");
144 }
145
146 tokio::time::sleep(Duration::from_millis(100)).await;
147
148 tracing::info!("- Subscribing to order book L2");
149 if let Err(e) = ws_client.subscribe_book(instrument_id).await {
150 tracing::error!("Failed to subscribe to order book: {e}");
151 } else {
152 tracing::info!(" ✓ Order book L2 subscription successful");
153 }
154
155 tokio::time::sleep(Duration::from_millis(100)).await;
156
157 tracing::info!("- Subscribing to order book L2_25");
158 if let Err(e) = ws_client.subscribe_book_25(instrument_id).await {
159 tracing::error!("Failed to subscribe to order book 25: {e}");
160 } else {
161 tracing::info!(" ✓ Order book L2_25 subscription successful");
162 }
163
164 tokio::time::sleep(Duration::from_millis(100)).await;
165
166 tracing::info!("- Subscribing to order book depth 10");
167 if let Err(e) = ws_client.subscribe_book_depth10(instrument_id).await {
168 tracing::error!("Failed to subscribe to depth 10: {e}");
169 } else {
170 tracing::info!(" ✓ Order book depth 10 subscription successful");
171 }
172
173 tokio::time::sleep(Duration::from_millis(100)).await;
174
175 let bar_type =
176 BarType::from(format!("{symbol}.BITMEX-1-MINUTE-LAST-EXTERNAL").as_str());
177 tracing::info!("- Subscribing to bars: {bar_type}");
178 if let Err(e) = ws_client.subscribe_bars(bar_type).await {
179 tracing::error!("Failed to subscribe to bars: {e}");
180 } else {
181 tracing::info!(" ✓ Bars subscription successful");
182 }
183 }
184 _ => {
185 tracing::error!("Unknown subscription type: {subscription_type}");
186 tracing::info!(
187 "Available types: quotes, trades, orderbook, orderbook25, depth10, bars, funding, liquidation, all"
188 );
189 return Ok(());
190 }
191 }
192
193 tracing::info!("Subscriptions completed, waiting for data...");
194 tracing::info!("Press CTRL+C to stop");
195
196 let sigint = tokio::signal::ctrl_c();
198 tokio::pin!(sigint);
199
200 let stream = ws_client.stream();
201 tokio::pin!(stream); let mut should_close = false;
205 let mut message_count = 0u64;
206
207 loop {
208 tokio::select! {
209 Some(msg) = stream.next() => {
210 message_count += 1;
211 tracing::info!("[Message #{message_count}] {msg:?}");
212 }
213 _ = &mut sigint => {
214 tracing::info!("Received SIGINT, closing connection...");
215 should_close = true;
216 break;
217 }
218 else => {
219 tracing::warn!("Stream ended unexpectedly");
220 break;
221 }
222 }
223 }
224
225 if should_close {
226 tracing::info!("Total messages received: {message_count}");
227 ws_client.close().await?;
228 tracing::info!("Connection closed successfully");
229 }
230
231 Ok(())
232}