1use std::{
54 collections::HashMap,
55 fmt::Debug,
56 num::NonZeroU32,
57 sync::{
58 Arc, LazyLock,
59 atomic::{AtomicBool, Ordering},
60 },
61};
62
63use chrono::{DateTime, Utc};
64use dashmap::DashMap;
65use nautilus_core::{UnixNanos, consts::NAUTILUS_USER_AGENT, time::get_atomic_clock_realtime};
66use nautilus_model::{
67 data::{
68 Bar, BarType, BookOrder, OrderBookDelta, OrderBookDeltas, TradeTick,
69 bar::get_bar_interval_ns,
70 },
71 enums::{AggressorSide, BookAction, OrderSide as NautilusOrderSide, RecordFlag},
72 identifiers::{AccountId, InstrumentId, TradeId},
73 instruments::{Instrument, InstrumentAny},
74 reports::{FillReport, OrderStatusReport, PositionStatusReport},
75 types::{Price, Quantity},
76};
77use nautilus_network::{
78 http::{HttpClient, Method, USER_AGENT},
79 ratelimiter::quota::Quota,
80 retry::{RetryConfig, RetryManager},
81};
82use rust_decimal::Decimal;
83use serde::{Deserialize, Serialize, de::DeserializeOwned};
84use tokio_util::sync::CancellationToken;
85use ustr::Ustr;
86
87use super::error::DydxHttpError;
88use crate::common::{
89 consts::{DYDX_HTTP_URL, DYDX_TESTNET_HTTP_URL},
90 enums::DydxCandleResolution,
91 parse::extract_raw_symbol,
92};
93
94pub static DYDX_REST_QUOTA: LazyLock<Quota> =
100 LazyLock::new(|| Quota::per_second(NonZeroU32::new(10).unwrap()));
101
102#[derive(Debug, Serialize, Deserialize)]
107pub struct DydxResponse<T> {
108 pub data: T,
110}
111
112pub struct DydxRawHttpClient {
122 base_url: String,
123 client: HttpClient,
124 retry_manager: RetryManager<DydxHttpError>,
125 cancellation_token: CancellationToken,
126 is_testnet: bool,
127}
128
129impl Default for DydxRawHttpClient {
130 fn default() -> Self {
131 Self::new(None, Some(60), None, false, None)
132 .expect("Failed to create default DydxRawHttpClient")
133 }
134}
135
136impl Debug for DydxRawHttpClient {
137 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138 f.debug_struct(stringify!(DydxRawHttpClient))
139 .field("base_url", &self.base_url)
140 .field("is_testnet", &self.is_testnet)
141 .finish_non_exhaustive()
142 }
143}
144
145impl DydxRawHttpClient {
146 pub fn cancel_all_requests(&self) {
148 self.cancellation_token.cancel();
149 }
150
151 pub fn cancellation_token(&self) -> &CancellationToken {
153 &self.cancellation_token
154 }
155
156 pub fn new(
165 base_url: Option<String>,
166 timeout_secs: Option<u64>,
167 proxy_url: Option<String>,
168 is_testnet: bool,
169 retry_config: Option<RetryConfig>,
170 ) -> anyhow::Result<Self> {
171 let base_url = if is_testnet {
172 base_url.unwrap_or_else(|| DYDX_TESTNET_HTTP_URL.to_string())
173 } else {
174 base_url.unwrap_or_else(|| DYDX_HTTP_URL.to_string())
175 };
176
177 let retry_manager = RetryManager::new(retry_config.unwrap_or_default());
178
179 let mut headers = HashMap::new();
180 headers.insert(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string());
181
182 let client = HttpClient::new(
183 headers,
184 vec![], vec![], Some(*DYDX_REST_QUOTA),
187 timeout_secs,
188 proxy_url,
189 )
190 .map_err(|e| {
191 DydxHttpError::ValidationError(format!("Failed to create HTTP client: {e}"))
192 })?;
193
194 Ok(Self {
195 base_url,
196 client,
197 retry_manager,
198 cancellation_token: CancellationToken::new(),
199 is_testnet,
200 })
201 }
202
203 #[must_use]
205 pub const fn is_testnet(&self) -> bool {
206 self.is_testnet
207 }
208
209 #[must_use]
211 pub fn base_url(&self) -> &str {
212 &self.base_url
213 }
214
215 pub async fn send_request<T>(
227 &self,
228 method: Method,
229 endpoint: &str,
230 query_params: Option<&str>,
231 ) -> Result<T, DydxHttpError>
232 where
233 T: DeserializeOwned,
234 {
235 let url = if let Some(params) = query_params {
236 format!("{}{endpoint}?{params}", self.base_url)
237 } else {
238 format!("{}{endpoint}", self.base_url)
239 };
240
241 let operation = || async {
242 let request = self
243 .client
244 .request_with_ustr_keys(
245 method.clone(),
246 url.clone(),
247 None, None, None, None, None, )
253 .await
254 .map_err(|e| DydxHttpError::HttpClientError(e.to_string()))?;
255
256 if !request.status.is_success() {
257 return Err(DydxHttpError::HttpStatus {
258 status: request.status.as_u16(),
259 message: String::from_utf8_lossy(&request.body).to_string(),
260 });
261 }
262
263 Ok(request)
264 };
265
266 let should_retry = |error: &DydxHttpError| -> bool {
271 match error {
272 DydxHttpError::HttpClientError(_) => true,
273 DydxHttpError::HttpStatus { status, .. } => *status == 429 || *status >= 500,
274 _ => false,
275 }
276 };
277
278 let create_error = |msg: String| -> DydxHttpError {
279 if msg == "canceled" {
280 DydxHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
281 } else {
282 DydxHttpError::ValidationError(msg)
283 }
284 };
285
286 let response = self
287 .retry_manager
288 .execute_with_retry_with_cancel(
289 endpoint,
290 operation,
291 should_retry,
292 create_error,
293 &self.cancellation_token,
294 )
295 .await?;
296
297 serde_json::from_slice(&response.body).map_err(|e| DydxHttpError::Deserialization {
298 error: e.to_string(),
299 body: String::from_utf8_lossy(&response.body).to_string(),
300 })
301 }
302
303 pub async fn send_post_request<T, B>(
316 &self,
317 endpoint: &str,
318 body: &B,
319 ) -> Result<T, DydxHttpError>
320 where
321 T: DeserializeOwned,
322 B: Serialize,
323 {
324 let url = format!("{}{endpoint}", self.base_url);
325
326 let body_bytes = serde_json::to_vec(body).map_err(|e| DydxHttpError::Serialization {
327 error: e.to_string(),
328 })?;
329
330 let operation = || async {
331 let request = self
332 .client
333 .request_with_ustr_keys(
334 Method::POST,
335 url.clone(),
336 None, None, Some(body_bytes.clone()),
339 None, None, )
342 .await
343 .map_err(|e| DydxHttpError::HttpClientError(e.to_string()))?;
344
345 if !request.status.is_success() {
346 return Err(DydxHttpError::HttpStatus {
347 status: request.status.as_u16(),
348 message: String::from_utf8_lossy(&request.body).to_string(),
349 });
350 }
351
352 Ok(request)
353 };
354
355 let should_retry = |error: &DydxHttpError| -> bool {
357 match error {
358 DydxHttpError::HttpClientError(_) => true,
359 DydxHttpError::HttpStatus { status, .. } => *status == 429 || *status >= 500,
360 _ => false,
361 }
362 };
363
364 let create_error = |msg: String| -> DydxHttpError {
365 if msg == "canceled" {
366 DydxHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
367 } else {
368 DydxHttpError::ValidationError(msg)
369 }
370 };
371
372 let response = self
373 .retry_manager
374 .execute_with_retry_with_cancel(
375 endpoint,
376 operation,
377 should_retry,
378 create_error,
379 &self.cancellation_token,
380 )
381 .await?;
382
383 serde_json::from_slice(&response.body).map_err(|e| DydxHttpError::Deserialization {
384 error: e.to_string(),
385 body: String::from_utf8_lossy(&response.body).to_string(),
386 })
387 }
388
389 pub async fn get_markets(&self) -> Result<super::models::MarketsResponse, DydxHttpError> {
395 self.send_request(Method::GET, "/v4/perpetualMarkets", None)
396 .await
397 }
398
399 pub async fn fetch_instruments(
412 &self,
413 maker_fee: Option<Decimal>,
414 taker_fee: Option<Decimal>,
415 ) -> Result<Vec<InstrumentAny>, DydxHttpError> {
416 let markets_response = self.get_markets().await?;
417 let ts_init = get_atomic_clock_realtime().get_time_ns();
418
419 let mut instruments = Vec::new();
420 let mut skipped_inactive = 0;
421
422 for (ticker, market) in markets_response.markets {
423 if !super::parse::is_market_active(&market.status) {
424 log::debug!(
425 "Skipping inactive market {ticker} (status: {:?})",
426 market.status
427 );
428 skipped_inactive += 1;
429 continue;
430 }
431
432 match super::parse::parse_instrument_any(&market, maker_fee, taker_fee, ts_init) {
433 Ok(instrument) => {
434 instruments.push(instrument);
435 }
436 Err(e) => {
437 log::error!("Failed to parse instrument {ticker}: {e}");
438 }
439 }
440 }
441
442 if skipped_inactive > 0 {
443 log::info!(
444 "Parsed {} instruments, skipped {} inactive",
445 instruments.len(),
446 skipped_inactive
447 );
448 } else {
449 log::info!("Parsed {} instruments", instruments.len());
450 }
451
452 Ok(instruments)
453 }
454
455 pub async fn get_orderbook(
461 &self,
462 ticker: &str,
463 ) -> Result<super::models::OrderbookResponse, DydxHttpError> {
464 let endpoint = format!("/v4/orderbooks/perpetualMarket/{ticker}");
465 self.send_request(Method::GET, &endpoint, None).await
466 }
467
468 pub async fn get_trades(
474 &self,
475 ticker: &str,
476 limit: Option<u32>,
477 ) -> Result<super::models::TradesResponse, DydxHttpError> {
478 let endpoint = format!("/v4/trades/perpetualMarket/{ticker}");
479 let query = limit.map(|l| format!("limit={l}"));
480 self.send_request(Method::GET, &endpoint, query.as_deref())
481 .await
482 }
483
484 pub async fn get_candles(
490 &self,
491 ticker: &str,
492 resolution: DydxCandleResolution,
493 limit: Option<u32>,
494 from_iso: Option<DateTime<Utc>>,
495 to_iso: Option<DateTime<Utc>>,
496 ) -> Result<super::models::CandlesResponse, DydxHttpError> {
497 let endpoint = format!("/v4/candles/perpetualMarkets/{ticker}");
498 let mut query_parts = vec![format!("resolution={resolution}")];
499 if let Some(l) = limit {
500 query_parts.push(format!("limit={l}"));
501 }
502 if let Some(from) = from_iso {
503 let from_str = from.to_rfc3339();
504 query_parts.push(format!("fromISO={}", urlencoding::encode(&from_str)));
505 }
506 if let Some(to) = to_iso {
507 let to_str = to.to_rfc3339();
508 query_parts.push(format!("toISO={}", urlencoding::encode(&to_str)));
509 }
510 let query = query_parts.join("&");
511 self.send_request(Method::GET, &endpoint, Some(&query))
512 .await
513 }
514
515 pub async fn get_subaccount(
521 &self,
522 address: &str,
523 subaccount_number: u32,
524 ) -> Result<super::models::SubaccountResponse, DydxHttpError> {
525 let endpoint = format!("/v4/addresses/{address}/subaccountNumber/{subaccount_number}");
526 self.send_request(Method::GET, &endpoint, None).await
527 }
528
529 pub async fn get_fills(
535 &self,
536 address: &str,
537 subaccount_number: u32,
538 market: Option<&str>,
539 limit: Option<u32>,
540 ) -> Result<super::models::FillsResponse, DydxHttpError> {
541 let endpoint = "/v4/fills";
542 let mut query_parts = vec![
543 format!("address={address}"),
544 format!("subaccountNumber={subaccount_number}"),
545 ];
546 if let Some(m) = market {
547 query_parts.push(format!("market={m}"));
548 }
549 if let Some(l) = limit {
550 query_parts.push(format!("limit={l}"));
551 }
552 let query = query_parts.join("&");
553 self.send_request(Method::GET, endpoint, Some(&query)).await
554 }
555
556 pub async fn get_orders(
562 &self,
563 address: &str,
564 subaccount_number: u32,
565 market: Option<&str>,
566 limit: Option<u32>,
567 ) -> Result<super::models::OrdersResponse, DydxHttpError> {
568 let endpoint = "/v4/orders";
569 let mut query_parts = vec![
570 format!("address={address}"),
571 format!("subaccountNumber={subaccount_number}"),
572 ];
573 if let Some(m) = market {
574 query_parts.push(format!("market={m}"));
575 }
576 if let Some(l) = limit {
577 query_parts.push(format!("limit={l}"));
578 }
579 let query = query_parts.join("&");
580 self.send_request(Method::GET, endpoint, Some(&query)).await
581 }
582
583 pub async fn get_transfers(
589 &self,
590 address: &str,
591 subaccount_number: u32,
592 limit: Option<u32>,
593 ) -> Result<super::models::TransfersResponse, DydxHttpError> {
594 let endpoint = "/v4/transfers";
595 let mut query_parts = vec![
596 format!("address={address}"),
597 format!("subaccountNumber={subaccount_number}"),
598 ];
599 if let Some(l) = limit {
600 query_parts.push(format!("limit={l}"));
601 }
602 let query = query_parts.join("&");
603 self.send_request(Method::GET, endpoint, Some(&query)).await
604 }
605
606 pub async fn get_time(&self) -> Result<super::models::TimeResponse, DydxHttpError> {
612 self.send_request(Method::GET, "/v4/time", None).await
613 }
614
615 pub async fn get_height(&self) -> Result<super::models::HeightResponse, DydxHttpError> {
621 self.send_request(Method::GET, "/v4/height", None).await
622 }
623}
624
625#[derive(Debug)]
641#[cfg_attr(
642 feature = "python",
643 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.dydx")
644)]
645pub struct DydxHttpClient {
646 pub(crate) inner: Arc<DydxRawHttpClient>,
648 pub(crate) instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
650 pub(crate) clob_pair_id_to_instrument: Arc<DashMap<u32, InstrumentId>>,
655 pub(crate) market_params_cache: Arc<DashMap<InstrumentId, super::models::PerpetualMarket>>,
661 cache_initialized: AtomicBool,
663}
664
665impl Clone for DydxHttpClient {
666 fn clone(&self) -> Self {
667 let cache_initialized = AtomicBool::new(false);
668 let is_initialized = self.cache_initialized.load(Ordering::Acquire);
669 if is_initialized {
670 cache_initialized.store(true, Ordering::Release);
671 }
672
673 Self {
674 inner: self.inner.clone(),
675 instruments_cache: self.instruments_cache.clone(),
676 clob_pair_id_to_instrument: self.clob_pair_id_to_instrument.clone(),
677 market_params_cache: self.market_params_cache.clone(),
678 cache_initialized,
679 }
680 }
681}
682
683impl Default for DydxHttpClient {
684 fn default() -> Self {
685 Self::new(None, Some(60), None, false, None)
686 .expect("Failed to create default DydxHttpClient")
687 }
688}
689
690impl DydxHttpClient {
691 pub fn new(
701 base_url: Option<String>,
702 timeout_secs: Option<u64>,
703 proxy_url: Option<String>,
704 is_testnet: bool,
705 retry_config: Option<RetryConfig>,
706 ) -> anyhow::Result<Self> {
707 Ok(Self {
708 inner: Arc::new(DydxRawHttpClient::new(
709 base_url,
710 timeout_secs,
711 proxy_url,
712 is_testnet,
713 retry_config,
714 )?),
715 instruments_cache: Arc::new(DashMap::new()),
716 clob_pair_id_to_instrument: Arc::new(DashMap::new()),
717 market_params_cache: Arc::new(DashMap::new()),
718 cache_initialized: AtomicBool::new(false),
719 })
720 }
721
722 pub async fn request_instruments(
732 &self,
733 symbol: Option<String>,
734 maker_fee: Option<Decimal>,
735 taker_fee: Option<Decimal>,
736 ) -> anyhow::Result<Vec<InstrumentAny>> {
737 let markets_response = self.inner.get_markets().await?;
738 let ts_init = get_atomic_clock_realtime().get_time_ns();
739
740 let mut instruments = Vec::new();
741 let mut skipped_inactive = 0;
742
743 for (ticker, market) in markets_response.markets {
744 if let Some(ref sym) = symbol
746 && ticker != *sym
747 {
748 continue;
749 }
750
751 if !super::parse::is_market_active(&market.status) {
752 log::debug!(
753 "Skipping inactive market {ticker} (status: {:?})",
754 market.status
755 );
756 skipped_inactive += 1;
757 continue;
758 }
759
760 match super::parse::parse_instrument_any(&market, maker_fee, taker_fee, ts_init) {
761 Ok(instrument) => {
762 instruments.push(instrument);
763 }
764 Err(e) => {
765 log::error!("Failed to parse instrument {ticker}: {e}");
766 }
767 }
768 }
769
770 if skipped_inactive > 0 {
771 log::info!(
772 "Parsed {} instruments, skipped {} inactive",
773 instruments.len(),
774 skipped_inactive
775 );
776 } else {
777 log::debug!("Parsed {} instruments", instruments.len());
778 }
779
780 Ok(instruments)
781 }
782
783 pub async fn fetch_and_cache_instruments(&self) -> anyhow::Result<()> {
795 let markets_response = self.inner.get_markets().await?;
797 let ts_init = get_atomic_clock_realtime().get_time_ns();
798
799 let mut parsed_instruments = Vec::new();
800 let mut parsed_markets = Vec::new();
801 let mut skipped_inactive = 0;
802
803 for (ticker, market) in markets_response.markets {
804 if !super::parse::is_market_active(&market.status) {
805 log::debug!(
806 "Skipping inactive market {ticker} (status: {:?})",
807 market.status
808 );
809 skipped_inactive += 1;
810 continue;
811 }
812
813 match super::parse::parse_instrument_any(&market, None, None, ts_init) {
814 Ok(instrument) => {
815 parsed_instruments.push(instrument);
816 parsed_markets.push(market);
817 }
818 Err(e) => {
819 log::error!("Failed to parse instrument {ticker}: {e}");
820 }
821 }
822 }
823
824 self.instruments_cache.clear();
826 self.clob_pair_id_to_instrument.clear();
827 self.market_params_cache.clear();
828
829 for (instrument, market) in parsed_instruments.iter().zip(parsed_markets.into_iter()) {
830 let instrument_id = instrument.id();
831 let symbol = instrument_id.symbol.inner();
832 self.instruments_cache.insert(symbol, instrument.clone());
833 self.clob_pair_id_to_instrument
834 .insert(market.clob_pair_id, instrument_id);
835 self.market_params_cache.insert(instrument_id, market);
836 }
837
838 if !parsed_instruments.is_empty() {
839 self.cache_initialized.store(true, Ordering::Release);
840 }
841
842 if skipped_inactive > 0 {
843 log::info!(
844 "Cached {} instruments, skipped {} inactive",
845 parsed_instruments.len(),
846 skipped_inactive
847 );
848 } else {
849 log::info!("Cached {} instruments", parsed_instruments.len());
850 }
851
852 Ok(())
853 }
854
855 pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
859 for inst in instruments {
860 let symbol = inst.id().symbol.inner();
861 self.instruments_cache.insert(symbol, inst);
862 }
863 self.cache_initialized.store(true, Ordering::Release);
864 }
865
866 pub fn cache_instrument(&self, instrument: InstrumentAny) {
870 let symbol = instrument.id().symbol.inner();
871 self.instruments_cache.insert(symbol, instrument);
872 self.cache_initialized.store(true, Ordering::Release);
873 }
874
875 #[must_use]
877 pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
878 self.instruments_cache
879 .get(symbol)
880 .map(|entry| entry.clone())
881 }
882
883 #[must_use]
887 pub fn get_instrument_by_clob_id(&self, clob_pair_id: u32) -> Option<InstrumentAny> {
888 let instrument_id = self
890 .clob_pair_id_to_instrument
891 .get(&clob_pair_id)
892 .map(|entry| *entry)?;
893
894 self.get_instrument(&instrument_id.symbol.inner())
896 }
897
898 #[must_use]
907 pub fn get_market_params(
908 &self,
909 instrument_id: &InstrumentId,
910 ) -> Option<super::models::PerpetualMarket> {
911 self.market_params_cache
912 .get(instrument_id)
913 .map(|entry| entry.clone())
914 }
915
916 pub async fn request_trades(
925 &self,
926 symbol: &str,
927 limit: Option<u32>,
928 ) -> anyhow::Result<super::models::TradesResponse> {
929 self.inner
930 .get_trades(symbol, limit)
931 .await
932 .map_err(Into::into)
933 }
934
935 pub async fn request_candles(
944 &self,
945 symbol: &str,
946 resolution: DydxCandleResolution,
947 limit: Option<u32>,
948 from_iso: Option<DateTime<Utc>>,
949 to_iso: Option<DateTime<Utc>>,
950 ) -> anyhow::Result<super::models::CandlesResponse> {
951 self.inner
952 .get_candles(symbol, resolution, limit, from_iso, to_iso)
953 .await
954 .map_err(Into::into)
955 }
956
957 pub async fn request_bars(
967 &self,
968 bar_type: BarType,
969 resolution: DydxCandleResolution,
970 limit: Option<u32>,
971 from_iso: Option<DateTime<Utc>>,
972 to_iso: Option<DateTime<Utc>>,
973 ) -> anyhow::Result<Vec<Bar>> {
974 let instrument_id = bar_type.instrument_id();
975 let symbol = instrument_id.symbol;
976
977 let instrument = self
979 .get_instrument(&symbol.inner())
980 .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {symbol}"))?;
981
982 let ticker = extract_raw_symbol(symbol.as_str());
984 let response = self
985 .request_candles(ticker, resolution, limit, from_iso, to_iso)
986 .await?;
987
988 let ts_init = get_atomic_clock_realtime().get_time_ns();
989 let interval_ns = get_bar_interval_ns(&bar_type);
990
991 let mut bars = Vec::with_capacity(response.candles.len());
992
993 for candle in response.candles {
994 let started_at_nanos = candle.started_at.timestamp_nanos_opt().ok_or_else(|| {
996 anyhow::anyhow!("Timestamp out of range for candle at {}", candle.started_at)
997 })?;
998 let ts_event = UnixNanos::from(started_at_nanos as u64) + interval_ns;
999
1000 let bar = Bar::new(
1001 bar_type,
1002 Price::from_decimal_dp(candle.open, instrument.price_precision())?,
1003 Price::from_decimal_dp(candle.high, instrument.price_precision())?,
1004 Price::from_decimal_dp(candle.low, instrument.price_precision())?,
1005 Price::from_decimal_dp(candle.close, instrument.price_precision())?,
1006 Quantity::from_decimal_dp(candle.base_token_volume, instrument.size_precision())?,
1007 ts_event,
1008 ts_init,
1009 );
1010
1011 bars.push(bar);
1012 }
1013
1014 Ok(bars)
1015 }
1016
1017 pub async fn request_trade_ticks(
1027 &self,
1028 instrument_id: InstrumentId,
1029 limit: Option<u32>,
1030 ) -> anyhow::Result<Vec<TradeTick>> {
1031 let symbol = instrument_id.symbol;
1032
1033 let instrument = self
1034 .get_instrument(&symbol.inner())
1035 .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {symbol}"))?;
1036
1037 let ticker = extract_raw_symbol(symbol.as_str());
1038 let response = self.request_trades(ticker, limit).await?;
1039
1040 let ts_init = get_atomic_clock_realtime().get_time_ns();
1041
1042 let mut trades = Vec::with_capacity(response.trades.len());
1043
1044 for trade in response.trades {
1045 let ts_event_nanos = trade.created_at.timestamp_nanos_opt().ok_or_else(|| {
1046 anyhow::anyhow!("Timestamp out of range for trade at {}", trade.created_at)
1047 })?;
1048 let ts_event = UnixNanos::from(ts_event_nanos as u64);
1049
1050 let aggressor_side = match trade.side {
1051 NautilusOrderSide::Buy => AggressorSide::Buyer,
1052 NautilusOrderSide::Sell => AggressorSide::Seller,
1053 NautilusOrderSide::NoOrderSide => AggressorSide::NoAggressor,
1054 };
1055
1056 let trade_tick = TradeTick::new(
1057 instrument_id,
1058 Price::from_decimal_dp(trade.price, instrument.price_precision())?,
1059 Quantity::from_decimal_dp(trade.size, instrument.size_precision())?,
1060 aggressor_side,
1061 TradeId::new(&trade.id),
1062 ts_event,
1063 ts_init,
1064 );
1065
1066 trades.push(trade_tick);
1067 }
1068
1069 Ok(trades)
1070 }
1071
1072 pub async fn request_orderbook_snapshot(
1083 &self,
1084 instrument_id: InstrumentId,
1085 ) -> anyhow::Result<OrderBookDeltas> {
1086 let symbol = instrument_id.symbol;
1087
1088 let instrument = self
1089 .get_instrument(&symbol.inner())
1090 .ok_or_else(|| anyhow::anyhow!("Instrument not found in cache: {symbol}"))?;
1091
1092 let ticker = extract_raw_symbol(symbol.as_str());
1093 let response = self.inner.get_orderbook(ticker).await?;
1094
1095 let ts_init = get_atomic_clock_realtime().get_time_ns();
1096
1097 let mut deltas = Vec::with_capacity(1 + response.bids.len() + response.asks.len());
1098
1099 deltas.push(OrderBookDelta::clear(instrument_id, 0, ts_init, ts_init));
1100
1101 for (i, level) in response.bids.iter().enumerate() {
1102 let is_last = i == response.bids.len() - 1 && response.asks.is_empty();
1103 let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
1104
1105 let order = BookOrder::new(
1106 NautilusOrderSide::Buy,
1107 Price::from_decimal_dp(level.price, instrument.price_precision())?,
1108 Quantity::from_decimal_dp(level.size, instrument.size_precision())?,
1109 0,
1110 );
1111
1112 deltas.push(OrderBookDelta::new(
1113 instrument_id,
1114 BookAction::Add,
1115 order,
1116 flags,
1117 0,
1118 ts_init,
1119 ts_init,
1120 ));
1121 }
1122
1123 for (i, level) in response.asks.iter().enumerate() {
1124 let is_last = i == response.asks.len() - 1;
1125 let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
1126
1127 let order = BookOrder::new(
1128 NautilusOrderSide::Sell,
1129 Price::from_decimal_dp(level.price, instrument.price_precision())?,
1130 Quantity::from_decimal_dp(level.size, instrument.size_precision())?,
1131 0,
1132 );
1133
1134 deltas.push(OrderBookDelta::new(
1135 instrument_id,
1136 BookAction::Add,
1137 order,
1138 flags,
1139 0,
1140 ts_init,
1141 ts_init,
1142 ));
1143 }
1144
1145 Ok(OrderBookDeltas::new(instrument_id, deltas))
1146 }
1147
1148 #[must_use]
1154 pub fn raw_client(&self) -> &Arc<DydxRawHttpClient> {
1155 &self.inner
1156 }
1157
1158 #[must_use]
1160 pub fn is_testnet(&self) -> bool {
1161 self.inner.is_testnet()
1162 }
1163
1164 #[must_use]
1166 pub fn base_url(&self) -> &str {
1167 self.inner.base_url()
1168 }
1169
1170 #[must_use]
1172 pub fn is_cache_initialized(&self) -> bool {
1173 self.cache_initialized.load(Ordering::Acquire)
1174 }
1175
1176 #[must_use]
1178 pub fn cached_instruments_count(&self) -> usize {
1179 self.instruments_cache.len()
1180 }
1181
1182 #[must_use]
1184 pub fn instruments(&self) -> &Arc<DashMap<Ustr, InstrumentAny>> {
1185 &self.instruments_cache
1186 }
1187
1188 #[must_use]
1193 pub fn clob_pair_id_mapping(&self) -> &Arc<DashMap<u32, InstrumentId>> {
1194 &self.clob_pair_id_to_instrument
1195 }
1196
1197 pub async fn request_order_status_reports(
1206 &self,
1207 address: &str,
1208 subaccount_number: u32,
1209 account_id: AccountId,
1210 instrument_id: Option<InstrumentId>,
1211 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1212 let ts_init = nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
1213
1214 let market = instrument_id.map(|id| {
1216 let symbol = id.symbol.to_string();
1217 symbol.trim_end_matches("-PERP").to_string()
1219 });
1220
1221 let orders = self
1222 .inner
1223 .get_orders(address, subaccount_number, market.as_deref(), None)
1224 .await?;
1225
1226 let mut reports = Vec::new();
1227
1228 for order in orders {
1229 let instrument = match self.get_instrument_by_clob_id(order.clob_pair_id) {
1231 Some(inst) => inst,
1232 None => {
1233 log::warn!(
1234 "Skipping order {}: no cached instrument for clob_pair_id {}",
1235 order.id,
1236 order.clob_pair_id
1237 );
1238 continue;
1239 }
1240 };
1241
1242 if instrument_id.is_some_and(|filter_id| instrument.id() != filter_id) {
1244 continue;
1245 }
1246
1247 match super::parse::parse_order_status_report(&order, &instrument, account_id, ts_init)
1248 {
1249 Ok(report) => reports.push(report),
1250 Err(e) => {
1251 log::warn!("Failed to parse order {}: {e}", order.id);
1252 }
1253 }
1254 }
1255
1256 Ok(reports)
1257 }
1258
1259 pub async fn request_fill_reports(
1268 &self,
1269 address: &str,
1270 subaccount_number: u32,
1271 account_id: AccountId,
1272 instrument_id: Option<InstrumentId>,
1273 ) -> anyhow::Result<Vec<FillReport>> {
1274 let ts_init = nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
1275
1276 let market = instrument_id.map(|id| {
1278 let symbol = id.symbol.to_string();
1279 symbol.trim_end_matches("-PERP").to_string()
1280 });
1281
1282 let fills_response = self
1283 .inner
1284 .get_fills(address, subaccount_number, market.as_deref(), None)
1285 .await?;
1286
1287 let mut reports = Vec::new();
1288
1289 for fill in fills_response.fills {
1290 let market = &fill.market;
1292 let symbol = Ustr::from(&format!("{market}-PERP"));
1293 let instrument = match self.get_instrument(&symbol) {
1294 Some(inst) => inst,
1295 None => {
1296 log::warn!(
1297 "Skipping fill {}: no cached instrument for market {}",
1298 fill.id,
1299 fill.market
1300 );
1301 continue;
1302 }
1303 };
1304
1305 if instrument_id.is_some_and(|filter_id| instrument.id() != filter_id) {
1307 continue;
1308 }
1309
1310 match super::parse::parse_fill_report(&fill, &instrument, account_id, ts_init) {
1311 Ok(report) => reports.push(report),
1312 Err(e) => {
1313 log::warn!("Failed to parse fill {}: {e}", fill.id);
1314 }
1315 }
1316 }
1317
1318 Ok(reports)
1319 }
1320
1321 pub async fn request_position_status_reports(
1330 &self,
1331 address: &str,
1332 subaccount_number: u32,
1333 account_id: AccountId,
1334 instrument_id: Option<InstrumentId>,
1335 ) -> anyhow::Result<Vec<PositionStatusReport>> {
1336 let ts_init = nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
1337
1338 let subaccount_response = self
1339 .inner
1340 .get_subaccount(address, subaccount_number)
1341 .await?;
1342
1343 let mut reports = Vec::new();
1344
1345 for (market, position) in subaccount_response.subaccount.open_perpetual_positions {
1346 let symbol = Ustr::from(&format!("{market}-PERP"));
1348 let instrument = match self.get_instrument(&symbol) {
1349 Some(inst) => inst,
1350 None => {
1351 log::warn!("Skipping position: no cached instrument for market {market}");
1352 continue;
1353 }
1354 };
1355
1356 if instrument_id.is_some_and(|filter_id| instrument.id() != filter_id) {
1358 continue;
1359 }
1360
1361 match super::parse::parse_position_status_report(
1362 &position,
1363 &instrument,
1364 account_id,
1365 ts_init,
1366 ) {
1367 Ok(report) => reports.push(report),
1368 Err(e) => {
1369 log::warn!("Failed to parse position for {market}: {e}");
1370 }
1371 }
1372 }
1373
1374 Ok(reports)
1375 }
1376}
1377
1378#[cfg(test)]
1379mod tests {
1380 use nautilus_core::UnixNanos;
1381 use rstest::rstest;
1382
1383 use super::*;
1384 use crate::http::error;
1385
1386 #[tokio::test]
1387 async fn test_raw_client_creation() {
1388 let client = DydxRawHttpClient::new(None, Some(30), None, false, None);
1389 assert!(client.is_ok());
1390
1391 let client = client.unwrap();
1392 assert!(!client.is_testnet());
1393 assert_eq!(client.base_url(), DYDX_HTTP_URL);
1394 }
1395
1396 #[tokio::test]
1397 async fn test_raw_client_testnet() {
1398 let client = DydxRawHttpClient::new(None, Some(30), None, true, None);
1399 assert!(client.is_ok());
1400
1401 let client = client.unwrap();
1402 assert!(client.is_testnet());
1403 assert_eq!(client.base_url(), DYDX_TESTNET_HTTP_URL);
1404 }
1405
1406 #[tokio::test]
1407 async fn test_domain_client_creation() {
1408 let client = DydxHttpClient::new(None, Some(30), None, false, None);
1409 assert!(client.is_ok());
1410
1411 let client = client.unwrap();
1412 assert!(!client.is_testnet());
1413 assert_eq!(client.base_url(), DYDX_HTTP_URL);
1414 assert!(!client.is_cache_initialized());
1415 assert_eq!(client.cached_instruments_count(), 0);
1416 }
1417
1418 #[tokio::test]
1419 async fn test_domain_client_testnet() {
1420 let client = DydxHttpClient::new(None, Some(30), None, true, None);
1421 assert!(client.is_ok());
1422
1423 let client = client.unwrap();
1424 assert!(client.is_testnet());
1425 assert_eq!(client.base_url(), DYDX_TESTNET_HTTP_URL);
1426 }
1427
1428 #[tokio::test]
1429 async fn test_domain_client_default() {
1430 let client = DydxHttpClient::default();
1431 assert!(!client.is_testnet());
1432 assert_eq!(client.base_url(), DYDX_HTTP_URL);
1433 assert!(!client.is_cache_initialized());
1434 }
1435
1436 #[tokio::test]
1437 async fn test_domain_client_clone() {
1438 let client = DydxHttpClient::new(None, Some(30), None, false, None).unwrap();
1439
1440 let cloned = client.clone();
1442 assert!(!cloned.is_cache_initialized());
1443
1444 client.cache_initialized.store(true, Ordering::Release);
1446
1447 #[allow(clippy::redundant_clone)]
1449 let cloned_after = client.clone();
1450 assert!(cloned_after.is_cache_initialized());
1451 }
1452
1453 #[rstest]
1454 fn test_domain_client_cache_instrument() {
1455 use nautilus_model::{
1456 identifiers::{InstrumentId, Symbol},
1457 instruments::CryptoPerpetual,
1458 types::{Currency, Price, Quantity},
1459 };
1460
1461 let client = DydxHttpClient::default();
1462 assert_eq!(client.cached_instruments_count(), 0);
1463
1464 let instrument_id =
1466 InstrumentId::new(Symbol::from("BTC-USD"), *crate::common::consts::DYDX_VENUE);
1467 let price = Price::from("1.0");
1468 let size = Quantity::from("0.001");
1469 let instrument = CryptoPerpetual::new(
1470 instrument_id,
1471 Symbol::from("BTC-USD"),
1472 Currency::BTC(),
1473 Currency::USD(),
1474 Currency::USD(),
1475 false,
1476 price.precision,
1477 size.precision,
1478 price,
1479 size,
1480 None,
1481 None,
1482 None,
1483 None,
1484 None,
1485 None,
1486 None,
1487 None,
1488 None,
1489 None,
1490 None,
1491 None,
1492 UnixNanos::default(),
1493 UnixNanos::default(),
1494 );
1495
1496 client.cache_instrument(InstrumentAny::CryptoPerpetual(instrument));
1498 assert_eq!(client.cached_instruments_count(), 1);
1499 assert!(client.is_cache_initialized());
1500
1501 let btc_usd = Ustr::from("BTC-USD");
1503 let cached = client.get_instrument(&btc_usd);
1504 assert!(cached.is_some());
1505 }
1506
1507 #[rstest]
1508 fn test_domain_client_get_instrument_not_found() {
1509 let client = DydxHttpClient::default();
1510 let eth_usd = Ustr::from("ETH-USD");
1511 let result = client.get_instrument(ð_usd);
1512 assert!(result.is_none());
1513 }
1514
1515 #[tokio::test]
1516 async fn test_http_timeout_respects_configuration_and_does_not_block() {
1517 use axum::{Router, routing::get};
1518 use tokio::net::TcpListener;
1519
1520 async fn slow_handler() -> &'static str {
1521 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
1523 "ok"
1524 }
1525
1526 let router = Router::new().route("/v4/slow", get(slow_handler));
1527
1528 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1529 let addr = listener.local_addr().unwrap();
1530
1531 tokio::spawn(async move {
1532 axum::serve(listener, router.into_make_service())
1533 .await
1534 .unwrap();
1535 });
1536
1537 let base_url = format!("http://{addr}");
1538
1539 let retry_config = RetryConfig {
1542 max_retries: 0,
1543 initial_delay_ms: 0,
1544 max_delay_ms: 0,
1545 backoff_factor: 1.0,
1546 jitter_ms: 0,
1547 operation_timeout_ms: Some(500),
1548 immediate_first: true,
1549 max_elapsed_ms: Some(1_000),
1550 };
1551
1552 let client =
1555 DydxRawHttpClient::new(Some(base_url), Some(60), None, false, Some(retry_config))
1556 .unwrap();
1557
1558 let start = std::time::Instant::now();
1559 let result: Result<serde_json::Value, error::DydxHttpError> =
1560 client.send_request(Method::GET, "/v4/slow", None).await;
1561 let elapsed = start.elapsed();
1562
1563 assert!(result.is_err());
1566 assert!(elapsed < std::time::Duration::from_secs(3));
1567 }
1568}