1use std::{
54 collections::HashMap,
55 fmt::{Debug, Formatter},
56 num::NonZeroU32,
57 sync::{
58 Arc, LazyLock,
59 atomic::{AtomicBool, Ordering},
60 },
61};
62
63use chrono::{DateTime, Utc};
64use dashmap::DashMap;
65use nautilus_core::{UnixNanos, consts::NAUTILUS_USER_AGENT, time::get_atomic_clock_realtime};
66use nautilus_model::{
67 data::{
68 Bar, BarType, BookOrder, OrderBookDelta, OrderBookDeltas, TradeTick,
69 bar::get_bar_interval_ns,
70 },
71 enums::{AggressorSide, BookAction, OrderSide as NautilusOrderSide, RecordFlag},
72 identifiers::{AccountId, InstrumentId, TradeId},
73 instruments::{Instrument, InstrumentAny},
74 reports::{FillReport, OrderStatusReport, PositionStatusReport},
75 types::{Price, Quantity},
76};
77use nautilus_network::{
78 http::{HttpClient, Method, USER_AGENT},
79 ratelimiter::quota::Quota,
80 retry::{RetryConfig, RetryManager},
81};
82use serde::{Deserialize, Serialize, de::DeserializeOwned};
83use tokio_util::sync::CancellationToken;
84use ustr::Ustr;
85
86use super::error::DydxHttpError;
87use crate::common::{
88 consts::{DYDX_HTTP_URL, DYDX_TESTNET_HTTP_URL},
89 enums::DydxCandleResolution,
90 parse::extract_raw_symbol,
91};
92
93pub static DYDX_REST_QUOTA: LazyLock<Quota> =
99 LazyLock::new(|| Quota::per_second(NonZeroU32::new(10).unwrap()));
100
101#[derive(Debug, Serialize, Deserialize)]
106pub struct DydxResponse<T> {
107 pub data: T,
109}
110
111pub struct DydxRawHttpClient {
121 base_url: String,
122 client: HttpClient,
123 retry_manager: RetryManager<DydxHttpError>,
124 cancellation_token: CancellationToken,
125 is_testnet: bool,
126}
127
128impl Default for DydxRawHttpClient {
129 fn default() -> Self {
130 Self::new(None, Some(60), None, false, None)
131 .expect("Failed to create default DydxRawHttpClient")
132 }
133}
134
135impl Debug for DydxRawHttpClient {
136 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
137 f.debug_struct(stringify!(DydxRawHttpClient))
138 .field("base_url", &self.base_url)
139 .field("is_testnet", &self.is_testnet)
140 .finish_non_exhaustive()
141 }
142}
143
144impl DydxRawHttpClient {
145 pub fn cancel_all_requests(&self) {
147 self.cancellation_token.cancel();
148 }
149
150 pub fn cancellation_token(&self) -> &CancellationToken {
152 &self.cancellation_token
153 }
154
155 pub fn new(
164 base_url: Option<String>,
165 timeout_secs: Option<u64>,
166 proxy_url: Option<String>,
167 is_testnet: bool,
168 retry_config: Option<RetryConfig>,
169 ) -> anyhow::Result<Self> {
170 let base_url = if is_testnet {
171 base_url.unwrap_or_else(|| DYDX_TESTNET_HTTP_URL.to_string())
172 } else {
173 base_url.unwrap_or_else(|| DYDX_HTTP_URL.to_string())
174 };
175
176 let retry_manager = RetryManager::new(retry_config.unwrap_or_default());
177
178 let mut headers = HashMap::new();
180 headers.insert(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string());
181
182 let client = HttpClient::new(
183 headers,
184 vec![], vec![], Some(*DYDX_REST_QUOTA),
187 timeout_secs,
188 proxy_url,
189 )
190 .map_err(|e| {
191 DydxHttpError::ValidationError(format!("Failed to create HTTP client: {e}"))
192 })?;
193
194 Ok(Self {
195 base_url,
196 client,
197 retry_manager,
198 cancellation_token: CancellationToken::new(),
199 is_testnet,
200 })
201 }
202
203 #[must_use]
205 pub const fn is_testnet(&self) -> bool {
206 self.is_testnet
207 }
208
209 #[must_use]
211 pub fn base_url(&self) -> &str {
212 &self.base_url
213 }
214
215 pub async fn send_request<T>(
227 &self,
228 method: Method,
229 endpoint: &str,
230 query_params: Option<&str>,
231 ) -> Result<T, DydxHttpError>
232 where
233 T: DeserializeOwned,
234 {
235 let url = if let Some(params) = query_params {
236 format!("{}{endpoint}?{params}", self.base_url)
237 } else {
238 format!("{}{endpoint}", self.base_url)
239 };
240
241 let operation = || async {
242 let request = self
243 .client
244 .request_with_ustr_keys(
245 method.clone(),
246 url.clone(),
247 None, None, None, None, None, )
253 .await
254 .map_err(|e| DydxHttpError::HttpClientError(e.to_string()))?;
255
256 if !request.status.is_success() {
258 return Err(DydxHttpError::HttpStatus {
259 status: request.status.as_u16(),
260 message: String::from_utf8_lossy(&request.body).to_string(),
261 });
262 }
263
264 Ok(request)
265 };
266
267 let should_retry = |error: &DydxHttpError| -> bool {
272 match error {
273 DydxHttpError::HttpClientError(_) => true,
274 DydxHttpError::HttpStatus { status, .. } => *status == 429 || *status >= 500,
275 _ => false,
276 }
277 };
278
279 let create_error = |msg: String| -> DydxHttpError {
280 if msg == "canceled" {
281 DydxHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
282 } else {
283 DydxHttpError::ValidationError(msg)
284 }
285 };
286
287 let response = self
289 .retry_manager
290 .execute_with_retry_with_cancel(
291 endpoint,
292 operation,
293 should_retry,
294 create_error,
295 &self.cancellation_token,
296 )
297 .await?;
298
299 serde_json::from_slice(&response.body).map_err(|e| DydxHttpError::Deserialization {
301 error: e.to_string(),
302 body: String::from_utf8_lossy(&response.body).to_string(),
303 })
304 }
305
306 pub async fn send_post_request<T, B>(
319 &self,
320 endpoint: &str,
321 body: &B,
322 ) -> Result<T, DydxHttpError>
323 where
324 T: DeserializeOwned,
325 B: Serialize,
326 {
327 let url = format!("{}{endpoint}", self.base_url);
328
329 let body_bytes = serde_json::to_vec(body).map_err(|e| DydxHttpError::Serialization {
330 error: e.to_string(),
331 })?;
332
333 let operation = || async {
334 let request = self
335 .client
336 .request_with_ustr_keys(
337 Method::POST,
338 url.clone(),
339 None, None, Some(body_bytes.clone()),
342 None, None, )
345 .await
346 .map_err(|e| DydxHttpError::HttpClientError(e.to_string()))?;
347
348 if !request.status.is_success() {
350 return Err(DydxHttpError::HttpStatus {
351 status: request.status.as_u16(),
352 message: String::from_utf8_lossy(&request.body).to_string(),
353 });
354 }
355
356 Ok(request)
357 };
358
359 let should_retry = |error: &DydxHttpError| -> bool {
361 match error {
362 DydxHttpError::HttpClientError(_) => true,
363 DydxHttpError::HttpStatus { status, .. } => *status == 429 || *status >= 500,
364 _ => false,
365 }
366 };
367
368 let create_error = |msg: String| -> DydxHttpError {
369 if msg == "canceled" {
370 DydxHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
371 } else {
372 DydxHttpError::ValidationError(msg)
373 }
374 };
375
376 let response = self
378 .retry_manager
379 .execute_with_retry_with_cancel(
380 endpoint,
381 operation,
382 should_retry,
383 create_error,
384 &self.cancellation_token,
385 )
386 .await?;
387
388 serde_json::from_slice(&response.body).map_err(|e| DydxHttpError::Deserialization {
390 error: e.to_string(),
391 body: String::from_utf8_lossy(&response.body).to_string(),
392 })
393 }
394
395 pub async fn get_markets(&self) -> Result<super::models::MarketsResponse, DydxHttpError> {
405 self.send_request(Method::GET, "/v4/perpetualMarkets", None)
406 .await
407 }
408
409 pub async fn fetch_instruments(
422 &self,
423 maker_fee: Option<rust_decimal::Decimal>,
424 taker_fee: Option<rust_decimal::Decimal>,
425 ) -> Result<Vec<InstrumentAny>, DydxHttpError> {
426 use nautilus_core::time::get_atomic_clock_realtime;
427
428 let markets_response = self.get_markets().await?;
429 let ts_init = get_atomic_clock_realtime().get_time_ns();
430
431 let mut instruments = Vec::new();
432 let mut skipped = 0;
433
434 for (ticker, market) in markets_response.markets {
435 match super::parse::parse_instrument_any(&market, maker_fee, taker_fee, ts_init) {
436 Ok(instrument) => {
437 instruments.push(instrument);
438 }
439 Err(e) => {
440 tracing::warn!("Failed to parse instrument {ticker}: {e}");
441 skipped += 1;
442 }
443 }
444 }
445
446 if skipped > 0 {
447 tracing::info!(
448 "Parsed {} instruments, skipped {} (inactive or invalid)",
449 instruments.len(),
450 skipped
451 );
452 } else {
453 tracing::info!("Parsed {} instruments", instruments.len());
454 }
455
456 Ok(instruments)
457 }
458
459 pub async fn get_orderbook(
469 &self,
470 ticker: &str,
471 ) -> Result<super::models::OrderbookResponse, DydxHttpError> {
472 let endpoint = format!("/v4/orderbooks/perpetualMarket/{ticker}");
473 self.send_request(Method::GET, &endpoint, None).await
474 }
475
476 pub async fn get_trades(
482 &self,
483 ticker: &str,
484 limit: Option<u32>,
485 ) -> Result<super::models::TradesResponse, DydxHttpError> {
486 let endpoint = format!("/v4/trades/perpetualMarket/{ticker}");
487 let query = limit.map(|l| format!("limit={l}"));
488 self.send_request(Method::GET, &endpoint, query.as_deref())
489 .await
490 }
491
492 pub async fn get_candles(
498 &self,
499 ticker: &str,
500 resolution: DydxCandleResolution,
501 limit: Option<u32>,
502 from_iso: Option<DateTime<Utc>>,
503 to_iso: Option<DateTime<Utc>>,
504 ) -> Result<super::models::CandlesResponse, DydxHttpError> {
505 let endpoint = format!("/v4/candles/perpetualMarkets/{ticker}");
506 let mut query_parts = vec![format!("resolution={}", resolution)];
507 if let Some(l) = limit {
508 query_parts.push(format!("limit={l}"));
509 }
510 if let Some(from) = from_iso {
511 let from_str = from.to_rfc3339();
512 query_parts.push(format!("fromISO={}", urlencoding::encode(&from_str)));
513 }
514 if let Some(to) = to_iso {
515 let to_str = to.to_rfc3339();
516 query_parts.push(format!("toISO={}", urlencoding::encode(&to_str)));
517 }
518 let query = query_parts.join("&");
519 self.send_request(Method::GET, &endpoint, Some(&query))
520 .await
521 }
522
523 pub async fn get_subaccount(
533 &self,
534 address: &str,
535 subaccount_number: u32,
536 ) -> Result<super::models::SubaccountResponse, DydxHttpError> {
537 let endpoint = format!("/v4/addresses/{address}/subaccountNumber/{subaccount_number}");
538 self.send_request(Method::GET, &endpoint, None).await
539 }
540
541 pub async fn get_fills(
547 &self,
548 address: &str,
549 subaccount_number: u32,
550 market: Option<&str>,
551 limit: Option<u32>,
552 ) -> Result<super::models::FillsResponse, DydxHttpError> {
553 let endpoint = "/v4/fills";
554 let mut query_parts = vec![
555 format!("address={address}"),
556 format!("subaccountNumber={subaccount_number}"),
557 ];
558 if let Some(m) = market {
559 query_parts.push(format!("market={m}"));
560 }
561 if let Some(l) = limit {
562 query_parts.push(format!("limit={l}"));
563 }
564 let query = query_parts.join("&");
565 self.send_request(Method::GET, endpoint, Some(&query)).await
566 }
567
568 pub async fn get_orders(
574 &self,
575 address: &str,
576 subaccount_number: u32,
577 market: Option<&str>,
578 limit: Option<u32>,
579 ) -> Result<super::models::OrdersResponse, DydxHttpError> {
580 let endpoint = "/v4/orders";
581 let mut query_parts = vec![
582 format!("address={address}"),
583 format!("subaccountNumber={subaccount_number}"),
584 ];
585 if let Some(m) = market {
586 query_parts.push(format!("market={m}"));
587 }
588 if let Some(l) = limit {
589 query_parts.push(format!("limit={l}"));
590 }
591 let query = query_parts.join("&");
592 self.send_request(Method::GET, endpoint, Some(&query)).await
593 }
594
595 pub async fn get_transfers(
601 &self,
602 address: &str,
603 subaccount_number: u32,
604 limit: Option<u32>,
605 ) -> Result<super::models::TransfersResponse, DydxHttpError> {
606 let endpoint = "/v4/transfers";
607 let mut query_parts = vec![
608 format!("address={address}"),
609 format!("subaccountNumber={subaccount_number}"),
610 ];
611 if let Some(l) = limit {
612 query_parts.push(format!("limit={l}"));
613 }
614 let query = query_parts.join("&");
615 self.send_request(Method::GET, endpoint, Some(&query)).await
616 }
617
618 pub async fn get_time(&self) -> Result<super::models::TimeResponse, DydxHttpError> {
628 self.send_request(Method::GET, "/v4/time", None).await
629 }
630
631 pub async fn get_height(&self) -> Result<super::models::HeightResponse, DydxHttpError> {
637 self.send_request(Method::GET, "/v4/height", None).await
638 }
639}
640
641#[derive(Debug)]
657#[cfg_attr(
658 feature = "python",
659 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
660)]
661pub struct DydxHttpClient {
662 pub(crate) inner: Arc<DydxRawHttpClient>,
664 pub(crate) instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
666 pub(crate) clob_pair_id_to_instrument: Arc<DashMap<u32, InstrumentId>>,
671 pub(crate) market_params_cache: Arc<DashMap<InstrumentId, super::models::PerpetualMarket>>,
677 cache_initialized: AtomicBool,
679}
680
681impl Clone for DydxHttpClient {
682 fn clone(&self) -> Self {
683 let cache_initialized = AtomicBool::new(false);
684 let is_initialized = self.cache_initialized.load(Ordering::Acquire);
685 if is_initialized {
686 cache_initialized.store(true, Ordering::Release);
687 }
688
689 Self {
690 inner: self.inner.clone(),
691 instruments_cache: self.instruments_cache.clone(),
692 clob_pair_id_to_instrument: self.clob_pair_id_to_instrument.clone(),
693 market_params_cache: self.market_params_cache.clone(),
694 cache_initialized,
695 }
696 }
697}
698
699impl Default for DydxHttpClient {
700 fn default() -> Self {
701 Self::new(None, Some(60), None, false, None)
702 .expect("Failed to create default DydxHttpClient")
703 }
704}
705
706impl DydxHttpClient {
707 pub fn new(
717 base_url: Option<String>,
718 timeout_secs: Option<u64>,
719 proxy_url: Option<String>,
720 is_testnet: bool,
721 retry_config: Option<RetryConfig>,
722 ) -> anyhow::Result<Self> {
723 Ok(Self {
724 inner: Arc::new(DydxRawHttpClient::new(
725 base_url,
726 timeout_secs,
727 proxy_url,
728 is_testnet,
729 retry_config,
730 )?),
731 instruments_cache: Arc::new(DashMap::new()),
732 clob_pair_id_to_instrument: Arc::new(DashMap::new()),
733 market_params_cache: Arc::new(DashMap::new()),
734 cache_initialized: AtomicBool::new(false),
735 })
736 }
737
738 pub async fn request_instruments(
748 &self,
749 symbol: Option<String>,
750 maker_fee: Option<rust_decimal::Decimal>,
751 taker_fee: Option<rust_decimal::Decimal>,
752 ) -> anyhow::Result<Vec<InstrumentAny>> {
753 use nautilus_core::time::get_atomic_clock_realtime;
754
755 let markets_response = self.inner.get_markets().await?;
756 let ts_init = get_atomic_clock_realtime().get_time_ns();
757
758 let mut instruments = Vec::new();
759 let mut skipped = 0;
760
761 for (ticker, market) in markets_response.markets {
762 if let Some(ref sym) = symbol
764 && ticker != *sym
765 {
766 continue;
767 }
768
769 match super::parse::parse_instrument_any(&market, maker_fee, taker_fee, ts_init) {
771 Ok(instrument) => {
772 instruments.push(instrument);
773 }
774 Err(e) => {
775 tracing::warn!("Failed to parse instrument {ticker}: {e}");
776 skipped += 1;
777 }
778 }
779 }
780
781 if skipped > 0 {
782 tracing::info!(
783 "Parsed {} instruments, skipped {} (inactive or invalid)",
784 instruments.len(),
785 skipped
786 );
787 } else {
788 tracing::debug!("Parsed {} instruments", instruments.len());
789 }
790
791 Ok(instruments)
792 }
793
794 pub async fn fetch_and_cache_instruments(&self) -> anyhow::Result<()> {
803 use nautilus_core::time::get_atomic_clock_realtime;
804
805 self.instruments_cache.clear();
806 self.clob_pair_id_to_instrument.clear();
807 self.market_params_cache.clear();
808
809 let markets_response = self.inner.get_markets().await?;
810 let ts_init = get_atomic_clock_realtime().get_time_ns();
811
812 let mut instruments = Vec::new();
813 let mut skipped = 0;
814
815 for (ticker, market) in markets_response.markets {
816 match super::parse::parse_instrument_any(&market, None, None, ts_init) {
818 Ok(instrument) => {
819 let instrument_id = instrument.id();
820 let symbol = instrument_id.symbol.inner();
821 self.instruments_cache.insert(symbol, instrument.clone());
822
823 self.clob_pair_id_to_instrument
825 .insert(market.clob_pair_id, instrument_id);
826
827 self.market_params_cache.insert(instrument_id, market);
829
830 instruments.push(instrument);
831 }
832 Err(e) => {
833 tracing::warn!("Failed to parse instrument {ticker}: {e}");
834 skipped += 1;
835 }
836 }
837 }
838
839 if !instruments.is_empty() {
840 self.cache_initialized.store(true, Ordering::Release);
841 }
842
843 if skipped > 0 {
844 tracing::info!(
845 "Cached {} instruments, skipped {} (inactive or invalid)",
846 instruments.len(),
847 skipped
848 );
849 } else {
850 tracing::info!("Cached {} instruments", instruments.len());
851 }
852
853 Ok(())
854 }
855
856 pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
860 for inst in instruments {
861 let symbol = inst.id().symbol.inner();
862 self.instruments_cache.insert(symbol, inst);
863 }
864 self.cache_initialized.store(true, Ordering::Release);
865 }
866
867 pub fn cache_instrument(&self, instrument: InstrumentAny) {
871 let symbol = instrument.id().symbol.inner();
872 self.instruments_cache.insert(symbol, instrument);
873 self.cache_initialized.store(true, Ordering::Release);
874 }
875
876 #[must_use]
878 pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
879 self.instruments_cache
880 .get(symbol)
881 .map(|entry| entry.clone())
882 }
883
884 #[must_use]
888 pub fn get_instrument_by_clob_id(&self, clob_pair_id: u32) -> Option<InstrumentAny> {
889 let instrument_id = self
891 .clob_pair_id_to_instrument
892 .get(&clob_pair_id)
893 .map(|entry| *entry)?;
894
895 self.get_instrument(&instrument_id.symbol.inner())
897 }
898
899 #[must_use]
908 pub fn get_market_params(
909 &self,
910 instrument_id: &InstrumentId,
911 ) -> Option<super::models::PerpetualMarket> {
912 self.market_params_cache
913 .get(instrument_id)
914 .map(|entry| entry.clone())
915 }
916
917 pub async fn request_trades(
926 &self,
927 symbol: &str,
928 limit: Option<u32>,
929 ) -> anyhow::Result<super::models::TradesResponse> {
930 self.inner
931 .get_trades(symbol, limit)
932 .await
933 .map_err(Into::into)
934 }
935
936 pub async fn request_candles(
945 &self,
946 symbol: &str,
947 resolution: DydxCandleResolution,
948 limit: Option<u32>,
949 from_iso: Option<DateTime<Utc>>,
950 to_iso: Option<DateTime<Utc>>,
951 ) -> anyhow::Result<super::models::CandlesResponse> {
952 self.inner
953 .get_candles(symbol, resolution, limit, from_iso, to_iso)
954 .await
955 .map_err(Into::into)
956 }
957
958 pub async fn request_bars(
968 &self,
969 bar_type: BarType,
970 resolution: DydxCandleResolution,
971 limit: Option<u32>,
972 from_iso: Option<DateTime<Utc>>,
973 to_iso: Option<DateTime<Utc>>,
974 ) -> anyhow::Result<Vec<Bar>> {
975 let instrument_id = bar_type.instrument_id();
976 let symbol = instrument_id.symbol;
977
978 let instrument = self
980 .get_instrument(&symbol.inner())
981 .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {symbol}"))?;
982
983 let ticker = extract_raw_symbol(symbol.as_str());
985 let response = self
986 .request_candles(ticker, resolution, limit, from_iso, to_iso)
987 .await?;
988
989 let ts_init = get_atomic_clock_realtime().get_time_ns();
990 let interval_ns = get_bar_interval_ns(&bar_type);
991
992 let mut bars = Vec::with_capacity(response.candles.len());
993
994 for candle in response.candles {
995 let started_at_nanos = candle.started_at.timestamp_nanos_opt().ok_or_else(|| {
997 anyhow::anyhow!("Timestamp out of range for candle at {}", candle.started_at)
998 })?;
999 let ts_event = UnixNanos::from(started_at_nanos as u64) + interval_ns;
1000
1001 let bar = Bar::new(
1002 bar_type,
1003 Price::from_decimal_dp(candle.open, instrument.price_precision())?,
1004 Price::from_decimal_dp(candle.high, instrument.price_precision())?,
1005 Price::from_decimal_dp(candle.low, instrument.price_precision())?,
1006 Price::from_decimal_dp(candle.close, instrument.price_precision())?,
1007 Quantity::from_decimal_dp(candle.base_token_volume, instrument.size_precision())?,
1008 ts_event,
1009 ts_init,
1010 );
1011
1012 bars.push(bar);
1013 }
1014
1015 Ok(bars)
1016 }
1017
1018 pub async fn request_trade_ticks(
1028 &self,
1029 instrument_id: InstrumentId,
1030 limit: Option<u32>,
1031 ) -> anyhow::Result<Vec<TradeTick>> {
1032 let symbol = instrument_id.symbol;
1033
1034 let instrument = self
1035 .get_instrument(&symbol.inner())
1036 .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {symbol}"))?;
1037
1038 let ticker = extract_raw_symbol(symbol.as_str());
1039 let response = self.request_trades(ticker, limit).await?;
1040
1041 let ts_init = get_atomic_clock_realtime().get_time_ns();
1042
1043 let mut trades = Vec::with_capacity(response.trades.len());
1044
1045 for trade in response.trades {
1046 let ts_event_nanos = trade.created_at.timestamp_nanos_opt().ok_or_else(|| {
1047 anyhow::anyhow!("Timestamp out of range for trade at {}", trade.created_at)
1048 })?;
1049 let ts_event = UnixNanos::from(ts_event_nanos as u64);
1050
1051 let aggressor_side = match trade.side {
1052 NautilusOrderSide::Buy => AggressorSide::Buyer,
1053 NautilusOrderSide::Sell => AggressorSide::Seller,
1054 NautilusOrderSide::NoOrderSide => AggressorSide::NoAggressor,
1055 };
1056
1057 let trade_tick = TradeTick::new(
1058 instrument_id,
1059 Price::from_decimal_dp(trade.price, instrument.price_precision())?,
1060 Quantity::from_decimal_dp(trade.size, instrument.size_precision())?,
1061 aggressor_side,
1062 TradeId::new(&trade.id),
1063 ts_event,
1064 ts_init,
1065 );
1066
1067 trades.push(trade_tick);
1068 }
1069
1070 Ok(trades)
1071 }
1072
1073 pub async fn request_orderbook_snapshot(
1084 &self,
1085 instrument_id: InstrumentId,
1086 ) -> anyhow::Result<OrderBookDeltas> {
1087 let symbol = instrument_id.symbol;
1088
1089 let instrument = self
1090 .get_instrument(&symbol.inner())
1091 .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {symbol}"))?;
1092
1093 let ticker = extract_raw_symbol(symbol.as_str());
1094 let response = self.inner.get_orderbook(ticker).await?;
1095
1096 let ts_init = get_atomic_clock_realtime().get_time_ns();
1097
1098 let mut deltas = Vec::with_capacity(1 + response.bids.len() + response.asks.len());
1099
1100 deltas.push(OrderBookDelta::clear(instrument_id, 0, ts_init, ts_init));
1101
1102 for (i, level) in response.bids.iter().enumerate() {
1103 let is_last = i == response.bids.len() - 1 && response.asks.is_empty();
1104 let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
1105
1106 let order = BookOrder::new(
1107 NautilusOrderSide::Buy,
1108 Price::from_decimal_dp(level.price, instrument.price_precision())?,
1109 Quantity::from_decimal_dp(level.size, instrument.size_precision())?,
1110 0,
1111 );
1112
1113 deltas.push(OrderBookDelta::new(
1114 instrument_id,
1115 BookAction::Add,
1116 order,
1117 flags,
1118 0,
1119 ts_init,
1120 ts_init,
1121 ));
1122 }
1123
1124 for (i, level) in response.asks.iter().enumerate() {
1125 let is_last = i == response.asks.len() - 1;
1126 let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
1127
1128 let order = BookOrder::new(
1129 NautilusOrderSide::Sell,
1130 Price::from_decimal_dp(level.price, instrument.price_precision())?,
1131 Quantity::from_decimal_dp(level.size, instrument.size_precision())?,
1132 0,
1133 );
1134
1135 deltas.push(OrderBookDelta::new(
1136 instrument_id,
1137 BookAction::Add,
1138 order,
1139 flags,
1140 0,
1141 ts_init,
1142 ts_init,
1143 ));
1144 }
1145
1146 Ok(OrderBookDeltas::new(instrument_id, deltas))
1147 }
1148
1149 #[must_use]
1155 pub fn raw_client(&self) -> &Arc<DydxRawHttpClient> {
1156 &self.inner
1157 }
1158
1159 #[must_use]
1161 pub fn is_testnet(&self) -> bool {
1162 self.inner.is_testnet()
1163 }
1164
1165 #[must_use]
1167 pub fn base_url(&self) -> &str {
1168 self.inner.base_url()
1169 }
1170
1171 #[must_use]
1173 pub fn is_cache_initialized(&self) -> bool {
1174 self.cache_initialized.load(Ordering::Acquire)
1175 }
1176
1177 #[must_use]
1179 pub fn cached_instruments_count(&self) -> usize {
1180 self.instruments_cache.len()
1181 }
1182
1183 #[must_use]
1185 pub fn instruments(&self) -> &Arc<DashMap<Ustr, InstrumentAny>> {
1186 &self.instruments_cache
1187 }
1188
1189 #[must_use]
1194 pub fn clob_pair_id_mapping(&self) -> &Arc<DashMap<u32, InstrumentId>> {
1195 &self.clob_pair_id_to_instrument
1196 }
1197
1198 pub async fn request_order_status_reports(
1207 &self,
1208 address: &str,
1209 subaccount_number: u32,
1210 account_id: AccountId,
1211 instrument_id: Option<InstrumentId>,
1212 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1213 let ts_init = nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
1214
1215 let market = instrument_id.map(|id| {
1217 let symbol = id.symbol.to_string();
1218 symbol.trim_end_matches("-PERP").to_string()
1220 });
1221
1222 let orders = self
1223 .inner
1224 .get_orders(address, subaccount_number, market.as_deref(), None)
1225 .await?;
1226
1227 let mut reports = Vec::new();
1228
1229 for order in orders {
1230 let instrument = match self.get_instrument_by_clob_id(order.clob_pair_id) {
1232 Some(inst) => inst,
1233 None => {
1234 tracing::warn!(
1235 "Skipping order {}: no cached instrument for clob_pair_id {}",
1236 order.id,
1237 order.clob_pair_id
1238 );
1239 continue;
1240 }
1241 };
1242
1243 if instrument_id.is_some_and(|filter_id| instrument.id() != filter_id) {
1245 continue;
1246 }
1247
1248 match super::parse::parse_order_status_report(&order, &instrument, account_id, ts_init)
1249 {
1250 Ok(report) => reports.push(report),
1251 Err(e) => {
1252 tracing::warn!("Failed to parse order {}: {e}", order.id);
1253 }
1254 }
1255 }
1256
1257 Ok(reports)
1258 }
1259
1260 pub async fn request_fill_reports(
1269 &self,
1270 address: &str,
1271 subaccount_number: u32,
1272 account_id: AccountId,
1273 instrument_id: Option<InstrumentId>,
1274 ) -> anyhow::Result<Vec<FillReport>> {
1275 let ts_init = nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
1276
1277 let market = instrument_id.map(|id| {
1279 let symbol = id.symbol.to_string();
1280 symbol.trim_end_matches("-PERP").to_string()
1281 });
1282
1283 let fills_response = self
1284 .inner
1285 .get_fills(address, subaccount_number, market.as_deref(), None)
1286 .await?;
1287
1288 let mut reports = Vec::new();
1289
1290 for fill in fills_response.fills {
1291 let market = &fill.market;
1293 let symbol = ustr::Ustr::from(&format!("{market}-PERP"));
1294 let instrument = match self.get_instrument(&symbol) {
1295 Some(inst) => inst,
1296 None => {
1297 tracing::warn!(
1298 "Skipping fill {}: no cached instrument for market {}",
1299 fill.id,
1300 fill.market
1301 );
1302 continue;
1303 }
1304 };
1305
1306 if instrument_id.is_some_and(|filter_id| instrument.id() != filter_id) {
1308 continue;
1309 }
1310
1311 match super::parse::parse_fill_report(&fill, &instrument, account_id, ts_init) {
1312 Ok(report) => reports.push(report),
1313 Err(e) => {
1314 tracing::warn!("Failed to parse fill {}: {e}", fill.id);
1315 }
1316 }
1317 }
1318
1319 Ok(reports)
1320 }
1321
1322 pub async fn request_position_status_reports(
1331 &self,
1332 address: &str,
1333 subaccount_number: u32,
1334 account_id: AccountId,
1335 instrument_id: Option<InstrumentId>,
1336 ) -> anyhow::Result<Vec<PositionStatusReport>> {
1337 let ts_init = nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
1338
1339 let subaccount_response = self
1340 .inner
1341 .get_subaccount(address, subaccount_number)
1342 .await?;
1343
1344 let mut reports = Vec::new();
1345
1346 for (market, position) in subaccount_response.subaccount.open_perpetual_positions {
1347 let symbol = ustr::Ustr::from(&format!("{market}-PERP"));
1349 let instrument = match self.get_instrument(&symbol) {
1350 Some(inst) => inst,
1351 None => {
1352 tracing::warn!(
1353 "Skipping position: no cached instrument for market {}",
1354 market
1355 );
1356 continue;
1357 }
1358 };
1359
1360 if instrument_id.is_some_and(|filter_id| instrument.id() != filter_id) {
1362 continue;
1363 }
1364
1365 match super::parse::parse_position_status_report(
1366 &position,
1367 &instrument,
1368 account_id,
1369 ts_init,
1370 ) {
1371 Ok(report) => reports.push(report),
1372 Err(e) => {
1373 tracing::warn!("Failed to parse position for {}: {e}", market);
1374 }
1375 }
1376 }
1377
1378 Ok(reports)
1379 }
1380}
1381
1382#[cfg(test)]
1383mod tests {
1384 use nautilus_core::UnixNanos;
1385 use rstest::rstest;
1386
1387 use super::*;
1388 use crate::http::error;
1389
1390 #[tokio::test]
1391 async fn test_raw_client_creation() {
1392 let client = DydxRawHttpClient::new(None, Some(30), None, false, None);
1393 assert!(client.is_ok());
1394
1395 let client = client.unwrap();
1396 assert!(!client.is_testnet());
1397 assert_eq!(client.base_url(), DYDX_HTTP_URL);
1398 }
1399
1400 #[tokio::test]
1401 async fn test_raw_client_testnet() {
1402 let client = DydxRawHttpClient::new(None, Some(30), None, true, None);
1403 assert!(client.is_ok());
1404
1405 let client = client.unwrap();
1406 assert!(client.is_testnet());
1407 assert_eq!(client.base_url(), DYDX_TESTNET_HTTP_URL);
1408 }
1409
1410 #[tokio::test]
1411 async fn test_domain_client_creation() {
1412 let client = DydxHttpClient::new(None, Some(30), None, false, None);
1413 assert!(client.is_ok());
1414
1415 let client = client.unwrap();
1416 assert!(!client.is_testnet());
1417 assert_eq!(client.base_url(), DYDX_HTTP_URL);
1418 assert!(!client.is_cache_initialized());
1419 assert_eq!(client.cached_instruments_count(), 0);
1420 }
1421
1422 #[tokio::test]
1423 async fn test_domain_client_testnet() {
1424 let client = DydxHttpClient::new(None, Some(30), None, true, None);
1425 assert!(client.is_ok());
1426
1427 let client = client.unwrap();
1428 assert!(client.is_testnet());
1429 assert_eq!(client.base_url(), DYDX_TESTNET_HTTP_URL);
1430 }
1431
1432 #[tokio::test]
1433 async fn test_domain_client_default() {
1434 let client = DydxHttpClient::default();
1435 assert!(!client.is_testnet());
1436 assert_eq!(client.base_url(), DYDX_HTTP_URL);
1437 assert!(!client.is_cache_initialized());
1438 }
1439
1440 #[tokio::test]
1441 async fn test_domain_client_clone() {
1442 let client = DydxHttpClient::new(None, Some(30), None, false, None).unwrap();
1443
1444 let cloned = client.clone();
1446 assert!(!cloned.is_cache_initialized());
1447
1448 client.cache_initialized.store(true, Ordering::Release);
1450
1451 #[allow(clippy::redundant_clone)]
1453 let cloned_after = client.clone();
1454 assert!(cloned_after.is_cache_initialized());
1455 }
1456
1457 #[rstest]
1458 fn test_domain_client_cache_instrument() {
1459 use nautilus_model::{
1460 identifiers::{InstrumentId, Symbol},
1461 instruments::CryptoPerpetual,
1462 types::{Currency, Price, Quantity},
1463 };
1464
1465 let client = DydxHttpClient::default();
1466 assert_eq!(client.cached_instruments_count(), 0);
1467
1468 let instrument_id =
1470 InstrumentId::new(Symbol::from("BTC-USD"), *crate::common::consts::DYDX_VENUE);
1471 let price = Price::from("1.0");
1472 let size = Quantity::from("0.001");
1473 let instrument = CryptoPerpetual::new(
1474 instrument_id,
1475 Symbol::from("BTC-USD"),
1476 Currency::BTC(),
1477 Currency::USD(),
1478 Currency::USD(),
1479 false,
1480 price.precision,
1481 size.precision,
1482 price,
1483 size,
1484 None,
1485 None,
1486 None,
1487 None,
1488 None,
1489 None,
1490 None,
1491 None,
1492 None,
1493 None,
1494 None,
1495 None,
1496 UnixNanos::default(),
1497 UnixNanos::default(),
1498 );
1499
1500 client.cache_instrument(InstrumentAny::CryptoPerpetual(instrument));
1502 assert_eq!(client.cached_instruments_count(), 1);
1503 assert!(client.is_cache_initialized());
1504
1505 let btc_usd = Ustr::from("BTC-USD");
1507 let cached = client.get_instrument(&btc_usd);
1508 assert!(cached.is_some());
1509 }
1510
1511 #[rstest]
1512 fn test_domain_client_get_instrument_not_found() {
1513 let client = DydxHttpClient::default();
1514 let eth_usd = Ustr::from("ETH-USD");
1515 let result = client.get_instrument(ð_usd);
1516 assert!(result.is_none());
1517 }
1518
1519 #[tokio::test]
1520 async fn test_http_timeout_respects_configuration_and_does_not_block() {
1521 use axum::{Router, routing::get};
1522 use tokio::net::TcpListener;
1523
1524 async fn slow_handler() -> &'static str {
1525 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
1527 "ok"
1528 }
1529
1530 let router = Router::new().route("/v4/slow", get(slow_handler));
1531
1532 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1533 let addr = listener.local_addr().unwrap();
1534
1535 tokio::spawn(async move {
1536 axum::serve(listener, router.into_make_service())
1537 .await
1538 .unwrap();
1539 });
1540
1541 let base_url = format!("http://{addr}");
1542
1543 let retry_config = RetryConfig {
1546 max_retries: 0,
1547 initial_delay_ms: 0,
1548 max_delay_ms: 0,
1549 backoff_factor: 1.0,
1550 jitter_ms: 0,
1551 operation_timeout_ms: Some(500),
1552 immediate_first: true,
1553 max_elapsed_ms: Some(1_000),
1554 };
1555
1556 let client =
1559 DydxRawHttpClient::new(Some(base_url), Some(60), None, false, Some(retry_config))
1560 .unwrap();
1561
1562 let start = std::time::Instant::now();
1563 let result: Result<serde_json::Value, error::DydxHttpError> =
1564 client.send_request(Method::GET, "/v4/slow", None).await;
1565 let elapsed = start.elapsed();
1566
1567 assert!(result.is_err());
1570 assert!(elapsed < std::time::Duration::from_secs(3));
1571 }
1572}