1use std::{
54 collections::HashMap,
55 fmt::Debug,
56 num::NonZeroU32,
57 sync::{Arc, LazyLock},
58};
59
60use chrono::{DateTime, Utc};
61use nautilus_core::{consts::NAUTILUS_USER_AGENT, time::get_atomic_clock_realtime};
62use nautilus_model::{
63 data::{Bar, BarType, BookOrder, OrderBookDelta, OrderBookDeltas, TradeTick},
64 enums::{
65 AggregationSource, BarAggregation, BookAction, OrderSide as NautilusOrderSide, PriceType,
66 RecordFlag,
67 },
68 identifiers::{AccountId, InstrumentId},
69 instruments::{Instrument, InstrumentAny},
70 reports::{FillReport, OrderStatusReport, PositionStatusReport},
71 types::{Price, Quantity},
72};
73use nautilus_network::{
74 http::{HttpClient, Method, USER_AGENT},
75 ratelimiter::quota::Quota,
76 retry::{RetryConfig, RetryManager},
77};
78use rust_decimal::Decimal;
79use serde::{Deserialize, Serialize, de::DeserializeOwned};
80use tokio_util::sync::CancellationToken;
81
82use super::error::DydxHttpError;
83use crate::{
84 common::{
85 consts::{DYDX_HTTP_URL, DYDX_TESTNET_HTTP_URL},
86 enums::DydxCandleResolution,
87 instrument_cache::InstrumentCache,
88 parse::extract_raw_symbol,
89 },
90 http::parse::parse_instrument_any,
91};
92
93const DYDX_MAX_BARS_PER_REQUEST: u32 = 1_000;
95
96fn bar_type_to_resolution(bar_type: &BarType) -> anyhow::Result<DydxCandleResolution> {
97 if bar_type.aggregation_source() != AggregationSource::External {
98 anyhow::bail!(
99 "dYdX only supports EXTERNAL aggregation, was {:?}",
100 bar_type.aggregation_source()
101 );
102 }
103
104 let spec = bar_type.spec();
105 if spec.price_type != PriceType::Last {
106 anyhow::bail!(
107 "dYdX only supports LAST price type, was {:?}",
108 spec.price_type
109 );
110 }
111
112 DydxCandleResolution::from_bar_spec(&spec)
113}
114
115pub static DYDX_REST_QUOTA: LazyLock<Quota> =
121 LazyLock::new(|| Quota::per_second(NonZeroU32::new(10).unwrap()));
122
123#[derive(Debug, Serialize, Deserialize)]
128pub struct DydxResponse<T> {
129 pub data: T,
131}
132
133pub struct DydxRawHttpClient {
143 base_url: String,
144 client: HttpClient,
145 retry_manager: RetryManager<DydxHttpError>,
146 cancellation_token: CancellationToken,
147 is_testnet: bool,
148}
149
150impl Default for DydxRawHttpClient {
151 fn default() -> Self {
152 Self::new(None, Some(60), None, false, None)
153 .expect("Failed to create default DydxRawHttpClient")
154 }
155}
156
157impl Debug for DydxRawHttpClient {
158 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159 f.debug_struct(stringify!(DydxRawHttpClient))
160 .field("base_url", &self.base_url)
161 .field("is_testnet", &self.is_testnet)
162 .finish_non_exhaustive()
163 }
164}
165
166impl DydxRawHttpClient {
167 pub fn cancel_all_requests(&self) {
169 self.cancellation_token.cancel();
170 }
171
172 pub fn cancellation_token(&self) -> &CancellationToken {
174 &self.cancellation_token
175 }
176
177 pub fn new(
186 base_url: Option<String>,
187 timeout_secs: Option<u64>,
188 proxy_url: Option<String>,
189 is_testnet: bool,
190 retry_config: Option<RetryConfig>,
191 ) -> anyhow::Result<Self> {
192 let base_url = if is_testnet {
193 base_url.unwrap_or_else(|| DYDX_TESTNET_HTTP_URL.to_string())
194 } else {
195 base_url.unwrap_or_else(|| DYDX_HTTP_URL.to_string())
196 };
197
198 let retry_manager = RetryManager::new(retry_config.unwrap_or_default());
199
200 let mut headers = HashMap::new();
201 headers.insert(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string());
202
203 let client = HttpClient::new(
204 headers,
205 vec![], vec![], Some(*DYDX_REST_QUOTA),
208 timeout_secs,
209 proxy_url,
210 )
211 .map_err(|e| {
212 DydxHttpError::ValidationError(format!("Failed to create HTTP client: {e}"))
213 })?;
214
215 Ok(Self {
216 base_url,
217 client,
218 retry_manager,
219 cancellation_token: CancellationToken::new(),
220 is_testnet,
221 })
222 }
223
224 #[must_use]
226 pub const fn is_testnet(&self) -> bool {
227 self.is_testnet
228 }
229
230 #[must_use]
232 pub fn base_url(&self) -> &str {
233 &self.base_url
234 }
235
236 pub async fn send_request<T>(
248 &self,
249 method: Method,
250 endpoint: &str,
251 query_params: Option<&str>,
252 ) -> Result<T, DydxHttpError>
253 where
254 T: DeserializeOwned,
255 {
256 let url = if let Some(params) = query_params {
257 format!("{}{endpoint}?{params}", self.base_url)
258 } else {
259 format!("{}{endpoint}", self.base_url)
260 };
261
262 let operation = || async {
263 let request = self
264 .client
265 .request_with_ustr_keys(
266 method.clone(),
267 url.clone(),
268 None, None, None, None, None, )
274 .await
275 .map_err(|e| DydxHttpError::HttpClientError(e.to_string()))?;
276
277 if !request.status.is_success() {
278 return Err(DydxHttpError::HttpStatus {
279 status: request.status.as_u16(),
280 message: String::from_utf8_lossy(&request.body).to_string(),
281 });
282 }
283
284 Ok(request)
285 };
286
287 let should_retry = |error: &DydxHttpError| -> bool {
292 match error {
293 DydxHttpError::HttpClientError(_) => true,
294 DydxHttpError::HttpStatus { status, .. } => *status == 429 || *status >= 500,
295 _ => false,
296 }
297 };
298
299 let create_error = |msg: String| -> DydxHttpError {
300 if msg == "canceled" {
301 DydxHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
302 } else if msg.contains("Timed out") {
303 DydxHttpError::HttpClientError(msg)
305 } else {
306 DydxHttpError::ValidationError(msg)
307 }
308 };
309
310 let response = self
311 .retry_manager
312 .execute_with_retry_with_cancel(
313 endpoint,
314 operation,
315 should_retry,
316 create_error,
317 &self.cancellation_token,
318 )
319 .await?;
320
321 serde_json::from_slice(&response.body).map_err(|e| DydxHttpError::Deserialization {
322 error: e.to_string(),
323 body: String::from_utf8_lossy(&response.body).to_string(),
324 })
325 }
326
327 pub async fn send_post_request<T, B>(
340 &self,
341 endpoint: &str,
342 body: &B,
343 ) -> Result<T, DydxHttpError>
344 where
345 T: DeserializeOwned,
346 B: Serialize,
347 {
348 let url = format!("{}{endpoint}", self.base_url);
349
350 let body_bytes = serde_json::to_vec(body).map_err(|e| DydxHttpError::Serialization {
351 error: e.to_string(),
352 })?;
353
354 let operation = || async {
355 let request = self
356 .client
357 .request_with_ustr_keys(
358 Method::POST,
359 url.clone(),
360 None, None, Some(body_bytes.clone()),
363 None, None, )
366 .await
367 .map_err(|e| DydxHttpError::HttpClientError(e.to_string()))?;
368
369 if !request.status.is_success() {
370 return Err(DydxHttpError::HttpStatus {
371 status: request.status.as_u16(),
372 message: String::from_utf8_lossy(&request.body).to_string(),
373 });
374 }
375
376 Ok(request)
377 };
378
379 let should_retry = |error: &DydxHttpError| -> bool {
381 match error {
382 DydxHttpError::HttpClientError(_) => true,
383 DydxHttpError::HttpStatus { status, .. } => *status == 429 || *status >= 500,
384 _ => false,
385 }
386 };
387
388 let create_error = |msg: String| -> DydxHttpError {
389 if msg == "canceled" {
390 DydxHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
391 } else if msg.contains("Timed out") {
392 DydxHttpError::HttpClientError(msg)
394 } else {
395 DydxHttpError::ValidationError(msg)
396 }
397 };
398
399 let response = self
400 .retry_manager
401 .execute_with_retry_with_cancel(
402 endpoint,
403 operation,
404 should_retry,
405 create_error,
406 &self.cancellation_token,
407 )
408 .await?;
409
410 serde_json::from_slice(&response.body).map_err(|e| DydxHttpError::Deserialization {
411 error: e.to_string(),
412 body: String::from_utf8_lossy(&response.body).to_string(),
413 })
414 }
415
416 pub async fn get_markets(&self) -> Result<super::models::MarketsResponse, DydxHttpError> {
422 self.send_request(Method::GET, "/v4/perpetualMarkets", None)
423 .await
424 }
425
426 pub async fn get_market(
434 &self,
435 ticker: &str,
436 ) -> Result<super::models::MarketsResponse, DydxHttpError> {
437 let query = format!("ticker={ticker}");
438 self.send_request(Method::GET, "/v4/perpetualMarkets", Some(&query))
439 .await
440 }
441
442 pub async fn fetch_instruments(
455 &self,
456 maker_fee: Option<Decimal>,
457 taker_fee: Option<Decimal>,
458 ) -> Result<Vec<InstrumentAny>, DydxHttpError> {
459 let markets_response = self.get_markets().await?;
460 let ts_init = get_atomic_clock_realtime().get_time_ns();
461
462 let mut instruments = Vec::new();
463 let mut skipped_inactive = 0;
464
465 for (ticker, market) in markets_response.markets {
466 if !super::parse::is_market_active(&market.status) {
467 log::debug!(
468 "Skipping inactive market {ticker} (status: {:?})",
469 market.status
470 );
471 skipped_inactive += 1;
472 continue;
473 }
474
475 match super::parse::parse_instrument_any(&market, maker_fee, taker_fee, ts_init) {
476 Ok(instrument) => {
477 instruments.push(instrument);
478 }
479 Err(e) => {
480 log::error!("Failed to parse instrument {ticker}: {e}");
481 }
482 }
483 }
484
485 if skipped_inactive > 0 {
486 log::info!(
487 "Parsed {} instruments, skipped {} inactive",
488 instruments.len(),
489 skipped_inactive
490 );
491 } else {
492 log::info!("Parsed {} instruments", instruments.len());
493 }
494
495 Ok(instruments)
496 }
497
498 pub async fn get_orderbook(
504 &self,
505 ticker: &str,
506 ) -> Result<super::models::OrderbookResponse, DydxHttpError> {
507 let endpoint = format!("/v4/orderbooks/perpetualMarket/{ticker}");
508 self.send_request(Method::GET, &endpoint, None).await
509 }
510
511 pub async fn get_trades(
517 &self,
518 ticker: &str,
519 limit: Option<u32>,
520 starting_before_or_at_height: Option<u64>,
521 ) -> Result<super::models::TradesResponse, DydxHttpError> {
522 let endpoint = format!("/v4/trades/perpetualMarket/{ticker}");
523 let mut query_parts = Vec::new();
524 if let Some(l) = limit {
525 query_parts.push(format!("limit={l}"));
526 }
527 if let Some(height) = starting_before_or_at_height {
528 query_parts.push(format!("createdBeforeOrAtHeight={height}"));
529 }
530 let query = if query_parts.is_empty() {
531 None
532 } else {
533 Some(query_parts.join("&"))
534 };
535 self.send_request(Method::GET, &endpoint, query.as_deref())
536 .await
537 }
538
539 pub async fn get_candles(
545 &self,
546 ticker: &str,
547 resolution: DydxCandleResolution,
548 limit: Option<u32>,
549 from_iso: Option<DateTime<Utc>>,
550 to_iso: Option<DateTime<Utc>>,
551 ) -> Result<super::models::CandlesResponse, DydxHttpError> {
552 let endpoint = format!("/v4/candles/perpetualMarkets/{ticker}");
553 let mut query_parts = vec![format!("resolution={resolution}")];
554 if let Some(l) = limit {
555 query_parts.push(format!("limit={l}"));
556 }
557 if let Some(from) = from_iso {
558 let from_str = from.to_rfc3339();
559 query_parts.push(format!("fromISO={}", urlencoding::encode(&from_str)));
560 }
561 if let Some(to) = to_iso {
562 let to_str = to.to_rfc3339();
563 query_parts.push(format!("toISO={}", urlencoding::encode(&to_str)));
564 }
565 let query = query_parts.join("&");
566 self.send_request(Method::GET, &endpoint, Some(&query))
567 .await
568 }
569
570 pub async fn get_subaccount(
576 &self,
577 address: &str,
578 subaccount_number: u32,
579 ) -> Result<super::models::SubaccountResponse, DydxHttpError> {
580 let endpoint = format!("/v4/addresses/{address}/subaccountNumber/{subaccount_number}");
581 self.send_request(Method::GET, &endpoint, None).await
582 }
583
584 pub async fn get_fills(
590 &self,
591 address: &str,
592 subaccount_number: u32,
593 market: Option<&str>,
594 limit: Option<u32>,
595 ) -> Result<super::models::FillsResponse, DydxHttpError> {
596 let endpoint = "/v4/fills";
597 let mut query_parts = vec![
598 format!("address={address}"),
599 format!("subaccountNumber={subaccount_number}"),
600 ];
601 if let Some(m) = market {
602 query_parts.push(format!("market={m}"));
603 }
604 if let Some(l) = limit {
605 query_parts.push(format!("limit={l}"));
606 }
607 let query = query_parts.join("&");
608 self.send_request(Method::GET, endpoint, Some(&query)).await
609 }
610
611 pub async fn get_orders(
617 &self,
618 address: &str,
619 subaccount_number: u32,
620 market: Option<&str>,
621 limit: Option<u32>,
622 ) -> Result<super::models::OrdersResponse, DydxHttpError> {
623 let endpoint = "/v4/orders";
624 let mut query_parts = vec![
625 format!("address={address}"),
626 format!("subaccountNumber={subaccount_number}"),
627 ];
628 if let Some(m) = market {
629 query_parts.push(format!("market={m}"));
630 }
631 if let Some(l) = limit {
632 query_parts.push(format!("limit={l}"));
633 }
634 let query = query_parts.join("&");
635 self.send_request(Method::GET, endpoint, Some(&query)).await
636 }
637
638 pub async fn get_transfers(
644 &self,
645 address: &str,
646 subaccount_number: u32,
647 limit: Option<u32>,
648 ) -> Result<super::models::TransfersResponse, DydxHttpError> {
649 let endpoint = "/v4/transfers";
650 let mut query_parts = vec![
651 format!("address={address}"),
652 format!("subaccountNumber={subaccount_number}"),
653 ];
654 if let Some(l) = limit {
655 query_parts.push(format!("limit={l}"));
656 }
657 let query = query_parts.join("&");
658 self.send_request(Method::GET, endpoint, Some(&query)).await
659 }
660
661 pub async fn get_time(&self) -> Result<super::models::TimeResponse, DydxHttpError> {
667 self.send_request(Method::GET, "/v4/time", None).await
668 }
669
670 pub async fn get_height(&self) -> Result<super::models::HeightResponse, DydxHttpError> {
676 self.send_request(Method::GET, "/v4/height", None).await
677 }
678}
679
680#[derive(Debug)]
696#[cfg_attr(
697 feature = "python",
698 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.dydx")
699)]
700pub struct DydxHttpClient {
701 pub(crate) inner: Arc<DydxRawHttpClient>,
703 pub(crate) instrument_cache: Arc<InstrumentCache>,
708}
709
710impl Clone for DydxHttpClient {
711 fn clone(&self) -> Self {
712 Self {
713 inner: self.inner.clone(),
714 instrument_cache: Arc::clone(&self.instrument_cache),
715 }
716 }
717}
718
719impl Default for DydxHttpClient {
720 fn default() -> Self {
721 Self::new(None, Some(60), None, false, None)
722 .expect("Failed to create default DydxHttpClient")
723 }
724}
725
726impl DydxHttpClient {
727 pub fn new(
740 base_url: Option<String>,
741 timeout_secs: Option<u64>,
742 proxy_url: Option<String>,
743 is_testnet: bool,
744 retry_config: Option<RetryConfig>,
745 ) -> anyhow::Result<Self> {
746 Self::new_with_cache(
747 base_url,
748 timeout_secs,
749 proxy_url,
750 is_testnet,
751 retry_config,
752 Arc::new(InstrumentCache::new()),
753 )
754 }
755
756 pub fn new_with_cache(
769 base_url: Option<String>,
770 timeout_secs: Option<u64>,
771 proxy_url: Option<String>,
772 is_testnet: bool,
773 retry_config: Option<RetryConfig>,
774 instrument_cache: Arc<InstrumentCache>,
775 ) -> anyhow::Result<Self> {
776 Ok(Self {
777 inner: Arc::new(DydxRawHttpClient::new(
778 base_url,
779 timeout_secs,
780 proxy_url,
781 is_testnet,
782 retry_config,
783 )?),
784 instrument_cache,
785 })
786 }
787
788 pub async fn request_instruments(
798 &self,
799 symbol: Option<String>,
800 maker_fee: Option<Decimal>,
801 taker_fee: Option<Decimal>,
802 ) -> anyhow::Result<Vec<InstrumentAny>> {
803 let markets_response = self.inner.get_markets().await?;
804 let ts_init = get_atomic_clock_realtime().get_time_ns();
805
806 let mut instruments = Vec::new();
807 let mut skipped_inactive = 0;
808
809 for (ticker, market) in markets_response.markets {
810 if let Some(ref sym) = symbol
812 && ticker != *sym
813 {
814 continue;
815 }
816
817 if !super::parse::is_market_active(&market.status) {
818 log::debug!(
819 "Skipping inactive market {ticker} (status: {:?})",
820 market.status
821 );
822 skipped_inactive += 1;
823 continue;
824 }
825
826 match super::parse::parse_instrument_any(&market, maker_fee, taker_fee, ts_init) {
827 Ok(instrument) => {
828 instruments.push(instrument);
829 }
830 Err(e) => {
831 log::error!("Failed to parse instrument {ticker}: {e}");
832 }
833 }
834 }
835
836 if skipped_inactive > 0 {
837 log::info!(
838 "Parsed {} instruments, skipped {} inactive",
839 instruments.len(),
840 skipped_inactive
841 );
842 } else {
843 log::debug!("Parsed {} instruments", instruments.len());
844 }
845
846 Ok(instruments)
847 }
848
849 pub async fn fetch_and_cache_instruments(&self) -> anyhow::Result<()> {
861 let markets_response = self.inner.get_markets().await?;
863 let ts_init = get_atomic_clock_realtime().get_time_ns();
864
865 let mut parsed_instruments = Vec::new();
866 let mut parsed_markets = Vec::new();
867 let mut skipped_inactive = 0;
868
869 for (ticker, market) in markets_response.markets {
870 if !super::parse::is_market_active(&market.status) {
871 log::debug!(
872 "Skipping inactive market {ticker} (status: {:?})",
873 market.status
874 );
875 skipped_inactive += 1;
876 continue;
877 }
878
879 match super::parse::parse_instrument_any(&market, None, None, ts_init) {
880 Ok(instrument) => {
881 parsed_instruments.push(instrument);
882 parsed_markets.push(market);
883 }
884 Err(e) => {
885 log::error!("Failed to parse instrument {ticker}: {e}");
886 }
887 }
888 }
889
890 self.instrument_cache.clear();
892
893 let items: Vec<_> = parsed_instruments.into_iter().zip(parsed_markets).collect();
895
896 if !items.is_empty() {
897 self.instrument_cache.insert_many(items.clone());
898 }
899
900 let count = items.len();
901
902 if skipped_inactive > 0 {
903 log::info!("Cached {count} instruments, skipped {skipped_inactive} inactive");
904 } else {
905 log::info!("Cached {count} instruments");
906 }
907
908 Ok(())
909 }
910
911 pub async fn fetch_and_cache_single_instrument(
917 &self,
918 ticker: &str,
919 ) -> anyhow::Result<Option<InstrumentAny>> {
920 let markets_response = self.inner.get_market(ticker).await?;
921 let ts_init = get_atomic_clock_realtime().get_time_ns();
922
923 if let Some(market) = markets_response.markets.get(ticker) {
925 if !super::parse::is_market_active(&market.status) {
926 log::debug!(
927 "Skipping inactive market {ticker} (status: {:?})",
928 market.status
929 );
930 return Ok(None);
931 }
932
933 let instrument = parse_instrument_any(market, None, None, ts_init)?;
934 self.instrument_cache
935 .insert(instrument.clone(), market.clone());
936
937 log::info!("Fetched and cached new instrument: {ticker}");
938 return Ok(Some(instrument));
939 }
940
941 Ok(None)
942 }
943
944 pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
949 self.instrument_cache.insert_instruments_only(instruments);
950 }
951
952 pub fn cache_instrument(&self, instrument: InstrumentAny) {
957 self.instrument_cache.insert_instrument_only(instrument);
958 }
959
960 #[must_use]
962 pub fn get_instrument(&self, instrument_id: &InstrumentId) -> Option<InstrumentAny> {
963 self.instrument_cache.get(instrument_id)
964 }
965
966 #[must_use]
970 pub fn get_instrument_by_clob_id(&self, clob_pair_id: u32) -> Option<InstrumentAny> {
971 self.instrument_cache.get_by_clob_id(clob_pair_id)
972 }
973
974 #[must_use]
978 pub fn get_instrument_by_market(&self, ticker: &str) -> Option<InstrumentAny> {
979 self.instrument_cache.get_by_market(ticker)
980 }
981
982 #[must_use]
991 pub fn get_market_params(
992 &self,
993 instrument_id: &InstrumentId,
994 ) -> Option<super::models::PerpetualMarket> {
995 self.instrument_cache.get_market_params(instrument_id)
996 }
997
998 pub async fn request_trades(
1007 &self,
1008 symbol: &str,
1009 limit: Option<u32>,
1010 starting_before_or_at_height: Option<u64>,
1011 ) -> anyhow::Result<super::models::TradesResponse> {
1012 self.inner
1013 .get_trades(symbol, limit, starting_before_or_at_height)
1014 .await
1015 .map_err(Into::into)
1016 }
1017
1018 pub async fn request_candles(
1027 &self,
1028 symbol: &str,
1029 resolution: DydxCandleResolution,
1030 limit: Option<u32>,
1031 from_iso: Option<DateTime<Utc>>,
1032 to_iso: Option<DateTime<Utc>>,
1033 ) -> anyhow::Result<super::models::CandlesResponse> {
1034 self.inner
1035 .get_candles(symbol, resolution, limit, from_iso, to_iso)
1036 .await
1037 .map_err(Into::into)
1038 }
1039
1040 pub async fn request_bars(
1058 &self,
1059 bar_type: BarType,
1060 start: Option<DateTime<Utc>>,
1061 end: Option<DateTime<Utc>>,
1062 limit: Option<u32>,
1063 timestamp_on_close: bool,
1064 ) -> anyhow::Result<Vec<Bar>> {
1065 let resolution = bar_type_to_resolution(&bar_type)?;
1066 let instrument_id = bar_type.instrument_id();
1067
1068 let instrument = self
1069 .get_instrument(&instrument_id)
1070 .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {instrument_id}"))?;
1071
1072 let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
1073 let price_precision = instrument.price_precision();
1074 let size_precision = instrument.size_precision();
1075 let ts_init = get_atomic_clock_realtime().get_time_ns();
1076
1077 let mut all_bars: Vec<Bar> = Vec::new();
1078
1079 let spec = bar_type.spec();
1081 let bar_secs: i64 = match spec.aggregation {
1082 BarAggregation::Minute => spec.step.get() as i64 * 60,
1083 BarAggregation::Hour => spec.step.get() as i64 * 3_600,
1084 BarAggregation::Day => spec.step.get() as i64 * 86_400,
1085 _ => anyhow::bail!("Unsupported aggregation: {:?}", spec.aggregation),
1086 };
1087
1088 match (start, end) {
1089 (Some(range_start), Some(range_end)) if range_end > range_start => {
1091 let overall_limit = limit.unwrap_or(u32::MAX);
1092 let mut remaining = overall_limit;
1093 let bars_per_call = DYDX_MAX_BARS_PER_REQUEST.min(remaining);
1094 let chunk_duration = chrono::Duration::seconds(bar_secs * bars_per_call as i64);
1095 let mut chunk_start = range_start;
1096
1097 while chunk_start < range_end && remaining > 0 {
1098 let chunk_end = (chunk_start + chunk_duration).min(range_end);
1099 let per_call_limit = remaining.min(DYDX_MAX_BARS_PER_REQUEST);
1100
1101 let response = self
1102 .inner
1103 .get_candles(
1104 ticker,
1105 resolution,
1106 Some(per_call_limit),
1107 Some(chunk_start),
1108 Some(chunk_end),
1109 )
1110 .await?;
1111
1112 let count = response.candles.len() as u32;
1113 if count == 0 {
1114 break;
1115 }
1116
1117 for candle in &response.candles {
1118 match super::parse::parse_bar(
1119 candle,
1120 bar_type,
1121 price_precision,
1122 size_precision,
1123 timestamp_on_close,
1124 ts_init,
1125 ) {
1126 Ok(bar) => all_bars.push(bar),
1127 Err(e) => log::warn!("Failed to parse candle for {instrument_id}: {e}"),
1128 }
1129 }
1130
1131 if remaining <= count {
1132 break;
1133 }
1134 remaining -= count;
1135 chunk_start += chunk_duration;
1136 }
1137 }
1138 _ => {
1140 let req_limit = limit.unwrap_or(DYDX_MAX_BARS_PER_REQUEST);
1141 let response = self
1142 .inner
1143 .get_candles(ticker, resolution, Some(req_limit), None, None)
1144 .await?;
1145
1146 for candle in &response.candles {
1147 match super::parse::parse_bar(
1148 candle,
1149 bar_type,
1150 price_precision,
1151 size_precision,
1152 timestamp_on_close,
1153 ts_init,
1154 ) {
1155 Ok(bar) => all_bars.push(bar),
1156 Err(e) => log::warn!("Failed to parse candle for {instrument_id}: {e}"),
1157 }
1158 }
1159 }
1160 }
1161
1162 let current_time_ns = get_atomic_clock_realtime().get_time_ns();
1164 all_bars.retain(|bar| bar.ts_event < current_time_ns);
1165
1166 Ok(all_bars)
1167 }
1168
1169 pub async fn request_trade_ticks(
1187 &self,
1188 instrument_id: InstrumentId,
1189 start: Option<DateTime<Utc>>,
1190 end: Option<DateTime<Utc>>,
1191 limit: Option<u32>,
1192 ) -> anyhow::Result<Vec<TradeTick>> {
1193 const DYDX_MAX_TRADES_PER_REQUEST: u32 = 1_000;
1194 const DYDX_BLOCK_TIME_SECS: f64 = 1.1;
1195
1196 if let (Some(s), Some(e)) = (start, end) {
1198 anyhow::ensure!(s < e, "start ({s}) must be before end ({e})");
1199 }
1200
1201 let instrument = self
1202 .get_instrument(&instrument_id)
1203 .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {instrument_id}"))?;
1204
1205 let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
1206 let price_precision = instrument.price_precision();
1207 let size_precision = instrument.size_precision();
1208 let ts_init = get_atomic_clock_realtime().get_time_ns();
1209
1210 let initial_cursor = if let Some(end_time) = end {
1214 match self.inner.get_height().await {
1215 Ok(height_resp) => {
1216 let secs_ahead = (height_resp.time - end_time).num_seconds();
1217 if secs_ahead > 0 {
1218 let blocks_to_skip = (secs_ahead as f64 / DYDX_BLOCK_TIME_SECS) as u64;
1219 let target = height_resp.height.saturating_sub(blocks_to_skip);
1220 log::debug!(
1221 "Estimated block height at {end_time}: {target} \
1222 (current: {}, skipping ~{blocks_to_skip} blocks)",
1223 height_resp.height,
1224 );
1225 Some(target)
1226 } else {
1227 None }
1229 }
1230 Err(e) => {
1231 log::warn!(
1232 "Failed to get block height for time skip, paginating from latest: {e}"
1233 );
1234 None
1235 }
1236 }
1237 } else {
1238 None
1239 };
1240
1241 let overall_limit = limit.unwrap_or(u32::MAX);
1242 let mut remaining = overall_limit;
1243 let mut cursor_height: Option<u64> = initial_cursor;
1244 let mut all_trades = Vec::new();
1245
1246 loop {
1247 let page_limit = remaining.min(DYDX_MAX_TRADES_PER_REQUEST);
1248 let response = self
1249 .inner
1250 .get_trades(ticker, Some(page_limit), cursor_height)
1251 .await?;
1252
1253 let page_count = response.trades.len() as u32;
1254 if page_count == 0 {
1255 break;
1256 }
1257
1258 let oldest_trade = response.trades.last().unwrap();
1260
1261 cursor_height = Some(oldest_trade.created_at_height.saturating_sub(1));
1263
1264 if let Some(s) = start
1266 && oldest_trade.created_at < s
1267 {
1268 for trade in &response.trades {
1270 if start.is_some_and(|s| trade.created_at < s) {
1271 continue;
1272 }
1273 if end.is_some_and(|e| trade.created_at > e) {
1274 continue;
1275 }
1276 all_trades.push(super::parse::parse_trade_tick(
1277 trade,
1278 instrument_id,
1279 price_precision,
1280 size_precision,
1281 ts_init,
1282 )?);
1283 }
1284 break;
1285 }
1286
1287 for trade in &response.trades {
1289 if start.is_some_and(|s| trade.created_at < s) {
1290 continue;
1291 }
1292 if end.is_some_and(|e| trade.created_at > e) {
1293 continue;
1294 }
1295 all_trades.push(super::parse::parse_trade_tick(
1296 trade,
1297 instrument_id,
1298 price_precision,
1299 size_precision,
1300 ts_init,
1301 )?);
1302 }
1303
1304 remaining = remaining.saturating_sub(page_count);
1305
1306 if page_count < page_limit || remaining == 0 {
1308 break;
1309 }
1310 }
1311
1312 all_trades.reverse();
1314 all_trades.dedup_by(|a, b| a.trade_id == b.trade_id);
1315
1316 if let Some(lim) = limit {
1318 all_trades.truncate(lim as usize);
1319 }
1320
1321 Ok(all_trades)
1322 }
1323
1324 pub async fn request_orderbook_snapshot(
1335 &self,
1336 instrument_id: InstrumentId,
1337 ) -> anyhow::Result<OrderBookDeltas> {
1338 let instrument = self
1339 .get_instrument(&instrument_id)
1340 .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {instrument_id}"))?;
1341
1342 let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
1343 let response = self.inner.get_orderbook(ticker).await?;
1344
1345 let ts_init = get_atomic_clock_realtime().get_time_ns();
1346
1347 let mut deltas = Vec::with_capacity(1 + response.bids.len() + response.asks.len());
1348
1349 deltas.push(OrderBookDelta::clear(instrument_id, 0, ts_init, ts_init));
1350
1351 for (i, level) in response.bids.iter().enumerate() {
1352 let is_last = i == response.bids.len() - 1 && response.asks.is_empty();
1353 let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
1354
1355 let order = BookOrder::new(
1356 NautilusOrderSide::Buy,
1357 Price::from_decimal_dp(level.price, instrument.price_precision())?,
1358 Quantity::from_decimal_dp(level.size, instrument.size_precision())?,
1359 0,
1360 );
1361
1362 deltas.push(OrderBookDelta::new(
1363 instrument_id,
1364 BookAction::Add,
1365 order,
1366 flags,
1367 0,
1368 ts_init,
1369 ts_init,
1370 ));
1371 }
1372
1373 for (i, level) in response.asks.iter().enumerate() {
1374 let is_last = i == response.asks.len() - 1;
1375 let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
1376
1377 let order = BookOrder::new(
1378 NautilusOrderSide::Sell,
1379 Price::from_decimal_dp(level.price, instrument.price_precision())?,
1380 Quantity::from_decimal_dp(level.size, instrument.size_precision())?,
1381 0,
1382 );
1383
1384 deltas.push(OrderBookDelta::new(
1385 instrument_id,
1386 BookAction::Add,
1387 order,
1388 flags,
1389 0,
1390 ts_init,
1391 ts_init,
1392 ));
1393 }
1394
1395 Ok(OrderBookDeltas::new(instrument_id, deltas))
1396 }
1397
1398 #[must_use]
1404 pub fn raw_client(&self) -> &Arc<DydxRawHttpClient> {
1405 &self.inner
1406 }
1407
1408 #[must_use]
1410 pub fn is_testnet(&self) -> bool {
1411 self.inner.is_testnet()
1412 }
1413
1414 #[must_use]
1416 pub fn base_url(&self) -> &str {
1417 self.inner.base_url()
1418 }
1419
1420 #[must_use]
1422 pub fn is_cache_initialized(&self) -> bool {
1423 self.instrument_cache.is_initialized()
1424 }
1425
1426 #[must_use]
1428 pub fn cached_instruments_count(&self) -> usize {
1429 self.instrument_cache.len()
1430 }
1431
1432 #[must_use]
1436 pub fn instrument_cache(&self) -> &Arc<InstrumentCache> {
1437 &self.instrument_cache
1438 }
1439
1440 #[must_use]
1444 pub fn all_instruments(&self) -> Vec<InstrumentAny> {
1445 self.instrument_cache.all_instruments()
1446 }
1447
1448 #[must_use]
1450 pub fn all_instrument_ids(&self) -> Vec<InstrumentId> {
1451 self.instrument_cache.all_instrument_ids()
1452 }
1453
1454 pub async fn request_order_status_reports(
1463 &self,
1464 address: &str,
1465 subaccount_number: u32,
1466 account_id: AccountId,
1467 instrument_id: Option<InstrumentId>,
1468 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1469 let ts_init = nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
1470
1471 let market = instrument_id.map(|id| {
1473 let symbol = id.symbol.to_string();
1474 symbol.trim_end_matches("-PERP").to_string()
1476 });
1477
1478 let orders = self
1479 .inner
1480 .get_orders(address, subaccount_number, market.as_deref(), None)
1481 .await?;
1482
1483 let mut reports = Vec::new();
1484
1485 for order in orders {
1486 let instrument = match self.get_instrument_by_clob_id(order.clob_pair_id) {
1488 Some(inst) => inst,
1489 None => {
1490 log::warn!(
1491 "Skipping order {}: no cached instrument for clob_pair_id {}",
1492 order.id,
1493 order.clob_pair_id
1494 );
1495 continue;
1496 }
1497 };
1498
1499 if instrument_id.is_some_and(|filter_id| instrument.id() != filter_id) {
1501 continue;
1502 }
1503
1504 match super::parse::parse_order_status_report(&order, &instrument, account_id, ts_init)
1505 {
1506 Ok(report) => reports.push(report),
1507 Err(e) => {
1508 log::warn!("Failed to parse order {}: {e}", order.id);
1509 }
1510 }
1511 }
1512
1513 Ok(reports)
1514 }
1515
1516 pub async fn request_fill_reports(
1525 &self,
1526 address: &str,
1527 subaccount_number: u32,
1528 account_id: AccountId,
1529 instrument_id: Option<InstrumentId>,
1530 ) -> anyhow::Result<Vec<FillReport>> {
1531 let ts_init = nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
1532
1533 let market = instrument_id.map(|id| {
1535 let symbol = id.symbol.to_string();
1536 symbol.trim_end_matches("-PERP").to_string()
1537 });
1538
1539 let fills_response = self
1540 .inner
1541 .get_fills(address, subaccount_number, market.as_deref(), None)
1542 .await?;
1543
1544 let mut reports = Vec::new();
1545
1546 for fill in fills_response.fills {
1547 let instrument = match self.get_instrument_by_market(&fill.market) {
1549 Some(inst) => inst,
1550 None => {
1551 log::warn!(
1552 "Skipping fill {}: no cached instrument for market {}",
1553 fill.id,
1554 fill.market
1555 );
1556 continue;
1557 }
1558 };
1559
1560 if instrument_id.is_some_and(|filter_id| instrument.id() != filter_id) {
1562 continue;
1563 }
1564
1565 match super::parse::parse_fill_report(&fill, &instrument, account_id, ts_init) {
1566 Ok(report) => reports.push(report),
1567 Err(e) => {
1568 log::warn!("Failed to parse fill {}: {e}", fill.id);
1569 }
1570 }
1571 }
1572
1573 Ok(reports)
1574 }
1575
1576 pub async fn request_position_status_reports(
1585 &self,
1586 address: &str,
1587 subaccount_number: u32,
1588 account_id: AccountId,
1589 instrument_id: Option<InstrumentId>,
1590 ) -> anyhow::Result<Vec<PositionStatusReport>> {
1591 let ts_init = nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
1592
1593 let subaccount_response = self
1594 .inner
1595 .get_subaccount(address, subaccount_number)
1596 .await?;
1597
1598 let mut reports = Vec::new();
1599
1600 for (market, position) in subaccount_response.subaccount.open_perpetual_positions {
1601 let instrument = match self.get_instrument_by_market(&market) {
1603 Some(inst) => inst,
1604 None => {
1605 log::warn!("Skipping position: no cached instrument for market {market}");
1606 continue;
1607 }
1608 };
1609
1610 if instrument_id.is_some_and(|filter_id| instrument.id() != filter_id) {
1612 continue;
1613 }
1614
1615 match super::parse::parse_position_status_report(
1616 &position,
1617 &instrument,
1618 account_id,
1619 ts_init,
1620 ) {
1621 Ok(report) => reports.push(report),
1622 Err(e) => {
1623 log::warn!("Failed to parse position for {market}: {e}");
1624 }
1625 }
1626 }
1627
1628 Ok(reports)
1629 }
1630}
1631
1632#[cfg(test)]
1633mod tests {
1634 use rstest::rstest;
1635
1636 use super::*;
1637 use crate::http::error;
1638
1639 #[tokio::test]
1640 async fn test_raw_client_creation() {
1641 let client = DydxRawHttpClient::new(None, Some(30), None, false, None);
1642 assert!(client.is_ok());
1643
1644 let client = client.unwrap();
1645 assert!(!client.is_testnet());
1646 assert_eq!(client.base_url(), DYDX_HTTP_URL);
1647 }
1648
1649 #[tokio::test]
1650 async fn test_raw_client_testnet() {
1651 let client = DydxRawHttpClient::new(None, Some(30), None, true, None);
1652 assert!(client.is_ok());
1653
1654 let client = client.unwrap();
1655 assert!(client.is_testnet());
1656 assert_eq!(client.base_url(), DYDX_TESTNET_HTTP_URL);
1657 }
1658
1659 #[tokio::test]
1660 async fn test_domain_client_creation() {
1661 let client = DydxHttpClient::new(None, Some(30), None, false, None);
1662 assert!(client.is_ok());
1663
1664 let client = client.unwrap();
1665 assert!(!client.is_testnet());
1666 assert_eq!(client.base_url(), DYDX_HTTP_URL);
1667 assert!(!client.is_cache_initialized());
1668 assert_eq!(client.cached_instruments_count(), 0);
1669 }
1670
1671 #[tokio::test]
1672 async fn test_domain_client_testnet() {
1673 let client = DydxHttpClient::new(None, Some(30), None, true, None);
1674 assert!(client.is_ok());
1675
1676 let client = client.unwrap();
1677 assert!(client.is_testnet());
1678 assert_eq!(client.base_url(), DYDX_TESTNET_HTTP_URL);
1679 }
1680
1681 #[tokio::test]
1682 async fn test_domain_client_default() {
1683 let client = DydxHttpClient::default();
1684 assert!(!client.is_testnet());
1685 assert_eq!(client.base_url(), DYDX_HTTP_URL);
1686 assert!(!client.is_cache_initialized());
1687 }
1688
1689 #[tokio::test]
1690 async fn test_domain_client_clone() {
1691 let client = DydxHttpClient::new(None, Some(30), None, false, None).unwrap();
1692
1693 let cloned = client.clone();
1695 assert!(!cloned.is_cache_initialized());
1696
1697 client.instrument_cache.insert_instruments_only(vec![]);
1698
1699 #[allow(clippy::redundant_clone)]
1701 let cloned_after = client.clone();
1702 assert!(cloned_after.is_cache_initialized());
1703 }
1704
1705 #[rstest]
1706 fn test_domain_client_get_instrument_not_found() {
1707 use nautilus_model::identifiers::{Symbol, Venue};
1708 let client = DydxHttpClient::default();
1709 let instrument_id = InstrumentId::new(Symbol::new("ETH-USD-PERP"), Venue::new("DYDX"));
1710 let result = client.get_instrument(&instrument_id);
1711 assert!(result.is_none());
1712 }
1713
1714 #[tokio::test]
1715 async fn test_http_timeout_respects_configuration_and_does_not_block() {
1716 use axum::{Router, routing::get};
1717 use tokio::net::TcpListener;
1718
1719 async fn slow_handler() -> &'static str {
1720 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
1722 "ok"
1723 }
1724
1725 let router = Router::new().route("/v4/slow", get(slow_handler));
1726
1727 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1728 let addr = listener.local_addr().unwrap();
1729
1730 tokio::spawn(async move {
1731 axum::serve(listener, router.into_make_service())
1732 .await
1733 .unwrap();
1734 });
1735
1736 let base_url = format!("http://{addr}");
1737
1738 let retry_config = RetryConfig {
1741 max_retries: 0,
1742 initial_delay_ms: 0,
1743 max_delay_ms: 0,
1744 backoff_factor: 1.0,
1745 jitter_ms: 0,
1746 operation_timeout_ms: Some(500),
1747 immediate_first: true,
1748 max_elapsed_ms: Some(1_000),
1749 };
1750
1751 let client =
1754 DydxRawHttpClient::new(Some(base_url), Some(60), None, false, Some(retry_config))
1755 .unwrap();
1756
1757 let start = std::time::Instant::now();
1758 let result: Result<serde_json::Value, error::DydxHttpError> =
1759 client.send_request(Method::GET, "/v4/slow", None).await;
1760 let elapsed = start.elapsed();
1761
1762 assert!(result.is_err());
1765 assert!(elapsed < std::time::Duration::from_secs(3));
1766 }
1767}