dydx_live_integration/
live_integration.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live integration tests for dYdX data client against public testnet.
17//!
18//! These tests verify end-to-end data flow from dYdX testnet to NautilusTrader's
19//! data engine, including subscription management, orderbook handling, and data
20//! type conversions.
21//!
22//! Usage:
23//! ```bash
24//! # Run all live integration tests against testnet
25//! cargo run --bin dydx-live-integration -p nautilus-dydx
26//!
27//! # Override endpoints
28//! DYDX_HTTP_URL=https://indexer.v4testnet.dydx.exchange \
29//! DYDX_WS_URL=wss://indexer.v4testnet.dydx.exchange/v4/ws \
30//! cargo run --bin dydx-live-integration -p nautilus-dydx
31//! ```
32//!
33//! **Requirements**:
34//! - Outbound network access to dYdX v4 testnet indexer
35//! - No credentials required (public endpoints only)
36
37use std::time::Duration;
38
39use nautilus_common::{
40    messages::{
41        DataEvent, DataResponse,
42        data::{RequestBars, SubscribeBars, SubscribeBookDeltas, SubscribeTrades},
43    },
44    runner::set_data_event_sender,
45};
46use nautilus_core::{UUID4, UnixNanos, time::get_atomic_clock_realtime};
47use nautilus_data::client::DataClient;
48use nautilus_dydx::{
49    common::consts::{DYDX_TESTNET_HTTP_URL, DYDX_TESTNET_WS_URL, DYDX_VENUE},
50    config::DydxDataClientConfig,
51    data::DydxDataClient,
52    http::client::DydxHttpClient,
53    websocket::client::DydxWebSocketClient,
54};
55use nautilus_model::{
56    data::{BarSpecification, BarType, Data},
57    enums::{BarAggregation, BookType, PriceType},
58    identifiers::{ClientId, InstrumentId},
59};
60use tokio::time::timeout;
61use tracing::level_filters::LevelFilter;
62
63#[tokio::main]
64async fn main() -> anyhow::Result<()> {
65    tracing_subscriber::fmt()
66        .with_max_level(LevelFilter::INFO)
67        .init();
68
69    tracing::info!("===== dYdX Live Integration Tests =====");
70    tracing::info!("");
71
72    let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
73    set_data_event_sender(tx);
74
75    test_connect_and_subscribe(&mut rx).await?;
76    test_request_historical_bars(&mut rx).await?;
77    test_orderbook_snapshot_refresh(&mut rx).await?;
78    test_complete_data_flow(&mut rx).await?;
79    test_crossed_orderbook_detection(&mut rx).await?;
80
81    tracing::info!("");
82    tracing::info!("===== All Live Integration Tests Passed =====");
83
84    Ok(())
85}
86
87async fn test_connect_and_subscribe(
88    rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
89) -> anyhow::Result<()> {
90    let client_id = ClientId::from("DYDX-INT-DATA");
91
92    let config = DydxDataClientConfig {
93        is_testnet: true,
94        base_url_http: Some(
95            std::env::var("DYDX_HTTP_URL").unwrap_or_else(|_| DYDX_TESTNET_HTTP_URL.to_string()),
96        ),
97        base_url_ws: Some(
98            std::env::var("DYDX_WS_URL").unwrap_or_else(|_| DYDX_TESTNET_WS_URL.to_string()),
99        ),
100        ..Default::default()
101    };
102
103    let http_client = DydxHttpClient::new(
104        config.base_url_http.clone(),
105        config.http_timeout_secs,
106        config.http_proxy_url.clone(),
107        config.is_testnet,
108        None,
109    )?;
110
111    let ws_url = config
112        .base_url_ws
113        .clone()
114        .unwrap_or_else(|| DYDX_TESTNET_WS_URL.to_string());
115    let ws_client = DydxWebSocketClient::new_public(ws_url, Some(30));
116
117    let mut data_client = DydxDataClient::new(client_id, config, http_client, Some(ws_client))?;
118
119    data_client.connect().await?;
120    tracing::info!("Connected to dYdX testnet");
121
122    let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
123    let venue = *DYDX_VENUE;
124    let ts_init: UnixNanos = get_atomic_clock_realtime().get_time_ns();
125    let command_id = UUID4::new();
126
127    let subscribe_trades = SubscribeTrades::new(
128        instrument_id,
129        Some(client_id),
130        Some(venue),
131        command_id,
132        ts_init,
133        None,
134    );
135    data_client.subscribe_trades(&subscribe_trades)?;
136    tracing::info!("Subscribed to trades");
137
138    let subscribe_book = SubscribeBookDeltas::new(
139        instrument_id,
140        BookType::L2_MBP,
141        Some(client_id),
142        Some(venue),
143        command_id,
144        ts_init,
145        None,
146        false,
147        None,
148    );
149    data_client.subscribe_book_deltas(&subscribe_book)?;
150    tracing::info!("Subscribed to order book deltas");
151
152    let bar_spec = BarSpecification {
153        step: std::num::NonZeroUsize::new(1).unwrap(),
154        aggregation: BarAggregation::Minute,
155        price_type: PriceType::Last,
156    };
157    let bar_type = BarType::new(
158        instrument_id,
159        bar_spec,
160        nautilus_model::enums::AggregationSource::External,
161    );
162    let subscribe_bars = SubscribeBars::new(
163        bar_type,
164        Some(client_id),
165        Some(venue),
166        command_id,
167        ts_init,
168        None,
169    );
170    data_client.subscribe_bars(&subscribe_bars)?;
171    tracing::info!("Subscribed to 1-minute bars");
172
173    match timeout(Duration::from_secs(30), rx.recv()).await {
174        Ok(Some(_)) => {
175            tracing::info!("Received data event from testnet");
176        }
177        Ok(None) => anyhow::bail!("data channel closed before receiving DataEvent"),
178        Err(_) => anyhow::bail!("timed out waiting for DataEvent"),
179    }
180
181    data_client.disconnect().await?;
182    tracing::info!("Disconnected");
183    tracing::info!("");
184
185    Ok(())
186}
187
188async fn test_request_historical_bars(
189    rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
190) -> anyhow::Result<()> {
191    let client_id = ClientId::from("DYDX-INT-BARS");
192
193    let config = DydxDataClientConfig {
194        is_testnet: true,
195        base_url_http: Some(
196            std::env::var("DYDX_HTTP_URL").unwrap_or_else(|_| DYDX_TESTNET_HTTP_URL.to_string()),
197        ),
198        ..Default::default()
199    };
200
201    let http_client = DydxHttpClient::new(
202        config.base_url_http.clone(),
203        config.http_timeout_secs,
204        config.http_proxy_url.clone(),
205        config.is_testnet,
206        None,
207    )?;
208
209    let mut data_client = DydxDataClient::new(client_id, config, http_client, None)?;
210
211    // Connect to bootstrap instruments (required for bar conversion)
212    // Note: No WebSocket client provided, so only HTTP initialization occurs
213    data_client.connect().await?;
214
215    let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
216    let bar_spec = BarSpecification {
217        step: std::num::NonZeroUsize::new(1).unwrap(),
218        aggregation: BarAggregation::Minute,
219        price_type: PriceType::Last,
220    };
221    let bar_type = BarType::new(
222        instrument_id,
223        bar_spec,
224        nautilus_model::enums::AggregationSource::External,
225    );
226
227    let now = chrono::Utc::now();
228    let small_start = Some(now - chrono::Duration::hours(1));
229    let small_end = Some(now);
230    let large_start = Some(now - chrono::Duration::hours(24));
231    let large_end = Some(now);
232
233    let ts_init = get_atomic_clock_realtime().get_time_ns();
234
235    let small_request = RequestBars::new(
236        bar_type,
237        small_start,
238        small_end,
239        Some(std::num::NonZeroUsize::new(100).unwrap()),
240        Some(client_id),
241        UUID4::new(),
242        ts_init,
243        None,
244    );
245    data_client.request_bars(&small_request)?;
246    tracing::info!("Requested 1-hour bar range");
247
248    let large_request = RequestBars::new(
249        bar_type,
250        large_start,
251        large_end,
252        Some(std::num::NonZeroUsize::new(5_000).unwrap()),
253        Some(client_id),
254        UUID4::new(),
255        ts_init,
256        None,
257    );
258    data_client.request_bars(&large_request)?;
259    tracing::info!("Requested 24-hour bar range (partitioned)");
260
261    let mut saw_bars_response = false;
262    let timeout_at = Duration::from_secs(60);
263
264    while !saw_bars_response {
265        let event = match timeout(timeout_at, rx.recv()).await {
266            Ok(Some(ev)) => ev,
267            Ok(None) => break,
268            Err(_) => break,
269        };
270
271        if let DataEvent::Response(DataResponse::Bars(_)) = event {
272            saw_bars_response = true;
273        }
274    }
275
276    if saw_bars_response {
277        tracing::info!("Received BarsResponse");
278    } else {
279        anyhow::bail!("expected at least one BarsResponse from dYdX testnet");
280    }
281
282    tracing::info!("");
283
284    Ok(())
285}
286
287async fn test_orderbook_snapshot_refresh(
288    rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
289) -> anyhow::Result<()> {
290    let client_id = ClientId::from("DYDX-INT-SNAPSHOT");
291
292    let config = DydxDataClientConfig {
293        is_testnet: true,
294        orderbook_refresh_interval_secs: Some(10),
295        base_url_http: Some(
296            std::env::var("DYDX_HTTP_URL").unwrap_or_else(|_| DYDX_TESTNET_HTTP_URL.to_string()),
297        ),
298        base_url_ws: Some(
299            std::env::var("DYDX_WS_URL").unwrap_or_else(|_| DYDX_TESTNET_WS_URL.to_string()),
300        ),
301        ..Default::default()
302    };
303
304    let http_client = DydxHttpClient::new(
305        config.base_url_http.clone(),
306        config.http_timeout_secs,
307        config.http_proxy_url.clone(),
308        config.is_testnet,
309        None,
310    )?;
311
312    let ws_url = config
313        .base_url_ws
314        .clone()
315        .unwrap_or_else(|| DYDX_TESTNET_WS_URL.to_string());
316    let ws_client = DydxWebSocketClient::new_public(ws_url, Some(30));
317
318    let mut data_client = DydxDataClient::new(client_id, config, http_client, Some(ws_client))?;
319
320    data_client.connect().await?;
321
322    let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
323    let venue = *DYDX_VENUE;
324    let ts_init: UnixNanos = get_atomic_clock_realtime().get_time_ns();
325    let command_id = UUID4::new();
326
327    let subscribe_book = SubscribeBookDeltas::new(
328        instrument_id,
329        BookType::L2_MBP,
330        Some(client_id),
331        Some(venue),
332        command_id,
333        ts_init,
334        None,
335        false,
336        None,
337    );
338    data_client.subscribe_book_deltas(&subscribe_book)?;
339    tracing::info!("Subscribed to order book with 10s refresh interval");
340
341    let mut orderbook_updates = 0;
342    let timeout_at = Duration::from_secs(30);
343    let start = tokio::time::Instant::now();
344
345    while start.elapsed() < timeout_at && orderbook_updates < 5 {
346        match timeout(Duration::from_secs(5), rx.recv()).await {
347            Ok(Some(DataEvent::Data(_))) => {
348                orderbook_updates += 1;
349            }
350            Ok(Some(_)) => continue,
351            Ok(None) => break,
352            Err(_) => continue,
353        }
354    }
355
356    data_client.disconnect().await?;
357
358    if orderbook_updates >= 3 {
359        tracing::info!("Received {} orderbook updates", orderbook_updates);
360    } else {
361        anyhow::bail!(
362            "expected at least 3 orderbook updates, got {}",
363            orderbook_updates
364        );
365    }
366
367    tracing::info!("");
368
369    Ok(())
370}
371
372async fn test_complete_data_flow(
373    rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
374) -> anyhow::Result<()> {
375    let client_id = ClientId::from("DYDX-INT-FLOW");
376
377    let config = DydxDataClientConfig {
378        is_testnet: true,
379        base_url_http: Some(
380            std::env::var("DYDX_HTTP_URL").unwrap_or_else(|_| DYDX_TESTNET_HTTP_URL.to_string()),
381        ),
382        base_url_ws: Some(
383            std::env::var("DYDX_WS_URL").unwrap_or_else(|_| DYDX_TESTNET_WS_URL.to_string()),
384        ),
385        ..Default::default()
386    };
387
388    let http_client = DydxHttpClient::new(
389        config.base_url_http.clone(),
390        config.http_timeout_secs,
391        config.http_proxy_url.clone(),
392        config.is_testnet,
393        None,
394    )?;
395
396    let ws_url = config
397        .base_url_ws
398        .clone()
399        .unwrap_or_else(|| DYDX_TESTNET_WS_URL.to_string());
400    let ws_client = DydxWebSocketClient::new_public(ws_url, Some(30));
401
402    let mut data_client = DydxDataClient::new(client_id, config, http_client, Some(ws_client))?;
403
404    data_client.connect().await?;
405
406    let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
407    let venue = *DYDX_VENUE;
408    let ts_init: UnixNanos = get_atomic_clock_realtime().get_time_ns();
409    let command_id = UUID4::new();
410
411    let subscribe_trades = SubscribeTrades::new(
412        instrument_id,
413        Some(client_id),
414        Some(venue),
415        command_id,
416        ts_init,
417        None,
418    );
419    data_client.subscribe_trades(&subscribe_trades)?;
420
421    let subscribe_book = SubscribeBookDeltas::new(
422        instrument_id,
423        BookType::L2_MBP,
424        Some(client_id),
425        Some(venue),
426        command_id,
427        ts_init,
428        None,
429        false,
430        None,
431    );
432    data_client.subscribe_book_deltas(&subscribe_book)?;
433
434    let bar_spec = BarSpecification {
435        step: std::num::NonZeroUsize::new(1).unwrap(),
436        aggregation: BarAggregation::Minute,
437        price_type: PriceType::Last,
438    };
439    let bar_type = BarType::new(
440        instrument_id,
441        bar_spec,
442        nautilus_model::enums::AggregationSource::External,
443    );
444    let subscribe_bars = SubscribeBars::new(
445        bar_type,
446        Some(client_id),
447        Some(venue),
448        command_id,
449        ts_init,
450        None,
451    );
452    data_client.subscribe_bars(&subscribe_bars)?;
453
454    let mut saw_trade = false;
455    let mut saw_orderbook = false;
456    let mut saw_bar = false;
457
458    let timeout_at = Duration::from_secs(60);
459    let start = tokio::time::Instant::now();
460
461    while start.elapsed() < timeout_at && (!saw_trade || !saw_orderbook || !saw_bar) {
462        match timeout(Duration::from_secs(10), rx.recv()).await {
463            Ok(Some(DataEvent::Data(data))) => match data {
464                Data::Trade(_) => {
465                    if !saw_trade {
466                        tracing::info!("Received trade data");
467                        saw_trade = true;
468                    }
469                }
470                Data::Delta(_) | Data::Deltas(_) => {
471                    if !saw_orderbook {
472                        tracing::info!("Received orderbook data");
473                        saw_orderbook = true;
474                    }
475                }
476                Data::Bar(_) => {
477                    if !saw_bar {
478                        tracing::info!("Received bar data");
479                        saw_bar = true;
480                    }
481                }
482                _ => {}
483            },
484            Ok(Some(_)) => continue,
485            Ok(None) => break,
486            Err(_) => continue,
487        }
488    }
489
490    data_client.disconnect().await?;
491
492    if !saw_trade {
493        anyhow::bail!("expected to receive at least one trade tick");
494    }
495    if !saw_orderbook {
496        anyhow::bail!("expected to receive at least one orderbook update");
497    }
498    if !saw_bar {
499        anyhow::bail!("expected to receive at least one bar");
500    }
501
502    tracing::info!("");
503
504    Ok(())
505}
506
507async fn test_crossed_orderbook_detection(
508    rx: &mut tokio::sync::mpsc::UnboundedReceiver<DataEvent>,
509) -> anyhow::Result<()> {
510    let client_id = ClientId::from("DYDX-INT-CROSSED");
511
512    let config = DydxDataClientConfig {
513        is_testnet: true,
514        base_url_http: Some(
515            std::env::var("DYDX_HTTP_URL").unwrap_or_else(|_| DYDX_TESTNET_HTTP_URL.to_string()),
516        ),
517        base_url_ws: Some(
518            std::env::var("DYDX_WS_URL").unwrap_or_else(|_| DYDX_TESTNET_WS_URL.to_string()),
519        ),
520        ..Default::default()
521    };
522
523    let http_client = DydxHttpClient::new(
524        config.base_url_http.clone(),
525        config.http_timeout_secs,
526        config.http_proxy_url.clone(),
527        config.is_testnet,
528        None,
529    )?;
530
531    let ws_url = config
532        .base_url_ws
533        .clone()
534        .unwrap_or_else(|| DYDX_TESTNET_WS_URL.to_string());
535    let ws_client = DydxWebSocketClient::new_public(ws_url, Some(30));
536
537    let mut data_client = DydxDataClient::new(client_id, config, http_client, Some(ws_client))?;
538
539    data_client.connect().await?;
540
541    let instrument_id = InstrumentId::from("BTC-USD-PERP.DYDX");
542    let venue = *DYDX_VENUE;
543    let ts_init = get_atomic_clock_realtime().get_time_ns();
544    let command_id = UUID4::new();
545
546    let subscribe_book = SubscribeBookDeltas::new(
547        instrument_id,
548        BookType::L2_MBP,
549        Some(client_id),
550        Some(venue),
551        command_id,
552        ts_init,
553        None,
554        false,
555        None,
556    );
557    data_client.subscribe_book_deltas(&subscribe_book)?;
558
559    let mut saw_orderbook_delta = false;
560    let mut quote_count = 0;
561
562    let _test_result = timeout(Duration::from_secs(30), async {
563        loop {
564            match rx.try_recv() {
565                Ok(DataEvent::Data(data)) => {
566                    let data_type_str = format!("{:?}", data);
567                    if data_type_str.contains("OrderBookDeltas") {
568                        saw_orderbook_delta = true;
569                    } else if data_type_str.contains("QuoteTick") {
570                        quote_count += 1;
571                    }
572                }
573                Ok(_) => continue,
574                Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {
575                    tokio::time::sleep(Duration::from_millis(100)).await;
576                    continue;
577                }
578                Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => break,
579            }
580        }
581    })
582    .await;
583
584    data_client.disconnect().await?;
585
586    if saw_orderbook_delta {
587        tracing::info!("Monitored orderbook for crossed conditions");
588        tracing::info!("  Received {} quotes from deltas", quote_count);
589    } else {
590        anyhow::bail!("Expected to receive at least one orderbook delta");
591    }
592
593    tracing::info!("");
594
595    Ok(())
596}