1use std::{
19 collections::HashMap,
20 fmt::Debug,
21 num::NonZeroU32,
22 sync::{
23 Arc, LazyLock, RwLock,
24 atomic::{AtomicBool, Ordering},
25 },
26};
27
28use dashmap::DashMap;
29use nautilus_core::{
30 consts::NAUTILUS_USER_AGENT, nanos::UnixNanos, time::get_atomic_clock_realtime,
31};
32use nautilus_model::{
33 data::Bar,
34 events::AccountState,
35 identifiers::AccountId,
36 instruments::{Instrument, any::InstrumentAny},
37 reports::{FillReport, OrderStatusReport, PositionStatusReport},
38};
39use nautilus_network::{
40 http::HttpClient,
41 ratelimiter::quota::Quota,
42 retry::{RetryConfig, RetryManager},
43};
44use reqwest::{Method, header::USER_AGENT};
45use rust_decimal::Decimal;
46use serde::{Serialize, de::DeserializeOwned};
47use tokio_util::sync::CancellationToken;
48use ustr::Ustr;
49
50use super::{
51 error::AxHttpError,
52 models::{
53 AuthenticateApiKeyRequest, AxAuthenticateResponse, AxBalancesResponse,
54 AxCancelOrderResponse, AxCandle, AxCandleResponse, AxCandlesResponse, AxFillsResponse,
55 AxFundingRatesResponse, AxInstrument, AxInstrumentsResponse, AxOpenOrdersResponse,
56 AxPlaceOrderResponse, AxPositionsResponse, AxRiskSnapshotResponse, AxTicker,
57 AxTickersResponse, AxTransactionsResponse, AxWhoAmI, CancelOrderRequest, PlaceOrderRequest,
58 },
59 parse::{
60 parse_account_state, parse_bar, parse_fill_report, parse_order_status_report,
61 parse_perp_instrument, parse_position_status_report,
62 },
63 query::{
64 GetCandleParams, GetCandlesParams, GetFundingRatesParams, GetInstrumentParams,
65 GetTickerParams, GetTransactionsParams,
66 },
67};
68use crate::common::{
69 consts::{AX_HTTP_URL, AX_ORDERS_URL},
70 credential::Credential,
71 enums::{AxCandleWidth, AxInstrumentState},
72};
73
74pub static AX_REST_QUOTA: LazyLock<Quota> = LazyLock::new(|| {
78 Quota::per_second(NonZeroU32::new(10).expect("Should be a valid non-zero u32"))
79});
80
81const AX_GLOBAL_RATE_KEY: &str = "architect:global";
82
83pub struct AxRawHttpClient {
88 base_url: String,
89 orders_base_url: String,
90 client: HttpClient,
91 credential: Option<Credential>,
92 session_token: RwLock<Option<String>>,
93 retry_manager: RetryManager<AxHttpError>,
94 cancellation_token: CancellationToken,
95}
96
97impl Default for AxRawHttpClient {
98 fn default() -> Self {
99 Self::new(None, None, Some(60), None, None, None, None)
100 .expect("Failed to create default AxRawHttpClient")
101 }
102}
103
104impl Debug for AxRawHttpClient {
105 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106 let has_session_token = self
107 .session_token
108 .read()
109 .map(|guard| guard.is_some())
110 .unwrap_or(false);
111 f.debug_struct(stringify!(AxRawHttpClient))
112 .field("base_url", &self.base_url)
113 .field("orders_base_url", &self.orders_base_url)
114 .field("has_credentials", &self.credential.is_some())
115 .field("has_session_token", &has_session_token)
116 .finish()
117 }
118}
119
120impl AxRawHttpClient {
121 #[must_use]
123 pub fn base_url(&self) -> &str {
124 &self.base_url
125 }
126
127 pub fn cancel_all_requests(&self) {
129 self.cancellation_token.cancel();
130 }
131
132 pub fn cancellation_token(&self) -> &CancellationToken {
134 &self.cancellation_token
135 }
136
137 #[allow(clippy::too_many_arguments)]
143 pub fn new(
144 base_url: Option<String>,
145 orders_base_url: Option<String>,
146 timeout_secs: Option<u64>,
147 max_retries: Option<u32>,
148 retry_delay_ms: Option<u64>,
149 retry_delay_max_ms: Option<u64>,
150 proxy_url: Option<String>,
151 ) -> Result<Self, AxHttpError> {
152 let retry_config = RetryConfig {
153 max_retries: max_retries.unwrap_or(3),
154 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
155 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
156 backoff_factor: 2.0,
157 jitter_ms: 1000,
158 operation_timeout_ms: Some(60_000),
159 immediate_first: false,
160 max_elapsed_ms: Some(180_000),
161 };
162
163 let retry_manager = RetryManager::new(retry_config);
164
165 Ok(Self {
166 base_url: base_url.unwrap_or_else(|| AX_HTTP_URL.to_string()),
167 orders_base_url: orders_base_url.unwrap_or_else(|| AX_ORDERS_URL.to_string()),
168 client: HttpClient::new(
169 Self::default_headers(),
170 vec![],
171 Self::rate_limiter_quotas(),
172 Some(*AX_REST_QUOTA),
173 timeout_secs,
174 proxy_url,
175 )
176 .map_err(|e| AxHttpError::NetworkError(format!("Failed to create HTTP client: {e}")))?,
177 credential: None,
178 session_token: RwLock::new(None),
179 retry_manager,
180 cancellation_token: CancellationToken::new(),
181 })
182 }
183
184 #[allow(clippy::too_many_arguments)]
190 pub fn with_credentials(
191 api_key: String,
192 api_secret: String,
193 base_url: Option<String>,
194 orders_base_url: Option<String>,
195 timeout_secs: Option<u64>,
196 max_retries: Option<u32>,
197 retry_delay_ms: Option<u64>,
198 retry_delay_max_ms: Option<u64>,
199 proxy_url: Option<String>,
200 ) -> Result<Self, AxHttpError> {
201 let retry_config = RetryConfig {
202 max_retries: max_retries.unwrap_or(3),
203 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
204 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
205 backoff_factor: 2.0,
206 jitter_ms: 1000,
207 operation_timeout_ms: Some(60_000),
208 immediate_first: false,
209 max_elapsed_ms: Some(180_000),
210 };
211
212 let retry_manager = RetryManager::new(retry_config);
213
214 Ok(Self {
215 base_url: base_url.unwrap_or_else(|| AX_HTTP_URL.to_string()),
216 orders_base_url: orders_base_url.unwrap_or_else(|| AX_ORDERS_URL.to_string()),
217 client: HttpClient::new(
218 Self::default_headers(),
219 vec![],
220 Self::rate_limiter_quotas(),
221 Some(*AX_REST_QUOTA),
222 timeout_secs,
223 proxy_url,
224 )
225 .map_err(|e| AxHttpError::NetworkError(format!("Failed to create HTTP client: {e}")))?,
226 credential: Some(Credential::new(api_key, api_secret)),
227 session_token: RwLock::new(None),
228 retry_manager,
229 cancellation_token: CancellationToken::new(),
230 })
231 }
232
233 pub fn set_session_token(&self, token: String) {
241 *self.session_token.write().expect("Lock poisoned") = Some(token);
243 }
244
245 fn default_headers() -> HashMap<String, String> {
246 HashMap::from([
247 (USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string()),
248 ("Accept".to_string(), "application/json".to_string()),
249 ])
250 }
251
252 fn rate_limiter_quotas() -> Vec<(String, Quota)> {
253 vec![(AX_GLOBAL_RATE_KEY.to_string(), *AX_REST_QUOTA)]
254 }
255
256 fn rate_limit_keys(endpoint: &str) -> Vec<String> {
257 let normalized = endpoint.split('?').next().unwrap_or(endpoint);
258 let route = format!("architect:{normalized}");
259
260 vec![AX_GLOBAL_RATE_KEY.to_string(), route]
261 }
262
263 fn auth_headers(&self) -> Result<HashMap<String, String>, AxHttpError> {
264 let credential = self
265 .credential
266 .as_ref()
267 .ok_or(AxHttpError::MissingCredentials)?;
268
269 let guard = self.session_token.read().expect("Lock poisoned");
271 let session_token = guard
272 .as_ref()
273 .ok_or_else(|| AxHttpError::ValidationError("Session token not set".to_string()))?;
274
275 let mut headers = HashMap::new();
276 headers.insert(
277 "Authorization".to_string(),
278 credential.bearer_token(session_token),
279 );
280
281 Ok(headers)
282 }
283
284 async fn send_request<T: DeserializeOwned, P: Serialize>(
285 &self,
286 method: Method,
287 endpoint: &str,
288 params: Option<&P>,
289 body: Option<Vec<u8>>,
290 authenticate: bool,
291 ) -> Result<T, AxHttpError> {
292 self.send_request_to_url(&self.base_url, method, endpoint, params, body, authenticate)
293 .await
294 }
295
296 async fn send_request_to_url<T: DeserializeOwned, P: Serialize>(
297 &self,
298 base_url: &str,
299 method: Method,
300 endpoint: &str,
301 params: Option<&P>,
302 body: Option<Vec<u8>>,
303 authenticate: bool,
304 ) -> Result<T, AxHttpError> {
305 let endpoint = endpoint.to_string();
306 let url = format!("{base_url}{endpoint}");
307
308 let params_str = if method == Method::GET || method == Method::DELETE {
309 params
310 .map(serde_urlencoded::to_string)
311 .transpose()
312 .map_err(|e| AxHttpError::JsonError(format!("Failed to serialize params: {e}")))?
313 } else {
314 None
315 };
316
317 let operation = || {
318 let url = url.clone();
319 let method = method.clone();
320 let endpoint = endpoint.clone();
321 let params_str = params_str.clone();
322 let body = body.clone();
323
324 async move {
325 let mut headers = Self::default_headers();
326
327 if authenticate {
328 let auth_headers = self.auth_headers()?;
329 headers.extend(auth_headers);
330 }
331
332 if body.is_some() {
333 headers.insert("Content-Type".to_string(), "application/json".to_string());
334 }
335
336 let full_url = if let Some(ref query) = params_str {
337 if query.is_empty() {
338 url
339 } else {
340 format!("{url}?{query}")
341 }
342 } else {
343 url
344 };
345
346 let rate_limit_keys = Self::rate_limit_keys(&endpoint);
347
348 let response = self
349 .client
350 .request(
351 method,
352 full_url,
353 None,
354 Some(headers),
355 body,
356 None,
357 Some(rate_limit_keys),
358 )
359 .await?;
360
361 let status = response.status;
362 let response_body = String::from_utf8_lossy(&response.body).to_string();
363
364 if !status.is_success() {
365 return Err(AxHttpError::UnexpectedStatus {
366 status: status.as_u16(),
367 body: response_body,
368 });
369 }
370
371 serde_json::from_str(&response_body).map_err(|e| {
372 AxHttpError::JsonError(format!(
373 "Failed to deserialize response: {e}\nBody: {response_body}"
374 ))
375 })
376 }
377 };
378
379 let should_retry = |_error: &AxHttpError| -> bool {
380 false
383 };
384
385 let create_error = |msg: String| -> AxHttpError {
386 if msg == "canceled" {
387 AxHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
388 } else {
389 AxHttpError::NetworkError(msg)
390 }
391 };
392
393 self.retry_manager
394 .execute_with_retry_with_cancel(
395 endpoint.as_str(),
396 operation,
397 should_retry,
398 create_error,
399 &self.cancellation_token,
400 )
401 .await
402 }
403
404 pub async fn get_whoami(&self) -> Result<AxWhoAmI, AxHttpError> {
413 self.send_request::<AxWhoAmI, ()>(Method::GET, "/whoami", None, None, true)
414 .await
415 }
416
417 pub async fn get_instruments(&self) -> Result<AxInstrumentsResponse, AxHttpError> {
426 self.send_request::<AxInstrumentsResponse, ()>(
427 Method::GET,
428 "/instruments",
429 None,
430 None,
431 false,
432 )
433 .await
434 }
435
436 pub async fn get_balances(&self) -> Result<AxBalancesResponse, AxHttpError> {
445 self.send_request::<AxBalancesResponse, ()>(Method::GET, "/balances", None, None, true)
446 .await
447 }
448
449 pub async fn get_positions(&self) -> Result<AxPositionsResponse, AxHttpError> {
458 self.send_request::<AxPositionsResponse, ()>(Method::GET, "/positions", None, None, true)
459 .await
460 }
461
462 pub async fn get_tickers(&self) -> Result<AxTickersResponse, AxHttpError> {
471 self.send_request::<AxTickersResponse, ()>(Method::GET, "/tickers", None, None, true)
472 .await
473 }
474
475 pub async fn get_ticker(&self, symbol: &str) -> Result<AxTicker, AxHttpError> {
484 let params = GetTickerParams::new(symbol);
485 self.send_request::<AxTicker, _>(Method::GET, "/ticker", Some(¶ms), None, true)
486 .await
487 }
488
489 pub async fn get_instrument(&self, symbol: &str) -> Result<AxInstrument, AxHttpError> {
498 let params = GetInstrumentParams::new(symbol);
499 self.send_request::<AxInstrument, _>(Method::GET, "/instrument", Some(¶ms), None, false)
500 .await
501 }
502
503 pub async fn authenticate(
512 &self,
513 api_key: &str,
514 api_secret: &str,
515 expiration_seconds: i32,
516 ) -> Result<AxAuthenticateResponse, AxHttpError> {
517 self.authenticate_with_totp(api_key, api_secret, expiration_seconds, None)
518 .await
519 }
520
521 pub async fn authenticate_with_totp(
532 &self,
533 api_key: &str,
534 api_secret: &str,
535 expiration_seconds: i32,
536 totp: Option<&str>,
537 ) -> Result<AxAuthenticateResponse, AxHttpError> {
538 let mut request = AuthenticateApiKeyRequest::new(api_key, api_secret, expiration_seconds);
539 if let Some(code) = totp {
540 request = request.with_totp(code);
541 }
542
543 let body = serde_json::to_vec(&request)
544 .map_err(|e| AxHttpError::JsonError(format!("Failed to serialize request: {e}")))?;
545
546 self.send_request::<AxAuthenticateResponse, ()>(
547 Method::POST,
548 "/authenticate",
549 None,
550 Some(body),
551 false,
552 )
553 .await
554 }
555
556 pub async fn place_order(
565 &self,
566 request: &PlaceOrderRequest,
567 ) -> Result<AxPlaceOrderResponse, AxHttpError> {
568 let body = serde_json::to_vec(request)
569 .map_err(|e| AxHttpError::JsonError(format!("Failed to serialize request: {e}")))?;
570 self.send_request_to_url::<AxPlaceOrderResponse, ()>(
571 &self.orders_base_url,
572 Method::POST,
573 "/place_order",
574 None,
575 Some(body),
576 true,
577 )
578 .await
579 }
580
581 pub async fn cancel_order(&self, order_id: &str) -> Result<AxCancelOrderResponse, AxHttpError> {
590 let request = CancelOrderRequest::new(order_id);
591 let body = serde_json::to_vec(&request)
592 .map_err(|e| AxHttpError::JsonError(format!("Failed to serialize request: {e}")))?;
593 self.send_request_to_url::<AxCancelOrderResponse, ()>(
594 &self.orders_base_url,
595 Method::POST,
596 "/cancel_order",
597 None,
598 Some(body),
599 true,
600 )
601 .await
602 }
603
604 pub async fn get_open_orders(&self) -> Result<AxOpenOrdersResponse, AxHttpError> {
613 self.send_request_to_url::<AxOpenOrdersResponse, ()>(
614 &self.orders_base_url,
615 Method::GET,
616 "/open_orders",
617 None,
618 None,
619 true,
620 )
621 .await
622 }
623
624 pub async fn get_fills(&self) -> Result<AxFillsResponse, AxHttpError> {
633 self.send_request::<AxFillsResponse, ()>(Method::GET, "/fills", None, None, true)
634 .await
635 }
636
637 pub async fn get_candles(
646 &self,
647 symbol: &str,
648 start_timestamp_ns: i64,
649 end_timestamp_ns: i64,
650 candle_width: AxCandleWidth,
651 ) -> Result<AxCandlesResponse, AxHttpError> {
652 let params =
653 GetCandlesParams::new(symbol, start_timestamp_ns, end_timestamp_ns, candle_width);
654 self.send_request::<AxCandlesResponse, _>(
655 Method::GET,
656 "/candles",
657 Some(¶ms),
658 None,
659 true,
660 )
661 .await
662 }
663
664 pub async fn get_current_candle(
673 &self,
674 symbol: &str,
675 candle_width: AxCandleWidth,
676 ) -> Result<AxCandle, AxHttpError> {
677 let params = GetCandleParams::new(symbol, candle_width);
678 let response = self
679 .send_request::<AxCandleResponse, _>(
680 Method::GET,
681 "/candles/current",
682 Some(¶ms),
683 None,
684 true,
685 )
686 .await?;
687 Ok(response.candle)
688 }
689
690 pub async fn get_last_candle(
699 &self,
700 symbol: &str,
701 candle_width: AxCandleWidth,
702 ) -> Result<AxCandle, AxHttpError> {
703 let params = GetCandleParams::new(symbol, candle_width);
704 let response = self
705 .send_request::<AxCandleResponse, _>(
706 Method::GET,
707 "/candles/last",
708 Some(¶ms),
709 None,
710 true,
711 )
712 .await?;
713 Ok(response.candle)
714 }
715
716 pub async fn get_funding_rates(
725 &self,
726 symbol: &str,
727 start_timestamp_ns: i64,
728 end_timestamp_ns: i64,
729 ) -> Result<AxFundingRatesResponse, AxHttpError> {
730 let params = GetFundingRatesParams::new(symbol, start_timestamp_ns, end_timestamp_ns);
731 self.send_request::<AxFundingRatesResponse, _>(
732 Method::GET,
733 "/funding-rates",
734 Some(¶ms),
735 None,
736 true,
737 )
738 .await
739 }
740
741 pub async fn get_risk_snapshot(&self) -> Result<AxRiskSnapshotResponse, AxHttpError> {
750 self.send_request::<AxRiskSnapshotResponse, ()>(
751 Method::GET,
752 "/risk-snapshot",
753 None,
754 None,
755 true,
756 )
757 .await
758 }
759
760 pub async fn get_transactions(
769 &self,
770 transaction_types: Vec<String>,
771 ) -> Result<AxTransactionsResponse, AxHttpError> {
772 let params = GetTransactionsParams::new(transaction_types);
773 self.send_request::<AxTransactionsResponse, _>(
774 Method::GET,
775 "/transactions",
776 Some(¶ms),
777 None,
778 true,
779 )
780 .await
781 }
782}
783
784#[derive(Debug)]
791#[cfg_attr(
792 feature = "python",
793 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.architect")
794)]
795pub struct AxHttpClient {
796 pub(crate) inner: Arc<AxRawHttpClient>,
797 pub(crate) instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
798 cache_initialized: AtomicBool,
799}
800
801impl Clone for AxHttpClient {
802 fn clone(&self) -> Self {
803 let cache_initialized = AtomicBool::new(false);
804
805 let is_initialized = self.cache_initialized.load(Ordering::Acquire);
806 if is_initialized {
807 cache_initialized.store(true, Ordering::Release);
808 }
809
810 Self {
811 inner: self.inner.clone(),
812 instruments_cache: self.instruments_cache.clone(),
813 cache_initialized,
814 }
815 }
816}
817
818impl Default for AxHttpClient {
819 fn default() -> Self {
820 Self::new(None, None, None, None, None, None, None)
821 .expect("Failed to create default AxHttpClient")
822 }
823}
824
825impl AxHttpClient {
826 #[allow(clippy::too_many_arguments)]
832 pub fn new(
833 base_url: Option<String>,
834 orders_base_url: Option<String>,
835 timeout_secs: Option<u64>,
836 max_retries: Option<u32>,
837 retry_delay_ms: Option<u64>,
838 retry_delay_max_ms: Option<u64>,
839 proxy_url: Option<String>,
840 ) -> Result<Self, AxHttpError> {
841 Ok(Self {
842 inner: Arc::new(AxRawHttpClient::new(
843 base_url,
844 orders_base_url,
845 timeout_secs,
846 max_retries,
847 retry_delay_ms,
848 retry_delay_max_ms,
849 proxy_url,
850 )?),
851 instruments_cache: Arc::new(DashMap::new()),
852 cache_initialized: AtomicBool::new(false),
853 })
854 }
855
856 #[allow(clippy::too_many_arguments)]
862 pub fn with_credentials(
863 api_key: String,
864 api_secret: String,
865 base_url: Option<String>,
866 orders_base_url: Option<String>,
867 timeout_secs: Option<u64>,
868 max_retries: Option<u32>,
869 retry_delay_ms: Option<u64>,
870 retry_delay_max_ms: Option<u64>,
871 proxy_url: Option<String>,
872 ) -> Result<Self, AxHttpError> {
873 Ok(Self {
874 inner: Arc::new(AxRawHttpClient::with_credentials(
875 api_key,
876 api_secret,
877 base_url,
878 orders_base_url,
879 timeout_secs,
880 max_retries,
881 retry_delay_ms,
882 retry_delay_max_ms,
883 proxy_url,
884 )?),
885 instruments_cache: Arc::new(DashMap::new()),
886 cache_initialized: AtomicBool::new(false),
887 })
888 }
889
890 #[must_use]
892 pub fn base_url(&self) -> &str {
893 self.inner.base_url()
894 }
895
896 pub fn cancel_all_requests(&self) {
898 self.inner.cancel_all_requests();
899 }
900
901 pub fn set_session_token(&self, token: String) {
905 self.inner.set_session_token(token);
906 }
907
908 fn generate_ts_init(&self) -> UnixNanos {
910 get_atomic_clock_realtime().get_time_ns()
911 }
912
913 #[must_use]
917 pub fn is_initialized(&self) -> bool {
918 self.cache_initialized.load(Ordering::Acquire)
919 }
920
921 #[must_use]
923 pub fn get_cached_symbols(&self) -> Vec<String> {
924 self.instruments_cache
925 .iter()
926 .map(|entry| entry.key().to_string())
927 .collect()
928 }
929
930 pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
934 for inst in instruments {
935 self.instruments_cache
936 .insert(inst.raw_symbol().inner(), inst);
937 }
938 self.cache_initialized.store(true, Ordering::Release);
939 }
940
941 pub fn cache_instrument(&self, instrument: InstrumentAny) {
945 self.instruments_cache
946 .insert(instrument.raw_symbol().inner(), instrument);
947 self.cache_initialized.store(true, Ordering::Release);
948 }
949
950 pub async fn authenticate(
958 &self,
959 api_key: &str,
960 api_secret: &str,
961 expiration_seconds: i32,
962 ) -> Result<String, AxHttpError> {
963 let resp = self
964 .inner
965 .authenticate(api_key, api_secret, expiration_seconds)
966 .await?;
967 self.inner.set_session_token(resp.token.clone());
968 Ok(resp.token)
969 }
970
971 pub async fn authenticate_with_totp(
979 &self,
980 api_key: &str,
981 api_secret: &str,
982 expiration_seconds: i32,
983 totp_code: Option<&str>,
984 ) -> Result<String, AxHttpError> {
985 let resp = self
986 .inner
987 .authenticate_with_totp(api_key, api_secret, expiration_seconds, totp_code)
988 .await?;
989 self.inner.set_session_token(resp.token.clone());
990 Ok(resp.token)
991 }
992
993 pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
995 self.instruments_cache
996 .get(symbol)
997 .map(|entry| entry.value().clone())
998 }
999
1000 pub async fn request_instruments(
1006 &self,
1007 maker_fee: Option<Decimal>,
1008 taker_fee: Option<Decimal>,
1009 ) -> anyhow::Result<Vec<InstrumentAny>> {
1010 let resp = self
1011 .inner
1012 .get_instruments()
1013 .await
1014 .map_err(|e| anyhow::anyhow!(e))?;
1015
1016 let maker_fee = maker_fee.unwrap_or(Decimal::ZERO);
1017 let taker_fee = taker_fee.unwrap_or(Decimal::ZERO);
1018 let ts_init = self.generate_ts_init();
1019
1020 let mut instruments: Vec<InstrumentAny> = Vec::new();
1021 for inst in &resp.instruments {
1022 if inst.state == AxInstrumentState::Suspended {
1023 log::debug!("Skipping suspended instrument: {}", inst.symbol);
1024 continue;
1025 }
1026
1027 match parse_perp_instrument(inst, maker_fee, taker_fee, ts_init, ts_init) {
1028 Ok(instrument) => instruments.push(instrument),
1029 Err(e) => {
1030 log::warn!("Failed to parse instrument {}: {e}", inst.symbol);
1031 }
1032 }
1033 }
1034
1035 Ok(instruments)
1036 }
1037
1038 pub async fn request_instrument(
1044 &self,
1045 symbol: &str,
1046 maker_fee: Option<Decimal>,
1047 taker_fee: Option<Decimal>,
1048 ) -> anyhow::Result<InstrumentAny> {
1049 let resp = self
1050 .inner
1051 .get_instrument(symbol)
1052 .await
1053 .map_err(|e| anyhow::anyhow!(e))?;
1054
1055 let maker_fee = maker_fee.unwrap_or(Decimal::ZERO);
1056 let taker_fee = taker_fee.unwrap_or(Decimal::ZERO);
1057 let ts_init = self.generate_ts_init();
1058
1059 parse_perp_instrument(&resp, maker_fee, taker_fee, ts_init, ts_init)
1060 }
1061
1062 pub async fn request_account_state(
1068 &self,
1069 account_id: AccountId,
1070 ) -> anyhow::Result<AccountState> {
1071 let response = self
1072 .inner
1073 .get_balances()
1074 .await
1075 .map_err(|e| anyhow::anyhow!(e))?;
1076
1077 let ts_init = self.generate_ts_init();
1078 parse_account_state(&response, account_id, ts_init, ts_init)
1079 }
1080
1081 pub async fn request_funding_rates(
1087 &self,
1088 symbol: &str,
1089 start_timestamp_ns: i64,
1090 end_timestamp_ns: i64,
1091 ) -> Result<AxFundingRatesResponse, AxHttpError> {
1092 self.inner
1093 .get_funding_rates(symbol, start_timestamp_ns, end_timestamp_ns)
1094 .await
1095 }
1096
1097 pub async fn request_bars(
1108 &self,
1109 symbol: &str,
1110 start_timestamp_ns: i64,
1111 end_timestamp_ns: i64,
1112 width: AxCandleWidth,
1113 ) -> anyhow::Result<Vec<Bar>> {
1114 let symbol_ustr = ustr::Ustr::from(symbol);
1115 let instrument = self
1116 .get_instrument(&symbol_ustr)
1117 .ok_or_else(|| anyhow::anyhow!("Instrument {symbol} not found in cache"))?;
1118
1119 let resp = self
1120 .inner
1121 .get_candles(symbol, start_timestamp_ns, end_timestamp_ns, width)
1122 .await
1123 .map_err(|e| anyhow::anyhow!(e))?;
1124
1125 let ts_init = self.generate_ts_init();
1126 let mut bars = Vec::with_capacity(resp.candles.len());
1127
1128 for candle in &resp.candles {
1129 match parse_bar(candle, &instrument, ts_init) {
1130 Ok(bar) => bars.push(bar),
1131 Err(e) => {
1132 log::warn!("Failed to parse bar for {symbol}: {e}");
1133 }
1134 }
1135 }
1136
1137 Ok(bars)
1138 }
1139
1140 pub async fn request_order_status_reports(
1151 &self,
1152 account_id: AccountId,
1153 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1154 let orders = self
1155 .inner
1156 .get_open_orders()
1157 .await
1158 .map_err(|e| anyhow::anyhow!(e))?;
1159
1160 let ts_init = self.generate_ts_init();
1161 let mut reports = Vec::with_capacity(orders.len());
1162
1163 for order in &orders {
1164 let instrument = self
1165 .get_instrument(&order.s)
1166 .ok_or_else(|| anyhow::anyhow!("Instrument {} not found in cache", order.s))?;
1167
1168 match parse_order_status_report(order, account_id, &instrument, ts_init) {
1169 Ok(report) => reports.push(report),
1170 Err(e) => {
1171 log::warn!("Failed to parse order {}: {e}", order.oid);
1172 }
1173 }
1174 }
1175
1176 Ok(reports)
1177 }
1178
1179 pub async fn request_fill_reports(
1190 &self,
1191 account_id: AccountId,
1192 ) -> anyhow::Result<Vec<FillReport>> {
1193 let response = self
1194 .inner
1195 .get_fills()
1196 .await
1197 .map_err(|e| anyhow::anyhow!(e))?;
1198
1199 let ts_init = self.generate_ts_init();
1200 let mut reports = Vec::with_capacity(response.fills.len());
1201
1202 for fill in &response.fills {
1203 let instrument = self
1204 .get_instrument(&fill.symbol)
1205 .ok_or_else(|| anyhow::anyhow!("Instrument {} not found in cache", fill.symbol))?;
1206
1207 match parse_fill_report(fill, account_id, &instrument, ts_init) {
1208 Ok(report) => reports.push(report),
1209 Err(e) => {
1210 log::warn!("Failed to parse fill {}: {e}", fill.execution_id);
1211 }
1212 }
1213 }
1214
1215 Ok(reports)
1216 }
1217
1218 pub async fn request_position_reports(
1229 &self,
1230 account_id: AccountId,
1231 ) -> anyhow::Result<Vec<PositionStatusReport>> {
1232 let response = self
1233 .inner
1234 .get_positions()
1235 .await
1236 .map_err(|e| anyhow::anyhow!(e))?;
1237
1238 let ts_init = self.generate_ts_init();
1239 let mut reports = Vec::with_capacity(response.positions.len());
1240
1241 for position in &response.positions {
1242 let instrument = self.get_instrument(&position.symbol).ok_or_else(|| {
1243 anyhow::anyhow!("Instrument {} not found in cache", position.symbol)
1244 })?;
1245
1246 match parse_position_status_report(position, account_id, &instrument, ts_init) {
1247 Ok(report) => reports.push(report),
1248 Err(e) => {
1249 log::warn!("Failed to parse position for {}: {e}", position.symbol);
1250 }
1251 }
1252 }
1253
1254 Ok(reports)
1255 }
1256}