1use std::{env, time::Duration};
41
42use nautilus_dydx::{
43 common::consts::{DYDX_TESTNET_HTTP_URL, DYDX_TESTNET_WS_URL},
44 http::client::DydxHttpClient,
45 websocket::{client::DydxWebSocketClient, handler::HandlerCommand},
46};
47use nautilus_model::{
48 data::{BarSpecification, BarType},
49 enums::{AggregationSource, BarAggregation, PriceType},
50 identifiers::InstrumentId,
51};
52use tokio::{pin, signal};
53use tracing::level_filters::LevelFilter;
54
55#[tokio::main]
56async fn main() -> Result<(), Box<dyn std::error::Error>> {
57 let log_level = env::var("LOG_LEVEL")
58 .unwrap_or_else(|_| "INFO".to_string())
59 .parse::<LevelFilter>()
60 .unwrap_or(LevelFilter::INFO);
61
62 tracing_subscriber::fmt().with_max_level(log_level).init();
63
64 let args: Vec<String> = env::args().collect();
65 let subscription_type = args.get(1).map_or("all", String::as_str);
66 let symbol = args.get(2).map_or("BTC-USD", String::as_str);
67 let testnet = args.get(3).is_none_or(|s| s == "testnet");
68
69 tracing::info!("Starting dYdX WebSocket test");
70 tracing::info!("Subscription type: {subscription_type}");
71 tracing::info!("Symbol: {symbol}");
72 tracing::info!("Testnet: {testnet}");
73 tracing::info!("");
74
75 let (http_url, ws_url) = if testnet {
76 (
77 env::var("DYDX_HTTP_URL").unwrap_or_else(|_| DYDX_TESTNET_HTTP_URL.to_string()),
78 env::var("DYDX_WS_URL").unwrap_or_else(|_| DYDX_TESTNET_WS_URL.to_string()),
79 )
80 } else {
81 (
82 env::var("DYDX_HTTP_URL").expect("DYDX_HTTP_URL required for mainnet"),
83 env::var("DYDX_WS_URL").expect("DYDX_WS_URL required for mainnet"),
84 )
85 };
86
87 let is_testnet = http_url.contains("testnet") || ws_url.contains("testnet");
88
89 tracing::info!("Connecting to dYdX HTTP API: {http_url}");
90 tracing::info!("Connecting to dYdX WebSocket: {ws_url}");
91 tracing::info!(
92 "Environment: {}",
93 if is_testnet { "TESTNET" } else { "MAINNET" }
94 );
95 tracing::info!("");
96
97 let http_client = DydxHttpClient::new(Some(http_url), Some(30), None, is_testnet, None)?;
98 let instruments = http_client.request_instruments(None, None, None).await?;
99
100 tracing::info!("Fetched {} instruments from HTTP", instruments.len());
101
102 let instrument_id = InstrumentId::from(format!("{symbol}-PERP.DYDX").as_str());
103
104 tracing::info!("Using instrument: {instrument_id}");
105 tracing::info!("");
106
107 let mut ws_client = DydxWebSocketClient::new_public(ws_url, Some(30));
108 ws_client.cache_instruments(instruments);
109
110 ws_client.connect().await?;
111 tracing::info!("WebSocket connected");
112 tracing::info!("");
113
114 tokio::time::sleep(Duration::from_millis(500)).await;
115
116 match subscription_type {
117 "trades" => {
118 tracing::info!("Subscribing to trades for {instrument_id}");
119 ws_client.subscribe_trades(instrument_id).await?;
120 }
121 "orderbook" | "book" => {
122 tracing::info!("Subscribing to orderbook for {instrument_id}");
123 ws_client.subscribe_orderbook(instrument_id).await?;
124 }
125 "candles" | "bars" => {
126 let bar_spec = BarSpecification {
127 step: std::num::NonZeroUsize::new(1).unwrap(),
128 aggregation: BarAggregation::Minute,
129 price_type: PriceType::Last,
130 };
131 let bar_type = BarType::new(instrument_id, bar_spec, AggregationSource::External);
132 let ticker = instrument_id.symbol.as_str().trim_end_matches("-PERP");
133 let topic = format!("{ticker}/1MIN");
134
135 ws_client.send_command(HandlerCommand::RegisterBarType { topic, bar_type })?;
136
137 tracing::info!("Subscribing to 1-minute candles for {instrument_id}");
138 ws_client.subscribe_candles(instrument_id, "1MIN").await?;
139 }
140 "all" => {
141 tracing::info!("Subscribing to all available data types for {instrument_id}");
142 tracing::info!("");
143
144 tracing::info!("- Subscribing to trades");
145 if let Err(e) = ws_client.subscribe_trades(instrument_id).await {
146 tracing::error!("Failed to subscribe to trades: {e}");
147 } else {
148 tracing::info!(" Trades subscription successful");
149 }
150
151 tokio::time::sleep(Duration::from_millis(100)).await;
152
153 tracing::info!("- Subscribing to orderbook");
154 if let Err(e) = ws_client.subscribe_orderbook(instrument_id).await {
155 tracing::error!("Failed to subscribe to orderbook: {e}");
156 } else {
157 tracing::info!(" Orderbook subscription successful");
158 }
159
160 tokio::time::sleep(Duration::from_millis(100)).await;
161
162 let bar_spec = BarSpecification {
163 step: std::num::NonZeroUsize::new(1).unwrap(),
164 aggregation: BarAggregation::Minute,
165 price_type: PriceType::Last,
166 };
167 let bar_type = BarType::new(instrument_id, bar_spec, AggregationSource::External);
168 let ticker = instrument_id.symbol.as_str().trim_end_matches("-PERP");
169 let topic = format!("{ticker}/1MIN");
170
171 ws_client.send_command(HandlerCommand::RegisterBarType { topic, bar_type })?;
172
173 tracing::info!("- Subscribing to 1-minute candles");
174 if let Err(e) = ws_client.subscribe_candles(instrument_id, "1MIN").await {
175 tracing::error!("Failed to subscribe to candles: {e}");
176 } else {
177 tracing::info!(" Candles subscription successful");
178 }
179 }
180 _ => {
181 tracing::error!("Unknown subscription type: {subscription_type}");
182 tracing::info!("Available types: trades, orderbook, candles, all");
183 return Ok(());
184 }
185 }
186
187 tracing::info!("");
188 tracing::info!("Subscriptions completed, waiting for data...");
189 tracing::info!("Press CTRL+C to stop");
190 tracing::info!("");
191
192 let Some(mut rx) = ws_client.take_receiver() else {
193 tracing::warn!("No inbound WebSocket receiver available; exiting");
194 return Ok(());
195 };
196
197 let sigint = signal::ctrl_c();
198 pin!(sigint);
199
200 let mut message_count: u64 = 0;
201 let mut should_close = false;
202
203 loop {
204 tokio::select! {
205 _ = &mut sigint => {
206 tracing::info!("Received SIGINT, closing connection...");
207 should_close = true;
208 break;
209 }
210 maybe_msg = rx.recv() => {
211 match maybe_msg {
212 Some(msg) => {
213 message_count += 1;
214 tracing::info!("[Message #{message_count}] {msg:?}");
215 }
216 None => {
217 tracing::info!("WebSocket message stream closed");
218 break;
219 }
220 }
221 }
222 }
223 }
224
225 if should_close {
226 tracing::info!("");
227 tracing::info!("Total messages received: {message_count}");
228 ws_client.disconnect().await?;
229 tracing::info!("Connection closed successfully");
230 }
231
232 Ok(())
233}