bitmex_ws_data/
ws_data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::env;
17
18use futures_util::StreamExt;
19use nautilus_bitmex::{http::client::BitmexHttpClient, websocket::client::BitmexWebSocketClient};
20use nautilus_model::{data::bar::BarType, identifiers::InstrumentId};
21use tokio::time::Duration;
22use tracing::level_filters::LevelFilter;
23
24#[tokio::main]
25async fn main() -> Result<(), Box<dyn std::error::Error>> {
26    let log_level = env::var("LOG_LEVEL")
27        .unwrap_or_else(|_| "INFO".to_string())
28        .parse::<LevelFilter>()
29        .unwrap_or(LevelFilter::INFO);
30
31    tracing_subscriber::fmt().with_max_level(log_level).init();
32
33    let args: Vec<String> = env::args().collect();
34    let subscription_type = args.get(1).map_or("all", String::as_str);
35    let symbol = args.get(2).map_or("XBTUSD", String::as_str);
36    let testnet = args.get(3).is_some_and(|s| s == "testnet");
37
38    tracing::info!("Starting Bitmex WebSocket test");
39    tracing::info!("Subscription type: {subscription_type}");
40    tracing::info!("Symbol: {symbol}");
41    tracing::info!("Testnet: {testnet}");
42
43    // Configure URLs
44    let (http_url, ws_url) = if testnet {
45        (
46            Some("https://testnet.bitmex.com".to_string()),
47            Some("wss://ws.testnet.bitmex.com/realtime".to_string()),
48        )
49    } else {
50        (None, None) // Use default production URLs
51    };
52
53    tracing::info!("Fetching instruments from HTTP API...");
54    let http_client = BitmexHttpClient::new(
55        http_url, // base_url
56        None,     // api_key
57        None,     // api_secret
58        testnet,  // testnet
59        Some(60), // timeout_secs
60        None,     // max_retries
61        None,     // retry_delay_ms
62        None,     // retry_delay_max_ms
63        None,     // recv_window_ms
64        None,     // max_requests_per_second
65        None,     // max_requests_per_minute
66    )
67    .expect("Failed to create HTTP client");
68
69    let instruments = http_client
70        .request_instruments(true) // active_only
71        .await?;
72
73    tracing::info!("Fetched {} instruments", instruments.len());
74
75    // Create WebSocket client
76    let mut ws_client = BitmexWebSocketClient::new(
77        ws_url,  // url: defaults to wss://ws.bitmex.com/realtime
78        None,    // No API key for public feeds
79        None,    // No API secret
80        None,    // Acount ID
81        Some(5), // 5 second heartbeat
82    )
83    .unwrap();
84    ws_client.initialize_instruments_cache(instruments);
85    ws_client.connect().await?;
86
87    // Give the connection a moment to stabilize
88    tokio::time::sleep(Duration::from_millis(500)).await;
89
90    let instrument_id = InstrumentId::from(format!("{symbol}.BITMEX").as_str());
91    tracing::info!("Using instrument_id: {instrument_id}");
92
93    match subscription_type {
94        "quotes" => {
95            tracing::info!("Subscribing to quotes for {instrument_id}");
96            ws_client.subscribe_quotes(instrument_id).await?;
97        }
98        "trades" => {
99            tracing::info!("Subscribing to trades for {instrument_id}");
100            ws_client.subscribe_trades(instrument_id).await?;
101        }
102        "orderbook" | "book" => {
103            tracing::info!("Subscribing to order book L2 for {instrument_id}");
104            ws_client.subscribe_book(instrument_id).await?;
105        }
106        "orderbook25" | "book25" => {
107            tracing::info!("Subscribing to order book L2_25 for {instrument_id}");
108            ws_client.subscribe_book_25(instrument_id).await?;
109        }
110        "depth10" | "book10" => {
111            tracing::info!("Subscribing to order book depth 10 for {instrument_id}");
112            ws_client.subscribe_book_depth10(instrument_id).await?;
113        }
114        "bars" => {
115            let bar_type =
116                BarType::from(format!("{symbol}.BITMEX-1-MINUTE-LAST-EXTERNAL").as_str());
117            tracing::info!("Subscribing to bars: {bar_type}");
118            ws_client.subscribe_bars(bar_type).await?;
119        }
120        "funding" => {
121            tracing::info!("Subscribing to funding rates");
122            // Note: This might need implementation
123            tracing::warn!("Funding rate subscription may not be implemented yet");
124        }
125        "liquidation" => {
126            tracing::info!("Subscribing to liquidations");
127            // Note: This might need implementation
128            tracing::warn!("Liquidation subscription may not be implemented yet");
129        }
130        "all" => {
131            tracing::info!("Subscribing to all available data types for {instrument_id}",);
132
133            tracing::info!("- Subscribing to quotes");
134            if let Err(e) = ws_client.subscribe_quotes(instrument_id).await {
135                tracing::error!("Failed to subscribe to quotes: {e}");
136            } else {
137                tracing::info!("  ✓ Quotes subscription successful");
138            }
139
140            tokio::time::sleep(Duration::from_millis(100)).await;
141
142            tracing::info!("- Subscribing to trades");
143            if let Err(e) = ws_client.subscribe_trades(instrument_id).await {
144                tracing::error!("Failed to subscribe to trades: {e}");
145            } else {
146                tracing::info!("  ✓ Trades subscription successful");
147            }
148
149            tokio::time::sleep(Duration::from_millis(100)).await;
150
151            tracing::info!("- Subscribing to order book L2");
152            if let Err(e) = ws_client.subscribe_book(instrument_id).await {
153                tracing::error!("Failed to subscribe to order book: {e}");
154            } else {
155                tracing::info!("  ✓ Order book L2 subscription successful");
156            }
157
158            tokio::time::sleep(Duration::from_millis(100)).await;
159
160            tracing::info!("- Subscribing to order book L2_25");
161            if let Err(e) = ws_client.subscribe_book_25(instrument_id).await {
162                tracing::error!("Failed to subscribe to order book 25: {e}");
163            } else {
164                tracing::info!("  ✓ Order book L2_25 subscription successful");
165            }
166
167            tokio::time::sleep(Duration::from_millis(100)).await;
168
169            tracing::info!("- Subscribing to order book depth 10");
170            if let Err(e) = ws_client.subscribe_book_depth10(instrument_id).await {
171                tracing::error!("Failed to subscribe to depth 10: {e}");
172            } else {
173                tracing::info!("  ✓ Order book depth 10 subscription successful");
174            }
175
176            tokio::time::sleep(Duration::from_millis(100)).await;
177
178            let bar_type =
179                BarType::from(format!("{symbol}.BITMEX-1-MINUTE-LAST-EXTERNAL").as_str());
180            tracing::info!("- Subscribing to bars: {bar_type}");
181            if let Err(e) = ws_client.subscribe_bars(bar_type).await {
182                tracing::error!("Failed to subscribe to bars: {e}");
183            } else {
184                tracing::info!("  ✓ Bars subscription successful");
185            }
186        }
187        _ => {
188            tracing::error!("Unknown subscription type: {subscription_type}");
189            tracing::info!(
190                "Available types: quotes, trades, orderbook, orderbook25, depth10, bars, funding, liquidation, all"
191            );
192            return Ok(());
193        }
194    }
195
196    tracing::info!("Subscriptions completed, waiting for data...");
197    tracing::info!("Press CTRL+C to stop");
198
199    // Create a future that completes on CTRL+C
200    let sigint = tokio::signal::ctrl_c();
201    tokio::pin!(sigint);
202
203    let stream = ws_client.stream();
204    tokio::pin!(stream); // Pin the stream to allow polling in the loop
205
206    // Use a flag to track if we should close
207    let mut should_close = false;
208    let mut message_count = 0u64;
209
210    loop {
211        tokio::select! {
212            Some(msg) = stream.next() => {
213                message_count += 1;
214                tracing::info!("[Message #{message_count}] {msg:?}");
215            }
216            _ = &mut sigint => {
217                tracing::info!("Received SIGINT, closing connection...");
218                should_close = true;
219                break;
220            }
221            else => {
222                tracing::warn!("Stream ended unexpectedly");
223                break;
224            }
225        }
226    }
227
228    if should_close {
229        tracing::info!("Total messages received: {message_count}");
230        ws_client.close().await?;
231        tracing::info!("Connection closed successfully");
232    }
233
234    Ok(())
235}