dydx_ws_data/
ws_data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Manual verification script for dYdX WebSocket public data streams.
17//!
18//! Exercises live subscriptions for trades, order book updates, and candles
19//! for a single instrument to validate the end-to-end streaming pipeline.
20//!
21//! Usage:
22//! ```bash
23//! # Test against testnet (default) - subscribe to all data types
24//! cargo run --bin dydx-ws-data -p nautilus-dydx
25//!
26//! # Subscribe to specific channel
27//! cargo run --bin dydx-ws-data -p nautilus-dydx -- trades BTC-USD
28//! cargo run --bin dydx-ws-data -p nautilus-dydx -- orderbook ETH-USD
29//! cargo run --bin dydx-ws-data -p nautilus-dydx -- candles BTC-USD
30//!
31//! # Use testnet explicitly
32//! cargo run --bin dydx-ws-data -p nautilus-dydx -- all BTC-USD testnet
33//!
34//! # Override endpoints via environment
35//! DYDX_HTTP_URL=https://indexer.v4testnet.dydx.exchange \
36//! DYDX_WS_URL=wss://indexer.v4testnet.dydx.exchange/v4/ws \
37//! cargo run --bin dydx-ws-data -p nautilus-dydx
38//! ```
39
40use std::{env, time::Duration};
41
42use nautilus_dydx::{
43    common::consts::{DYDX_TESTNET_HTTP_URL, DYDX_TESTNET_WS_URL},
44    http::client::DydxHttpClient,
45    websocket::{client::DydxWebSocketClient, handler::HandlerCommand},
46};
47use nautilus_model::{
48    data::{BarSpecification, BarType},
49    enums::{AggregationSource, BarAggregation, PriceType},
50    identifiers::InstrumentId,
51};
52use tokio::{pin, signal};
53use tracing::level_filters::LevelFilter;
54
55#[tokio::main]
56async fn main() -> Result<(), Box<dyn std::error::Error>> {
57    let log_level = env::var("LOG_LEVEL")
58        .unwrap_or_else(|_| "INFO".to_string())
59        .parse::<LevelFilter>()
60        .unwrap_or(LevelFilter::INFO);
61
62    tracing_subscriber::fmt().with_max_level(log_level).init();
63
64    let args: Vec<String> = env::args().collect();
65    let subscription_type = args.get(1).map_or("all", String::as_str);
66    let symbol = args.get(2).map_or("BTC-USD", String::as_str);
67    let testnet = args.get(3).is_none_or(|s| s == "testnet");
68
69    tracing::info!("Starting dYdX WebSocket test");
70    tracing::info!("Subscription type: {subscription_type}");
71    tracing::info!("Symbol: {symbol}");
72    tracing::info!("Testnet: {testnet}");
73    tracing::info!("");
74
75    let (http_url, ws_url) = if testnet {
76        (
77            env::var("DYDX_HTTP_URL").unwrap_or_else(|_| DYDX_TESTNET_HTTP_URL.to_string()),
78            env::var("DYDX_WS_URL").unwrap_or_else(|_| DYDX_TESTNET_WS_URL.to_string()),
79        )
80    } else {
81        (
82            env::var("DYDX_HTTP_URL").expect("DYDX_HTTP_URL required for mainnet"),
83            env::var("DYDX_WS_URL").expect("DYDX_WS_URL required for mainnet"),
84        )
85    };
86
87    let is_testnet = http_url.contains("testnet") || ws_url.contains("testnet");
88
89    tracing::info!("Connecting to dYdX HTTP API: {http_url}");
90    tracing::info!("Connecting to dYdX WebSocket: {ws_url}");
91    tracing::info!(
92        "Environment: {}",
93        if is_testnet { "TESTNET" } else { "MAINNET" }
94    );
95    tracing::info!("");
96
97    let http_client = DydxHttpClient::new(Some(http_url), Some(30), None, is_testnet, None)?;
98    let instruments = http_client.request_instruments(None, None, None).await?;
99
100    tracing::info!("Fetched {} instruments from HTTP", instruments.len());
101
102    let instrument_id = InstrumentId::from(format!("{symbol}-PERP.DYDX").as_str());
103
104    tracing::info!("Using instrument: {instrument_id}");
105    tracing::info!("");
106
107    let mut ws_client = DydxWebSocketClient::new_public(ws_url, Some(30));
108    ws_client.cache_instruments(instruments);
109
110    ws_client.connect().await?;
111    tracing::info!("WebSocket connected");
112    tracing::info!("");
113
114    tokio::time::sleep(Duration::from_millis(500)).await;
115
116    match subscription_type {
117        "trades" => {
118            tracing::info!("Subscribing to trades for {instrument_id}");
119            ws_client.subscribe_trades(instrument_id).await?;
120        }
121        "orderbook" | "book" => {
122            tracing::info!("Subscribing to orderbook for {instrument_id}");
123            ws_client.subscribe_orderbook(instrument_id).await?;
124        }
125        "candles" | "bars" => {
126            let bar_spec = BarSpecification {
127                step: std::num::NonZeroUsize::new(1).unwrap(),
128                aggregation: BarAggregation::Minute,
129                price_type: PriceType::Last,
130            };
131            let bar_type = BarType::new(instrument_id, bar_spec, AggregationSource::External);
132            let ticker = instrument_id.symbol.as_str().trim_end_matches("-PERP");
133            let topic = format!("{ticker}/1MIN");
134
135            ws_client.send_command(HandlerCommand::RegisterBarType { topic, bar_type })?;
136
137            tracing::info!("Subscribing to 1-minute candles for {instrument_id}");
138            ws_client.subscribe_candles(instrument_id, "1MIN").await?;
139        }
140        "all" => {
141            tracing::info!("Subscribing to all available data types for {instrument_id}");
142            tracing::info!("");
143
144            tracing::info!("- Subscribing to trades");
145            if let Err(e) = ws_client.subscribe_trades(instrument_id).await {
146                tracing::error!("Failed to subscribe to trades: {e}");
147            } else {
148                tracing::info!("  Trades subscription successful");
149            }
150
151            tokio::time::sleep(Duration::from_millis(100)).await;
152
153            tracing::info!("- Subscribing to orderbook");
154            if let Err(e) = ws_client.subscribe_orderbook(instrument_id).await {
155                tracing::error!("Failed to subscribe to orderbook: {e}");
156            } else {
157                tracing::info!("  Orderbook subscription successful");
158            }
159
160            tokio::time::sleep(Duration::from_millis(100)).await;
161
162            let bar_spec = BarSpecification {
163                step: std::num::NonZeroUsize::new(1).unwrap(),
164                aggregation: BarAggregation::Minute,
165                price_type: PriceType::Last,
166            };
167            let bar_type = BarType::new(instrument_id, bar_spec, AggregationSource::External);
168            let ticker = instrument_id.symbol.as_str().trim_end_matches("-PERP");
169            let topic = format!("{ticker}/1MIN");
170
171            ws_client.send_command(HandlerCommand::RegisterBarType { topic, bar_type })?;
172
173            tracing::info!("- Subscribing to 1-minute candles");
174            if let Err(e) = ws_client.subscribe_candles(instrument_id, "1MIN").await {
175                tracing::error!("Failed to subscribe to candles: {e}");
176            } else {
177                tracing::info!("  Candles subscription successful");
178            }
179        }
180        _ => {
181            tracing::error!("Unknown subscription type: {subscription_type}");
182            tracing::info!("Available types: trades, orderbook, candles, all");
183            return Ok(());
184        }
185    }
186
187    tracing::info!("");
188    tracing::info!("Subscriptions completed, waiting for data...");
189    tracing::info!("Press CTRL+C to stop");
190    tracing::info!("");
191
192    let Some(mut rx) = ws_client.take_receiver() else {
193        tracing::warn!("No inbound WebSocket receiver available; exiting");
194        return Ok(());
195    };
196
197    let sigint = signal::ctrl_c();
198    pin!(sigint);
199
200    let mut message_count: u64 = 0;
201    let mut should_close = false;
202
203    loop {
204        tokio::select! {
205            _ = &mut sigint => {
206                tracing::info!("Received SIGINT, closing connection...");
207                should_close = true;
208                break;
209            }
210            maybe_msg = rx.recv() => {
211                match maybe_msg {
212                    Some(msg) => {
213                        message_count += 1;
214                        tracing::info!("[Message #{message_count}] {msg:?}");
215                    }
216                    None => {
217                        tracing::info!("WebSocket message stream closed");
218                        break;
219                    }
220                }
221            }
222        }
223    }
224
225    if should_close {
226        tracing::info!("");
227        tracing::info!("Total messages received: {message_count}");
228        ws_client.disconnect().await?;
229        tracing::info!("Connection closed successfully");
230    }
231
232    Ok(())
233}