1use std::{
19 collections::HashMap,
20 fmt::Debug,
21 num::NonZeroU32,
22 sync::{
23 Arc, LazyLock, RwLock,
24 atomic::{AtomicBool, Ordering},
25 },
26};
27
28use chrono::{DateTime, Utc};
29use dashmap::DashMap;
30use nautilus_core::{
31 UUID4, consts::NAUTILUS_USER_AGENT, nanos::UnixNanos, time::get_atomic_clock_realtime,
32};
33use nautilus_model::{
34 data::{Bar, BookOrder, FundingRateUpdate, TradeTick},
35 enums::{BookType, OrderSide, OrderType, TimeInForce},
36 events::AccountState,
37 identifiers::{AccountId, ClientOrderId, InstrumentId, VenueOrderId},
38 instruments::{Instrument, any::InstrumentAny},
39 orderbook::OrderBook,
40 reports::{FillReport, OrderStatusReport, PositionStatusReport},
41 types::{Price, Quantity},
42};
43use nautilus_network::{
44 http::HttpClient,
45 ratelimiter::quota::Quota,
46 retry::{RetryConfig, RetryManager},
47};
48use reqwest::{Method, header::USER_AGENT};
49use rust_decimal::Decimal;
50use serde::{Serialize, de::DeserializeOwned};
51use tokio_util::sync::CancellationToken;
52use ustr::Ustr;
53
54use super::{
55 error::AxHttpError,
56 models::{
57 AuthenticateApiKeyRequest, AxAuthenticateResponse, AxBalancesResponse,
58 AxBatchCancelOrdersResponse, AxBookResponse, AxCancelAllOrdersResponse,
59 AxCancelOrderResponse, AxCandle, AxCandleResponse, AxCandlesResponse, AxFillsResponse,
60 AxFundingRatesResponse, AxInitialMarginRequirementResponse, AxInstrument,
61 AxInstrumentsResponse, AxOpenOrdersResponse, AxOrderStatusQueryResponse, AxOrdersResponse,
62 AxPlaceOrderResponse, AxPositionsResponse, AxPreviewAggressiveLimitOrderResponse,
63 AxRiskSnapshotResponse, AxTicker, AxTickersResponse, AxTradesResponse,
64 AxTransactionsResponse, AxWhoAmI, BatchCancelOrdersRequest, CancelAllOrdersRequest,
65 CancelOrderRequest, PlaceOrderRequest, PreviewAggressiveLimitOrderRequest,
66 },
67 parse::{
68 parse_account_state, parse_bar, parse_fill_report, parse_funding_rate,
69 parse_order_status_report, parse_perp_instrument, parse_position_status_report,
70 parse_trade_tick,
71 },
72 query::{
73 GetBookParams, GetCandleParams, GetCandlesParams, GetFundingRatesParams,
74 GetInstrumentParams, GetOrderStatusParams, GetOrdersParams, GetTickerParams,
75 GetTradesParams, GetTransactionsParams,
76 },
77};
78use crate::common::{
79 consts::{AX_HTTP_URL, AX_ORDERS_URL},
80 credential::Credential,
81 enums::{AxCandleWidth, AxInstrumentState},
82 parse::client_order_id_to_cid,
83};
84
85pub static AX_REST_QUOTA: LazyLock<Quota> = LazyLock::new(|| {
89 Quota::per_second(NonZeroU32::new(10).expect("Should be a valid non-zero u32"))
90});
91
92const AX_GLOBAL_RATE_KEY: &str = "architect:global";
93
94pub struct AxRawHttpClient {
99 base_url: String,
100 orders_base_url: String,
101 client: HttpClient,
102 credential: Option<Credential>,
103 session_token: RwLock<Option<String>>,
104 retry_manager: RetryManager<AxHttpError>,
105 cancellation_token: RwLock<CancellationToken>,
106}
107
108impl Default for AxRawHttpClient {
109 fn default() -> Self {
110 Self::new(None, None, Some(60), None, None, None, None)
111 .expect("Failed to create default AxRawHttpClient")
112 }
113}
114
115impl Debug for AxRawHttpClient {
116 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117 let has_session_token = self.session_token.read().is_ok_and(|guard| guard.is_some());
118 f.debug_struct(stringify!(AxRawHttpClient))
119 .field("base_url", &self.base_url)
120 .field("orders_base_url", &self.orders_base_url)
121 .field("has_credentials", &self.credential.is_some())
122 .field("has_session_token", &has_session_token)
123 .finish()
124 }
125}
126
127impl AxRawHttpClient {
128 #[must_use]
130 pub fn base_url(&self) -> &str {
131 &self.base_url
132 }
133
134 #[must_use]
136 pub fn api_key_masked(&self) -> String {
137 self.credential
138 .as_ref()
139 .map_or_else(|| "None".to_string(), |c| c.masked_api_key())
140 }
141
142 pub fn cancel_all_requests(&self) {
148 self.cancellation_token
149 .read()
150 .expect("Lock poisoned")
151 .cancel();
152 }
153
154 pub fn reset_cancellation_token(&self) {
160 *self.cancellation_token.write().expect("Lock poisoned") = CancellationToken::new();
161 }
162
163 pub fn cancellation_token(&self) -> CancellationToken {
169 self.cancellation_token
170 .read()
171 .expect("Lock poisoned")
172 .clone()
173 }
174
175 #[allow(clippy::too_many_arguments)]
181 pub fn new(
182 base_url: Option<String>,
183 orders_base_url: Option<String>,
184 timeout_secs: Option<u64>,
185 max_retries: Option<u32>,
186 retry_delay_ms: Option<u64>,
187 retry_delay_max_ms: Option<u64>,
188 proxy_url: Option<String>,
189 ) -> Result<Self, AxHttpError> {
190 let retry_config = RetryConfig {
191 max_retries: max_retries.unwrap_or(3),
192 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
193 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
194 backoff_factor: 2.0,
195 jitter_ms: 1000,
196 operation_timeout_ms: Some(60_000),
197 immediate_first: false,
198 max_elapsed_ms: Some(180_000),
199 };
200
201 let retry_manager = RetryManager::new(retry_config);
202
203 Ok(Self {
204 base_url: base_url.unwrap_or_else(|| AX_HTTP_URL.to_string()),
205 orders_base_url: orders_base_url.unwrap_or_else(|| AX_ORDERS_URL.to_string()),
206 client: HttpClient::new(
207 Self::default_headers(),
208 vec![],
209 Self::rate_limiter_quotas(),
210 Some(*AX_REST_QUOTA),
211 timeout_secs,
212 proxy_url,
213 )
214 .map_err(|e| AxHttpError::NetworkError(format!("Failed to create HTTP client: {e}")))?,
215 credential: None,
216 session_token: RwLock::new(None),
217 retry_manager,
218 cancellation_token: RwLock::new(CancellationToken::new()),
219 })
220 }
221
222 #[allow(clippy::too_many_arguments)]
228 pub fn with_credentials(
229 api_key: String,
230 api_secret: String,
231 base_url: Option<String>,
232 orders_base_url: Option<String>,
233 timeout_secs: Option<u64>,
234 max_retries: Option<u32>,
235 retry_delay_ms: Option<u64>,
236 retry_delay_max_ms: Option<u64>,
237 proxy_url: Option<String>,
238 ) -> Result<Self, AxHttpError> {
239 let retry_config = RetryConfig {
240 max_retries: max_retries.unwrap_or(3),
241 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
242 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
243 backoff_factor: 2.0,
244 jitter_ms: 1000,
245 operation_timeout_ms: Some(60_000),
246 immediate_first: false,
247 max_elapsed_ms: Some(180_000),
248 };
249
250 let retry_manager = RetryManager::new(retry_config);
251
252 Ok(Self {
253 base_url: base_url.unwrap_or_else(|| AX_HTTP_URL.to_string()),
254 orders_base_url: orders_base_url.unwrap_or_else(|| AX_ORDERS_URL.to_string()),
255 client: HttpClient::new(
256 Self::default_headers(),
257 vec![],
258 Self::rate_limiter_quotas(),
259 Some(*AX_REST_QUOTA),
260 timeout_secs,
261 proxy_url,
262 )
263 .map_err(|e| AxHttpError::NetworkError(format!("Failed to create HTTP client: {e}")))?,
264 credential: Some(Credential::new(api_key, api_secret)),
265 session_token: RwLock::new(None),
266 retry_manager,
267 cancellation_token: RwLock::new(CancellationToken::new()),
268 })
269 }
270
271 pub fn set_session_token(&self, token: String) {
279 *self.session_token.write().expect("Lock poisoned") = Some(token);
281 }
282
283 fn default_headers() -> HashMap<String, String> {
284 HashMap::from([
285 (USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string()),
286 ("Accept".to_string(), "application/json".to_string()),
287 ])
288 }
289
290 fn rate_limiter_quotas() -> Vec<(String, Quota)> {
291 vec![(AX_GLOBAL_RATE_KEY.to_string(), *AX_REST_QUOTA)]
292 }
293
294 fn rate_limit_keys(endpoint: &str) -> Vec<String> {
295 let normalized = endpoint.split('?').next().unwrap_or(endpoint);
296 let route = format!("architect:{normalized}");
297
298 vec![AX_GLOBAL_RATE_KEY.to_string(), route]
299 }
300
301 fn auth_headers(&self) -> Result<HashMap<String, String>, AxHttpError> {
302 let guard = self.session_token.read().expect("Lock poisoned");
304 let session_token = guard.as_ref().ok_or(AxHttpError::MissingSessionToken)?;
305
306 let mut headers = HashMap::new();
307 headers.insert(
308 "Authorization".to_string(),
309 format!("Bearer {session_token}"),
310 );
311
312 Ok(headers)
313 }
314
315 async fn send_request<T: DeserializeOwned, P: Serialize>(
316 &self,
317 method: Method,
318 endpoint: &str,
319 params: Option<&P>,
320 body: Option<Vec<u8>>,
321 authenticate: bool,
322 ) -> Result<T, AxHttpError> {
323 self.send_request_to_url(&self.base_url, method, endpoint, params, body, authenticate)
324 .await
325 }
326
327 async fn send_request_to_url<T: DeserializeOwned, P: Serialize>(
328 &self,
329 base_url: &str,
330 method: Method,
331 endpoint: &str,
332 params: Option<&P>,
333 body: Option<Vec<u8>>,
334 authenticate: bool,
335 ) -> Result<T, AxHttpError> {
336 let endpoint = endpoint.to_string();
337 let url = format!("{base_url}{endpoint}");
338
339 let params_str = if method == Method::GET || method == Method::DELETE {
340 params
341 .map(serde_urlencoded::to_string)
342 .transpose()
343 .map_err(|e| AxHttpError::JsonError(format!("Failed to serialize params: {e}")))?
344 } else {
345 None
346 };
347
348 let operation = || {
349 let url = url.clone();
350 let method = method.clone();
351 let endpoint = endpoint.clone();
352 let params_str = params_str.clone();
353 let body = body.clone();
354
355 async move {
356 let mut headers = Self::default_headers();
357
358 if authenticate {
359 let auth_headers = self.auth_headers()?;
360 headers.extend(auth_headers);
361 }
362
363 if body.is_some() {
364 headers.insert("Content-Type".to_string(), "application/json".to_string());
365 }
366
367 let full_url = if let Some(ref query) = params_str {
368 if query.is_empty() {
369 url
370 } else {
371 format!("{url}?{query}")
372 }
373 } else {
374 url
375 };
376
377 let rate_limit_keys = Self::rate_limit_keys(&endpoint);
378
379 let response = self
380 .client
381 .request(
382 method,
383 full_url,
384 None,
385 Some(headers),
386 body,
387 None,
388 Some(rate_limit_keys),
389 )
390 .await?;
391
392 let status = response.status;
393 let response_body = String::from_utf8_lossy(&response.body).to_string();
394
395 if !status.is_success() {
396 return Err(AxHttpError::UnexpectedStatus {
397 status: status.as_u16(),
398 body: response_body,
399 });
400 }
401
402 serde_json::from_str(&response_body).map_err(|e| {
403 AxHttpError::JsonError(format!(
404 "Failed to deserialize response: {e}\nBody: {response_body}"
405 ))
406 })
407 }
408 };
409
410 let is_idempotent = matches!(method, Method::GET | Method::HEAD | Method::OPTIONS);
412 let should_retry = |error: &AxHttpError| -> bool { is_idempotent && error.is_retryable() };
413
414 let create_error = |msg: String| -> AxHttpError {
415 if msg == "canceled" {
416 AxHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
417 } else {
418 AxHttpError::NetworkError(msg)
419 }
420 };
421
422 let cancel_token = self
423 .cancellation_token
424 .read()
425 .expect("Lock poisoned")
426 .clone();
427
428 self.retry_manager
429 .execute_with_retry_with_cancel(
430 endpoint.as_str(),
431 operation,
432 should_retry,
433 create_error,
434 &cancel_token,
435 )
436 .await
437 }
438
439 pub async fn get_whoami(&self) -> Result<AxWhoAmI, AxHttpError> {
448 self.send_request::<AxWhoAmI, ()>(Method::GET, "/whoami", None, None, true)
449 .await
450 }
451
452 pub async fn get_instruments(&self) -> Result<AxInstrumentsResponse, AxHttpError> {
461 self.send_request::<AxInstrumentsResponse, ()>(
462 Method::GET,
463 "/instruments",
464 None,
465 None,
466 false,
467 )
468 .await
469 }
470
471 pub async fn get_balances(&self) -> Result<AxBalancesResponse, AxHttpError> {
480 self.send_request::<AxBalancesResponse, ()>(Method::GET, "/balances", None, None, true)
481 .await
482 }
483
484 pub async fn get_positions(&self) -> Result<AxPositionsResponse, AxHttpError> {
493 self.send_request::<AxPositionsResponse, ()>(Method::GET, "/positions", None, None, true)
494 .await
495 }
496
497 pub async fn get_tickers(&self) -> Result<AxTickersResponse, AxHttpError> {
506 self.send_request::<AxTickersResponse, ()>(Method::GET, "/tickers", None, None, true)
507 .await
508 }
509
510 pub async fn get_ticker(&self, symbol: Ustr) -> Result<AxTicker, AxHttpError> {
519 let params = GetTickerParams::new(symbol);
520 self.send_request::<AxTicker, _>(Method::GET, "/ticker", Some(¶ms), None, true)
521 .await
522 }
523
524 pub async fn get_instrument(&self, symbol: Ustr) -> Result<AxInstrument, AxHttpError> {
533 let params = GetInstrumentParams::new(symbol);
534 self.send_request::<AxInstrument, _>(Method::GET, "/instrument", Some(¶ms), None, false)
535 .await
536 }
537
538 pub async fn authenticate(
547 &self,
548 api_key: &str,
549 api_secret: &str,
550 expiration_seconds: i32,
551 ) -> Result<AxAuthenticateResponse, AxHttpError> {
552 let request = AuthenticateApiKeyRequest::new(api_key, api_secret, expiration_seconds);
553
554 let body = serde_json::to_vec(&request)
555 .map_err(|e| AxHttpError::JsonError(format!("Failed to serialize request: {e}")))?;
556
557 self.send_request::<AxAuthenticateResponse, ()>(
558 Method::POST,
559 "/authenticate",
560 None,
561 Some(body),
562 false,
563 )
564 .await
565 }
566
567 pub async fn authenticate_auto(
582 &self,
583 expiration_seconds: i32,
584 ) -> Result<AxAuthenticateResponse, AxHttpError> {
585 let (api_key, api_secret) = self
586 .resolve_credentials()
587 .ok_or(AxHttpError::MissingCredentials)?;
588
589 self.authenticate(&api_key, &api_secret, expiration_seconds)
590 .await
591 }
592
593 fn resolve_credentials(&self) -> Option<(String, String)> {
594 if let Some(cred) = &self.credential {
595 return Some((cred.api_key().to_string(), cred.api_secret().to_string()));
596 }
597
598 let api_key = std::env::var("AX_API_KEY").ok()?;
599 let api_secret = std::env::var("AX_API_SECRET").ok()?;
600 Some((api_key, api_secret))
601 }
602
603 pub async fn place_order(
612 &self,
613 request: &PlaceOrderRequest,
614 ) -> Result<AxPlaceOrderResponse, AxHttpError> {
615 let body = serde_json::to_vec(request)
616 .map_err(|e| AxHttpError::JsonError(format!("Failed to serialize request: {e}")))?;
617 self.send_request_to_url::<AxPlaceOrderResponse, ()>(
618 &self.orders_base_url,
619 Method::POST,
620 "/place_order",
621 None,
622 Some(body),
623 true,
624 )
625 .await
626 }
627
628 pub async fn cancel_order(&self, order_id: &str) -> Result<AxCancelOrderResponse, AxHttpError> {
637 let request = CancelOrderRequest::new(order_id);
638 let body = serde_json::to_vec(&request)
639 .map_err(|e| AxHttpError::JsonError(format!("Failed to serialize request: {e}")))?;
640 self.send_request_to_url::<AxCancelOrderResponse, ()>(
641 &self.orders_base_url,
642 Method::POST,
643 "/cancel_order",
644 None,
645 Some(body),
646 true,
647 )
648 .await
649 }
650
651 pub async fn cancel_all_orders(
660 &self,
661 request: &CancelAllOrdersRequest,
662 ) -> Result<AxCancelAllOrdersResponse, AxHttpError> {
663 let body = serde_json::to_vec(request)
664 .map_err(|e| AxHttpError::JsonError(format!("Failed to serialize request: {e}")))?;
665 self.send_request_to_url::<AxCancelAllOrdersResponse, ()>(
666 &self.orders_base_url,
667 Method::POST,
668 "/cancel_all_orders",
669 None,
670 Some(body),
671 true,
672 )
673 .await
674 }
675
676 pub async fn batch_cancel_orders(
685 &self,
686 request: &BatchCancelOrdersRequest,
687 ) -> Result<AxBatchCancelOrdersResponse, AxHttpError> {
688 let body = serde_json::to_vec(request)
689 .map_err(|e| AxHttpError::JsonError(format!("Failed to serialize request: {e}")))?;
690 self.send_request_to_url::<AxBatchCancelOrdersResponse, ()>(
691 &self.orders_base_url,
692 Method::POST,
693 "/batch_cancel_orders",
694 None,
695 Some(body),
696 true,
697 )
698 .await
699 }
700
701 pub async fn get_open_orders(&self) -> Result<AxOpenOrdersResponse, AxHttpError> {
710 self.send_request_to_url::<AxOpenOrdersResponse, ()>(
711 &self.orders_base_url,
712 Method::GET,
713 "/open_orders",
714 None,
715 None,
716 true,
717 )
718 .await
719 }
720
721 pub async fn get_fills(&self) -> Result<AxFillsResponse, AxHttpError> {
730 self.send_request::<AxFillsResponse, ()>(Method::GET, "/fills", None, None, true)
731 .await
732 }
733
734 pub async fn get_candles(
743 &self,
744 symbol: Ustr,
745 start_timestamp_ns: i64,
746 end_timestamp_ns: i64,
747 candle_width: AxCandleWidth,
748 ) -> Result<AxCandlesResponse, AxHttpError> {
749 let params =
750 GetCandlesParams::new(symbol, start_timestamp_ns, end_timestamp_ns, candle_width);
751 self.send_request::<AxCandlesResponse, _>(
752 Method::GET,
753 "/candles",
754 Some(¶ms),
755 None,
756 true,
757 )
758 .await
759 }
760
761 pub async fn get_current_candle(
770 &self,
771 symbol: Ustr,
772 candle_width: AxCandleWidth,
773 ) -> Result<AxCandle, AxHttpError> {
774 let params = GetCandleParams::new(symbol, candle_width);
775 let response = self
776 .send_request::<AxCandleResponse, _>(
777 Method::GET,
778 "/candles/current",
779 Some(¶ms),
780 None,
781 true,
782 )
783 .await?;
784 Ok(response.candle)
785 }
786
787 pub async fn get_last_candle(
796 &self,
797 symbol: Ustr,
798 candle_width: AxCandleWidth,
799 ) -> Result<AxCandle, AxHttpError> {
800 let params = GetCandleParams::new(symbol, candle_width);
801 let response = self
802 .send_request::<AxCandleResponse, _>(
803 Method::GET,
804 "/candles/last",
805 Some(¶ms),
806 None,
807 true,
808 )
809 .await?;
810 Ok(response.candle)
811 }
812
813 pub async fn get_funding_rates(
822 &self,
823 symbol: Ustr,
824 start_timestamp_ns: i64,
825 end_timestamp_ns: i64,
826 ) -> Result<AxFundingRatesResponse, AxHttpError> {
827 let params = GetFundingRatesParams::new(symbol, start_timestamp_ns, end_timestamp_ns);
828 self.send_request::<AxFundingRatesResponse, _>(
829 Method::GET,
830 "/funding-rates",
831 Some(¶ms),
832 None,
833 true,
834 )
835 .await
836 }
837
838 pub async fn get_risk_snapshot(&self) -> Result<AxRiskSnapshotResponse, AxHttpError> {
847 self.send_request::<AxRiskSnapshotResponse, ()>(
848 Method::GET,
849 "/risk-snapshot",
850 None,
851 None,
852 true,
853 )
854 .await
855 }
856
857 pub async fn preview_aggressive_limit_order(
870 &self,
871 request: &PreviewAggressiveLimitOrderRequest,
872 ) -> Result<AxPreviewAggressiveLimitOrderResponse, AxHttpError> {
873 let body = serde_json::to_vec(request)
874 .map_err(|e| AxHttpError::JsonError(format!("Failed to serialize request: {e}")))?;
875 self.send_request::<AxPreviewAggressiveLimitOrderResponse, ()>(
876 Method::POST,
877 "/preview-aggressive-limit-order",
878 None,
879 Some(body),
880 true,
881 )
882 .await
883 }
884
885 pub async fn get_transactions(
894 &self,
895 transaction_types: Vec<String>,
896 ) -> Result<AxTransactionsResponse, AxHttpError> {
897 let params = GetTransactionsParams::new(transaction_types);
898 self.send_request::<AxTransactionsResponse, _>(
899 Method::GET,
900 "/transactions",
901 Some(¶ms),
902 None,
903 true,
904 )
905 .await
906 }
907
908 pub async fn get_trades(
917 &self,
918 symbol: Ustr,
919 limit: Option<i32>,
920 ) -> Result<AxTradesResponse, AxHttpError> {
921 let params = GetTradesParams::new(symbol, limit);
922 self.send_request::<AxTradesResponse, _>(Method::GET, "/trades", Some(¶ms), None, true)
923 .await
924 }
925
926 pub async fn get_book(
935 &self,
936 symbol: Ustr,
937 level: Option<i32>,
938 ) -> Result<AxBookResponse, AxHttpError> {
939 let params = GetBookParams::new(symbol, level);
940 self.send_request::<AxBookResponse, _>(Method::GET, "/book", Some(¶ms), None, false)
941 .await
942 }
943
944 pub async fn get_order_status_by_id(
953 &self,
954 order_id: &str,
955 ) -> Result<AxOrderStatusQueryResponse, AxHttpError> {
956 let params = GetOrderStatusParams::by_order_id(order_id);
957 self.send_request_to_url::<AxOrderStatusQueryResponse, _>(
958 &self.orders_base_url,
959 Method::GET,
960 "/order-status",
961 Some(¶ms),
962 None,
963 true,
964 )
965 .await
966 }
967
968 pub async fn get_order_status_by_cid(
977 &self,
978 client_order_id: u64,
979 ) -> Result<AxOrderStatusQueryResponse, AxHttpError> {
980 let params = GetOrderStatusParams::by_client_order_id(client_order_id);
981 self.send_request_to_url::<AxOrderStatusQueryResponse, _>(
982 &self.orders_base_url,
983 Method::GET,
984 "/order-status",
985 Some(¶ms),
986 None,
987 true,
988 )
989 .await
990 }
991
992 pub async fn get_orders(
1001 &self,
1002 params: &GetOrdersParams,
1003 ) -> Result<AxOrdersResponse, AxHttpError> {
1004 self.send_request_to_url::<AxOrdersResponse, _>(
1005 &self.orders_base_url,
1006 Method::GET,
1007 "/orders",
1008 Some(params),
1009 None,
1010 true,
1011 )
1012 .await
1013 }
1014
1015 pub async fn check_initial_margin(
1024 &self,
1025 request: &PlaceOrderRequest,
1026 ) -> Result<AxInitialMarginRequirementResponse, AxHttpError> {
1027 let body = serde_json::to_vec(request)
1028 .map_err(|e| AxHttpError::JsonError(format!("Failed to serialize request: {e}")))?;
1029 self.send_request_to_url::<AxInitialMarginRequirementResponse, ()>(
1030 &self.orders_base_url,
1031 Method::POST,
1032 "/initial-margin-requirement",
1033 None,
1034 Some(body),
1035 true,
1036 )
1037 .await
1038 }
1039}
1040
1041#[derive(Debug)]
1046#[cfg_attr(
1047 feature = "python",
1048 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.architect")
1049)]
1050pub struct AxHttpClient {
1051 pub(crate) inner: Arc<AxRawHttpClient>,
1052 pub(crate) instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
1053 cache_initialized: Arc<AtomicBool>,
1054}
1055
1056impl Clone for AxHttpClient {
1057 fn clone(&self) -> Self {
1058 Self {
1059 inner: self.inner.clone(),
1060 instruments_cache: self.instruments_cache.clone(),
1061 cache_initialized: self.cache_initialized.clone(),
1062 }
1063 }
1064}
1065
1066impl Default for AxHttpClient {
1067 fn default() -> Self {
1068 Self::new(None, None, None, None, None, None, None)
1069 .expect("Failed to create default AxHttpClient")
1070 }
1071}
1072
1073impl AxHttpClient {
1074 #[allow(clippy::too_many_arguments)]
1080 pub fn new(
1081 base_url: Option<String>,
1082 orders_base_url: Option<String>,
1083 timeout_secs: Option<u64>,
1084 max_retries: Option<u32>,
1085 retry_delay_ms: Option<u64>,
1086 retry_delay_max_ms: Option<u64>,
1087 proxy_url: Option<String>,
1088 ) -> Result<Self, AxHttpError> {
1089 Ok(Self {
1090 inner: Arc::new(AxRawHttpClient::new(
1091 base_url,
1092 orders_base_url,
1093 timeout_secs,
1094 max_retries,
1095 retry_delay_ms,
1096 retry_delay_max_ms,
1097 proxy_url,
1098 )?),
1099 instruments_cache: Arc::new(DashMap::new()),
1100 cache_initialized: Arc::new(AtomicBool::new(false)),
1101 })
1102 }
1103
1104 #[allow(clippy::too_many_arguments)]
1110 pub fn with_credentials(
1111 api_key: String,
1112 api_secret: String,
1113 base_url: Option<String>,
1114 orders_base_url: Option<String>,
1115 timeout_secs: Option<u64>,
1116 max_retries: Option<u32>,
1117 retry_delay_ms: Option<u64>,
1118 retry_delay_max_ms: Option<u64>,
1119 proxy_url: Option<String>,
1120 ) -> Result<Self, AxHttpError> {
1121 Ok(Self {
1122 inner: Arc::new(AxRawHttpClient::with_credentials(
1123 api_key,
1124 api_secret,
1125 base_url,
1126 orders_base_url,
1127 timeout_secs,
1128 max_retries,
1129 retry_delay_ms,
1130 retry_delay_max_ms,
1131 proxy_url,
1132 )?),
1133 instruments_cache: Arc::new(DashMap::new()),
1134 cache_initialized: Arc::new(AtomicBool::new(false)),
1135 })
1136 }
1137
1138 #[must_use]
1140 pub fn base_url(&self) -> &str {
1141 self.inner.base_url()
1142 }
1143
1144 #[must_use]
1146 pub fn api_key_masked(&self) -> String {
1147 self.inner.api_key_masked()
1148 }
1149
1150 pub fn cancel_all_requests(&self) {
1152 self.inner.cancel_all_requests();
1153 }
1154
1155 pub fn reset_cancellation_token(&self) {
1157 self.inner.reset_cancellation_token();
1158 }
1159
1160 pub fn set_session_token(&self, token: String) {
1164 self.inner.set_session_token(token);
1165 }
1166
1167 fn generate_ts_init(&self) -> UnixNanos {
1169 get_atomic_clock_realtime().get_time_ns()
1170 }
1171
1172 #[must_use]
1176 pub fn is_initialized(&self) -> bool {
1177 self.cache_initialized.load(Ordering::Acquire)
1178 }
1179
1180 #[must_use]
1182 pub fn get_cached_symbols(&self) -> Vec<String> {
1183 self.instruments_cache
1184 .iter()
1185 .map(|entry| entry.key().to_string())
1186 .collect()
1187 }
1188
1189 pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
1193 for inst in instruments {
1194 self.instruments_cache
1195 .insert(inst.raw_symbol().inner(), inst);
1196 }
1197 self.cache_initialized.store(true, Ordering::Release);
1198 }
1199
1200 pub fn cache_instrument(&self, instrument: InstrumentAny) {
1204 self.instruments_cache
1205 .insert(instrument.raw_symbol().inner(), instrument);
1206 self.cache_initialized.store(true, Ordering::Release);
1207 }
1208
1209 pub async fn authenticate(
1217 &self,
1218 api_key: &str,
1219 api_secret: &str,
1220 expiration_seconds: i32,
1221 ) -> Result<String, AxHttpError> {
1222 let resp = self
1223 .inner
1224 .authenticate(api_key, api_secret, expiration_seconds)
1225 .await?;
1226 self.inner.set_session_token(resp.token.clone());
1227 Ok(resp.token)
1228 }
1229
1230 pub async fn authenticate_auto(&self, expiration_seconds: i32) -> Result<String, AxHttpError> {
1247 let resp = self.inner.authenticate_auto(expiration_seconds).await?;
1248 self.inner.set_session_token(resp.token.clone());
1249 Ok(resp.token)
1250 }
1251
1252 pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
1254 self.instruments_cache
1255 .get(symbol)
1256 .map(|entry| entry.value().clone())
1257 }
1258
1259 pub async fn request_instruments(
1265 &self,
1266 maker_fee: Option<Decimal>,
1267 taker_fee: Option<Decimal>,
1268 ) -> anyhow::Result<Vec<InstrumentAny>> {
1269 let resp = self
1270 .inner
1271 .get_instruments()
1272 .await
1273 .map_err(|e| anyhow::anyhow!(e))?;
1274
1275 let maker_fee = maker_fee.unwrap_or(Decimal::ZERO);
1276 let taker_fee = taker_fee.unwrap_or(Decimal::ZERO);
1277 let ts_init = self.generate_ts_init();
1278
1279 let mut instruments: Vec<InstrumentAny> = Vec::new();
1280 for inst in &resp.instruments {
1281 if inst.state == AxInstrumentState::Suspended {
1282 log::debug!("Skipping suspended instrument: {}", inst.symbol);
1283 continue;
1284 }
1285
1286 if inst.symbol.as_str().starts_with("TEST") {
1288 log::debug!("Skipping test instrument: {}", inst.symbol);
1289 continue;
1290 }
1291
1292 match parse_perp_instrument(inst, maker_fee, taker_fee, ts_init, ts_init) {
1293 Ok(instrument) => instruments.push(instrument),
1294 Err(e) => {
1295 log::warn!("Failed to parse instrument {}: {e}", inst.symbol);
1296 }
1297 }
1298 }
1299
1300 Ok(instruments)
1301 }
1302
1303 pub async fn request_instrument(
1309 &self,
1310 symbol: Ustr,
1311 maker_fee: Option<Decimal>,
1312 taker_fee: Option<Decimal>,
1313 ) -> anyhow::Result<InstrumentAny> {
1314 let resp = self
1315 .inner
1316 .get_instrument(symbol)
1317 .await
1318 .map_err(|e| anyhow::anyhow!(e))?;
1319
1320 let maker_fee = maker_fee.unwrap_or(Decimal::ZERO);
1321 let taker_fee = taker_fee.unwrap_or(Decimal::ZERO);
1322 let ts_init = self.generate_ts_init();
1323
1324 parse_perp_instrument(&resp, maker_fee, taker_fee, ts_init, ts_init)
1325 }
1326
1327 pub async fn request_book_snapshot(
1337 &self,
1338 symbol: Ustr,
1339 depth: Option<usize>,
1340 ) -> anyhow::Result<OrderBook> {
1341 let instrument = self
1342 .get_instrument(&symbol)
1343 .ok_or_else(|| anyhow::anyhow!("Instrument {symbol} not found in cache"))?;
1344
1345 let resp = self
1346 .inner
1347 .get_book(symbol, Some(2))
1348 .await
1349 .map_err(|e| anyhow::anyhow!(e))?;
1350
1351 let instrument_id = instrument.id();
1352 let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
1353
1354 let price_precision = instrument.price_precision();
1355 let size_precision = instrument.size_precision();
1356 let ts_event = UnixNanos::from(resp.book.ts as u64 * 1_000_000_000 + resp.book.tn as u64);
1357
1358 for (i, level) in resp.book.b.iter().enumerate() {
1359 if depth.is_some_and(|d| i >= d) {
1360 break;
1361 }
1362 let price = Price::from_decimal_dp(level.p, price_precision)
1363 .unwrap_or_else(|_| Price::from(level.p.to_string().as_str()));
1364 let size = Quantity::new(level.q as f64, size_precision);
1365 let order = BookOrder::new(OrderSide::Buy, price, size, i as u64);
1366 book.add(order, 0, i as u64, ts_event);
1367 }
1368
1369 let bids_len = resp.book.b.len();
1370 for (i, level) in resp.book.a.iter().enumerate() {
1371 if depth.is_some_and(|d| i >= d) {
1372 break;
1373 }
1374 let price = Price::from_decimal_dp(level.p, price_precision)
1375 .unwrap_or_else(|_| Price::from(level.p.to_string().as_str()));
1376 let size = Quantity::new(level.q as f64, size_precision);
1377 let order = BookOrder::new(OrderSide::Sell, price, size, (bids_len + i) as u64);
1378 book.add(order, 0, (bids_len + i) as u64, ts_event);
1379 }
1380
1381 Ok(book)
1382 }
1383
1384 pub async fn request_trade_ticks(
1398 &self,
1399 symbol: Ustr,
1400 limit: Option<i32>,
1401 start: Option<UnixNanos>,
1402 end: Option<UnixNanos>,
1403 ) -> anyhow::Result<Vec<TradeTick>> {
1404 let instrument = self
1405 .get_instrument(&symbol)
1406 .ok_or_else(|| anyhow::anyhow!("Instrument {symbol} not found in cache"))?;
1407
1408 let resp = self
1409 .inner
1410 .get_trades(symbol, limit)
1411 .await
1412 .map_err(|e| anyhow::anyhow!(e))?;
1413
1414 let ts_init = self.generate_ts_init();
1415 let mut ticks = Vec::with_capacity(resp.trades.len());
1416
1417 for trade in &resp.trades {
1418 match parse_trade_tick(trade, &instrument, ts_init) {
1419 Ok(tick) => {
1420 if start.is_some_and(|s| tick.ts_event < s) {
1421 continue;
1422 }
1423 if end.is_some_and(|e| tick.ts_event > e) {
1424 continue;
1425 }
1426 ticks.push(tick);
1427 }
1428 Err(e) => {
1429 log::warn!("Failed to parse trade for {symbol}: {e}");
1430 }
1431 }
1432 }
1433
1434 Ok(ticks)
1435 }
1436
1437 pub async fn request_bars(
1448 &self,
1449 symbol: Ustr,
1450 start: Option<DateTime<Utc>>,
1451 end: Option<DateTime<Utc>>,
1452 width: AxCandleWidth,
1453 ) -> anyhow::Result<Vec<Bar>> {
1454 let instrument = self
1455 .get_instrument(&symbol)
1456 .ok_or_else(|| anyhow::anyhow!("Instrument {symbol} not found in cache"))?;
1457
1458 let start_ns = start.and_then(|dt| dt.timestamp_nanos_opt()).unwrap_or(0);
1459 let end_ns = end
1460 .and_then(|dt| dt.timestamp_nanos_opt())
1461 .unwrap_or_else(|| self.generate_ts_init().as_i64());
1462 let resp = self
1463 .inner
1464 .get_candles(symbol, start_ns, end_ns, width)
1465 .await
1466 .map_err(|e| anyhow::anyhow!(e))?;
1467
1468 let ts_init = self.generate_ts_init();
1469 let mut bars = Vec::with_capacity(resp.candles.len());
1470
1471 for candle in &resp.candles {
1472 match parse_bar(candle, &instrument, ts_init) {
1473 Ok(bar) => bars.push(bar),
1474 Err(e) => {
1475 log::warn!("Failed to parse bar for {symbol}: {e}");
1476 }
1477 }
1478 }
1479
1480 Ok(bars)
1481 }
1482
1483 pub async fn request_funding_rates(
1489 &self,
1490 instrument_id: InstrumentId,
1491 start: Option<DateTime<Utc>>,
1492 end: Option<DateTime<Utc>>,
1493 ) -> Result<Vec<FundingRateUpdate>, AxHttpError> {
1494 let symbol = instrument_id.symbol.inner();
1495 let start_ns = start.and_then(|dt| dt.timestamp_nanos_opt()).unwrap_or(0);
1496 let end_ns = end
1497 .and_then(|dt| dt.timestamp_nanos_opt())
1498 .unwrap_or_else(|| self.generate_ts_init().as_i64());
1499 let response = self
1500 .inner
1501 .get_funding_rates(symbol, start_ns, end_ns)
1502 .await?;
1503
1504 let ts_init = self.generate_ts_init();
1505 let funding_rates = response
1506 .funding_rates
1507 .iter()
1508 .map(|r| parse_funding_rate(r, instrument_id, ts_init))
1509 .collect();
1510
1511 Ok(funding_rates)
1512 }
1513
1514 pub async fn request_account_state(
1520 &self,
1521 account_id: AccountId,
1522 ) -> anyhow::Result<AccountState> {
1523 let response = self
1524 .inner
1525 .get_balances()
1526 .await
1527 .map_err(|e| anyhow::anyhow!(e))?;
1528
1529 let ts_init = self.generate_ts_init();
1530 parse_account_state(&response, account_id, ts_init, ts_init)
1531 }
1532
1533 pub async fn check_initial_margin(
1539 &self,
1540 request: &PlaceOrderRequest,
1541 ) -> anyhow::Result<Decimal> {
1542 let resp = self
1543 .inner
1544 .check_initial_margin(request)
1545 .await
1546 .map_err(|e| anyhow::anyhow!(e))?;
1547 Ok(resp.im)
1548 }
1549
1550 #[allow(clippy::too_many_arguments)]
1562 pub async fn request_order_status(
1563 &self,
1564 account_id: AccountId,
1565 instrument_id: InstrumentId,
1566 client_order_id: Option<ClientOrderId>,
1567 venue_order_id: Option<VenueOrderId>,
1568 order_side: OrderSide,
1569 order_type: OrderType,
1570 time_in_force: TimeInForce,
1571 ) -> anyhow::Result<OrderStatusReport> {
1572 let resp = if let Some(ref voi) = venue_order_id {
1573 self.inner.get_order_status_by_id(voi.as_str()).await
1574 } else if let Some(ref coid) = client_order_id {
1575 let cid = client_order_id_to_cid(coid);
1576 self.inner.get_order_status_by_cid(cid).await
1577 } else {
1578 anyhow::bail!("Either venue_order_id or client_order_id must be provided")
1579 }
1580 .map_err(|e| anyhow::anyhow!(e))?;
1581
1582 let detail = resp.status;
1583 let size_precision = self
1584 .get_instrument(&detail.symbol)
1585 .map_or(0, |i| i.size_precision());
1586
1587 let voi = VenueOrderId::new(&detail.order_id);
1588 let order_status = detail.state.into();
1589 let filled = detail.filled_quantity.unwrap_or(0);
1590 let remaining = detail.remaining_quantity.unwrap_or(0);
1591 let quantity = Quantity::new((filled + remaining) as f64, size_precision);
1592 let filled_qty = Quantity::new(filled as f64, size_precision);
1593 let ts_init = self.generate_ts_init();
1594
1595 let resolved_coid = client_order_id
1596 .unwrap_or_else(|| ClientOrderId::new(format!("CID-{}", detail.clord_id.unwrap_or(0))));
1597
1598 Ok(OrderStatusReport::new(
1599 account_id,
1600 instrument_id,
1601 Some(resolved_coid),
1602 voi,
1603 order_side,
1604 order_type,
1605 time_in_force,
1606 order_status,
1607 quantity,
1608 filled_qty,
1609 ts_init,
1610 ts_init,
1611 ts_init,
1612 Some(UUID4::new()),
1613 ))
1614 }
1615
1616 pub async fn request_order_status_reports<F>(
1630 &self,
1631 account_id: AccountId,
1632 cid_resolver: Option<F>,
1633 ) -> anyhow::Result<Vec<OrderStatusReport>>
1634 where
1635 F: Fn(u64) -> Option<ClientOrderId>,
1636 {
1637 let response = self
1638 .inner
1639 .get_open_orders()
1640 .await
1641 .map_err(|e| anyhow::anyhow!(e))?;
1642
1643 let ts_init = self.generate_ts_init();
1644 let mut reports = Vec::with_capacity(response.orders.len());
1645
1646 for order in &response.orders {
1647 let instrument = self
1648 .get_instrument(&order.s)
1649 .ok_or_else(|| anyhow::anyhow!("Instrument {} not found in cache", order.s))?;
1650
1651 match parse_order_status_report(
1652 order,
1653 account_id,
1654 &instrument,
1655 ts_init,
1656 cid_resolver.as_ref(),
1657 ) {
1658 Ok(report) => reports.push(report),
1659 Err(e) => {
1660 log::warn!("Failed to parse order {}: {e}", order.oid);
1661 }
1662 }
1663 }
1664
1665 Ok(reports)
1666 }
1667
1668 pub async fn request_fill_reports(
1679 &self,
1680 account_id: AccountId,
1681 ) -> anyhow::Result<Vec<FillReport>> {
1682 let response = self
1683 .inner
1684 .get_fills()
1685 .await
1686 .map_err(|e| anyhow::anyhow!(e))?;
1687
1688 let ts_init = self.generate_ts_init();
1689 let mut reports = Vec::with_capacity(response.fills.len());
1690
1691 for fill in &response.fills {
1692 let instrument = self
1693 .get_instrument(&fill.symbol)
1694 .ok_or_else(|| anyhow::anyhow!("Instrument {} not found in cache", fill.symbol))?;
1695
1696 match parse_fill_report(fill, account_id, &instrument, ts_init) {
1697 Ok(report) => reports.push(report),
1698 Err(e) => {
1699 log::warn!("Failed to parse fill {}: {e}", fill.trade_id);
1700 }
1701 }
1702 }
1703
1704 Ok(reports)
1705 }
1706
1707 pub async fn request_position_reports(
1718 &self,
1719 account_id: AccountId,
1720 ) -> anyhow::Result<Vec<PositionStatusReport>> {
1721 let response = self
1722 .inner
1723 .get_positions()
1724 .await
1725 .map_err(|e| anyhow::anyhow!(e))?;
1726
1727 let ts_init = self.generate_ts_init();
1728 let mut reports = Vec::with_capacity(response.positions.len());
1729
1730 for position in &response.positions {
1731 if position.signed_quantity == 0 {
1733 continue;
1734 }
1735
1736 let instrument = self.get_instrument(&position.symbol).ok_or_else(|| {
1737 anyhow::anyhow!("Instrument {} not found in cache", position.symbol)
1738 })?;
1739
1740 match parse_position_status_report(position, account_id, &instrument, ts_init) {
1741 Ok(report) => reports.push(report),
1742 Err(e) => {
1743 log::warn!("Failed to parse position for {}: {e}", position.symbol);
1744 }
1745 }
1746 }
1747
1748 Ok(reports)
1749 }
1750}