dydx_ws_exec/
ws_exec.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Manual verification script for dYdX WebSocket private channels (subaccount updates).
17//!
18//! Subscribes to subaccount order, fill, and position updates via WebSocket.
19//! dYdX v4 uses wallet-based subscriptions (no API key signing required for WS).
20//!
21//! Usage:
22//! ```bash
23//! # Test against testnet (default)
24//! DYDX_MNEMONIC="your mnemonic" cargo run --bin dydx-ws-exec -p nautilus-dydx
25//!
26//! # Test against mainnet
27//! DYDX_MNEMONIC="your mnemonic" \
28//! DYDX_WS_URL=wss://indexer.dydx.trade/v4/ws \
29//! DYDX_HTTP_URL=https://indexer.dydx.trade \
30//! cargo run --bin dydx-ws-exec -p nautilus-dydx -- --mainnet
31//!
32//! # With custom subaccount
33//! DYDX_MNEMONIC="your mnemonic" cargo run --bin dydx-ws-exec -p nautilus-dydx -- --subaccount 1
34//! ```
35
36use std::{env, time::Duration};
37
38use nautilus_dydx::{
39    common::consts::{DYDX_TESTNET_HTTP_URL, DYDX_TESTNET_WS_URL},
40    grpc::wallet::Wallet,
41    http::client::DydxHttpClient,
42    websocket::{NautilusWsMessage, client::DydxWebSocketClient},
43};
44use tracing::level_filters::LevelFilter;
45
46const DEFAULT_SUBACCOUNT: u32 = 0;
47
48#[tokio::main]
49async fn main() -> Result<(), Box<dyn std::error::Error>> {
50    tracing_subscriber::fmt()
51        .with_max_level(LevelFilter::DEBUG)
52        .init();
53
54    let args: Vec<String> = env::args().collect();
55    let is_mainnet = args.iter().any(|a| a == "--mainnet");
56    let subaccount_number = args
57        .iter()
58        .position(|a| a == "--subaccount")
59        .and_then(|i| args.get(i + 1))
60        .and_then(|s| s.parse::<u32>().ok())
61        .unwrap_or(DEFAULT_SUBACCOUNT);
62
63    let mnemonic = env::var("DYDX_MNEMONIC").expect("DYDX_MNEMONIC environment variable not set");
64
65    let ws_url = if is_mainnet {
66        env::var("DYDX_WS_URL").unwrap_or_else(|_| "wss://indexer.dydx.trade/v4/ws".to_string())
67    } else {
68        env::var("DYDX_WS_URL").unwrap_or_else(|_| DYDX_TESTNET_WS_URL.to_string())
69    };
70
71    let http_url = if is_mainnet {
72        env::var("DYDX_HTTP_URL").unwrap_or_else(|_| "https://indexer.dydx.trade".to_string())
73    } else {
74        env::var("DYDX_HTTP_URL").unwrap_or_else(|_| DYDX_TESTNET_HTTP_URL.to_string())
75    };
76
77    tracing::info!("Connecting to dYdX WebSocket API: {}", ws_url);
78    tracing::info!(
79        "Environment: {}",
80        if is_mainnet { "MAINNET" } else { "TESTNET" }
81    );
82    tracing::info!("Subaccount: {}", subaccount_number);
83    tracing::info!("");
84
85    let wallet = Wallet::from_mnemonic(&mnemonic)?;
86    let account = wallet.account_offline(subaccount_number)?;
87    let wallet_address = account.address.clone();
88    tracing::info!("Wallet address: {}", wallet_address);
89    tracing::info!("");
90
91    let http_client =
92        DydxHttpClient::new(Some(http_url.clone()), Some(30), None, !is_mainnet, None)?;
93
94    tracing::info!("Fetching instruments from HTTP API...");
95    let instruments = http_client.request_instruments(None, None, None).await?;
96    tracing::info!("Fetched {} instruments", instruments.len());
97
98    let mut ws_client = DydxWebSocketClient::new_public(ws_url, Some(30));
99    ws_client.cache_instruments(instruments);
100    ws_client.connect().await?;
101
102    tokio::time::sleep(Duration::from_millis(500)).await;
103
104    tracing::info!(
105        "Subscribing to subaccount: {}/{}",
106        wallet_address,
107        subaccount_number
108    );
109    ws_client
110        .subscribe_subaccount(&wallet_address, subaccount_number)
111        .await?;
112
113    let Some(mut rx) = ws_client.take_receiver() else {
114        tracing::warn!("No inbound WebSocket receiver available; exiting");
115        return Ok(());
116    };
117
118    let sigint = tokio::signal::ctrl_c();
119    tokio::pin!(sigint);
120
121    let mut event_count = 0;
122    tracing::info!("Listening for subaccount updates (press Ctrl+C to stop)...");
123    tracing::info!("");
124
125    loop {
126        tokio::select! {
127            maybe_event = rx.recv() => {
128                match maybe_event {
129                    Some(event) => {
130                        event_count += 1;
131                        tracing::debug!("[Event #{}] {:?}", event_count, event);
132
133                        match event {
134                            NautilusWsMessage::Order(_) => {
135                                tracing::info!("[Event #{}] Order status update received", event_count);
136                            }
137                            NautilusWsMessage::Fill(_) => {
138                                tracing::info!("[Event #{}] Fill update received", event_count);
139                            }
140                            NautilusWsMessage::Position(_) => {
141                                tracing::info!("[Event #{}] Position update received", event_count);
142                            }
143                            _ => {}
144                        }
145                    }
146                    None => {
147                        tracing::info!("WebSocket message stream closed");
148                        break;
149                    }
150                }
151            }
152            _ = &mut sigint => {
153                tracing::info!("Received SIGINT, closing connection...");
154                ws_client.disconnect().await?;
155                break;
156            }
157            else => break,
158        }
159    }
160
161    tracing::info!("");
162    tracing::info!("WebSocket execution test completed");
163    tracing::info!("Total events received: {}", event_count);
164
165    Ok(())
166}