bitmex_ws_data/
ws-data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::env;
17
18use futures_util::StreamExt;
19use nautilus_bitmex::{http::client::BitmexHttpClient, websocket::client::BitmexWebSocketClient};
20use nautilus_model::{data::bar::BarType, identifiers::InstrumentId};
21use tokio::time::Duration;
22use tracing::level_filters::LevelFilter;
23
24#[tokio::main]
25async fn main() -> Result<(), Box<dyn std::error::Error>> {
26    let log_level = env::var("LOG_LEVEL")
27        .unwrap_or_else(|_| "INFO".to_string())
28        .parse::<LevelFilter>()
29        .unwrap_or(LevelFilter::INFO);
30
31    tracing_subscriber::fmt().with_max_level(log_level).init();
32
33    let args: Vec<String> = env::args().collect();
34    let subscription_type = args.get(1).map_or("all", String::as_str);
35    let symbol = args.get(2).map_or("XBTUSD", String::as_str);
36    let testnet = args.get(3).is_some_and(|s| s == "testnet");
37
38    tracing::info!("Starting Bitmex WebSocket test");
39    tracing::info!("Subscription type: {subscription_type}");
40    tracing::info!("Symbol: {symbol}");
41    tracing::info!("Testnet: {testnet}");
42
43    // Configure URLs
44    let (http_url, ws_url) = if testnet {
45        (
46            Some("https://testnet.bitmex.com".to_string()),
47            Some("wss://ws.testnet.bitmex.com/realtime".to_string()),
48        )
49    } else {
50        (None, None) // Use default production URLs
51    };
52
53    tracing::info!("Fetching instruments from HTTP API...");
54    let http_client = BitmexHttpClient::new(
55        http_url, // base_url
56        None,     // api_key
57        None,     // api_secret
58        testnet,  // testnet
59        Some(60), // timeout_secs
60    );
61
62    let instruments = http_client
63        .request_instruments(true) // active_only
64        .await?;
65
66    tracing::info!("Fetched {} instruments", instruments.len());
67
68    // Create WebSocket client
69    let mut ws_client = BitmexWebSocketClient::new(
70        ws_url,  // url: defaults to wss://ws.bitmex.com/realtime
71        None,    // No API key for public feeds
72        None,    // No API secret
73        None,    // Acount ID
74        Some(5), // 5 second heartbeat
75    )
76    .unwrap();
77    ws_client.initialize_instruments_cache(instruments);
78    ws_client.connect().await?;
79
80    // Give the connection a moment to stabilize
81    tokio::time::sleep(Duration::from_millis(500)).await;
82
83    let instrument_id = InstrumentId::from(format!("{symbol}.BITMEX").as_str());
84    tracing::info!("Using instrument_id: {instrument_id}");
85
86    match subscription_type {
87        "quotes" => {
88            tracing::info!("Subscribing to quotes for {instrument_id}");
89            ws_client.subscribe_quotes(instrument_id).await?;
90        }
91        "trades" => {
92            tracing::info!("Subscribing to trades for {instrument_id}");
93            ws_client.subscribe_trades(instrument_id).await?;
94        }
95        "orderbook" | "book" => {
96            tracing::info!("Subscribing to order book L2 for {instrument_id}");
97            ws_client.subscribe_book(instrument_id).await?;
98        }
99        "orderbook25" | "book25" => {
100            tracing::info!("Subscribing to order book L2_25 for {instrument_id}");
101            ws_client.subscribe_book_25(instrument_id).await?;
102        }
103        "depth10" | "book10" => {
104            tracing::info!("Subscribing to order book depth 10 for {instrument_id}");
105            ws_client.subscribe_book_depth10(instrument_id).await?;
106        }
107        "bars" => {
108            let bar_type =
109                BarType::from(format!("{symbol}.BITMEX-1-MINUTE-LAST-EXTERNAL").as_str());
110            tracing::info!("Subscribing to bars: {bar_type}");
111            ws_client.subscribe_bars(bar_type).await?;
112        }
113        "funding" => {
114            tracing::info!("Subscribing to funding rates");
115            // Note: This might need implementation
116            tracing::warn!("Funding rate subscription may not be implemented yet");
117        }
118        "liquidation" => {
119            tracing::info!("Subscribing to liquidations");
120            // Note: This might need implementation
121            tracing::warn!("Liquidation subscription may not be implemented yet");
122        }
123        "all" => {
124            tracing::info!("Subscribing to all available data types for {instrument_id}",);
125
126            tracing::info!("- Subscribing to quotes");
127            if let Err(e) = ws_client.subscribe_quotes(instrument_id).await {
128                tracing::error!("Failed to subscribe to quotes: {e}");
129            } else {
130                tracing::info!("  ✓ Quotes subscription successful");
131            }
132
133            tokio::time::sleep(Duration::from_millis(100)).await;
134
135            tracing::info!("- Subscribing to trades");
136            if let Err(e) = ws_client.subscribe_trades(instrument_id).await {
137                tracing::error!("Failed to subscribe to trades: {e}");
138            } else {
139                tracing::info!("  ✓ Trades subscription successful");
140            }
141
142            tokio::time::sleep(Duration::from_millis(100)).await;
143
144            tracing::info!("- Subscribing to order book L2");
145            if let Err(e) = ws_client.subscribe_book(instrument_id).await {
146                tracing::error!("Failed to subscribe to order book: {e}");
147            } else {
148                tracing::info!("  ✓ Order book L2 subscription successful");
149            }
150
151            tokio::time::sleep(Duration::from_millis(100)).await;
152
153            tracing::info!("- Subscribing to order book L2_25");
154            if let Err(e) = ws_client.subscribe_book_25(instrument_id).await {
155                tracing::error!("Failed to subscribe to order book 25: {e}");
156            } else {
157                tracing::info!("  ✓ Order book L2_25 subscription successful");
158            }
159
160            tokio::time::sleep(Duration::from_millis(100)).await;
161
162            tracing::info!("- Subscribing to order book depth 10");
163            if let Err(e) = ws_client.subscribe_book_depth10(instrument_id).await {
164                tracing::error!("Failed to subscribe to depth 10: {e}");
165            } else {
166                tracing::info!("  ✓ Order book depth 10 subscription successful");
167            }
168
169            tokio::time::sleep(Duration::from_millis(100)).await;
170
171            let bar_type =
172                BarType::from(format!("{symbol}.BITMEX-1-MINUTE-LAST-EXTERNAL").as_str());
173            tracing::info!("- Subscribing to bars: {bar_type}");
174            if let Err(e) = ws_client.subscribe_bars(bar_type).await {
175                tracing::error!("Failed to subscribe to bars: {e}");
176            } else {
177                tracing::info!("  ✓ Bars subscription successful");
178            }
179        }
180        _ => {
181            tracing::error!("Unknown subscription type: {subscription_type}");
182            tracing::info!(
183                "Available types: quotes, trades, orderbook, orderbook25, depth10, bars, funding, liquidation, all"
184            );
185            return Ok(());
186        }
187    }
188
189    tracing::info!("Subscriptions completed, waiting for data...");
190    tracing::info!("Press CTRL+C to stop");
191
192    // Create a future that completes on CTRL+C
193    let sigint = tokio::signal::ctrl_c();
194    tokio::pin!(sigint);
195
196    let stream = ws_client.stream();
197    tokio::pin!(stream); // Pin the stream to allow polling in the loop
198
199    // Use a flag to track if we should close
200    let mut should_close = false;
201    let mut message_count = 0u64;
202
203    loop {
204        tokio::select! {
205            Some(msg) = stream.next() => {
206                message_count += 1;
207                tracing::info!("[Message #{message_count}] {msg:?}");
208            }
209            _ = &mut sigint => {
210                tracing::info!("Received SIGINT, closing connection...");
211                should_close = true;
212                break;
213            }
214            else => {
215                tracing::warn!("Stream ended unexpectedly");
216                break;
217            }
218        }
219    }
220
221    if should_close {
222        tracing::info!("Total messages received: {message_count}");
223        ws_client.close().await?;
224        tracing::info!("Connection closed successfully");
225    }
226
227    Ok(())
228}