1use std::{env, time::Duration};
37
38use nautilus_dydx::{
39 common::consts::{DYDX_TESTNET_HTTP_URL, DYDX_TESTNET_WS_URL},
40 grpc::wallet::Wallet,
41 http::client::DydxHttpClient,
42 websocket::{NautilusWsMessage, client::DydxWebSocketClient},
43};
44use tracing::level_filters::LevelFilter;
45
46const DEFAULT_SUBACCOUNT: u32 = 0;
47
48#[tokio::main]
49async fn main() -> Result<(), Box<dyn std::error::Error>> {
50 tracing_subscriber::fmt()
51 .with_max_level(LevelFilter::DEBUG)
52 .init();
53
54 let args: Vec<String> = env::args().collect();
55 let is_mainnet = args.iter().any(|a| a == "--mainnet");
56 let subaccount_number = args
57 .iter()
58 .position(|a| a == "--subaccount")
59 .and_then(|i| args.get(i + 1))
60 .and_then(|s| s.parse::<u32>().ok())
61 .unwrap_or(DEFAULT_SUBACCOUNT);
62
63 let mnemonic = env::var("DYDX_MNEMONIC").expect("DYDX_MNEMONIC environment variable not set");
64
65 let ws_url = if is_mainnet {
66 env::var("DYDX_WS_URL").unwrap_or_else(|_| "wss://indexer.dydx.trade/v4/ws".to_string())
67 } else {
68 env::var("DYDX_WS_URL").unwrap_or_else(|_| DYDX_TESTNET_WS_URL.to_string())
69 };
70
71 let http_url = if is_mainnet {
72 env::var("DYDX_HTTP_URL").unwrap_or_else(|_| "https://indexer.dydx.trade".to_string())
73 } else {
74 env::var("DYDX_HTTP_URL").unwrap_or_else(|_| DYDX_TESTNET_HTTP_URL.to_string())
75 };
76
77 tracing::info!("Connecting to dYdX WebSocket API: {}", ws_url);
78 tracing::info!(
79 "Environment: {}",
80 if is_mainnet { "MAINNET" } else { "TESTNET" }
81 );
82 tracing::info!("Subaccount: {}", subaccount_number);
83 tracing::info!("");
84
85 let wallet = Wallet::from_mnemonic(&mnemonic)?;
86 let account = wallet.account_offline(subaccount_number)?;
87 let wallet_address = account.address.clone();
88 tracing::info!("Wallet address: {}", wallet_address);
89 tracing::info!("");
90
91 let http_client =
92 DydxHttpClient::new(Some(http_url.clone()), Some(30), None, !is_mainnet, None)?;
93
94 tracing::info!("Fetching instruments from HTTP API...");
95 let instruments = http_client.request_instruments(None, None, None).await?;
96 tracing::info!("Fetched {} instruments", instruments.len());
97
98 let mut ws_client = DydxWebSocketClient::new_public(ws_url, Some(30));
99 ws_client.cache_instruments(instruments);
100 ws_client.connect().await?;
101
102 tokio::time::sleep(Duration::from_millis(500)).await;
103
104 tracing::info!(
105 "Subscribing to subaccount: {}/{}",
106 wallet_address,
107 subaccount_number
108 );
109 ws_client
110 .subscribe_subaccount(&wallet_address, subaccount_number)
111 .await?;
112
113 let Some(mut rx) = ws_client.take_receiver() else {
114 tracing::warn!("No inbound WebSocket receiver available; exiting");
115 return Ok(());
116 };
117
118 let sigint = tokio::signal::ctrl_c();
119 tokio::pin!(sigint);
120
121 let mut event_count = 0;
122 tracing::info!("Listening for subaccount updates (press Ctrl+C to stop)...");
123 tracing::info!("");
124
125 loop {
126 tokio::select! {
127 maybe_event = rx.recv() => {
128 match maybe_event {
129 Some(event) => {
130 event_count += 1;
131 tracing::debug!("[Event #{}] {:?}", event_count, event);
132
133 match event {
134 NautilusWsMessage::Order(_) => {
135 tracing::info!("[Event #{}] Order status update received", event_count);
136 }
137 NautilusWsMessage::Fill(_) => {
138 tracing::info!("[Event #{}] Fill update received", event_count);
139 }
140 NautilusWsMessage::Position(_) => {
141 tracing::info!("[Event #{}] Position update received", event_count);
142 }
143 _ => {}
144 }
145 }
146 None => {
147 tracing::info!("WebSocket message stream closed");
148 break;
149 }
150 }
151 }
152 _ = &mut sigint => {
153 tracing::info!("Received SIGINT, closing connection...");
154 ws_client.disconnect().await?;
155 break;
156 }
157 else => break,
158 }
159 }
160
161 tracing::info!("");
162 tracing::info!("WebSocket execution test completed");
163 tracing::info!("Total events received: {}", event_count);
164
165 Ok(())
166}