1use std::time::Duration;
38
39use nautilus_common::{
40 live::runner::set_data_event_sender,
41 messages::{
42 DataEvent, DataResponse,
43 data::{RequestBars, SubscribeBars, SubscribeBookDeltas, SubscribeTrades},
44 },
45};
46use nautilus_core::{UUID4, UnixNanos, time::get_atomic_clock_realtime};
47use nautilus_data::client::DataClient;
48use nautilus_dydx::{
49 common::consts::{DYDX_TESTNET_HTTP_URL, DYDX_TESTNET_WS_URL, DYDX_VENUE},
50 config::DydxDataClientConfig,
51 data::DydxDataClient,
52 http::client::DydxHttpClient,
53 websocket::client::DydxWebSocketClient,
54};
55use nautilus_model::{
56 data::{BarSpecification, BarType, Data},
57 enums::{AggregationSource, BarAggregation, BookType, PriceType},
58 identifiers::{ClientId, InstrumentId},
59};
60use tracing::level_filters::LevelFilter;
61
62#[tokio::main]
63async fn main() -> anyhow::Result<()> {
64 tracing_subscriber::fmt()
65 .with_max_level(LevelFilter::INFO)
66 .init();
67
68 tracing::info!("===== dYdX Live Integration Tests =====");
69 tracing::info!("");
70
71 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
72 set_data_event_sender(tx);
73
74 test_connect_and_subscribe(&mut rx).await?;
75 test_request_historical_bars(&mut rx).await?;
76 test_orderbook_snapshot_refresh(&mut rx).await?;
77 test_complete_data_flow(&mut rx).await?;
78 test_crossed_orderbook_detection(&mut rx).await?;
79
80 tracing::info!("");
81 tracing::info!("===== All Live Integration Tests Passed =====");
82
83 Ok(())
84}
85
86async fn test_connect_and_subscribe(
87 rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
88) -> anyhow::Result<()> {
89 let client_id = ClientId::from("DYDX-INT-DATA");
90
91 let config = DydxDataClientConfig {
92 is_testnet: true,
93 base_url_http: Some(
94 std::env::var("DYDX_HTTP_URL").unwrap_or_else(|_| DYDX_TESTNET_HTTP_URL.to_string()),
95 ),
96 base_url_ws: Some(
97 std::env::var("DYDX_WS_URL").unwrap_or_else(|_| DYDX_TESTNET_WS_URL.to_string()),
98 ),
99 ..Default::default()
100 };
101
102 let http_client = DydxHttpClient::new(
103 config.base_url_http.clone(),
104 config.http_timeout_secs,
105 config.http_proxy_url.clone(),
106 config.is_testnet,
107 None,
108 )?;
109
110 let ws_url = config
111 .base_url_ws
112 .clone()
113 .unwrap_or_else(|| DYDX_TESTNET_WS_URL.to_string());
114 let ws_client = DydxWebSocketClient::new_public(ws_url, Some(30));
115
116 let mut data_client = DydxDataClient::new(client_id, config, http_client, Some(ws_client))?;
117
118 data_client.connect().await?;
119 tracing::info!("Connected to dYdX testnet");
120
121 let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
122 let venue = *DYDX_VENUE;
123 let ts_init: UnixNanos = get_atomic_clock_realtime().get_time_ns();
124 let command_id = UUID4::new();
125
126 let subscribe_trades = SubscribeTrades::new(
127 instrument_id,
128 Some(client_id),
129 Some(venue),
130 command_id,
131 ts_init,
132 None,
133 );
134 data_client.subscribe_trades(&subscribe_trades)?;
135 tracing::info!("Subscribed to trades");
136
137 let subscribe_book = SubscribeBookDeltas::new(
138 instrument_id,
139 BookType::L2_MBP,
140 Some(client_id),
141 Some(venue),
142 command_id,
143 ts_init,
144 None,
145 false,
146 None,
147 );
148 data_client.subscribe_book_deltas(&subscribe_book)?;
149 tracing::info!("Subscribed to order book deltas");
150
151 let bar_spec = BarSpecification {
152 step: std::num::NonZeroUsize::new(1).unwrap(),
153 aggregation: BarAggregation::Minute,
154 price_type: PriceType::Last,
155 };
156 let bar_type = BarType::new(instrument_id, bar_spec, AggregationSource::External);
157 let subscribe_bars = SubscribeBars::new(
158 bar_type,
159 Some(client_id),
160 Some(venue),
161 command_id,
162 ts_init,
163 None,
164 );
165 data_client.subscribe_bars(&subscribe_bars)?;
166 tracing::info!("Subscribed to 1-minute bars");
167
168 match tokio::time::timeout(Duration::from_secs(30), rx.recv()).await {
169 Ok(Some(_)) => {
170 tracing::info!("Received data event from testnet");
171 }
172 Ok(None) => anyhow::bail!("data channel closed before receiving DataEvent"),
173 Err(_) => anyhow::bail!("timed out waiting for DataEvent"),
174 }
175
176 data_client.disconnect().await?;
177 tracing::info!("Disconnected");
178 tracing::info!("");
179
180 Ok(())
181}
182
183async fn test_request_historical_bars(
184 rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
185) -> anyhow::Result<()> {
186 let client_id = ClientId::from("DYDX-INT-BARS");
187
188 let config = DydxDataClientConfig {
189 is_testnet: true,
190 base_url_http: Some(
191 std::env::var("DYDX_HTTP_URL").unwrap_or_else(|_| DYDX_TESTNET_HTTP_URL.to_string()),
192 ),
193 ..Default::default()
194 };
195
196 let http_client = DydxHttpClient::new(
197 config.base_url_http.clone(),
198 config.http_timeout_secs,
199 config.http_proxy_url.clone(),
200 config.is_testnet,
201 None,
202 )?;
203
204 let mut data_client = DydxDataClient::new(client_id, config, http_client, None)?;
205
206 data_client.connect().await?;
209
210 let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
211 let bar_spec = BarSpecification {
212 step: std::num::NonZeroUsize::new(1).unwrap(),
213 aggregation: BarAggregation::Minute,
214 price_type: PriceType::Last,
215 };
216 let bar_type = BarType::new(instrument_id, bar_spec, AggregationSource::External);
217
218 let now = chrono::Utc::now();
219 let small_start = Some(now - chrono::Duration::hours(1));
220 let small_end = Some(now);
221 let large_start = Some(now - chrono::Duration::hours(24));
222 let large_end = Some(now);
223
224 let ts_init = get_atomic_clock_realtime().get_time_ns();
225
226 let small_request = RequestBars::new(
227 bar_type,
228 small_start,
229 small_end,
230 Some(std::num::NonZeroUsize::new(100).unwrap()),
231 Some(client_id),
232 UUID4::new(),
233 ts_init,
234 None,
235 );
236 data_client.request_bars(&small_request)?;
237 tracing::info!("Requested 1-hour bar range");
238
239 let large_request = RequestBars::new(
240 bar_type,
241 large_start,
242 large_end,
243 Some(std::num::NonZeroUsize::new(5_000).unwrap()),
244 Some(client_id),
245 UUID4::new(),
246 ts_init,
247 None,
248 );
249 data_client.request_bars(&large_request)?;
250 tracing::info!("Requested 24-hour bar range (partitioned)");
251
252 let mut saw_bars_response = false;
253 let timeout_at = Duration::from_secs(60);
254
255 while !saw_bars_response {
256 let event = match tokio::time::timeout(timeout_at, rx.recv()).await {
257 Ok(Some(ev)) => ev,
258 Ok(None) => break,
259 Err(_) => break,
260 };
261
262 if let DataEvent::Response(DataResponse::Bars(_)) = event {
263 saw_bars_response = true;
264 }
265 }
266
267 if saw_bars_response {
268 tracing::info!("Received BarsResponse");
269 } else {
270 anyhow::bail!("expected at least one BarsResponse from dYdX testnet");
271 }
272
273 tracing::info!("");
274
275 Ok(())
276}
277
278async fn test_orderbook_snapshot_refresh(
279 rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
280) -> anyhow::Result<()> {
281 let client_id = ClientId::from("DYDX-INT-SNAPSHOT");
282
283 let config = DydxDataClientConfig {
284 is_testnet: true,
285 orderbook_refresh_interval_secs: Some(10),
286 base_url_http: Some(
287 std::env::var("DYDX_HTTP_URL").unwrap_or_else(|_| DYDX_TESTNET_HTTP_URL.to_string()),
288 ),
289 base_url_ws: Some(
290 std::env::var("DYDX_WS_URL").unwrap_or_else(|_| DYDX_TESTNET_WS_URL.to_string()),
291 ),
292 ..Default::default()
293 };
294
295 let http_client = DydxHttpClient::new(
296 config.base_url_http.clone(),
297 config.http_timeout_secs,
298 config.http_proxy_url.clone(),
299 config.is_testnet,
300 None,
301 )?;
302
303 let ws_url = config
304 .base_url_ws
305 .clone()
306 .unwrap_or_else(|| DYDX_TESTNET_WS_URL.to_string());
307 let ws_client = DydxWebSocketClient::new_public(ws_url, Some(30));
308
309 let mut data_client = DydxDataClient::new(client_id, config, http_client, Some(ws_client))?;
310
311 data_client.connect().await?;
312
313 let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
314 let venue = *DYDX_VENUE;
315 let ts_init: UnixNanos = get_atomic_clock_realtime().get_time_ns();
316 let command_id = UUID4::new();
317
318 let subscribe_book = SubscribeBookDeltas::new(
319 instrument_id,
320 BookType::L2_MBP,
321 Some(client_id),
322 Some(venue),
323 command_id,
324 ts_init,
325 None,
326 false,
327 None,
328 );
329 data_client.subscribe_book_deltas(&subscribe_book)?;
330 tracing::info!("Subscribed to order book with 10s refresh interval");
331
332 let mut orderbook_updates = 0;
333 let timeout_at = Duration::from_secs(30);
334 let start = tokio::time::Instant::now();
335
336 while start.elapsed() < timeout_at && orderbook_updates < 5 {
337 match tokio::time::timeout(Duration::from_secs(5), rx.recv()).await {
338 Ok(Some(DataEvent::Data(_))) => {
339 orderbook_updates += 1;
340 }
341 Ok(Some(_)) => continue,
342 Ok(None) => break,
343 Err(_) => continue,
344 }
345 }
346
347 data_client.disconnect().await?;
348
349 if orderbook_updates >= 3 {
350 tracing::info!("Received {} orderbook updates", orderbook_updates);
351 } else {
352 anyhow::bail!("expected at least 3 orderbook updates, got {orderbook_updates}");
353 }
354
355 tracing::info!("");
356
357 Ok(())
358}
359
360async fn test_complete_data_flow(
361 rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
362) -> anyhow::Result<()> {
363 let client_id = ClientId::from("DYDX-INT-FLOW");
364
365 let config = DydxDataClientConfig {
366 is_testnet: true,
367 base_url_http: Some(
368 std::env::var("DYDX_HTTP_URL").unwrap_or_else(|_| DYDX_TESTNET_HTTP_URL.to_string()),
369 ),
370 base_url_ws: Some(
371 std::env::var("DYDX_WS_URL").unwrap_or_else(|_| DYDX_TESTNET_WS_URL.to_string()),
372 ),
373 ..Default::default()
374 };
375
376 let http_client = DydxHttpClient::new(
377 config.base_url_http.clone(),
378 config.http_timeout_secs,
379 config.http_proxy_url.clone(),
380 config.is_testnet,
381 None,
382 )?;
383
384 let ws_url = config
385 .base_url_ws
386 .clone()
387 .unwrap_or_else(|| DYDX_TESTNET_WS_URL.to_string());
388 let ws_client = DydxWebSocketClient::new_public(ws_url, Some(30));
389
390 let mut data_client = DydxDataClient::new(client_id, config, http_client, Some(ws_client))?;
391
392 data_client.connect().await?;
393
394 let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
395 let venue = *DYDX_VENUE;
396 let ts_init: UnixNanos = get_atomic_clock_realtime().get_time_ns();
397 let command_id = UUID4::new();
398
399 let subscribe_trades = SubscribeTrades::new(
400 instrument_id,
401 Some(client_id),
402 Some(venue),
403 command_id,
404 ts_init,
405 None,
406 );
407 data_client.subscribe_trades(&subscribe_trades)?;
408
409 let subscribe_book = SubscribeBookDeltas::new(
410 instrument_id,
411 BookType::L2_MBP,
412 Some(client_id),
413 Some(venue),
414 command_id,
415 ts_init,
416 None,
417 false,
418 None,
419 );
420 data_client.subscribe_book_deltas(&subscribe_book)?;
421
422 let bar_spec = BarSpecification {
423 step: std::num::NonZeroUsize::new(1).unwrap(),
424 aggregation: BarAggregation::Minute,
425 price_type: PriceType::Last,
426 };
427 let bar_type = BarType::new(instrument_id, bar_spec, AggregationSource::External);
428 let subscribe_bars = SubscribeBars::new(
429 bar_type,
430 Some(client_id),
431 Some(venue),
432 command_id,
433 ts_init,
434 None,
435 );
436 data_client.subscribe_bars(&subscribe_bars)?;
437
438 let mut saw_trade = false;
439 let mut saw_orderbook = false;
440 let mut saw_bar = false;
441
442 let timeout_at = Duration::from_secs(60);
443 let start = tokio::time::Instant::now();
444
445 while start.elapsed() < timeout_at && (!saw_trade || !saw_orderbook || !saw_bar) {
446 match tokio::time::timeout(Duration::from_secs(10), rx.recv()).await {
447 Ok(Some(DataEvent::Data(data))) => match data {
448 Data::Trade(_) => {
449 if !saw_trade {
450 tracing::info!("Received trade data");
451 saw_trade = true;
452 }
453 }
454 Data::Delta(_) | Data::Deltas(_) => {
455 if !saw_orderbook {
456 tracing::info!("Received orderbook data");
457 saw_orderbook = true;
458 }
459 }
460 Data::Bar(_) => {
461 if !saw_bar {
462 tracing::info!("Received bar data");
463 saw_bar = true;
464 }
465 }
466 _ => {}
467 },
468 Ok(Some(_)) => continue,
469 Ok(None) => break,
470 Err(_) => continue,
471 }
472 }
473
474 data_client.disconnect().await?;
475
476 if !saw_trade {
477 anyhow::bail!("expected to receive at least one trade tick");
478 }
479 if !saw_orderbook {
480 anyhow::bail!("expected to receive at least one orderbook update");
481 }
482 if !saw_bar {
483 anyhow::bail!("expected to receive at least one bar");
484 }
485
486 tracing::info!("");
487
488 Ok(())
489}
490
491async fn test_crossed_orderbook_detection(
492 rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
493) -> anyhow::Result<()> {
494 let client_id = ClientId::from("DYDX-INT-CROSSED");
495
496 let config = DydxDataClientConfig {
497 is_testnet: true,
498 base_url_http: Some(
499 std::env::var("DYDX_HTTP_URL").unwrap_or_else(|_| DYDX_TESTNET_HTTP_URL.to_string()),
500 ),
501 base_url_ws: Some(
502 std::env::var("DYDX_WS_URL").unwrap_or_else(|_| DYDX_TESTNET_WS_URL.to_string()),
503 ),
504 ..Default::default()
505 };
506
507 let http_client = DydxHttpClient::new(
508 config.base_url_http.clone(),
509 config.http_timeout_secs,
510 config.http_proxy_url.clone(),
511 config.is_testnet,
512 None,
513 )?;
514
515 let ws_url = config
516 .base_url_ws
517 .clone()
518 .unwrap_or_else(|| DYDX_TESTNET_WS_URL.to_string());
519 let ws_client = DydxWebSocketClient::new_public(ws_url, Some(30));
520
521 let mut data_client = DydxDataClient::new(client_id, config, http_client, Some(ws_client))?;
522
523 data_client.connect().await?;
524
525 let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
526 let venue = *DYDX_VENUE;
527 let ts_init = get_atomic_clock_realtime().get_time_ns();
528 let command_id = UUID4::new();
529
530 let subscribe_book = SubscribeBookDeltas::new(
531 instrument_id,
532 BookType::L2_MBP,
533 Some(client_id),
534 Some(venue),
535 command_id,
536 ts_init,
537 None,
538 false,
539 None,
540 );
541 data_client.subscribe_book_deltas(&subscribe_book)?;
542
543 let mut saw_orderbook_delta = false;
544 let mut quote_count = 0;
545
546 let _test_result = tokio::time::timeout(Duration::from_secs(30), async {
547 loop {
548 match rx.try_recv() {
549 Ok(DataEvent::Data(data)) => {
550 let data_type_str = format!("{data:?}");
551 if data_type_str.contains("OrderBookDeltas") {
552 saw_orderbook_delta = true;
553 } else if data_type_str.contains("QuoteTick") {
554 quote_count += 1;
555 }
556 }
557 Ok(_) => continue,
558 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {
559 tokio::time::sleep(Duration::from_millis(100)).await;
560 continue;
561 }
562 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => break,
563 }
564 }
565 })
566 .await;
567
568 data_client.disconnect().await?;
569
570 if saw_orderbook_delta {
571 tracing::info!("Monitored orderbook for crossed conditions");
572 tracing::info!(" Received {} quotes from deltas", quote_count);
573 } else {
574 anyhow::bail!("Expected to receive at least one orderbook delta");
575 }
576
577 tracing::info!("");
578
579 Ok(())
580}