dydx_live_integration/
live_integration.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live integration tests for dYdX data client against public testnet.
17//!
18//! These tests verify end-to-end data flow from dYdX testnet to NautilusTrader's
19//! data engine, including subscription management, orderbook handling, and data
20//! type conversions.
21//!
22//! Usage:
23//! ```bash
24//! # Run all live integration tests against testnet
25//! cargo run --bin dydx-live-integration -p nautilus-dydx
26//!
27//! # Override endpoints
28//! DYDX_HTTP_URL=https://indexer.v4testnet.dydx.exchange \
29//! DYDX_WS_URL=wss://indexer.v4testnet.dydx.exchange/v4/ws \
30//! cargo run --bin dydx-live-integration -p nautilus-dydx
31//! ```
32//!
33//! **Requirements**:
34//! - Outbound network access to dYdX v4 testnet indexer
35//! - No credentials required (public endpoints only)
36
37use std::time::Duration;
38
39use nautilus_common::{
40    live::runner::set_data_event_sender,
41    messages::{
42        DataEvent, DataResponse,
43        data::{RequestBars, SubscribeBars, SubscribeBookDeltas, SubscribeTrades},
44    },
45};
46use nautilus_core::{UUID4, UnixNanos, time::get_atomic_clock_realtime};
47use nautilus_data::client::DataClient;
48use nautilus_dydx::{
49    common::consts::{DYDX_TESTNET_HTTP_URL, DYDX_TESTNET_WS_URL, DYDX_VENUE},
50    config::DydxDataClientConfig,
51    data::DydxDataClient,
52    http::client::DydxHttpClient,
53    websocket::client::DydxWebSocketClient,
54};
55use nautilus_model::{
56    data::{BarSpecification, BarType, Data},
57    enums::{AggregationSource, BarAggregation, BookType, PriceType},
58    identifiers::{ClientId, InstrumentId},
59};
60use tracing::level_filters::LevelFilter;
61
62#[tokio::main]
63async fn main() -> anyhow::Result<()> {
64    tracing_subscriber::fmt()
65        .with_max_level(LevelFilter::INFO)
66        .init();
67
68    tracing::info!("===== dYdX Live Integration Tests =====");
69    tracing::info!("");
70
71    let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
72    set_data_event_sender(tx);
73
74    test_connect_and_subscribe(&mut rx).await?;
75    test_request_historical_bars(&mut rx).await?;
76    test_orderbook_snapshot_refresh(&mut rx).await?;
77    test_complete_data_flow(&mut rx).await?;
78    test_crossed_orderbook_detection(&mut rx).await?;
79
80    tracing::info!("");
81    tracing::info!("===== All Live Integration Tests Passed =====");
82
83    Ok(())
84}
85
86async fn test_connect_and_subscribe(
87    rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
88) -> anyhow::Result<()> {
89    let client_id = ClientId::from("DYDX-INT-DATA");
90
91    let config = DydxDataClientConfig {
92        is_testnet: true,
93        base_url_http: Some(
94            std::env::var("DYDX_HTTP_URL").unwrap_or_else(|_| DYDX_TESTNET_HTTP_URL.to_string()),
95        ),
96        base_url_ws: Some(
97            std::env::var("DYDX_WS_URL").unwrap_or_else(|_| DYDX_TESTNET_WS_URL.to_string()),
98        ),
99        ..Default::default()
100    };
101
102    let http_client = DydxHttpClient::new(
103        config.base_url_http.clone(),
104        config.http_timeout_secs,
105        config.http_proxy_url.clone(),
106        config.is_testnet,
107        None,
108    )?;
109
110    let ws_url = config
111        .base_url_ws
112        .clone()
113        .unwrap_or_else(|| DYDX_TESTNET_WS_URL.to_string());
114    let ws_client = DydxWebSocketClient::new_public(ws_url, Some(30));
115
116    let mut data_client = DydxDataClient::new(client_id, config, http_client, Some(ws_client))?;
117
118    data_client.connect().await?;
119    tracing::info!("Connected to dYdX testnet");
120
121    let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
122    let venue = *DYDX_VENUE;
123    let ts_init: UnixNanos = get_atomic_clock_realtime().get_time_ns();
124    let command_id = UUID4::new();
125
126    let subscribe_trades = SubscribeTrades::new(
127        instrument_id,
128        Some(client_id),
129        Some(venue),
130        command_id,
131        ts_init,
132        None,
133    );
134    data_client.subscribe_trades(&subscribe_trades)?;
135    tracing::info!("Subscribed to trades");
136
137    let subscribe_book = SubscribeBookDeltas::new(
138        instrument_id,
139        BookType::L2_MBP,
140        Some(client_id),
141        Some(venue),
142        command_id,
143        ts_init,
144        None,
145        false,
146        None,
147    );
148    data_client.subscribe_book_deltas(&subscribe_book)?;
149    tracing::info!("Subscribed to order book deltas");
150
151    let bar_spec = BarSpecification {
152        step: std::num::NonZeroUsize::new(1).unwrap(),
153        aggregation: BarAggregation::Minute,
154        price_type: PriceType::Last,
155    };
156    let bar_type = BarType::new(instrument_id, bar_spec, AggregationSource::External);
157    let subscribe_bars = SubscribeBars::new(
158        bar_type,
159        Some(client_id),
160        Some(venue),
161        command_id,
162        ts_init,
163        None,
164    );
165    data_client.subscribe_bars(&subscribe_bars)?;
166    tracing::info!("Subscribed to 1-minute bars");
167
168    match tokio::time::timeout(Duration::from_secs(30), rx.recv()).await {
169        Ok(Some(_)) => {
170            tracing::info!("Received data event from testnet");
171        }
172        Ok(None) => anyhow::bail!("data channel closed before receiving DataEvent"),
173        Err(_) => anyhow::bail!("timed out waiting for DataEvent"),
174    }
175
176    data_client.disconnect().await?;
177    tracing::info!("Disconnected");
178    tracing::info!("");
179
180    Ok(())
181}
182
183async fn test_request_historical_bars(
184    rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
185) -> anyhow::Result<()> {
186    let client_id = ClientId::from("DYDX-INT-BARS");
187
188    let config = DydxDataClientConfig {
189        is_testnet: true,
190        base_url_http: Some(
191            std::env::var("DYDX_HTTP_URL").unwrap_or_else(|_| DYDX_TESTNET_HTTP_URL.to_string()),
192        ),
193        ..Default::default()
194    };
195
196    let http_client = DydxHttpClient::new(
197        config.base_url_http.clone(),
198        config.http_timeout_secs,
199        config.http_proxy_url.clone(),
200        config.is_testnet,
201        None,
202    )?;
203
204    let mut data_client = DydxDataClient::new(client_id, config, http_client, None)?;
205
206    // Connect to bootstrap instruments (required for bar conversion)
207    // Note: No WebSocket client provided, so only HTTP initialization occurs
208    data_client.connect().await?;
209
210    let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
211    let bar_spec = BarSpecification {
212        step: std::num::NonZeroUsize::new(1).unwrap(),
213        aggregation: BarAggregation::Minute,
214        price_type: PriceType::Last,
215    };
216    let bar_type = BarType::new(instrument_id, bar_spec, AggregationSource::External);
217
218    let now = chrono::Utc::now();
219    let small_start = Some(now - chrono::Duration::hours(1));
220    let small_end = Some(now);
221    let large_start = Some(now - chrono::Duration::hours(24));
222    let large_end = Some(now);
223
224    let ts_init = get_atomic_clock_realtime().get_time_ns();
225
226    let small_request = RequestBars::new(
227        bar_type,
228        small_start,
229        small_end,
230        Some(std::num::NonZeroUsize::new(100).unwrap()),
231        Some(client_id),
232        UUID4::new(),
233        ts_init,
234        None,
235    );
236    data_client.request_bars(&small_request)?;
237    tracing::info!("Requested 1-hour bar range");
238
239    let large_request = RequestBars::new(
240        bar_type,
241        large_start,
242        large_end,
243        Some(std::num::NonZeroUsize::new(5_000).unwrap()),
244        Some(client_id),
245        UUID4::new(),
246        ts_init,
247        None,
248    );
249    data_client.request_bars(&large_request)?;
250    tracing::info!("Requested 24-hour bar range (partitioned)");
251
252    let mut saw_bars_response = false;
253    let timeout_at = Duration::from_secs(60);
254
255    while !saw_bars_response {
256        let event = match tokio::time::timeout(timeout_at, rx.recv()).await {
257            Ok(Some(ev)) => ev,
258            Ok(None) => break,
259            Err(_) => break,
260        };
261
262        if let DataEvent::Response(DataResponse::Bars(_)) = event {
263            saw_bars_response = true;
264        }
265    }
266
267    if saw_bars_response {
268        tracing::info!("Received BarsResponse");
269    } else {
270        anyhow::bail!("expected at least one BarsResponse from dYdX testnet");
271    }
272
273    tracing::info!("");
274
275    Ok(())
276}
277
278async fn test_orderbook_snapshot_refresh(
279    rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
280) -> anyhow::Result<()> {
281    let client_id = ClientId::from("DYDX-INT-SNAPSHOT");
282
283    let config = DydxDataClientConfig {
284        is_testnet: true,
285        orderbook_refresh_interval_secs: Some(10),
286        base_url_http: Some(
287            std::env::var("DYDX_HTTP_URL").unwrap_or_else(|_| DYDX_TESTNET_HTTP_URL.to_string()),
288        ),
289        base_url_ws: Some(
290            std::env::var("DYDX_WS_URL").unwrap_or_else(|_| DYDX_TESTNET_WS_URL.to_string()),
291        ),
292        ..Default::default()
293    };
294
295    let http_client = DydxHttpClient::new(
296        config.base_url_http.clone(),
297        config.http_timeout_secs,
298        config.http_proxy_url.clone(),
299        config.is_testnet,
300        None,
301    )?;
302
303    let ws_url = config
304        .base_url_ws
305        .clone()
306        .unwrap_or_else(|| DYDX_TESTNET_WS_URL.to_string());
307    let ws_client = DydxWebSocketClient::new_public(ws_url, Some(30));
308
309    let mut data_client = DydxDataClient::new(client_id, config, http_client, Some(ws_client))?;
310
311    data_client.connect().await?;
312
313    let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
314    let venue = *DYDX_VENUE;
315    let ts_init: UnixNanos = get_atomic_clock_realtime().get_time_ns();
316    let command_id = UUID4::new();
317
318    let subscribe_book = SubscribeBookDeltas::new(
319        instrument_id,
320        BookType::L2_MBP,
321        Some(client_id),
322        Some(venue),
323        command_id,
324        ts_init,
325        None,
326        false,
327        None,
328    );
329    data_client.subscribe_book_deltas(&subscribe_book)?;
330    tracing::info!("Subscribed to order book with 10s refresh interval");
331
332    let mut orderbook_updates = 0;
333    let timeout_at = Duration::from_secs(30);
334    let start = tokio::time::Instant::now();
335
336    while start.elapsed() < timeout_at && orderbook_updates < 5 {
337        match tokio::time::timeout(Duration::from_secs(5), rx.recv()).await {
338            Ok(Some(DataEvent::Data(_))) => {
339                orderbook_updates += 1;
340            }
341            Ok(Some(_)) => continue,
342            Ok(None) => break,
343            Err(_) => continue,
344        }
345    }
346
347    data_client.disconnect().await?;
348
349    if orderbook_updates >= 3 {
350        tracing::info!("Received {} orderbook updates", orderbook_updates);
351    } else {
352        anyhow::bail!("expected at least 3 orderbook updates, got {orderbook_updates}");
353    }
354
355    tracing::info!("");
356
357    Ok(())
358}
359
360async fn test_complete_data_flow(
361    rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
362) -> anyhow::Result<()> {
363    let client_id = ClientId::from("DYDX-INT-FLOW");
364
365    let config = DydxDataClientConfig {
366        is_testnet: true,
367        base_url_http: Some(
368            std::env::var("DYDX_HTTP_URL").unwrap_or_else(|_| DYDX_TESTNET_HTTP_URL.to_string()),
369        ),
370        base_url_ws: Some(
371            std::env::var("DYDX_WS_URL").unwrap_or_else(|_| DYDX_TESTNET_WS_URL.to_string()),
372        ),
373        ..Default::default()
374    };
375
376    let http_client = DydxHttpClient::new(
377        config.base_url_http.clone(),
378        config.http_timeout_secs,
379        config.http_proxy_url.clone(),
380        config.is_testnet,
381        None,
382    )?;
383
384    let ws_url = config
385        .base_url_ws
386        .clone()
387        .unwrap_or_else(|| DYDX_TESTNET_WS_URL.to_string());
388    let ws_client = DydxWebSocketClient::new_public(ws_url, Some(30));
389
390    let mut data_client = DydxDataClient::new(client_id, config, http_client, Some(ws_client))?;
391
392    data_client.connect().await?;
393
394    let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
395    let venue = *DYDX_VENUE;
396    let ts_init: UnixNanos = get_atomic_clock_realtime().get_time_ns();
397    let command_id = UUID4::new();
398
399    let subscribe_trades = SubscribeTrades::new(
400        instrument_id,
401        Some(client_id),
402        Some(venue),
403        command_id,
404        ts_init,
405        None,
406    );
407    data_client.subscribe_trades(&subscribe_trades)?;
408
409    let subscribe_book = SubscribeBookDeltas::new(
410        instrument_id,
411        BookType::L2_MBP,
412        Some(client_id),
413        Some(venue),
414        command_id,
415        ts_init,
416        None,
417        false,
418        None,
419    );
420    data_client.subscribe_book_deltas(&subscribe_book)?;
421
422    let bar_spec = BarSpecification {
423        step: std::num::NonZeroUsize::new(1).unwrap(),
424        aggregation: BarAggregation::Minute,
425        price_type: PriceType::Last,
426    };
427    let bar_type = BarType::new(instrument_id, bar_spec, AggregationSource::External);
428    let subscribe_bars = SubscribeBars::new(
429        bar_type,
430        Some(client_id),
431        Some(venue),
432        command_id,
433        ts_init,
434        None,
435    );
436    data_client.subscribe_bars(&subscribe_bars)?;
437
438    let mut saw_trade = false;
439    let mut saw_orderbook = false;
440    let mut saw_bar = false;
441
442    let timeout_at = Duration::from_secs(60);
443    let start = tokio::time::Instant::now();
444
445    while start.elapsed() < timeout_at && (!saw_trade || !saw_orderbook || !saw_bar) {
446        match tokio::time::timeout(Duration::from_secs(10), rx.recv()).await {
447            Ok(Some(DataEvent::Data(data))) => match data {
448                Data::Trade(_) => {
449                    if !saw_trade {
450                        tracing::info!("Received trade data");
451                        saw_trade = true;
452                    }
453                }
454                Data::Delta(_) | Data::Deltas(_) => {
455                    if !saw_orderbook {
456                        tracing::info!("Received orderbook data");
457                        saw_orderbook = true;
458                    }
459                }
460                Data::Bar(_) => {
461                    if !saw_bar {
462                        tracing::info!("Received bar data");
463                        saw_bar = true;
464                    }
465                }
466                _ => {}
467            },
468            Ok(Some(_)) => continue,
469            Ok(None) => break,
470            Err(_) => continue,
471        }
472    }
473
474    data_client.disconnect().await?;
475
476    if !saw_trade {
477        anyhow::bail!("expected to receive at least one trade tick");
478    }
479    if !saw_orderbook {
480        anyhow::bail!("expected to receive at least one orderbook update");
481    }
482    if !saw_bar {
483        anyhow::bail!("expected to receive at least one bar");
484    }
485
486    tracing::info!("");
487
488    Ok(())
489}
490
491async fn test_crossed_orderbook_detection(
492    rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
493) -> anyhow::Result<()> {
494    let client_id = ClientId::from("DYDX-INT-CROSSED");
495
496    let config = DydxDataClientConfig {
497        is_testnet: true,
498        base_url_http: Some(
499            std::env::var("DYDX_HTTP_URL").unwrap_or_else(|_| DYDX_TESTNET_HTTP_URL.to_string()),
500        ),
501        base_url_ws: Some(
502            std::env::var("DYDX_WS_URL").unwrap_or_else(|_| DYDX_TESTNET_WS_URL.to_string()),
503        ),
504        ..Default::default()
505    };
506
507    let http_client = DydxHttpClient::new(
508        config.base_url_http.clone(),
509        config.http_timeout_secs,
510        config.http_proxy_url.clone(),
511        config.is_testnet,
512        None,
513    )?;
514
515    let ws_url = config
516        .base_url_ws
517        .clone()
518        .unwrap_or_else(|| DYDX_TESTNET_WS_URL.to_string());
519    let ws_client = DydxWebSocketClient::new_public(ws_url, Some(30));
520
521    let mut data_client = DydxDataClient::new(client_id, config, http_client, Some(ws_client))?;
522
523    data_client.connect().await?;
524
525    let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
526    let venue = *DYDX_VENUE;
527    let ts_init = get_atomic_clock_realtime().get_time_ns();
528    let command_id = UUID4::new();
529
530    let subscribe_book = SubscribeBookDeltas::new(
531        instrument_id,
532        BookType::L2_MBP,
533        Some(client_id),
534        Some(venue),
535        command_id,
536        ts_init,
537        None,
538        false,
539        None,
540    );
541    data_client.subscribe_book_deltas(&subscribe_book)?;
542
543    let mut saw_orderbook_delta = false;
544    let mut quote_count = 0;
545
546    let _test_result = tokio::time::timeout(Duration::from_secs(30), async {
547        loop {
548            match rx.try_recv() {
549                Ok(DataEvent::Data(data)) => {
550                    let data_type_str = format!("{data:?}");
551                    if data_type_str.contains("OrderBookDeltas") {
552                        saw_orderbook_delta = true;
553                    } else if data_type_str.contains("QuoteTick") {
554                        quote_count += 1;
555                    }
556                }
557                Ok(_) => continue,
558                Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {
559                    tokio::time::sleep(Duration::from_millis(100)).await;
560                    continue;
561                }
562                Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => break,
563            }
564        }
565    })
566    .await;
567
568    data_client.disconnect().await?;
569
570    if saw_orderbook_delta {
571        tracing::info!("Monitored orderbook for crossed conditions");
572        tracing::info!("  Received {} quotes from deltas", quote_count);
573    } else {
574        anyhow::bail!("Expected to receive at least one orderbook delta");
575    }
576
577    tracing::info!("");
578
579    Ok(())
580}