bitmex_ws_data/
ws-data.rs1use std::env;
17
18use futures_util::StreamExt;
19use nautilus_bitmex::{http::client::BitmexHttpClient, websocket::client::BitmexWebSocketClient};
20use nautilus_model::{data::bar::BarType, identifiers::InstrumentId};
21use tokio::time::Duration;
22use tracing::level_filters::LevelFilter;
23
24#[tokio::main]
25async fn main() -> Result<(), Box<dyn std::error::Error>> {
26 let log_level = env::var("LOG_LEVEL")
27 .unwrap_or_else(|_| "INFO".to_string())
28 .parse::<LevelFilter>()
29 .unwrap_or(LevelFilter::INFO);
30
31 tracing_subscriber::fmt().with_max_level(log_level).init();
32
33 let args: Vec<String> = env::args().collect();
34 let subscription_type = args.get(1).map_or("all", String::as_str);
35 let symbol = args.get(2).map_or("XBTUSD", String::as_str);
36 let testnet = args.get(3).is_some_and(|s| s == "testnet");
37
38 tracing::info!("Starting Bitmex WebSocket test");
39 tracing::info!("Subscription type: {subscription_type}");
40 tracing::info!("Symbol: {symbol}");
41 tracing::info!("Testnet: {testnet}");
42
43 let (http_url, ws_url) = if testnet {
45 (
46 Some("https://testnet.bitmex.com".to_string()),
47 Some("wss://ws.testnet.bitmex.com/realtime".to_string()),
48 )
49 } else {
50 (None, None) };
52
53 tracing::info!("Fetching instruments from HTTP API...");
54 let http_client = BitmexHttpClient::new(
55 http_url, None, None, testnet, Some(60), );
61
62 let instruments = http_client
63 .request_instruments(true) .await?;
65
66 tracing::info!("Fetched {} instruments", instruments.len());
67
68 let mut ws_client = BitmexWebSocketClient::new(
70 ws_url, None, None, None, Some(5), )
76 .unwrap();
77 ws_client.initialize_instruments_cache(instruments);
78 ws_client.connect().await?;
79
80 tokio::time::sleep(Duration::from_millis(500)).await;
82
83 let instrument_id = InstrumentId::from(format!("{symbol}.BITMEX").as_str());
84 tracing::info!("Using instrument_id: {instrument_id}");
85
86 match subscription_type {
87 "quotes" => {
88 tracing::info!("Subscribing to quotes for {instrument_id}");
89 ws_client.subscribe_quotes(instrument_id).await?;
90 }
91 "trades" => {
92 tracing::info!("Subscribing to trades for {instrument_id}");
93 ws_client.subscribe_trades(instrument_id).await?;
94 }
95 "orderbook" | "book" => {
96 tracing::info!("Subscribing to order book L2 for {instrument_id}");
97 ws_client.subscribe_book(instrument_id).await?;
98 }
99 "orderbook25" | "book25" => {
100 tracing::info!("Subscribing to order book L2_25 for {instrument_id}");
101 ws_client.subscribe_book_25(instrument_id).await?;
102 }
103 "depth10" | "book10" => {
104 tracing::info!("Subscribing to order book depth 10 for {instrument_id}");
105 ws_client.subscribe_book_depth10(instrument_id).await?;
106 }
107 "bars" => {
108 let bar_type =
109 BarType::from(format!("{symbol}.BITMEX-1-MINUTE-LAST-EXTERNAL").as_str());
110 tracing::info!("Subscribing to bars: {bar_type}");
111 ws_client.subscribe_bars(bar_type).await?;
112 }
113 "funding" => {
114 tracing::info!("Subscribing to funding rates");
115 tracing::warn!("Funding rate subscription may not be implemented yet");
117 }
118 "liquidation" => {
119 tracing::info!("Subscribing to liquidations");
120 tracing::warn!("Liquidation subscription may not be implemented yet");
122 }
123 "all" => {
124 tracing::info!("Subscribing to all available data types for {instrument_id}",);
125
126 tracing::info!("- Subscribing to quotes");
127 if let Err(e) = ws_client.subscribe_quotes(instrument_id).await {
128 tracing::error!("Failed to subscribe to quotes: {e}");
129 } else {
130 tracing::info!(" ✓ Quotes subscription successful");
131 }
132
133 tokio::time::sleep(Duration::from_millis(100)).await;
134
135 tracing::info!("- Subscribing to trades");
136 if let Err(e) = ws_client.subscribe_trades(instrument_id).await {
137 tracing::error!("Failed to subscribe to trades: {e}");
138 } else {
139 tracing::info!(" ✓ Trades subscription successful");
140 }
141
142 tokio::time::sleep(Duration::from_millis(100)).await;
143
144 tracing::info!("- Subscribing to order book L2");
145 if let Err(e) = ws_client.subscribe_book(instrument_id).await {
146 tracing::error!("Failed to subscribe to order book: {e}");
147 } else {
148 tracing::info!(" ✓ Order book L2 subscription successful");
149 }
150
151 tokio::time::sleep(Duration::from_millis(100)).await;
152
153 tracing::info!("- Subscribing to order book L2_25");
154 if let Err(e) = ws_client.subscribe_book_25(instrument_id).await {
155 tracing::error!("Failed to subscribe to order book 25: {e}");
156 } else {
157 tracing::info!(" ✓ Order book L2_25 subscription successful");
158 }
159
160 tokio::time::sleep(Duration::from_millis(100)).await;
161
162 tracing::info!("- Subscribing to order book depth 10");
163 if let Err(e) = ws_client.subscribe_book_depth10(instrument_id).await {
164 tracing::error!("Failed to subscribe to depth 10: {e}");
165 } else {
166 tracing::info!(" ✓ Order book depth 10 subscription successful");
167 }
168
169 tokio::time::sleep(Duration::from_millis(100)).await;
170
171 let bar_type =
172 BarType::from(format!("{symbol}.BITMEX-1-MINUTE-LAST-EXTERNAL").as_str());
173 tracing::info!("- Subscribing to bars: {bar_type}");
174 if let Err(e) = ws_client.subscribe_bars(bar_type).await {
175 tracing::error!("Failed to subscribe to bars: {e}");
176 } else {
177 tracing::info!(" ✓ Bars subscription successful");
178 }
179 }
180 _ => {
181 tracing::error!("Unknown subscription type: {subscription_type}");
182 tracing::info!(
183 "Available types: quotes, trades, orderbook, orderbook25, depth10, bars, funding, liquidation, all"
184 );
185 return Ok(());
186 }
187 }
188
189 tracing::info!("Subscriptions completed, waiting for data...");
190 tracing::info!("Press CTRL+C to stop");
191
192 let sigint = tokio::signal::ctrl_c();
194 tokio::pin!(sigint);
195
196 let stream = ws_client.stream();
197 tokio::pin!(stream); let mut should_close = false;
201 let mut message_count = 0u64;
202
203 loop {
204 tokio::select! {
205 Some(msg) = stream.next() => {
206 message_count += 1;
207 tracing::info!("[Message #{message_count}] {msg:?}");
208 }
209 _ = &mut sigint => {
210 tracing::info!("Received SIGINT, closing connection...");
211 should_close = true;
212 break;
213 }
214 else => {
215 tracing::warn!("Stream ended unexpectedly");
216 break;
217 }
218 }
219 }
220
221 if should_close {
222 tracing::info!("Total messages received: {message_count}");
223 ws_client.close().await?;
224 tracing::info!("Connection closed successfully");
225 }
226
227 Ok(())
228}