bitmex_ws_data/
ws-data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::env;
17
18use futures_util::StreamExt;
19use nautilus_bitmex::{http::client::BitmexHttpClient, websocket::client::BitmexWebSocketClient};
20use nautilus_model::{data::bar::BarType, identifiers::InstrumentId};
21use tokio::time::Duration;
22use tracing::level_filters::LevelFilter;
23
24#[tokio::main]
25async fn main() -> Result<(), Box<dyn std::error::Error>> {
26    let log_level = env::var("LOG_LEVEL")
27        .unwrap_or_else(|_| "INFO".to_string())
28        .parse::<LevelFilter>()
29        .unwrap_or(LevelFilter::INFO);
30
31    tracing_subscriber::fmt().with_max_level(log_level).init();
32
33    let args: Vec<String> = env::args().collect();
34    let subscription_type = args.get(1).map_or("all", String::as_str);
35    let symbol = args.get(2).map_or("XBTUSD", String::as_str);
36    let testnet = args.get(3).is_some_and(|s| s == "testnet");
37
38    tracing::info!("Starting Bitmex WebSocket test");
39    tracing::info!("Subscription type: {subscription_type}");
40    tracing::info!("Symbol: {symbol}");
41    tracing::info!("Testnet: {testnet}");
42
43    // Configure URLs
44    let (http_url, ws_url) = if testnet {
45        (
46            Some("https://testnet.bitmex.com".to_string()),
47            Some("wss://ws.testnet.bitmex.com/realtime".to_string()),
48        )
49    } else {
50        (None, None) // Use default production URLs
51    };
52
53    tracing::info!("Fetching instruments from HTTP API...");
54    let http_client = BitmexHttpClient::new(
55        http_url, // base_url
56        None,     // api_key
57        None,     // api_secret
58        testnet,  // testnet
59        Some(60), // timeout_secs
60        None,     // max_retries
61        None,     // retry_delay_ms
62        None,     // retry_delay_max_ms
63    )
64    .expect("Failed to create HTTP client");
65
66    let instruments = http_client
67        .request_instruments(true) // active_only
68        .await?;
69
70    tracing::info!("Fetched {} instruments", instruments.len());
71
72    // Create WebSocket client
73    let mut ws_client = BitmexWebSocketClient::new(
74        ws_url,  // url: defaults to wss://ws.bitmex.com/realtime
75        None,    // No API key for public feeds
76        None,    // No API secret
77        None,    // Acount ID
78        Some(5), // 5 second heartbeat
79    )
80    .unwrap();
81    ws_client.initialize_instruments_cache(instruments);
82    ws_client.connect().await?;
83
84    // Give the connection a moment to stabilize
85    tokio::time::sleep(Duration::from_millis(500)).await;
86
87    let instrument_id = InstrumentId::from(format!("{symbol}.BITMEX").as_str());
88    tracing::info!("Using instrument_id: {instrument_id}");
89
90    match subscription_type {
91        "quotes" => {
92            tracing::info!("Subscribing to quotes for {instrument_id}");
93            ws_client.subscribe_quotes(instrument_id).await?;
94        }
95        "trades" => {
96            tracing::info!("Subscribing to trades for {instrument_id}");
97            ws_client.subscribe_trades(instrument_id).await?;
98        }
99        "orderbook" | "book" => {
100            tracing::info!("Subscribing to order book L2 for {instrument_id}");
101            ws_client.subscribe_book(instrument_id).await?;
102        }
103        "orderbook25" | "book25" => {
104            tracing::info!("Subscribing to order book L2_25 for {instrument_id}");
105            ws_client.subscribe_book_25(instrument_id).await?;
106        }
107        "depth10" | "book10" => {
108            tracing::info!("Subscribing to order book depth 10 for {instrument_id}");
109            ws_client.subscribe_book_depth10(instrument_id).await?;
110        }
111        "bars" => {
112            let bar_type =
113                BarType::from(format!("{symbol}.BITMEX-1-MINUTE-LAST-EXTERNAL").as_str());
114            tracing::info!("Subscribing to bars: {bar_type}");
115            ws_client.subscribe_bars(bar_type).await?;
116        }
117        "funding" => {
118            tracing::info!("Subscribing to funding rates");
119            // Note: This might need implementation
120            tracing::warn!("Funding rate subscription may not be implemented yet");
121        }
122        "liquidation" => {
123            tracing::info!("Subscribing to liquidations");
124            // Note: This might need implementation
125            tracing::warn!("Liquidation subscription may not be implemented yet");
126        }
127        "all" => {
128            tracing::info!("Subscribing to all available data types for {instrument_id}",);
129
130            tracing::info!("- Subscribing to quotes");
131            if let Err(e) = ws_client.subscribe_quotes(instrument_id).await {
132                tracing::error!("Failed to subscribe to quotes: {e}");
133            } else {
134                tracing::info!("  ✓ Quotes subscription successful");
135            }
136
137            tokio::time::sleep(Duration::from_millis(100)).await;
138
139            tracing::info!("- Subscribing to trades");
140            if let Err(e) = ws_client.subscribe_trades(instrument_id).await {
141                tracing::error!("Failed to subscribe to trades: {e}");
142            } else {
143                tracing::info!("  ✓ Trades subscription successful");
144            }
145
146            tokio::time::sleep(Duration::from_millis(100)).await;
147
148            tracing::info!("- Subscribing to order book L2");
149            if let Err(e) = ws_client.subscribe_book(instrument_id).await {
150                tracing::error!("Failed to subscribe to order book: {e}");
151            } else {
152                tracing::info!("  ✓ Order book L2 subscription successful");
153            }
154
155            tokio::time::sleep(Duration::from_millis(100)).await;
156
157            tracing::info!("- Subscribing to order book L2_25");
158            if let Err(e) = ws_client.subscribe_book_25(instrument_id).await {
159                tracing::error!("Failed to subscribe to order book 25: {e}");
160            } else {
161                tracing::info!("  ✓ Order book L2_25 subscription successful");
162            }
163
164            tokio::time::sleep(Duration::from_millis(100)).await;
165
166            tracing::info!("- Subscribing to order book depth 10");
167            if let Err(e) = ws_client.subscribe_book_depth10(instrument_id).await {
168                tracing::error!("Failed to subscribe to depth 10: {e}");
169            } else {
170                tracing::info!("  ✓ Order book depth 10 subscription successful");
171            }
172
173            tokio::time::sleep(Duration::from_millis(100)).await;
174
175            let bar_type =
176                BarType::from(format!("{symbol}.BITMEX-1-MINUTE-LAST-EXTERNAL").as_str());
177            tracing::info!("- Subscribing to bars: {bar_type}");
178            if let Err(e) = ws_client.subscribe_bars(bar_type).await {
179                tracing::error!("Failed to subscribe to bars: {e}");
180            } else {
181                tracing::info!("  ✓ Bars subscription successful");
182            }
183        }
184        _ => {
185            tracing::error!("Unknown subscription type: {subscription_type}");
186            tracing::info!(
187                "Available types: quotes, trades, orderbook, orderbook25, depth10, bars, funding, liquidation, all"
188            );
189            return Ok(());
190        }
191    }
192
193    tracing::info!("Subscriptions completed, waiting for data...");
194    tracing::info!("Press CTRL+C to stop");
195
196    // Create a future that completes on CTRL+C
197    let sigint = tokio::signal::ctrl_c();
198    tokio::pin!(sigint);
199
200    let stream = ws_client.stream();
201    tokio::pin!(stream); // Pin the stream to allow polling in the loop
202
203    // Use a flag to track if we should close
204    let mut should_close = false;
205    let mut message_count = 0u64;
206
207    loop {
208        tokio::select! {
209            Some(msg) = stream.next() => {
210                message_count += 1;
211                tracing::info!("[Message #{message_count}] {msg:?}");
212            }
213            _ = &mut sigint => {
214                tracing::info!("Received SIGINT, closing connection...");
215                should_close = true;
216                break;
217            }
218            else => {
219                tracing::warn!("Stream ended unexpectedly");
220                break;
221            }
222        }
223    }
224
225    if should_close {
226        tracing::info!("Total messages received: {message_count}");
227        ws_client.close().await?;
228        tracing::info!("Connection closed successfully");
229    }
230
231    Ok(())
232}