1use std::{
54 collections::HashMap,
55 fmt::Debug,
56 num::NonZeroU32,
57 sync::{
58 Arc, LazyLock,
59 atomic::{AtomicBool, Ordering},
60 },
61};
62
63use chrono::{DateTime, Utc};
64use dashmap::DashMap;
65use nautilus_core::{UnixNanos, consts::NAUTILUS_USER_AGENT, time::get_atomic_clock_realtime};
66use nautilus_model::{
67 data::{
68 Bar, BarType, BookOrder, OrderBookDelta, OrderBookDeltas, TradeTick,
69 bar::get_bar_interval_ns,
70 },
71 enums::{AggressorSide, BookAction, OrderSide as NautilusOrderSide, RecordFlag},
72 identifiers::{AccountId, InstrumentId, TradeId},
73 instruments::{Instrument, InstrumentAny},
74 reports::{FillReport, OrderStatusReport, PositionStatusReport},
75 types::{Price, Quantity},
76};
77use nautilus_network::{
78 http::{HttpClient, Method, USER_AGENT},
79 ratelimiter::quota::Quota,
80 retry::{RetryConfig, RetryManager},
81};
82use serde::{Deserialize, Serialize, de::DeserializeOwned};
83use tokio_util::sync::CancellationToken;
84use ustr::Ustr;
85
86use super::error::DydxHttpError;
87use crate::common::{
88 consts::{DYDX_HTTP_URL, DYDX_TESTNET_HTTP_URL},
89 enums::DydxCandleResolution,
90 parse::extract_raw_symbol,
91};
92
93pub static DYDX_REST_QUOTA: LazyLock<Quota> =
99 LazyLock::new(|| Quota::per_second(NonZeroU32::new(10).unwrap()));
100
101#[derive(Debug, Serialize, Deserialize)]
106pub struct DydxResponse<T> {
107 pub data: T,
109}
110
111pub struct DydxRawHttpClient {
121 base_url: String,
122 client: HttpClient,
123 retry_manager: RetryManager<DydxHttpError>,
124 cancellation_token: CancellationToken,
125 is_testnet: bool,
126}
127
128impl Default for DydxRawHttpClient {
129 fn default() -> Self {
130 Self::new(None, Some(60), None, false, None)
131 .expect("Failed to create default DydxRawHttpClient")
132 }
133}
134
135impl Debug for DydxRawHttpClient {
136 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
137 f.debug_struct(stringify!(DydxRawHttpClient))
138 .field("base_url", &self.base_url)
139 .field("is_testnet", &self.is_testnet)
140 .finish_non_exhaustive()
141 }
142}
143
144impl DydxRawHttpClient {
145 pub fn cancel_all_requests(&self) {
147 self.cancellation_token.cancel();
148 }
149
150 pub fn cancellation_token(&self) -> &CancellationToken {
152 &self.cancellation_token
153 }
154
155 pub fn new(
164 base_url: Option<String>,
165 timeout_secs: Option<u64>,
166 proxy_url: Option<String>,
167 is_testnet: bool,
168 retry_config: Option<RetryConfig>,
169 ) -> anyhow::Result<Self> {
170 let base_url = if is_testnet {
171 base_url.unwrap_or_else(|| DYDX_TESTNET_HTTP_URL.to_string())
172 } else {
173 base_url.unwrap_or_else(|| DYDX_HTTP_URL.to_string())
174 };
175
176 let retry_manager = RetryManager::new(retry_config.unwrap_or_default());
177
178 let mut headers = HashMap::new();
180 headers.insert(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string());
181
182 let client = HttpClient::new(
183 headers,
184 vec![], vec![], Some(*DYDX_REST_QUOTA),
187 timeout_secs,
188 proxy_url,
189 )
190 .map_err(|e| {
191 DydxHttpError::ValidationError(format!("Failed to create HTTP client: {e}"))
192 })?;
193
194 Ok(Self {
195 base_url,
196 client,
197 retry_manager,
198 cancellation_token: CancellationToken::new(),
199 is_testnet,
200 })
201 }
202
203 #[must_use]
205 pub const fn is_testnet(&self) -> bool {
206 self.is_testnet
207 }
208
209 #[must_use]
211 pub fn base_url(&self) -> &str {
212 &self.base_url
213 }
214
215 pub async fn send_request<T>(
227 &self,
228 method: Method,
229 endpoint: &str,
230 query_params: Option<&str>,
231 ) -> Result<T, DydxHttpError>
232 where
233 T: DeserializeOwned,
234 {
235 let url = if let Some(params) = query_params {
236 format!("{}{endpoint}?{params}", self.base_url)
237 } else {
238 format!("{}{endpoint}", self.base_url)
239 };
240
241 let operation = || async {
242 let request = self
243 .client
244 .request_with_ustr_keys(
245 method.clone(),
246 url.clone(),
247 None, None, None, None, None, )
253 .await
254 .map_err(|e| DydxHttpError::HttpClientError(e.to_string()))?;
255
256 if !request.status.is_success() {
258 return Err(DydxHttpError::HttpStatus {
259 status: request.status.as_u16(),
260 message: String::from_utf8_lossy(&request.body).to_string(),
261 });
262 }
263
264 Ok(request)
265 };
266
267 let should_retry = |error: &DydxHttpError| -> bool {
272 match error {
273 DydxHttpError::HttpClientError(_) => true,
274 DydxHttpError::HttpStatus { status, .. } => *status == 429 || *status >= 500,
275 _ => false,
276 }
277 };
278
279 let create_error = |msg: String| -> DydxHttpError {
280 if msg == "canceled" {
281 DydxHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
282 } else {
283 DydxHttpError::ValidationError(msg)
284 }
285 };
286
287 let response = self
289 .retry_manager
290 .execute_with_retry_with_cancel(
291 endpoint,
292 operation,
293 should_retry,
294 create_error,
295 &self.cancellation_token,
296 )
297 .await?;
298
299 serde_json::from_slice(&response.body).map_err(|e| DydxHttpError::Deserialization {
301 error: e.to_string(),
302 body: String::from_utf8_lossy(&response.body).to_string(),
303 })
304 }
305
306 pub async fn send_post_request<T, B>(
319 &self,
320 endpoint: &str,
321 body: &B,
322 ) -> Result<T, DydxHttpError>
323 where
324 T: DeserializeOwned,
325 B: Serialize,
326 {
327 let url = format!("{}{endpoint}", self.base_url);
328
329 let body_bytes = serde_json::to_vec(body).map_err(|e| DydxHttpError::Serialization {
330 error: e.to_string(),
331 })?;
332
333 let operation = || async {
334 let request = self
335 .client
336 .request_with_ustr_keys(
337 Method::POST,
338 url.clone(),
339 None, None, Some(body_bytes.clone()),
342 None, None, )
345 .await
346 .map_err(|e| DydxHttpError::HttpClientError(e.to_string()))?;
347
348 if !request.status.is_success() {
350 return Err(DydxHttpError::HttpStatus {
351 status: request.status.as_u16(),
352 message: String::from_utf8_lossy(&request.body).to_string(),
353 });
354 }
355
356 Ok(request)
357 };
358
359 let should_retry = |error: &DydxHttpError| -> bool {
361 match error {
362 DydxHttpError::HttpClientError(_) => true,
363 DydxHttpError::HttpStatus { status, .. } => *status == 429 || *status >= 500,
364 _ => false,
365 }
366 };
367
368 let create_error = |msg: String| -> DydxHttpError {
369 if msg == "canceled" {
370 DydxHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
371 } else {
372 DydxHttpError::ValidationError(msg)
373 }
374 };
375
376 let response = self
378 .retry_manager
379 .execute_with_retry_with_cancel(
380 endpoint,
381 operation,
382 should_retry,
383 create_error,
384 &self.cancellation_token,
385 )
386 .await?;
387
388 serde_json::from_slice(&response.body).map_err(|e| DydxHttpError::Deserialization {
390 error: e.to_string(),
391 body: String::from_utf8_lossy(&response.body).to_string(),
392 })
393 }
394
395 pub async fn get_markets(&self) -> Result<super::models::MarketsResponse, DydxHttpError> {
401 self.send_request(Method::GET, "/v4/perpetualMarkets", None)
402 .await
403 }
404
405 pub async fn fetch_instruments(
418 &self,
419 maker_fee: Option<rust_decimal::Decimal>,
420 taker_fee: Option<rust_decimal::Decimal>,
421 ) -> Result<Vec<InstrumentAny>, DydxHttpError> {
422 use nautilus_core::time::get_atomic_clock_realtime;
423
424 let markets_response = self.get_markets().await?;
425 let ts_init = get_atomic_clock_realtime().get_time_ns();
426
427 let mut instruments = Vec::new();
428 let mut skipped_inactive = 0;
429
430 for (ticker, market) in markets_response.markets {
431 if !super::parse::is_market_active(&market.status) {
432 tracing::debug!(
433 "Skipping inactive market {ticker} (status: {:?})",
434 market.status
435 );
436 skipped_inactive += 1;
437 continue;
438 }
439
440 match super::parse::parse_instrument_any(&market, maker_fee, taker_fee, ts_init) {
441 Ok(instrument) => {
442 instruments.push(instrument);
443 }
444 Err(e) => {
445 tracing::error!("Failed to parse instrument {ticker}: {e}");
446 }
447 }
448 }
449
450 if skipped_inactive > 0 {
451 tracing::info!(
452 "Parsed {} instruments, skipped {} inactive",
453 instruments.len(),
454 skipped_inactive
455 );
456 } else {
457 tracing::info!("Parsed {} instruments", instruments.len());
458 }
459
460 Ok(instruments)
461 }
462
463 pub async fn get_orderbook(
469 &self,
470 ticker: &str,
471 ) -> Result<super::models::OrderbookResponse, DydxHttpError> {
472 let endpoint = format!("/v4/orderbooks/perpetualMarket/{ticker}");
473 self.send_request(Method::GET, &endpoint, None).await
474 }
475
476 pub async fn get_trades(
482 &self,
483 ticker: &str,
484 limit: Option<u32>,
485 ) -> Result<super::models::TradesResponse, DydxHttpError> {
486 let endpoint = format!("/v4/trades/perpetualMarket/{ticker}");
487 let query = limit.map(|l| format!("limit={l}"));
488 self.send_request(Method::GET, &endpoint, query.as_deref())
489 .await
490 }
491
492 pub async fn get_candles(
498 &self,
499 ticker: &str,
500 resolution: DydxCandleResolution,
501 limit: Option<u32>,
502 from_iso: Option<DateTime<Utc>>,
503 to_iso: Option<DateTime<Utc>>,
504 ) -> Result<super::models::CandlesResponse, DydxHttpError> {
505 let endpoint = format!("/v4/candles/perpetualMarkets/{ticker}");
506 let mut query_parts = vec![format!("resolution={}", resolution)];
507 if let Some(l) = limit {
508 query_parts.push(format!("limit={l}"));
509 }
510 if let Some(from) = from_iso {
511 let from_str = from.to_rfc3339();
512 query_parts.push(format!("fromISO={}", urlencoding::encode(&from_str)));
513 }
514 if let Some(to) = to_iso {
515 let to_str = to.to_rfc3339();
516 query_parts.push(format!("toISO={}", urlencoding::encode(&to_str)));
517 }
518 let query = query_parts.join("&");
519 self.send_request(Method::GET, &endpoint, Some(&query))
520 .await
521 }
522
523 pub async fn get_subaccount(
529 &self,
530 address: &str,
531 subaccount_number: u32,
532 ) -> Result<super::models::SubaccountResponse, DydxHttpError> {
533 let endpoint = format!("/v4/addresses/{address}/subaccountNumber/{subaccount_number}");
534 self.send_request(Method::GET, &endpoint, None).await
535 }
536
537 pub async fn get_fills(
543 &self,
544 address: &str,
545 subaccount_number: u32,
546 market: Option<&str>,
547 limit: Option<u32>,
548 ) -> Result<super::models::FillsResponse, DydxHttpError> {
549 let endpoint = "/v4/fills";
550 let mut query_parts = vec![
551 format!("address={address}"),
552 format!("subaccountNumber={subaccount_number}"),
553 ];
554 if let Some(m) = market {
555 query_parts.push(format!("market={m}"));
556 }
557 if let Some(l) = limit {
558 query_parts.push(format!("limit={l}"));
559 }
560 let query = query_parts.join("&");
561 self.send_request(Method::GET, endpoint, Some(&query)).await
562 }
563
564 pub async fn get_orders(
570 &self,
571 address: &str,
572 subaccount_number: u32,
573 market: Option<&str>,
574 limit: Option<u32>,
575 ) -> Result<super::models::OrdersResponse, DydxHttpError> {
576 let endpoint = "/v4/orders";
577 let mut query_parts = vec![
578 format!("address={address}"),
579 format!("subaccountNumber={subaccount_number}"),
580 ];
581 if let Some(m) = market {
582 query_parts.push(format!("market={m}"));
583 }
584 if let Some(l) = limit {
585 query_parts.push(format!("limit={l}"));
586 }
587 let query = query_parts.join("&");
588 self.send_request(Method::GET, endpoint, Some(&query)).await
589 }
590
591 pub async fn get_transfers(
597 &self,
598 address: &str,
599 subaccount_number: u32,
600 limit: Option<u32>,
601 ) -> Result<super::models::TransfersResponse, DydxHttpError> {
602 let endpoint = "/v4/transfers";
603 let mut query_parts = vec![
604 format!("address={address}"),
605 format!("subaccountNumber={subaccount_number}"),
606 ];
607 if let Some(l) = limit {
608 query_parts.push(format!("limit={l}"));
609 }
610 let query = query_parts.join("&");
611 self.send_request(Method::GET, endpoint, Some(&query)).await
612 }
613
614 pub async fn get_time(&self) -> Result<super::models::TimeResponse, DydxHttpError> {
620 self.send_request(Method::GET, "/v4/time", None).await
621 }
622
623 pub async fn get_height(&self) -> Result<super::models::HeightResponse, DydxHttpError> {
629 self.send_request(Method::GET, "/v4/height", None).await
630 }
631}
632
633#[derive(Debug)]
649#[cfg_attr(
650 feature = "python",
651 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.dydx")
652)]
653pub struct DydxHttpClient {
654 pub(crate) inner: Arc<DydxRawHttpClient>,
656 pub(crate) instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
658 pub(crate) clob_pair_id_to_instrument: Arc<DashMap<u32, InstrumentId>>,
663 pub(crate) market_params_cache: Arc<DashMap<InstrumentId, super::models::PerpetualMarket>>,
669 cache_initialized: AtomicBool,
671}
672
673impl Clone for DydxHttpClient {
674 fn clone(&self) -> Self {
675 let cache_initialized = AtomicBool::new(false);
676 let is_initialized = self.cache_initialized.load(Ordering::Acquire);
677 if is_initialized {
678 cache_initialized.store(true, Ordering::Release);
679 }
680
681 Self {
682 inner: self.inner.clone(),
683 instruments_cache: self.instruments_cache.clone(),
684 clob_pair_id_to_instrument: self.clob_pair_id_to_instrument.clone(),
685 market_params_cache: self.market_params_cache.clone(),
686 cache_initialized,
687 }
688 }
689}
690
691impl Default for DydxHttpClient {
692 fn default() -> Self {
693 Self::new(None, Some(60), None, false, None)
694 .expect("Failed to create default DydxHttpClient")
695 }
696}
697
698impl DydxHttpClient {
699 pub fn new(
709 base_url: Option<String>,
710 timeout_secs: Option<u64>,
711 proxy_url: Option<String>,
712 is_testnet: bool,
713 retry_config: Option<RetryConfig>,
714 ) -> anyhow::Result<Self> {
715 Ok(Self {
716 inner: Arc::new(DydxRawHttpClient::new(
717 base_url,
718 timeout_secs,
719 proxy_url,
720 is_testnet,
721 retry_config,
722 )?),
723 instruments_cache: Arc::new(DashMap::new()),
724 clob_pair_id_to_instrument: Arc::new(DashMap::new()),
725 market_params_cache: Arc::new(DashMap::new()),
726 cache_initialized: AtomicBool::new(false),
727 })
728 }
729
730 pub async fn request_instruments(
740 &self,
741 symbol: Option<String>,
742 maker_fee: Option<rust_decimal::Decimal>,
743 taker_fee: Option<rust_decimal::Decimal>,
744 ) -> anyhow::Result<Vec<InstrumentAny>> {
745 use nautilus_core::time::get_atomic_clock_realtime;
746
747 let markets_response = self.inner.get_markets().await?;
748 let ts_init = get_atomic_clock_realtime().get_time_ns();
749
750 let mut instruments = Vec::new();
751 let mut skipped_inactive = 0;
752
753 for (ticker, market) in markets_response.markets {
754 if let Some(ref sym) = symbol
756 && ticker != *sym
757 {
758 continue;
759 }
760
761 if !super::parse::is_market_active(&market.status) {
762 tracing::debug!(
763 "Skipping inactive market {ticker} (status: {:?})",
764 market.status
765 );
766 skipped_inactive += 1;
767 continue;
768 }
769
770 match super::parse::parse_instrument_any(&market, maker_fee, taker_fee, ts_init) {
771 Ok(instrument) => {
772 instruments.push(instrument);
773 }
774 Err(e) => {
775 tracing::error!("Failed to parse instrument {ticker}: {e}");
776 }
777 }
778 }
779
780 if skipped_inactive > 0 {
781 tracing::info!(
782 "Parsed {} instruments, skipped {} inactive",
783 instruments.len(),
784 skipped_inactive
785 );
786 } else {
787 tracing::debug!("Parsed {} instruments", instruments.len());
788 }
789
790 Ok(instruments)
791 }
792
793 pub async fn fetch_and_cache_instruments(&self) -> anyhow::Result<()> {
805 use nautilus_core::time::get_atomic_clock_realtime;
806
807 let markets_response = self.inner.get_markets().await?;
809 let ts_init = get_atomic_clock_realtime().get_time_ns();
810
811 let mut parsed_instruments = Vec::new();
812 let mut parsed_markets = Vec::new();
813 let mut skipped_inactive = 0;
814
815 for (ticker, market) in markets_response.markets {
816 if !super::parse::is_market_active(&market.status) {
817 tracing::debug!(
818 "Skipping inactive market {ticker} (status: {:?})",
819 market.status
820 );
821 skipped_inactive += 1;
822 continue;
823 }
824
825 match super::parse::parse_instrument_any(&market, None, None, ts_init) {
826 Ok(instrument) => {
827 parsed_instruments.push(instrument);
828 parsed_markets.push(market);
829 }
830 Err(e) => {
831 tracing::error!("Failed to parse instrument {ticker}: {e}");
832 }
833 }
834 }
835
836 self.instruments_cache.clear();
838 self.clob_pair_id_to_instrument.clear();
839 self.market_params_cache.clear();
840
841 for (instrument, market) in parsed_instruments.iter().zip(parsed_markets.into_iter()) {
842 let instrument_id = instrument.id();
843 let symbol = instrument_id.symbol.inner();
844 self.instruments_cache.insert(symbol, instrument.clone());
845 self.clob_pair_id_to_instrument
846 .insert(market.clob_pair_id, instrument_id);
847 self.market_params_cache.insert(instrument_id, market);
848 }
849
850 if !parsed_instruments.is_empty() {
851 self.cache_initialized.store(true, Ordering::Release);
852 }
853
854 if skipped_inactive > 0 {
855 tracing::info!(
856 "Cached {} instruments, skipped {} inactive",
857 parsed_instruments.len(),
858 skipped_inactive
859 );
860 } else {
861 tracing::info!("Cached {} instruments", parsed_instruments.len());
862 }
863
864 Ok(())
865 }
866
867 pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
871 for inst in instruments {
872 let symbol = inst.id().symbol.inner();
873 self.instruments_cache.insert(symbol, inst);
874 }
875 self.cache_initialized.store(true, Ordering::Release);
876 }
877
878 pub fn cache_instrument(&self, instrument: InstrumentAny) {
882 let symbol = instrument.id().symbol.inner();
883 self.instruments_cache.insert(symbol, instrument);
884 self.cache_initialized.store(true, Ordering::Release);
885 }
886
887 #[must_use]
889 pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
890 self.instruments_cache
891 .get(symbol)
892 .map(|entry| entry.clone())
893 }
894
895 #[must_use]
899 pub fn get_instrument_by_clob_id(&self, clob_pair_id: u32) -> Option<InstrumentAny> {
900 let instrument_id = self
902 .clob_pair_id_to_instrument
903 .get(&clob_pair_id)
904 .map(|entry| *entry)?;
905
906 self.get_instrument(&instrument_id.symbol.inner())
908 }
909
910 #[must_use]
919 pub fn get_market_params(
920 &self,
921 instrument_id: &InstrumentId,
922 ) -> Option<super::models::PerpetualMarket> {
923 self.market_params_cache
924 .get(instrument_id)
925 .map(|entry| entry.clone())
926 }
927
928 pub async fn request_trades(
937 &self,
938 symbol: &str,
939 limit: Option<u32>,
940 ) -> anyhow::Result<super::models::TradesResponse> {
941 self.inner
942 .get_trades(symbol, limit)
943 .await
944 .map_err(Into::into)
945 }
946
947 pub async fn request_candles(
956 &self,
957 symbol: &str,
958 resolution: DydxCandleResolution,
959 limit: Option<u32>,
960 from_iso: Option<DateTime<Utc>>,
961 to_iso: Option<DateTime<Utc>>,
962 ) -> anyhow::Result<super::models::CandlesResponse> {
963 self.inner
964 .get_candles(symbol, resolution, limit, from_iso, to_iso)
965 .await
966 .map_err(Into::into)
967 }
968
969 pub async fn request_bars(
979 &self,
980 bar_type: BarType,
981 resolution: DydxCandleResolution,
982 limit: Option<u32>,
983 from_iso: Option<DateTime<Utc>>,
984 to_iso: Option<DateTime<Utc>>,
985 ) -> anyhow::Result<Vec<Bar>> {
986 let instrument_id = bar_type.instrument_id();
987 let symbol = instrument_id.symbol;
988
989 let instrument = self
991 .get_instrument(&symbol.inner())
992 .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {symbol}"))?;
993
994 let ticker = extract_raw_symbol(symbol.as_str());
996 let response = self
997 .request_candles(ticker, resolution, limit, from_iso, to_iso)
998 .await?;
999
1000 let ts_init = get_atomic_clock_realtime().get_time_ns();
1001 let interval_ns = get_bar_interval_ns(&bar_type);
1002
1003 let mut bars = Vec::with_capacity(response.candles.len());
1004
1005 for candle in response.candles {
1006 let started_at_nanos = candle.started_at.timestamp_nanos_opt().ok_or_else(|| {
1008 anyhow::anyhow!("Timestamp out of range for candle at {}", candle.started_at)
1009 })?;
1010 let ts_event = UnixNanos::from(started_at_nanos as u64) + interval_ns;
1011
1012 let bar = Bar::new(
1013 bar_type,
1014 Price::from_decimal_dp(candle.open, instrument.price_precision())?,
1015 Price::from_decimal_dp(candle.high, instrument.price_precision())?,
1016 Price::from_decimal_dp(candle.low, instrument.price_precision())?,
1017 Price::from_decimal_dp(candle.close, instrument.price_precision())?,
1018 Quantity::from_decimal_dp(candle.base_token_volume, instrument.size_precision())?,
1019 ts_event,
1020 ts_init,
1021 );
1022
1023 bars.push(bar);
1024 }
1025
1026 Ok(bars)
1027 }
1028
1029 pub async fn request_trade_ticks(
1039 &self,
1040 instrument_id: InstrumentId,
1041 limit: Option<u32>,
1042 ) -> anyhow::Result<Vec<TradeTick>> {
1043 let symbol = instrument_id.symbol;
1044
1045 let instrument = self
1046 .get_instrument(&symbol.inner())
1047 .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {symbol}"))?;
1048
1049 let ticker = extract_raw_symbol(symbol.as_str());
1050 let response = self.request_trades(ticker, limit).await?;
1051
1052 let ts_init = get_atomic_clock_realtime().get_time_ns();
1053
1054 let mut trades = Vec::with_capacity(response.trades.len());
1055
1056 for trade in response.trades {
1057 let ts_event_nanos = trade.created_at.timestamp_nanos_opt().ok_or_else(|| {
1058 anyhow::anyhow!("Timestamp out of range for trade at {}", trade.created_at)
1059 })?;
1060 let ts_event = UnixNanos::from(ts_event_nanos as u64);
1061
1062 let aggressor_side = match trade.side {
1063 NautilusOrderSide::Buy => AggressorSide::Buyer,
1064 NautilusOrderSide::Sell => AggressorSide::Seller,
1065 NautilusOrderSide::NoOrderSide => AggressorSide::NoAggressor,
1066 };
1067
1068 let trade_tick = TradeTick::new(
1069 instrument_id,
1070 Price::from_decimal_dp(trade.price, instrument.price_precision())?,
1071 Quantity::from_decimal_dp(trade.size, instrument.size_precision())?,
1072 aggressor_side,
1073 TradeId::new(&trade.id),
1074 ts_event,
1075 ts_init,
1076 );
1077
1078 trades.push(trade_tick);
1079 }
1080
1081 Ok(trades)
1082 }
1083
1084 pub async fn request_orderbook_snapshot(
1095 &self,
1096 instrument_id: InstrumentId,
1097 ) -> anyhow::Result<OrderBookDeltas> {
1098 let symbol = instrument_id.symbol;
1099
1100 let instrument = self
1101 .get_instrument(&symbol.inner())
1102 .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {symbol}"))?;
1103
1104 let ticker = extract_raw_symbol(symbol.as_str());
1105 let response = self.inner.get_orderbook(ticker).await?;
1106
1107 let ts_init = get_atomic_clock_realtime().get_time_ns();
1108
1109 let mut deltas = Vec::with_capacity(1 + response.bids.len() + response.asks.len());
1110
1111 deltas.push(OrderBookDelta::clear(instrument_id, 0, ts_init, ts_init));
1112
1113 for (i, level) in response.bids.iter().enumerate() {
1114 let is_last = i == response.bids.len() - 1 && response.asks.is_empty();
1115 let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
1116
1117 let order = BookOrder::new(
1118 NautilusOrderSide::Buy,
1119 Price::from_decimal_dp(level.price, instrument.price_precision())?,
1120 Quantity::from_decimal_dp(level.size, instrument.size_precision())?,
1121 0,
1122 );
1123
1124 deltas.push(OrderBookDelta::new(
1125 instrument_id,
1126 BookAction::Add,
1127 order,
1128 flags,
1129 0,
1130 ts_init,
1131 ts_init,
1132 ));
1133 }
1134
1135 for (i, level) in response.asks.iter().enumerate() {
1136 let is_last = i == response.asks.len() - 1;
1137 let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
1138
1139 let order = BookOrder::new(
1140 NautilusOrderSide::Sell,
1141 Price::from_decimal_dp(level.price, instrument.price_precision())?,
1142 Quantity::from_decimal_dp(level.size, instrument.size_precision())?,
1143 0,
1144 );
1145
1146 deltas.push(OrderBookDelta::new(
1147 instrument_id,
1148 BookAction::Add,
1149 order,
1150 flags,
1151 0,
1152 ts_init,
1153 ts_init,
1154 ));
1155 }
1156
1157 Ok(OrderBookDeltas::new(instrument_id, deltas))
1158 }
1159
1160 #[must_use]
1166 pub fn raw_client(&self) -> &Arc<DydxRawHttpClient> {
1167 &self.inner
1168 }
1169
1170 #[must_use]
1172 pub fn is_testnet(&self) -> bool {
1173 self.inner.is_testnet()
1174 }
1175
1176 #[must_use]
1178 pub fn base_url(&self) -> &str {
1179 self.inner.base_url()
1180 }
1181
1182 #[must_use]
1184 pub fn is_cache_initialized(&self) -> bool {
1185 self.cache_initialized.load(Ordering::Acquire)
1186 }
1187
1188 #[must_use]
1190 pub fn cached_instruments_count(&self) -> usize {
1191 self.instruments_cache.len()
1192 }
1193
1194 #[must_use]
1196 pub fn instruments(&self) -> &Arc<DashMap<Ustr, InstrumentAny>> {
1197 &self.instruments_cache
1198 }
1199
1200 #[must_use]
1205 pub fn clob_pair_id_mapping(&self) -> &Arc<DashMap<u32, InstrumentId>> {
1206 &self.clob_pair_id_to_instrument
1207 }
1208
1209 pub async fn request_order_status_reports(
1218 &self,
1219 address: &str,
1220 subaccount_number: u32,
1221 account_id: AccountId,
1222 instrument_id: Option<InstrumentId>,
1223 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1224 let ts_init = nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
1225
1226 let market = instrument_id.map(|id| {
1228 let symbol = id.symbol.to_string();
1229 symbol.trim_end_matches("-PERP").to_string()
1231 });
1232
1233 let orders = self
1234 .inner
1235 .get_orders(address, subaccount_number, market.as_deref(), None)
1236 .await?;
1237
1238 let mut reports = Vec::new();
1239
1240 for order in orders {
1241 let instrument = match self.get_instrument_by_clob_id(order.clob_pair_id) {
1243 Some(inst) => inst,
1244 None => {
1245 tracing::warn!(
1246 "Skipping order {}: no cached instrument for clob_pair_id {}",
1247 order.id,
1248 order.clob_pair_id
1249 );
1250 continue;
1251 }
1252 };
1253
1254 if instrument_id.is_some_and(|filter_id| instrument.id() != filter_id) {
1256 continue;
1257 }
1258
1259 match super::parse::parse_order_status_report(&order, &instrument, account_id, ts_init)
1260 {
1261 Ok(report) => reports.push(report),
1262 Err(e) => {
1263 tracing::warn!("Failed to parse order {}: {e}", order.id);
1264 }
1265 }
1266 }
1267
1268 Ok(reports)
1269 }
1270
1271 pub async fn request_fill_reports(
1280 &self,
1281 address: &str,
1282 subaccount_number: u32,
1283 account_id: AccountId,
1284 instrument_id: Option<InstrumentId>,
1285 ) -> anyhow::Result<Vec<FillReport>> {
1286 let ts_init = nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
1287
1288 let market = instrument_id.map(|id| {
1290 let symbol = id.symbol.to_string();
1291 symbol.trim_end_matches("-PERP").to_string()
1292 });
1293
1294 let fills_response = self
1295 .inner
1296 .get_fills(address, subaccount_number, market.as_deref(), None)
1297 .await?;
1298
1299 let mut reports = Vec::new();
1300
1301 for fill in fills_response.fills {
1302 let market = &fill.market;
1304 let symbol = Ustr::from(&format!("{market}-PERP"));
1305 let instrument = match self.get_instrument(&symbol) {
1306 Some(inst) => inst,
1307 None => {
1308 tracing::warn!(
1309 "Skipping fill {}: no cached instrument for market {}",
1310 fill.id,
1311 fill.market
1312 );
1313 continue;
1314 }
1315 };
1316
1317 if instrument_id.is_some_and(|filter_id| instrument.id() != filter_id) {
1319 continue;
1320 }
1321
1322 match super::parse::parse_fill_report(&fill, &instrument, account_id, ts_init) {
1323 Ok(report) => reports.push(report),
1324 Err(e) => {
1325 tracing::warn!("Failed to parse fill {}: {e}", fill.id);
1326 }
1327 }
1328 }
1329
1330 Ok(reports)
1331 }
1332
1333 pub async fn request_position_status_reports(
1342 &self,
1343 address: &str,
1344 subaccount_number: u32,
1345 account_id: AccountId,
1346 instrument_id: Option<InstrumentId>,
1347 ) -> anyhow::Result<Vec<PositionStatusReport>> {
1348 let ts_init = nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
1349
1350 let subaccount_response = self
1351 .inner
1352 .get_subaccount(address, subaccount_number)
1353 .await?;
1354
1355 let mut reports = Vec::new();
1356
1357 for (market, position) in subaccount_response.subaccount.open_perpetual_positions {
1358 let symbol = Ustr::from(&format!("{market}-PERP"));
1360 let instrument = match self.get_instrument(&symbol) {
1361 Some(inst) => inst,
1362 None => {
1363 tracing::warn!(
1364 "Skipping position: no cached instrument for market {}",
1365 market
1366 );
1367 continue;
1368 }
1369 };
1370
1371 if instrument_id.is_some_and(|filter_id| instrument.id() != filter_id) {
1373 continue;
1374 }
1375
1376 match super::parse::parse_position_status_report(
1377 &position,
1378 &instrument,
1379 account_id,
1380 ts_init,
1381 ) {
1382 Ok(report) => reports.push(report),
1383 Err(e) => {
1384 tracing::warn!("Failed to parse position for {}: {e}", market);
1385 }
1386 }
1387 }
1388
1389 Ok(reports)
1390 }
1391}
1392
1393#[cfg(test)]
1394mod tests {
1395 use nautilus_core::UnixNanos;
1396 use rstest::rstest;
1397
1398 use super::*;
1399 use crate::http::error;
1400
1401 #[tokio::test]
1402 async fn test_raw_client_creation() {
1403 let client = DydxRawHttpClient::new(None, Some(30), None, false, None);
1404 assert!(client.is_ok());
1405
1406 let client = client.unwrap();
1407 assert!(!client.is_testnet());
1408 assert_eq!(client.base_url(), DYDX_HTTP_URL);
1409 }
1410
1411 #[tokio::test]
1412 async fn test_raw_client_testnet() {
1413 let client = DydxRawHttpClient::new(None, Some(30), None, true, None);
1414 assert!(client.is_ok());
1415
1416 let client = client.unwrap();
1417 assert!(client.is_testnet());
1418 assert_eq!(client.base_url(), DYDX_TESTNET_HTTP_URL);
1419 }
1420
1421 #[tokio::test]
1422 async fn test_domain_client_creation() {
1423 let client = DydxHttpClient::new(None, Some(30), None, false, None);
1424 assert!(client.is_ok());
1425
1426 let client = client.unwrap();
1427 assert!(!client.is_testnet());
1428 assert_eq!(client.base_url(), DYDX_HTTP_URL);
1429 assert!(!client.is_cache_initialized());
1430 assert_eq!(client.cached_instruments_count(), 0);
1431 }
1432
1433 #[tokio::test]
1434 async fn test_domain_client_testnet() {
1435 let client = DydxHttpClient::new(None, Some(30), None, true, None);
1436 assert!(client.is_ok());
1437
1438 let client = client.unwrap();
1439 assert!(client.is_testnet());
1440 assert_eq!(client.base_url(), DYDX_TESTNET_HTTP_URL);
1441 }
1442
1443 #[tokio::test]
1444 async fn test_domain_client_default() {
1445 let client = DydxHttpClient::default();
1446 assert!(!client.is_testnet());
1447 assert_eq!(client.base_url(), DYDX_HTTP_URL);
1448 assert!(!client.is_cache_initialized());
1449 }
1450
1451 #[tokio::test]
1452 async fn test_domain_client_clone() {
1453 let client = DydxHttpClient::new(None, Some(30), None, false, None).unwrap();
1454
1455 let cloned = client.clone();
1457 assert!(!cloned.is_cache_initialized());
1458
1459 client.cache_initialized.store(true, Ordering::Release);
1461
1462 #[allow(clippy::redundant_clone)]
1464 let cloned_after = client.clone();
1465 assert!(cloned_after.is_cache_initialized());
1466 }
1467
1468 #[rstest]
1469 fn test_domain_client_cache_instrument() {
1470 use nautilus_model::{
1471 identifiers::{InstrumentId, Symbol},
1472 instruments::CryptoPerpetual,
1473 types::{Currency, Price, Quantity},
1474 };
1475
1476 let client = DydxHttpClient::default();
1477 assert_eq!(client.cached_instruments_count(), 0);
1478
1479 let instrument_id =
1481 InstrumentId::new(Symbol::from("BTC-USD"), *crate::common::consts::DYDX_VENUE);
1482 let price = Price::from("1.0");
1483 let size = Quantity::from("0.001");
1484 let instrument = CryptoPerpetual::new(
1485 instrument_id,
1486 Symbol::from("BTC-USD"),
1487 Currency::BTC(),
1488 Currency::USD(),
1489 Currency::USD(),
1490 false,
1491 price.precision,
1492 size.precision,
1493 price,
1494 size,
1495 None,
1496 None,
1497 None,
1498 None,
1499 None,
1500 None,
1501 None,
1502 None,
1503 None,
1504 None,
1505 None,
1506 None,
1507 UnixNanos::default(),
1508 UnixNanos::default(),
1509 );
1510
1511 client.cache_instrument(InstrumentAny::CryptoPerpetual(instrument));
1513 assert_eq!(client.cached_instruments_count(), 1);
1514 assert!(client.is_cache_initialized());
1515
1516 let btc_usd = Ustr::from("BTC-USD");
1518 let cached = client.get_instrument(&btc_usd);
1519 assert!(cached.is_some());
1520 }
1521
1522 #[rstest]
1523 fn test_domain_client_get_instrument_not_found() {
1524 let client = DydxHttpClient::default();
1525 let eth_usd = Ustr::from("ETH-USD");
1526 let result = client.get_instrument(ð_usd);
1527 assert!(result.is_none());
1528 }
1529
1530 #[tokio::test]
1531 async fn test_http_timeout_respects_configuration_and_does_not_block() {
1532 use axum::{Router, routing::get};
1533 use tokio::net::TcpListener;
1534
1535 async fn slow_handler() -> &'static str {
1536 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
1538 "ok"
1539 }
1540
1541 let router = Router::new().route("/v4/slow", get(slow_handler));
1542
1543 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1544 let addr = listener.local_addr().unwrap();
1545
1546 tokio::spawn(async move {
1547 axum::serve(listener, router.into_make_service())
1548 .await
1549 .unwrap();
1550 });
1551
1552 let base_url = format!("http://{addr}");
1553
1554 let retry_config = RetryConfig {
1557 max_retries: 0,
1558 initial_delay_ms: 0,
1559 max_delay_ms: 0,
1560 backoff_factor: 1.0,
1561 jitter_ms: 0,
1562 operation_timeout_ms: Some(500),
1563 immediate_first: true,
1564 max_elapsed_ms: Some(1_000),
1565 };
1566
1567 let client =
1570 DydxRawHttpClient::new(Some(base_url), Some(60), None, false, Some(retry_config))
1571 .unwrap();
1572
1573 let start = std::time::Instant::now();
1574 let result: Result<serde_json::Value, error::DydxHttpError> =
1575 client.send_request(Method::GET, "/v4/slow", None).await;
1576 let elapsed = start.elapsed();
1577
1578 assert!(result.is_err());
1581 assert!(elapsed < std::time::Duration::from_secs(3));
1582 }
1583}