1use std::time::Duration;
34
35use nautilus_dydx::{
36 common::consts::{DYDX_TESTNET_HTTP_URL, DYDX_TESTNET_WS_URL},
37 http::client::DydxHttpClient,
38 websocket::{client::DydxWebSocketClient, handler::HandlerCommand},
39};
40use nautilus_model::{
41 data::{BarSpecification, BarType},
42 enums::{AggregationSource, BarAggregation, PriceType},
43 identifiers::InstrumentId,
44};
45use tokio::{pin, signal};
46use tracing::level_filters::LevelFilter;
47
48#[tokio::main]
49async fn main() -> Result<(), Box<dyn std::error::Error>> {
50 tracing_subscriber::fmt()
51 .with_max_level(LevelFilter::INFO)
52 .init();
53
54 let http_url =
56 std::env::var("DYDX_HTTP_URL").unwrap_or_else(|_| DYDX_TESTNET_HTTP_URL.to_string());
57 let ws_url = std::env::var("DYDX_WS_URL").unwrap_or_else(|_| DYDX_TESTNET_WS_URL.to_string());
58
59 let is_testnet = http_url.contains("testnet") || ws_url.contains("testnet");
61
62 tracing::info!("Connecting to dYdX HTTP API: {http_url}");
63 tracing::info!("Connecting to dYdX WebSocket: {ws_url}");
64 tracing::info!(
65 "Environment: {}",
66 if is_testnet { "TESTNET" } else { "MAINNET" }
67 );
68
69 let http_client = DydxHttpClient::new(Some(http_url), Some(30), None, is_testnet, None)?;
72 let instruments = http_client.request_instruments(None, None, None).await?;
73
74 tracing::info!("Fetched {} instruments from HTTP", instruments.len());
75
76 let instrument_id = std::env::var("DYDX_INSTRUMENT_ID").map_or_else(
78 |_| InstrumentId::from("BTC-USD-PERP.DYDX"),
79 InstrumentId::from,
80 );
81
82 tracing::info!("Using instrument: {instrument_id}");
83
84 let mut ws_client = DydxWebSocketClient::new_public(ws_url, Some(30));
86 ws_client.cache_instruments(instruments);
87
88 ws_client.connect().await?;
89 tracing::info!("WebSocket connected");
90
91 tokio::time::sleep(Duration::from_millis(500)).await;
93
94 tracing::info!("Subscribing to trades for {instrument_id}");
96 ws_client.subscribe_trades(instrument_id).await?;
97
98 tracing::info!("Subscribing to orderbook for {instrument_id}");
99 ws_client.subscribe_orderbook(instrument_id).await?;
100
101 let bar_spec = BarSpecification {
103 step: std::num::NonZeroUsize::new(1).unwrap(),
104 aggregation: BarAggregation::Minute,
105 price_type: PriceType::Last,
106 };
107 let bar_type = BarType::new(instrument_id, bar_spec, AggregationSource::External);
108 let ticker = instrument_id.symbol.as_str().trim_end_matches("-PERP");
109 let topic = format!("{ticker}/1MIN");
110
111 ws_client.send_command(HandlerCommand::RegisterBarType { topic, bar_type })?;
112
113 tracing::info!("Subscribing to 1-minute candles for {instrument_id}");
114 ws_client.subscribe_candles(instrument_id, "1MIN").await?;
115
116 let Some(mut rx) = ws_client.take_receiver() else {
118 tracing::warn!("No inbound WebSocket receiver available; exiting");
119 return Ok(());
120 };
121
122 let sigint = signal::ctrl_c();
124 pin!(sigint);
125
126 let mut message_count: u64 = 0;
127
128 tracing::info!("Streaming messages (CTRL+C to exit)...");
129
130 loop {
131 tokio::select! {
132 _ = &mut sigint => {
133 tracing::info!("Received SIGINT, closing connection...");
134 ws_client.disconnect().await?;
135 break;
136 }
137 maybe_msg = rx.recv() => {
138 match maybe_msg {
139 Some(msg) => {
140 message_count += 1;
141 tracing::info!("Message #{message_count}: {msg:?}");
142 }
143 None => {
144 tracing::info!("WebSocket message stream closed");
145 break;
146 }
147 }
148 }
149 }
150 }
151
152 tracing::info!("Received {message_count} total messages");
153
154 Ok(())
155}