1use std::time::Duration;
38
39use nautilus_common::{
40 messages::{
41 DataEvent, DataResponse,
42 data::{RequestBars, SubscribeBars, SubscribeBookDeltas, SubscribeTrades},
43 },
44 runner::set_data_event_sender,
45};
46use nautilus_core::{UUID4, UnixNanos, time::get_atomic_clock_realtime};
47use nautilus_data::client::DataClient;
48use nautilus_dydx::{
49 common::consts::{DYDX_TESTNET_HTTP_URL, DYDX_TESTNET_WS_URL, DYDX_VENUE},
50 config::DydxDataClientConfig,
51 data::DydxDataClient,
52 http::client::DydxHttpClient,
53 websocket::client::DydxWebSocketClient,
54};
55use nautilus_model::{
56 data::{BarSpecification, BarType, Data},
57 enums::{BarAggregation, BookType, PriceType},
58 identifiers::{ClientId, InstrumentId},
59};
60use tokio::time::timeout;
61use tracing::level_filters::LevelFilter;
62
63#[tokio::main]
64async fn main() -> anyhow::Result<()> {
65 tracing_subscriber::fmt()
66 .with_max_level(LevelFilter::INFO)
67 .init();
68
69 tracing::info!("===== dYdX Live Integration Tests =====");
70 tracing::info!("");
71
72 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
73 set_data_event_sender(tx);
74
75 test_connect_and_subscribe(&mut rx).await?;
76 test_request_historical_bars(&mut rx).await?;
77 test_orderbook_snapshot_refresh(&mut rx).await?;
78 test_complete_data_flow(&mut rx).await?;
79 test_crossed_orderbook_detection(&mut rx).await?;
80
81 tracing::info!("");
82 tracing::info!("===== All Live Integration Tests Passed =====");
83
84 Ok(())
85}
86
87async fn test_connect_and_subscribe(
88 rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
89) -> anyhow::Result<()> {
90 let client_id = ClientId::from("DYDX-INT-DATA");
91
92 let config = DydxDataClientConfig {
93 is_testnet: true,
94 base_url_http: Some(
95 std::env::var("DYDX_HTTP_URL").unwrap_or_else(|_| DYDX_TESTNET_HTTP_URL.to_string()),
96 ),
97 base_url_ws: Some(
98 std::env::var("DYDX_WS_URL").unwrap_or_else(|_| DYDX_TESTNET_WS_URL.to_string()),
99 ),
100 ..Default::default()
101 };
102
103 let http_client = DydxHttpClient::new(
104 config.base_url_http.clone(),
105 config.http_timeout_secs,
106 config.http_proxy_url.clone(),
107 config.is_testnet,
108 None,
109 )?;
110
111 let ws_url = config
112 .base_url_ws
113 .clone()
114 .unwrap_or_else(|| DYDX_TESTNET_WS_URL.to_string());
115 let ws_client = DydxWebSocketClient::new_public(ws_url, Some(30));
116
117 let mut data_client = DydxDataClient::new(client_id, config, http_client, Some(ws_client))?;
118
119 data_client.connect().await?;
120 tracing::info!("Connected to dYdX testnet");
121
122 let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
123 let venue = *DYDX_VENUE;
124 let ts_init: UnixNanos = get_atomic_clock_realtime().get_time_ns();
125 let command_id = UUID4::new();
126
127 let subscribe_trades = SubscribeTrades::new(
128 instrument_id,
129 Some(client_id),
130 Some(venue),
131 command_id,
132 ts_init,
133 None,
134 );
135 data_client.subscribe_trades(&subscribe_trades)?;
136 tracing::info!("Subscribed to trades");
137
138 let subscribe_book = SubscribeBookDeltas::new(
139 instrument_id,
140 BookType::L2_MBP,
141 Some(client_id),
142 Some(venue),
143 command_id,
144 ts_init,
145 None,
146 false,
147 None,
148 );
149 data_client.subscribe_book_deltas(&subscribe_book)?;
150 tracing::info!("Subscribed to order book deltas");
151
152 let bar_spec = BarSpecification {
153 step: std::num::NonZeroUsize::new(1).unwrap(),
154 aggregation: BarAggregation::Minute,
155 price_type: PriceType::Last,
156 };
157 let bar_type = BarType::new(
158 instrument_id,
159 bar_spec,
160 nautilus_model::enums::AggregationSource::External,
161 );
162 let subscribe_bars = SubscribeBars::new(
163 bar_type,
164 Some(client_id),
165 Some(venue),
166 command_id,
167 ts_init,
168 None,
169 );
170 data_client.subscribe_bars(&subscribe_bars)?;
171 tracing::info!("Subscribed to 1-minute bars");
172
173 match timeout(Duration::from_secs(30), rx.recv()).await {
174 Ok(Some(_)) => {
175 tracing::info!("Received data event from testnet");
176 }
177 Ok(None) => anyhow::bail!("data channel closed before receiving DataEvent"),
178 Err(_) => anyhow::bail!("timed out waiting for DataEvent"),
179 }
180
181 data_client.disconnect().await?;
182 tracing::info!("Disconnected");
183 tracing::info!("");
184
185 Ok(())
186}
187
188async fn test_request_historical_bars(
189 rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
190) -> anyhow::Result<()> {
191 let client_id = ClientId::from("DYDX-INT-BARS");
192
193 let config = DydxDataClientConfig {
194 is_testnet: true,
195 base_url_http: Some(
196 std::env::var("DYDX_HTTP_URL").unwrap_or_else(|_| DYDX_TESTNET_HTTP_URL.to_string()),
197 ),
198 ..Default::default()
199 };
200
201 let http_client = DydxHttpClient::new(
202 config.base_url_http.clone(),
203 config.http_timeout_secs,
204 config.http_proxy_url.clone(),
205 config.is_testnet,
206 None,
207 )?;
208
209 let mut data_client = DydxDataClient::new(client_id, config, http_client, None)?;
210
211 data_client.connect().await?;
214
215 let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
216 let bar_spec = BarSpecification {
217 step: std::num::NonZeroUsize::new(1).unwrap(),
218 aggregation: BarAggregation::Minute,
219 price_type: PriceType::Last,
220 };
221 let bar_type = BarType::new(
222 instrument_id,
223 bar_spec,
224 nautilus_model::enums::AggregationSource::External,
225 );
226
227 let now = chrono::Utc::now();
228 let small_start = Some(now - chrono::Duration::hours(1));
229 let small_end = Some(now);
230 let large_start = Some(now - chrono::Duration::hours(24));
231 let large_end = Some(now);
232
233 let ts_init = get_atomic_clock_realtime().get_time_ns();
234
235 let small_request = RequestBars::new(
236 bar_type,
237 small_start,
238 small_end,
239 Some(std::num::NonZeroUsize::new(100).unwrap()),
240 Some(client_id),
241 UUID4::new(),
242 ts_init,
243 None,
244 );
245 data_client.request_bars(&small_request)?;
246 tracing::info!("Requested 1-hour bar range");
247
248 let large_request = RequestBars::new(
249 bar_type,
250 large_start,
251 large_end,
252 Some(std::num::NonZeroUsize::new(5_000).unwrap()),
253 Some(client_id),
254 UUID4::new(),
255 ts_init,
256 None,
257 );
258 data_client.request_bars(&large_request)?;
259 tracing::info!("Requested 24-hour bar range (partitioned)");
260
261 let mut saw_bars_response = false;
262 let timeout_at = Duration::from_secs(60);
263
264 while !saw_bars_response {
265 let event = match timeout(timeout_at, rx.recv()).await {
266 Ok(Some(ev)) => ev,
267 Ok(None) => break,
268 Err(_) => break,
269 };
270
271 if let DataEvent::Response(DataResponse::Bars(_)) = event {
272 saw_bars_response = true;
273 }
274 }
275
276 if saw_bars_response {
277 tracing::info!("Received BarsResponse");
278 } else {
279 anyhow::bail!("expected at least one BarsResponse from dYdX testnet");
280 }
281
282 tracing::info!("");
283
284 Ok(())
285}
286
287async fn test_orderbook_snapshot_refresh(
288 rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
289) -> anyhow::Result<()> {
290 let client_id = ClientId::from("DYDX-INT-SNAPSHOT");
291
292 let config = DydxDataClientConfig {
293 is_testnet: true,
294 orderbook_refresh_interval_secs: Some(10),
295 base_url_http: Some(
296 std::env::var("DYDX_HTTP_URL").unwrap_or_else(|_| DYDX_TESTNET_HTTP_URL.to_string()),
297 ),
298 base_url_ws: Some(
299 std::env::var("DYDX_WS_URL").unwrap_or_else(|_| DYDX_TESTNET_WS_URL.to_string()),
300 ),
301 ..Default::default()
302 };
303
304 let http_client = DydxHttpClient::new(
305 config.base_url_http.clone(),
306 config.http_timeout_secs,
307 config.http_proxy_url.clone(),
308 config.is_testnet,
309 None,
310 )?;
311
312 let ws_url = config
313 .base_url_ws
314 .clone()
315 .unwrap_or_else(|| DYDX_TESTNET_WS_URL.to_string());
316 let ws_client = DydxWebSocketClient::new_public(ws_url, Some(30));
317
318 let mut data_client = DydxDataClient::new(client_id, config, http_client, Some(ws_client))?;
319
320 data_client.connect().await?;
321
322 let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
323 let venue = *DYDX_VENUE;
324 let ts_init: UnixNanos = get_atomic_clock_realtime().get_time_ns();
325 let command_id = UUID4::new();
326
327 let subscribe_book = SubscribeBookDeltas::new(
328 instrument_id,
329 BookType::L2_MBP,
330 Some(client_id),
331 Some(venue),
332 command_id,
333 ts_init,
334 None,
335 false,
336 None,
337 );
338 data_client.subscribe_book_deltas(&subscribe_book)?;
339 tracing::info!("Subscribed to order book with 10s refresh interval");
340
341 let mut orderbook_updates = 0;
342 let timeout_at = Duration::from_secs(30);
343 let start = tokio::time::Instant::now();
344
345 while start.elapsed() < timeout_at && orderbook_updates < 5 {
346 match timeout(Duration::from_secs(5), rx.recv()).await {
347 Ok(Some(DataEvent::Data(_))) => {
348 orderbook_updates += 1;
349 }
350 Ok(Some(_)) => continue,
351 Ok(None) => break,
352 Err(_) => continue,
353 }
354 }
355
356 data_client.disconnect().await?;
357
358 if orderbook_updates >= 3 {
359 tracing::info!("Received {} orderbook updates", orderbook_updates);
360 } else {
361 anyhow::bail!(
362 "expected at least 3 orderbook updates, got {}",
363 orderbook_updates
364 );
365 }
366
367 tracing::info!("");
368
369 Ok(())
370}
371
372async fn test_complete_data_flow(
373 rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
374) -> anyhow::Result<()> {
375 let client_id = ClientId::from("DYDX-INT-FLOW");
376
377 let config = DydxDataClientConfig {
378 is_testnet: true,
379 base_url_http: Some(
380 std::env::var("DYDX_HTTP_URL").unwrap_or_else(|_| DYDX_TESTNET_HTTP_URL.to_string()),
381 ),
382 base_url_ws: Some(
383 std::env::var("DYDX_WS_URL").unwrap_or_else(|_| DYDX_TESTNET_WS_URL.to_string()),
384 ),
385 ..Default::default()
386 };
387
388 let http_client = DydxHttpClient::new(
389 config.base_url_http.clone(),
390 config.http_timeout_secs,
391 config.http_proxy_url.clone(),
392 config.is_testnet,
393 None,
394 )?;
395
396 let ws_url = config
397 .base_url_ws
398 .clone()
399 .unwrap_or_else(|| DYDX_TESTNET_WS_URL.to_string());
400 let ws_client = DydxWebSocketClient::new_public(ws_url, Some(30));
401
402 let mut data_client = DydxDataClient::new(client_id, config, http_client, Some(ws_client))?;
403
404 data_client.connect().await?;
405
406 let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
407 let venue = *DYDX_VENUE;
408 let ts_init: UnixNanos = get_atomic_clock_realtime().get_time_ns();
409 let command_id = UUID4::new();
410
411 let subscribe_trades = SubscribeTrades::new(
412 instrument_id,
413 Some(client_id),
414 Some(venue),
415 command_id,
416 ts_init,
417 None,
418 );
419 data_client.subscribe_trades(&subscribe_trades)?;
420
421 let subscribe_book = SubscribeBookDeltas::new(
422 instrument_id,
423 BookType::L2_MBP,
424 Some(client_id),
425 Some(venue),
426 command_id,
427 ts_init,
428 None,
429 false,
430 None,
431 );
432 data_client.subscribe_book_deltas(&subscribe_book)?;
433
434 let bar_spec = BarSpecification {
435 step: std::num::NonZeroUsize::new(1).unwrap(),
436 aggregation: BarAggregation::Minute,
437 price_type: PriceType::Last,
438 };
439 let bar_type = BarType::new(
440 instrument_id,
441 bar_spec,
442 nautilus_model::enums::AggregationSource::External,
443 );
444 let subscribe_bars = SubscribeBars::new(
445 bar_type,
446 Some(client_id),
447 Some(venue),
448 command_id,
449 ts_init,
450 None,
451 );
452 data_client.subscribe_bars(&subscribe_bars)?;
453
454 let mut saw_trade = false;
455 let mut saw_orderbook = false;
456 let mut saw_bar = false;
457
458 let timeout_at = Duration::from_secs(60);
459 let start = tokio::time::Instant::now();
460
461 while start.elapsed() < timeout_at && (!saw_trade || !saw_orderbook || !saw_bar) {
462 match timeout(Duration::from_secs(10), rx.recv()).await {
463 Ok(Some(DataEvent::Data(data))) => match data {
464 Data::Trade(_) => {
465 if !saw_trade {
466 tracing::info!("Received trade data");
467 saw_trade = true;
468 }
469 }
470 Data::Delta(_) | Data::Deltas(_) => {
471 if !saw_orderbook {
472 tracing::info!("Received orderbook data");
473 saw_orderbook = true;
474 }
475 }
476 Data::Bar(_) => {
477 if !saw_bar {
478 tracing::info!("Received bar data");
479 saw_bar = true;
480 }
481 }
482 _ => {}
483 },
484 Ok(Some(_)) => continue,
485 Ok(None) => break,
486 Err(_) => continue,
487 }
488 }
489
490 data_client.disconnect().await?;
491
492 if !saw_trade {
493 anyhow::bail!("expected to receive at least one trade tick");
494 }
495 if !saw_orderbook {
496 anyhow::bail!("expected to receive at least one orderbook update");
497 }
498 if !saw_bar {
499 anyhow::bail!("expected to receive at least one bar");
500 }
501
502 tracing::info!("");
503
504 Ok(())
505}
506
507async fn test_crossed_orderbook_detection(
508 rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
509) -> anyhow::Result<()> {
510 let client_id = ClientId::from("DYDX-INT-CROSSED");
511
512 let config = DydxDataClientConfig {
513 is_testnet: true,
514 base_url_http: Some(
515 std::env::var("DYDX_HTTP_URL").unwrap_or_else(|_| DYDX_TESTNET_HTTP_URL.to_string()),
516 ),
517 base_url_ws: Some(
518 std::env::var("DYDX_WS_URL").unwrap_or_else(|_| DYDX_TESTNET_WS_URL.to_string()),
519 ),
520 ..Default::default()
521 };
522
523 let http_client = DydxHttpClient::new(
524 config.base_url_http.clone(),
525 config.http_timeout_secs,
526 config.http_proxy_url.clone(),
527 config.is_testnet,
528 None,
529 )?;
530
531 let ws_url = config
532 .base_url_ws
533 .clone()
534 .unwrap_or_else(|| DYDX_TESTNET_WS_URL.to_string());
535 let ws_client = DydxWebSocketClient::new_public(ws_url, Some(30));
536
537 let mut data_client = DydxDataClient::new(client_id, config, http_client, Some(ws_client))?;
538
539 data_client.connect().await?;
540
541 let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
542 let venue = *DYDX_VENUE;
543 let ts_init = get_atomic_clock_realtime().get_time_ns();
544 let command_id = UUID4::new();
545
546 let subscribe_book = SubscribeBookDeltas::new(
547 instrument_id,
548 BookType::L2_MBP,
549 Some(client_id),
550 Some(venue),
551 command_id,
552 ts_init,
553 None,
554 false,
555 None,
556 );
557 data_client.subscribe_book_deltas(&subscribe_book)?;
558
559 let mut saw_orderbook_delta = false;
560 let mut quote_count = 0;
561
562 let _test_result = timeout(Duration::from_secs(30), async {
563 loop {
564 match rx.try_recv() {
565 Ok(DataEvent::Data(data)) => {
566 let data_type_str = format!("{:?}", data);
567 if data_type_str.contains("OrderBookDeltas") {
568 saw_orderbook_delta = true;
569 } else if data_type_str.contains("QuoteTick") {
570 quote_count += 1;
571 }
572 }
573 Ok(_) => continue,
574 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {
575 tokio::time::sleep(Duration::from_millis(100)).await;
576 continue;
577 }
578 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => break,
579 }
580 }
581 })
582 .await;
583
584 data_client.disconnect().await?;
585
586 if saw_orderbook_delta {
587 tracing::info!("Monitored orderbook for crossed conditions");
588 tracing::info!(" Received {} quotes from deltas", quote_count);
589 } else {
590 anyhow::bail!("Expected to receive at least one orderbook delta");
591 }
592
593 tracing::info!("");
594
595 Ok(())
596}