dydx_ws_data/
ws_data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Manual verification script for dYdX WebSocket public data streams.
17//!
18//! Exercises live subscriptions for trades, order book updates, and candles
19//! for a single instrument to validate the end-to-end streaming pipeline.
20//!
21//! Usage:
22//! ```bash
23//! # Test against testnet (default)
24//! cargo run --bin dydx-ws-data -p nautilus-dydx
25//!
26//! # Override endpoints or instrument
27//! DYDX_HTTP_URL=https://indexer.v4testnet.dydx.exchange \
28//! DYDX_WS_URL=wss://indexer.v4testnet.dydx.exchange/v4/ws \
29//! DYDX_INSTRUMENT_ID=BTC-USD-PERP.DYDX \
30//! cargo run --bin dydx-ws-data -p nautilus-dydx
31//! ```
32
33use std::time::Duration;
34
35use nautilus_dydx::{
36    common::consts::{DYDX_TESTNET_HTTP_URL, DYDX_TESTNET_WS_URL},
37    http::client::DydxHttpClient,
38    websocket::{client::DydxWebSocketClient, handler::HandlerCommand},
39};
40use nautilus_model::{
41    data::{BarSpecification, BarType},
42    enums::{AggregationSource, BarAggregation, PriceType},
43    identifiers::InstrumentId,
44};
45use tokio::{pin, signal};
46use tracing::level_filters::LevelFilter;
47
48#[tokio::main]
49async fn main() -> Result<(), Box<dyn std::error::Error>> {
50    tracing_subscriber::fmt()
51        .with_max_level(LevelFilter::INFO)
52        .init();
53
54    // Resolve endpoints from environment, falling back to testnet defaults.
55    let http_url =
56        std::env::var("DYDX_HTTP_URL").unwrap_or_else(|_| DYDX_TESTNET_HTTP_URL.to_string());
57    let ws_url = std::env::var("DYDX_WS_URL").unwrap_or_else(|_| DYDX_TESTNET_WS_URL.to_string());
58
59    // Derive environment label from URLs.
60    let is_testnet = http_url.contains("testnet") || ws_url.contains("testnet");
61
62    tracing::info!("Connecting to dYdX HTTP API: {http_url}");
63    tracing::info!("Connecting to dYdX WebSocket: {ws_url}");
64    tracing::info!(
65        "Environment: {}",
66        if is_testnet { "TESTNET" } else { "MAINNET" }
67    );
68
69    // Create HTTP client and fetch instruments so the WebSocket client can
70    // decode incoming market data using the shared instrument definitions.
71    let http_client = DydxHttpClient::new(Some(http_url), Some(30), None, is_testnet, None)?;
72    let instruments = http_client.request_instruments(None, None, None).await?;
73
74    tracing::info!("Fetched {} instruments from HTTP", instruments.len());
75
76    // Resolve instrument ID from env or use BTC-USD perpetual by default.
77    let instrument_id = std::env::var("DYDX_INSTRUMENT_ID").map_or_else(
78        |_| InstrumentId::from("BTC-USD-PERP.DYDX"),
79        InstrumentId::from,
80    );
81
82    tracing::info!("Using instrument: {instrument_id}");
83
84    // Initialize WebSocket client and cache instruments before connecting.
85    let mut ws_client = DydxWebSocketClient::new_public(ws_url, Some(30));
86    ws_client.cache_instruments(instruments);
87
88    ws_client.connect().await?;
89    tracing::info!("WebSocket connected");
90
91    // Give the connection a brief moment to fully establish.
92    tokio::time::sleep(Duration::from_millis(500)).await;
93
94    // Subscribe to core public channels for the chosen instrument.
95    tracing::info!("Subscribing to trades for {instrument_id}");
96    ws_client.subscribe_trades(instrument_id).await?;
97
98    tracing::info!("Subscribing to orderbook for {instrument_id}");
99    ws_client.subscribe_orderbook(instrument_id).await?;
100
101    // Register bar type before subscribing to candles
102    let bar_spec = BarSpecification {
103        step: std::num::NonZeroUsize::new(1).unwrap(),
104        aggregation: BarAggregation::Minute,
105        price_type: PriceType::Last,
106    };
107    let bar_type = BarType::new(instrument_id, bar_spec, AggregationSource::External);
108    let ticker = instrument_id.symbol.as_str().trim_end_matches("-PERP");
109    let topic = format!("{ticker}/1MIN");
110
111    ws_client.send_command(HandlerCommand::RegisterBarType { topic, bar_type })?;
112
113    tracing::info!("Subscribing to 1-minute candles for {instrument_id}");
114    ws_client.subscribe_candles(instrument_id, "1MIN").await?;
115
116    // Take ownership of the typed message stream.
117    let Some(mut rx) = ws_client.take_receiver() else {
118        tracing::warn!("No inbound WebSocket receiver available; exiting");
119        return Ok(());
120    };
121
122    // Create a future that completes on CTRL+C.
123    let sigint = signal::ctrl_c();
124    pin!(sigint);
125
126    let mut message_count: u64 = 0;
127
128    tracing::info!("Streaming messages (CTRL+C to exit)...");
129
130    loop {
131        tokio::select! {
132            _ = &mut sigint => {
133                tracing::info!("Received SIGINT, closing connection...");
134                ws_client.disconnect().await?;
135                break;
136            }
137            maybe_msg = rx.recv() => {
138                match maybe_msg {
139                    Some(msg) => {
140                        message_count += 1;
141                        tracing::info!("Message #{message_count}: {msg:?}");
142                    }
143                    None => {
144                        tracing::info!("WebSocket message stream closed");
145                        break;
146                    }
147                }
148            }
149        }
150    }
151
152    tracing::info!("Received {message_count} total messages");
153
154    Ok(())
155}