bitmex_ws_data/
ws_data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::env;
17
18use futures_util::StreamExt;
19use nautilus_bitmex::{http::client::BitmexHttpClient, websocket::client::BitmexWebSocketClient};
20use nautilus_model::{data::bar::BarType, identifiers::InstrumentId};
21use tokio::time::Duration;
22use tracing::level_filters::LevelFilter;
23
24#[tokio::main]
25async fn main() -> Result<(), Box<dyn std::error::Error>> {
26    let log_level = env::var("LOG_LEVEL")
27        .unwrap_or_else(|_| "INFO".to_string())
28        .parse::<LevelFilter>()
29        .unwrap_or(LevelFilter::INFO);
30
31    tracing_subscriber::fmt().with_max_level(log_level).init();
32
33    let args: Vec<String> = env::args().collect();
34    let subscription_type = args.get(1).map_or("all", String::as_str);
35    let symbol = args.get(2).map_or("XBTUSD", String::as_str);
36    let testnet = args.get(3).is_some_and(|s| s == "testnet");
37
38    tracing::info!("Starting Bitmex WebSocket test");
39    tracing::info!("Subscription type: {subscription_type}");
40    tracing::info!("Symbol: {symbol}");
41    tracing::info!("Testnet: {testnet}");
42
43    // Configure URLs
44    let (http_url, ws_url) = if testnet {
45        (
46            Some("https://testnet.bitmex.com".to_string()),
47            Some("wss://ws.testnet.bitmex.com/realtime".to_string()),
48        )
49    } else {
50        (None, None) // Use default production URLs
51    };
52
53    tracing::info!("Fetching instruments from HTTP API...");
54    let http_client = BitmexHttpClient::new(
55        http_url, // base_url
56        None,     // api_key
57        None,     // api_secret
58        testnet,  // testnet
59        Some(60), // timeout_secs
60        None,     // max_retries
61        None,     // retry_delay_ms
62        None,     // retry_delay_max_ms
63        None,     // recv_window_ms
64        None,     // max_requests_per_second
65        None,     // max_requests_per_minute
66        None,     // proxy_url
67    )
68    .expect("Failed to create HTTP client");
69
70    let instruments = http_client
71        .request_instruments(true) // active_only
72        .await?;
73
74    tracing::info!("Fetched {} instruments", instruments.len());
75
76    // Create WebSocket client
77    let mut ws_client = BitmexWebSocketClient::new(
78        ws_url,  // url: defaults to wss://ws.bitmex.com/realtime
79        None,    // No API key for public feeds
80        None,    // No API secret
81        None,    // Account ID
82        Some(5), // 5 second heartbeat
83    )
84    .unwrap();
85    ws_client.cache_instruments(instruments);
86    ws_client.connect().await?;
87
88    // Give the connection a moment to stabilize
89    tokio::time::sleep(Duration::from_millis(500)).await;
90
91    let instrument_id = InstrumentId::from(format!("{symbol}.BITMEX").as_str());
92    tracing::info!("Using instrument_id: {instrument_id}");
93
94    match subscription_type {
95        "quotes" => {
96            tracing::info!("Subscribing to quotes for {instrument_id}");
97            ws_client.subscribe_quotes(instrument_id).await?;
98        }
99        "trades" => {
100            tracing::info!("Subscribing to trades for {instrument_id}");
101            ws_client.subscribe_trades(instrument_id).await?;
102        }
103        "orderbook" | "book" => {
104            tracing::info!("Subscribing to order book L2 for {instrument_id}");
105            ws_client.subscribe_book(instrument_id).await?;
106        }
107        "orderbook25" | "book25" => {
108            tracing::info!("Subscribing to order book L2_25 for {instrument_id}");
109            ws_client.subscribe_book_25(instrument_id).await?;
110        }
111        "depth10" | "book10" => {
112            tracing::info!("Subscribing to order book depth 10 for {instrument_id}");
113            ws_client.subscribe_book_depth10(instrument_id).await?;
114        }
115        "bars" => {
116            let bar_type =
117                BarType::from(format!("{symbol}.BITMEX-1-MINUTE-LAST-EXTERNAL").as_str());
118            tracing::info!("Subscribing to bars: {bar_type}");
119            ws_client.subscribe_bars(bar_type).await?;
120        }
121        "funding" => {
122            tracing::info!("Subscribing to funding rates");
123            // Note: This might need implementation
124            tracing::warn!("Funding rate subscription may not be implemented yet");
125        }
126        "liquidation" => {
127            tracing::info!("Subscribing to liquidations");
128            // Note: This might need implementation
129            tracing::warn!("Liquidation subscription may not be implemented yet");
130        }
131        "all" => {
132            tracing::info!("Subscribing to all available data types for {instrument_id}",);
133
134            tracing::info!("- Subscribing to quotes");
135            if let Err(e) = ws_client.subscribe_quotes(instrument_id).await {
136                tracing::error!("Failed to subscribe to quotes: {e}");
137            } else {
138                tracing::info!("  ✓ Quotes subscription successful");
139            }
140
141            tokio::time::sleep(Duration::from_millis(100)).await;
142
143            tracing::info!("- Subscribing to trades");
144            if let Err(e) = ws_client.subscribe_trades(instrument_id).await {
145                tracing::error!("Failed to subscribe to trades: {e}");
146            } else {
147                tracing::info!("  ✓ Trades subscription successful");
148            }
149
150            tokio::time::sleep(Duration::from_millis(100)).await;
151
152            tracing::info!("- Subscribing to order book L2");
153            if let Err(e) = ws_client.subscribe_book(instrument_id).await {
154                tracing::error!("Failed to subscribe to order book: {e}");
155            } else {
156                tracing::info!("  ✓ Order book L2 subscription successful");
157            }
158
159            tokio::time::sleep(Duration::from_millis(100)).await;
160
161            tracing::info!("- Subscribing to order book L2_25");
162            if let Err(e) = ws_client.subscribe_book_25(instrument_id).await {
163                tracing::error!("Failed to subscribe to order book 25: {e}");
164            } else {
165                tracing::info!("  ✓ Order book L2_25 subscription successful");
166            }
167
168            tokio::time::sleep(Duration::from_millis(100)).await;
169
170            tracing::info!("- Subscribing to order book depth 10");
171            if let Err(e) = ws_client.subscribe_book_depth10(instrument_id).await {
172                tracing::error!("Failed to subscribe to depth 10: {e}");
173            } else {
174                tracing::info!("  ✓ Order book depth 10 subscription successful");
175            }
176
177            tokio::time::sleep(Duration::from_millis(100)).await;
178
179            let bar_type =
180                BarType::from(format!("{symbol}.BITMEX-1-MINUTE-LAST-EXTERNAL").as_str());
181            tracing::info!("- Subscribing to bars: {bar_type}");
182            if let Err(e) = ws_client.subscribe_bars(bar_type).await {
183                tracing::error!("Failed to subscribe to bars: {e}");
184            } else {
185                tracing::info!("  ✓ Bars subscription successful");
186            }
187        }
188        _ => {
189            tracing::error!("Unknown subscription type: {subscription_type}");
190            tracing::info!(
191                "Available types: quotes, trades, orderbook, orderbook25, depth10, bars, funding, liquidation, all"
192            );
193            return Ok(());
194        }
195    }
196
197    tracing::info!("Subscriptions completed, waiting for data...");
198    tracing::info!("Press CTRL+C to stop");
199
200    // Create a future that completes on CTRL+C
201    let sigint = tokio::signal::ctrl_c();
202    tokio::pin!(sigint);
203
204    let stream = ws_client.stream();
205    tokio::pin!(stream); // Pin the stream to allow polling in the loop
206
207    // Use a flag to track if we should close
208    let mut should_close = false;
209    let mut message_count = 0u64;
210
211    loop {
212        tokio::select! {
213            Some(msg) = stream.next() => {
214                message_count += 1;
215                tracing::info!("[Message #{message_count}] {msg:?}");
216            }
217            _ = &mut sigint => {
218                tracing::info!("Received SIGINT, closing connection...");
219                should_close = true;
220                break;
221            }
222            else => {
223                tracing::warn!("Stream ended unexpectedly");
224                break;
225            }
226        }
227    }
228
229    if should_close {
230        tracing::info!("Total messages received: {message_count}");
231        ws_client.close().await?;
232        tracing::info!("Connection closed successfully");
233    }
234
235    Ok(())
236}