nautilus_architect_ax/http/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides the HTTP client integration for the Ax REST API.
17
18use std::{
19    collections::HashMap,
20    fmt::Debug,
21    num::NonZeroU32,
22    sync::{
23        Arc, LazyLock, RwLock,
24        atomic::{AtomicBool, Ordering},
25    },
26};
27
28use dashmap::DashMap;
29use nautilus_core::{
30    consts::NAUTILUS_USER_AGENT, nanos::UnixNanos, time::get_atomic_clock_realtime,
31};
32use nautilus_model::{
33    data::Bar,
34    events::AccountState,
35    identifiers::AccountId,
36    instruments::{Instrument, any::InstrumentAny},
37    reports::{FillReport, OrderStatusReport, PositionStatusReport},
38};
39use nautilus_network::{
40    http::HttpClient,
41    ratelimiter::quota::Quota,
42    retry::{RetryConfig, RetryManager},
43};
44use reqwest::{Method, header::USER_AGENT};
45use rust_decimal::Decimal;
46use serde::{Serialize, de::DeserializeOwned};
47use tokio_util::sync::CancellationToken;
48use ustr::Ustr;
49
50use super::{
51    error::AxHttpError,
52    models::{
53        AuthenticateApiKeyRequest, AxAuthenticateResponse, AxBalancesResponse,
54        AxCancelOrderResponse, AxCandle, AxCandleResponse, AxCandlesResponse, AxFillsResponse,
55        AxFundingRatesResponse, AxInstrument, AxInstrumentsResponse, AxOpenOrdersResponse,
56        AxPlaceOrderResponse, AxPositionsResponse, AxRiskSnapshotResponse, AxTicker,
57        AxTickersResponse, AxTransactionsResponse, AxWhoAmI, CancelOrderRequest, PlaceOrderRequest,
58    },
59    parse::{
60        parse_account_state, parse_bar, parse_fill_report, parse_order_status_report,
61        parse_perp_instrument, parse_position_status_report,
62    },
63    query::{
64        GetCandleParams, GetCandlesParams, GetFundingRatesParams, GetInstrumentParams,
65        GetTickerParams, GetTransactionsParams,
66    },
67};
68use crate::common::{
69    consts::{AX_HTTP_URL, AX_ORDERS_URL},
70    credential::Credential,
71    enums::{AxCandleWidth, AxInstrumentState},
72};
73
74/// Default Ax REST API rate limit.
75///
76/// Conservative default of 10 requests per second.
77pub static AX_REST_QUOTA: LazyLock<Quota> = LazyLock::new(|| {
78    Quota::per_second(NonZeroU32::new(10).expect("Should be a valid non-zero u32"))
79});
80
81const AX_GLOBAL_RATE_KEY: &str = "architect:global";
82
83/// Raw HTTP client for low-level AX Exchange API operations.
84///
85/// This client handles request/response operations with the AX Exchange API,
86/// returning venue-specific response types. It does not parse to Nautilus domain types.
87pub struct AxRawHttpClient {
88    base_url: String,
89    orders_base_url: String,
90    client: HttpClient,
91    credential: Option<Credential>,
92    session_token: RwLock<Option<String>>,
93    retry_manager: RetryManager<AxHttpError>,
94    cancellation_token: CancellationToken,
95}
96
97impl Default for AxRawHttpClient {
98    fn default() -> Self {
99        Self::new(None, None, Some(60), None, None, None, None)
100            .expect("Failed to create default AxRawHttpClient")
101    }
102}
103
104impl Debug for AxRawHttpClient {
105    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106        let has_session_token = self
107            .session_token
108            .read()
109            .map(|guard| guard.is_some())
110            .unwrap_or(false);
111        f.debug_struct(stringify!(AxRawHttpClient))
112            .field("base_url", &self.base_url)
113            .field("orders_base_url", &self.orders_base_url)
114            .field("has_credentials", &self.credential.is_some())
115            .field("has_session_token", &has_session_token)
116            .finish()
117    }
118}
119
120impl AxRawHttpClient {
121    /// Returns the base URL for this client.
122    #[must_use]
123    pub fn base_url(&self) -> &str {
124        &self.base_url
125    }
126
127    /// Cancel all pending HTTP requests.
128    pub fn cancel_all_requests(&self) {
129        self.cancellation_token.cancel();
130    }
131
132    /// Get the cancellation token for this client.
133    pub fn cancellation_token(&self) -> &CancellationToken {
134        &self.cancellation_token
135    }
136
137    /// Creates a new [`AxRawHttpClient`] using the default Ax HTTP URL.
138    ///
139    /// # Errors
140    ///
141    /// Returns an error if the retry manager cannot be created.
142    #[allow(clippy::too_many_arguments)]
143    pub fn new(
144        base_url: Option<String>,
145        orders_base_url: Option<String>,
146        timeout_secs: Option<u64>,
147        max_retries: Option<u32>,
148        retry_delay_ms: Option<u64>,
149        retry_delay_max_ms: Option<u64>,
150        proxy_url: Option<String>,
151    ) -> Result<Self, AxHttpError> {
152        let retry_config = RetryConfig {
153            max_retries: max_retries.unwrap_or(3),
154            initial_delay_ms: retry_delay_ms.unwrap_or(1000),
155            max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
156            backoff_factor: 2.0,
157            jitter_ms: 1000,
158            operation_timeout_ms: Some(60_000),
159            immediate_first: false,
160            max_elapsed_ms: Some(180_000),
161        };
162
163        let retry_manager = RetryManager::new(retry_config);
164
165        Ok(Self {
166            base_url: base_url.unwrap_or_else(|| AX_HTTP_URL.to_string()),
167            orders_base_url: orders_base_url.unwrap_or_else(|| AX_ORDERS_URL.to_string()),
168            client: HttpClient::new(
169                Self::default_headers(),
170                vec![],
171                Self::rate_limiter_quotas(),
172                Some(*AX_REST_QUOTA),
173                timeout_secs,
174                proxy_url,
175            )
176            .map_err(|e| AxHttpError::NetworkError(format!("Failed to create HTTP client: {e}")))?,
177            credential: None,
178            session_token: RwLock::new(None),
179            retry_manager,
180            cancellation_token: CancellationToken::new(),
181        })
182    }
183
184    /// Creates a new [`AxRawHttpClient`] configured with credentials.
185    ///
186    /// # Errors
187    ///
188    /// Returns an error if the HTTP client cannot be created.
189    #[allow(clippy::too_many_arguments)]
190    pub fn with_credentials(
191        api_key: String,
192        api_secret: String,
193        base_url: Option<String>,
194        orders_base_url: Option<String>,
195        timeout_secs: Option<u64>,
196        max_retries: Option<u32>,
197        retry_delay_ms: Option<u64>,
198        retry_delay_max_ms: Option<u64>,
199        proxy_url: Option<String>,
200    ) -> Result<Self, AxHttpError> {
201        let retry_config = RetryConfig {
202            max_retries: max_retries.unwrap_or(3),
203            initial_delay_ms: retry_delay_ms.unwrap_or(1000),
204            max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
205            backoff_factor: 2.0,
206            jitter_ms: 1000,
207            operation_timeout_ms: Some(60_000),
208            immediate_first: false,
209            max_elapsed_ms: Some(180_000),
210        };
211
212        let retry_manager = RetryManager::new(retry_config);
213
214        Ok(Self {
215            base_url: base_url.unwrap_or_else(|| AX_HTTP_URL.to_string()),
216            orders_base_url: orders_base_url.unwrap_or_else(|| AX_ORDERS_URL.to_string()),
217            client: HttpClient::new(
218                Self::default_headers(),
219                vec![],
220                Self::rate_limiter_quotas(),
221                Some(*AX_REST_QUOTA),
222                timeout_secs,
223                proxy_url,
224            )
225            .map_err(|e| AxHttpError::NetworkError(format!("Failed to create HTTP client: {e}")))?,
226            credential: Some(Credential::new(api_key, api_secret)),
227            session_token: RwLock::new(None),
228            retry_manager,
229            cancellation_token: CancellationToken::new(),
230        })
231    }
232
233    /// Sets the session token for authenticated requests.
234    ///
235    /// The session token is obtained through the login flow and used for bearer token authentication.
236    ///
237    /// # Panics
238    ///
239    /// Panics if the internal lock is poisoned (indicates a panic in another thread).
240    pub fn set_session_token(&self, token: String) {
241        // SAFETY: Lock poisoning indicates a panic in another thread, which is fatal
242        *self.session_token.write().expect("Lock poisoned") = Some(token);
243    }
244
245    fn default_headers() -> HashMap<String, String> {
246        HashMap::from([
247            (USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string()),
248            ("Accept".to_string(), "application/json".to_string()),
249        ])
250    }
251
252    fn rate_limiter_quotas() -> Vec<(String, Quota)> {
253        vec![(AX_GLOBAL_RATE_KEY.to_string(), *AX_REST_QUOTA)]
254    }
255
256    fn rate_limit_keys(endpoint: &str) -> Vec<String> {
257        let normalized = endpoint.split('?').next().unwrap_or(endpoint);
258        let route = format!("architect:{normalized}");
259
260        vec![AX_GLOBAL_RATE_KEY.to_string(), route]
261    }
262
263    fn auth_headers(&self) -> Result<HashMap<String, String>, AxHttpError> {
264        let credential = self
265            .credential
266            .as_ref()
267            .ok_or(AxHttpError::MissingCredentials)?;
268
269        // SAFETY: Lock poisoning indicates a panic in another thread, which is fatal
270        let guard = self.session_token.read().expect("Lock poisoned");
271        let session_token = guard
272            .as_ref()
273            .ok_or_else(|| AxHttpError::ValidationError("Session token not set".to_string()))?;
274
275        let mut headers = HashMap::new();
276        headers.insert(
277            "Authorization".to_string(),
278            credential.bearer_token(session_token),
279        );
280
281        Ok(headers)
282    }
283
284    async fn send_request<T: DeserializeOwned, P: Serialize>(
285        &self,
286        method: Method,
287        endpoint: &str,
288        params: Option<&P>,
289        body: Option<Vec<u8>>,
290        authenticate: bool,
291    ) -> Result<T, AxHttpError> {
292        self.send_request_to_url(&self.base_url, method, endpoint, params, body, authenticate)
293            .await
294    }
295
296    async fn send_request_to_url<T: DeserializeOwned, P: Serialize>(
297        &self,
298        base_url: &str,
299        method: Method,
300        endpoint: &str,
301        params: Option<&P>,
302        body: Option<Vec<u8>>,
303        authenticate: bool,
304    ) -> Result<T, AxHttpError> {
305        let endpoint = endpoint.to_string();
306        let url = format!("{base_url}{endpoint}");
307
308        let params_str = if method == Method::GET || method == Method::DELETE {
309            params
310                .map(serde_urlencoded::to_string)
311                .transpose()
312                .map_err(|e| AxHttpError::JsonError(format!("Failed to serialize params: {e}")))?
313        } else {
314            None
315        };
316
317        let operation = || {
318            let url = url.clone();
319            let method = method.clone();
320            let endpoint = endpoint.clone();
321            let params_str = params_str.clone();
322            let body = body.clone();
323
324            async move {
325                let mut headers = Self::default_headers();
326
327                if authenticate {
328                    let auth_headers = self.auth_headers()?;
329                    headers.extend(auth_headers);
330                }
331
332                if body.is_some() {
333                    headers.insert("Content-Type".to_string(), "application/json".to_string());
334                }
335
336                let full_url = if let Some(ref query) = params_str {
337                    if query.is_empty() {
338                        url
339                    } else {
340                        format!("{url}?{query}")
341                    }
342                } else {
343                    url
344                };
345
346                let rate_limit_keys = Self::rate_limit_keys(&endpoint);
347
348                let response = self
349                    .client
350                    .request(
351                        method,
352                        full_url,
353                        None,
354                        Some(headers),
355                        body,
356                        None,
357                        Some(rate_limit_keys),
358                    )
359                    .await?;
360
361                let status = response.status;
362                let response_body = String::from_utf8_lossy(&response.body).to_string();
363
364                if !status.is_success() {
365                    return Err(AxHttpError::UnexpectedStatus {
366                        status: status.as_u16(),
367                        body: response_body,
368                    });
369                }
370
371                serde_json::from_str(&response_body).map_err(|e| {
372                    AxHttpError::JsonError(format!(
373                        "Failed to deserialize response: {e}\nBody: {response_body}"
374                    ))
375                })
376            }
377        };
378
379        let should_retry = |_error: &AxHttpError| -> bool {
380            // For now, don't retry any errors
381            // TODO: Implement proper retry logic based on error type
382            false
383        };
384
385        let create_error = |msg: String| -> AxHttpError {
386            if msg == "canceled" {
387                AxHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
388            } else {
389                AxHttpError::NetworkError(msg)
390            }
391        };
392
393        self.retry_manager
394            .execute_with_retry_with_cancel(
395                endpoint.as_str(),
396                operation,
397                should_retry,
398                create_error,
399                &self.cancellation_token,
400            )
401            .await
402    }
403
404    /// Fetches the current authenticated user information.
405    ///
406    /// # Endpoint
407    /// `GET /whoami`
408    ///
409    /// # Errors
410    ///
411    /// Returns an error if the request fails or the response cannot be parsed.
412    pub async fn get_whoami(&self) -> Result<AxWhoAmI, AxHttpError> {
413        self.send_request::<AxWhoAmI, ()>(Method::GET, "/whoami", None, None, true)
414            .await
415    }
416
417    /// Fetches all available instruments.
418    ///
419    /// # Endpoint
420    /// `GET /instruments`
421    ///
422    /// # Errors
423    ///
424    /// Returns an error if the request fails or the response cannot be parsed.
425    pub async fn get_instruments(&self) -> Result<AxInstrumentsResponse, AxHttpError> {
426        self.send_request::<AxInstrumentsResponse, ()>(
427            Method::GET,
428            "/instruments",
429            None,
430            None,
431            false,
432        )
433        .await
434    }
435
436    /// Fetches all account balances for the authenticated user.
437    ///
438    /// # Endpoint
439    /// `GET /balances`
440    ///
441    /// # Errors
442    ///
443    /// Returns an error if the request fails or the response cannot be parsed.
444    pub async fn get_balances(&self) -> Result<AxBalancesResponse, AxHttpError> {
445        self.send_request::<AxBalancesResponse, ()>(Method::GET, "/balances", None, None, true)
446            .await
447    }
448
449    /// Fetches all open positions for the authenticated user.
450    ///
451    /// # Endpoint
452    /// `GET /positions`
453    ///
454    /// # Errors
455    ///
456    /// Returns an error if the request fails or the response cannot be parsed.
457    pub async fn get_positions(&self) -> Result<AxPositionsResponse, AxHttpError> {
458        self.send_request::<AxPositionsResponse, ()>(Method::GET, "/positions", None, None, true)
459            .await
460    }
461
462    /// Fetches all tickers.
463    ///
464    /// # Endpoint
465    /// `GET /tickers`
466    ///
467    /// # Errors
468    ///
469    /// Returns an error if the request fails or the response cannot be parsed.
470    pub async fn get_tickers(&self) -> Result<AxTickersResponse, AxHttpError> {
471        self.send_request::<AxTickersResponse, ()>(Method::GET, "/tickers", None, None, true)
472            .await
473    }
474
475    /// Fetches a single ticker by symbol.
476    ///
477    /// # Endpoint
478    /// `GET /ticker?symbol=<symbol>`
479    ///
480    /// # Errors
481    ///
482    /// Returns an error if the request fails or the response cannot be parsed.
483    pub async fn get_ticker(&self, symbol: &str) -> Result<AxTicker, AxHttpError> {
484        let params = GetTickerParams::new(symbol);
485        self.send_request::<AxTicker, _>(Method::GET, "/ticker", Some(&params), None, true)
486            .await
487    }
488
489    /// Fetches a single instrument by symbol.
490    ///
491    /// # Endpoint
492    /// `GET /instrument?symbol=<symbol>`
493    ///
494    /// # Errors
495    ///
496    /// Returns an error if the request fails or the response cannot be parsed.
497    pub async fn get_instrument(&self, symbol: &str) -> Result<AxInstrument, AxHttpError> {
498        let params = GetInstrumentParams::new(symbol);
499        self.send_request::<AxInstrument, _>(Method::GET, "/instrument", Some(&params), None, false)
500            .await
501    }
502
503    /// Authenticates using API key and secret to obtain a session token.
504    ///
505    /// # Endpoint
506    /// `POST /authenticate`
507    ///
508    /// # Errors
509    ///
510    /// Returns an error if the request fails or the response cannot be parsed.
511    pub async fn authenticate(
512        &self,
513        api_key: &str,
514        api_secret: &str,
515        expiration_seconds: i32,
516    ) -> Result<AxAuthenticateResponse, AxHttpError> {
517        self.authenticate_with_totp(api_key, api_secret, expiration_seconds, None)
518            .await
519    }
520
521    /// Authenticates with the AX Exchange API using API key credentials and optional 2FA.
522    ///
523    /// # Endpoint
524    /// `POST /authenticate`
525    ///
526    /// # Errors
527    ///
528    /// Returns an error if:
529    /// - 400: 2FA is required but `totp` was not provided.
530    /// - 401: Invalid credentials.
531    pub async fn authenticate_with_totp(
532        &self,
533        api_key: &str,
534        api_secret: &str,
535        expiration_seconds: i32,
536        totp: Option<&str>,
537    ) -> Result<AxAuthenticateResponse, AxHttpError> {
538        let mut request = AuthenticateApiKeyRequest::new(api_key, api_secret, expiration_seconds);
539        if let Some(code) = totp {
540            request = request.with_totp(code);
541        }
542
543        let body = serde_json::to_vec(&request)
544            .map_err(|e| AxHttpError::JsonError(format!("Failed to serialize request: {e}")))?;
545
546        self.send_request::<AxAuthenticateResponse, ()>(
547            Method::POST,
548            "/authenticate",
549            None,
550            Some(body),
551            false,
552        )
553        .await
554    }
555
556    /// Places a new order.
557    ///
558    /// # Endpoint
559    /// `POST /place_order` (orders base URL)
560    ///
561    /// # Errors
562    ///
563    /// Returns an error if the request fails or the response cannot be parsed.
564    pub async fn place_order(
565        &self,
566        request: &PlaceOrderRequest,
567    ) -> Result<AxPlaceOrderResponse, AxHttpError> {
568        let body = serde_json::to_vec(request)
569            .map_err(|e| AxHttpError::JsonError(format!("Failed to serialize request: {e}")))?;
570        self.send_request_to_url::<AxPlaceOrderResponse, ()>(
571            &self.orders_base_url,
572            Method::POST,
573            "/place_order",
574            None,
575            Some(body),
576            true,
577        )
578        .await
579    }
580
581    /// Cancels an existing order.
582    ///
583    /// # Endpoint
584    /// `POST /cancel_order` (orders base URL)
585    ///
586    /// # Errors
587    ///
588    /// Returns an error if the request fails or the response cannot be parsed.
589    pub async fn cancel_order(&self, order_id: &str) -> Result<AxCancelOrderResponse, AxHttpError> {
590        let request = CancelOrderRequest::new(order_id);
591        let body = serde_json::to_vec(&request)
592            .map_err(|e| AxHttpError::JsonError(format!("Failed to serialize request: {e}")))?;
593        self.send_request_to_url::<AxCancelOrderResponse, ()>(
594            &self.orders_base_url,
595            Method::POST,
596            "/cancel_order",
597            None,
598            Some(body),
599            true,
600        )
601        .await
602    }
603
604    /// Fetches all open orders.
605    ///
606    /// # Endpoint
607    /// `GET /open_orders` (orders base URL)
608    ///
609    /// # Errors
610    ///
611    /// Returns an error if the request fails or the response cannot be parsed.
612    pub async fn get_open_orders(&self) -> Result<AxOpenOrdersResponse, AxHttpError> {
613        self.send_request_to_url::<AxOpenOrdersResponse, ()>(
614            &self.orders_base_url,
615            Method::GET,
616            "/open_orders",
617            None,
618            None,
619            true,
620        )
621        .await
622    }
623
624    /// Fetches all fills/trades.
625    ///
626    /// # Endpoint
627    /// `GET /fills`
628    ///
629    /// # Errors
630    ///
631    /// Returns an error if the request fails or the response cannot be parsed.
632    pub async fn get_fills(&self) -> Result<AxFillsResponse, AxHttpError> {
633        self.send_request::<AxFillsResponse, ()>(Method::GET, "/fills", None, None, true)
634            .await
635    }
636
637    /// Fetches historical candles.
638    ///
639    /// # Endpoint
640    /// `GET /candles`
641    ///
642    /// # Errors
643    ///
644    /// Returns an error if the request fails or the response cannot be parsed.
645    pub async fn get_candles(
646        &self,
647        symbol: &str,
648        start_timestamp_ns: i64,
649        end_timestamp_ns: i64,
650        candle_width: AxCandleWidth,
651    ) -> Result<AxCandlesResponse, AxHttpError> {
652        let params =
653            GetCandlesParams::new(symbol, start_timestamp_ns, end_timestamp_ns, candle_width);
654        self.send_request::<AxCandlesResponse, _>(
655            Method::GET,
656            "/candles",
657            Some(&params),
658            None,
659            true,
660        )
661        .await
662    }
663
664    /// Fetches the current (incomplete) candle.
665    ///
666    /// # Endpoint
667    /// `GET /candles/current`
668    ///
669    /// # Errors
670    ///
671    /// Returns an error if the request fails or the response cannot be parsed.
672    pub async fn get_current_candle(
673        &self,
674        symbol: &str,
675        candle_width: AxCandleWidth,
676    ) -> Result<AxCandle, AxHttpError> {
677        let params = GetCandleParams::new(symbol, candle_width);
678        let response = self
679            .send_request::<AxCandleResponse, _>(
680                Method::GET,
681                "/candles/current",
682                Some(&params),
683                None,
684                true,
685            )
686            .await?;
687        Ok(response.candle)
688    }
689
690    /// Fetches the last completed candle.
691    ///
692    /// # Endpoint
693    /// `GET /candles/last`
694    ///
695    /// # Errors
696    ///
697    /// Returns an error if the request fails or the response cannot be parsed.
698    pub async fn get_last_candle(
699        &self,
700        symbol: &str,
701        candle_width: AxCandleWidth,
702    ) -> Result<AxCandle, AxHttpError> {
703        let params = GetCandleParams::new(symbol, candle_width);
704        let response = self
705            .send_request::<AxCandleResponse, _>(
706                Method::GET,
707                "/candles/last",
708                Some(&params),
709                None,
710                true,
711            )
712            .await?;
713        Ok(response.candle)
714    }
715
716    /// Fetches funding rates for a symbol.
717    ///
718    /// # Endpoint
719    /// `GET /funding-rates`
720    ///
721    /// # Errors
722    ///
723    /// Returns an error if the request fails or the response cannot be parsed.
724    pub async fn get_funding_rates(
725        &self,
726        symbol: &str,
727        start_timestamp_ns: i64,
728        end_timestamp_ns: i64,
729    ) -> Result<AxFundingRatesResponse, AxHttpError> {
730        let params = GetFundingRatesParams::new(symbol, start_timestamp_ns, end_timestamp_ns);
731        self.send_request::<AxFundingRatesResponse, _>(
732            Method::GET,
733            "/funding-rates",
734            Some(&params),
735            None,
736            true,
737        )
738        .await
739    }
740
741    /// Fetches the current risk snapshot.
742    ///
743    /// # Endpoint
744    /// `GET /risk-snapshot`
745    ///
746    /// # Errors
747    ///
748    /// Returns an error if the request fails or the response cannot be parsed.
749    pub async fn get_risk_snapshot(&self) -> Result<AxRiskSnapshotResponse, AxHttpError> {
750        self.send_request::<AxRiskSnapshotResponse, ()>(
751            Method::GET,
752            "/risk-snapshot",
753            None,
754            None,
755            true,
756        )
757        .await
758    }
759
760    /// Fetches transactions filtered by type.
761    ///
762    /// # Endpoint
763    /// `GET /transactions`
764    ///
765    /// # Errors
766    ///
767    /// Returns an error if the request fails or the response cannot be parsed.
768    pub async fn get_transactions(
769        &self,
770        transaction_types: Vec<String>,
771    ) -> Result<AxTransactionsResponse, AxHttpError> {
772        let params = GetTransactionsParams::new(transaction_types);
773        self.send_request::<AxTransactionsResponse, _>(
774            Method::GET,
775            "/transactions",
776            Some(&params),
777            None,
778            true,
779        )
780        .await
781    }
782}
783
784// ------------------------------------------------------------------------------------------------
785
786/// High-level HTTP client for the Ax REST API.
787///
788/// This client wraps the underlying [`AxRawHttpClient`] to provide a convenient
789/// interface for Python bindings and instrument caching.
790#[derive(Debug)]
791#[cfg_attr(
792    feature = "python",
793    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.architect")
794)]
795pub struct AxHttpClient {
796    pub(crate) inner: Arc<AxRawHttpClient>,
797    pub(crate) instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
798    cache_initialized: AtomicBool,
799}
800
801impl Clone for AxHttpClient {
802    fn clone(&self) -> Self {
803        let cache_initialized = AtomicBool::new(false);
804
805        let is_initialized = self.cache_initialized.load(Ordering::Acquire);
806        if is_initialized {
807            cache_initialized.store(true, Ordering::Release);
808        }
809
810        Self {
811            inner: self.inner.clone(),
812            instruments_cache: self.instruments_cache.clone(),
813            cache_initialized,
814        }
815    }
816}
817
818impl Default for AxHttpClient {
819    fn default() -> Self {
820        Self::new(None, None, None, None, None, None, None)
821            .expect("Failed to create default AxHttpClient")
822    }
823}
824
825impl AxHttpClient {
826    /// Creates a new [`AxHttpClient`] using the default Ax HTTP URL.
827    ///
828    /// # Errors
829    ///
830    /// Returns an error if the retry manager cannot be created.
831    #[allow(clippy::too_many_arguments)]
832    pub fn new(
833        base_url: Option<String>,
834        orders_base_url: Option<String>,
835        timeout_secs: Option<u64>,
836        max_retries: Option<u32>,
837        retry_delay_ms: Option<u64>,
838        retry_delay_max_ms: Option<u64>,
839        proxy_url: Option<String>,
840    ) -> Result<Self, AxHttpError> {
841        Ok(Self {
842            inner: Arc::new(AxRawHttpClient::new(
843                base_url,
844                orders_base_url,
845                timeout_secs,
846                max_retries,
847                retry_delay_ms,
848                retry_delay_max_ms,
849                proxy_url,
850            )?),
851            instruments_cache: Arc::new(DashMap::new()),
852            cache_initialized: AtomicBool::new(false),
853        })
854    }
855
856    /// Creates a new [`AxHttpClient`] configured with credentials.
857    ///
858    /// # Errors
859    ///
860    /// Returns an error if the HTTP client cannot be created.
861    #[allow(clippy::too_many_arguments)]
862    pub fn with_credentials(
863        api_key: String,
864        api_secret: String,
865        base_url: Option<String>,
866        orders_base_url: Option<String>,
867        timeout_secs: Option<u64>,
868        max_retries: Option<u32>,
869        retry_delay_ms: Option<u64>,
870        retry_delay_max_ms: Option<u64>,
871        proxy_url: Option<String>,
872    ) -> Result<Self, AxHttpError> {
873        Ok(Self {
874            inner: Arc::new(AxRawHttpClient::with_credentials(
875                api_key,
876                api_secret,
877                base_url,
878                orders_base_url,
879                timeout_secs,
880                max_retries,
881                retry_delay_ms,
882                retry_delay_max_ms,
883                proxy_url,
884            )?),
885            instruments_cache: Arc::new(DashMap::new()),
886            cache_initialized: AtomicBool::new(false),
887        })
888    }
889
890    /// Returns the base URL for this client.
891    #[must_use]
892    pub fn base_url(&self) -> &str {
893        self.inner.base_url()
894    }
895
896    /// Cancel all pending HTTP requests.
897    pub fn cancel_all_requests(&self) {
898        self.inner.cancel_all_requests();
899    }
900
901    /// Sets the session token for authenticated requests.
902    ///
903    /// The session token is obtained through the login flow and used for bearer token authentication.
904    pub fn set_session_token(&self, token: String) {
905        self.inner.set_session_token(token);
906    }
907
908    /// Generates a timestamp for initialization.
909    fn generate_ts_init(&self) -> UnixNanos {
910        get_atomic_clock_realtime().get_time_ns()
911    }
912
913    /// Checks if the client is initialized.
914    ///
915    /// The client is considered initialized if any instruments have been cached from the venue.
916    #[must_use]
917    pub fn is_initialized(&self) -> bool {
918        self.cache_initialized.load(Ordering::Acquire)
919    }
920
921    /// Returns a snapshot of all instrument symbols currently held in the internal cache.
922    #[must_use]
923    pub fn get_cached_symbols(&self) -> Vec<String> {
924        self.instruments_cache
925            .iter()
926            .map(|entry| entry.key().to_string())
927            .collect()
928    }
929
930    /// Caches multiple instruments.
931    ///
932    /// Any existing instruments with the same symbols will be replaced.
933    pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
934        for inst in instruments {
935            self.instruments_cache
936                .insert(inst.raw_symbol().inner(), inst);
937        }
938        self.cache_initialized.store(true, Ordering::Release);
939    }
940
941    /// Caches a single instrument.
942    ///
943    /// Any existing instrument with the same symbol will be replaced.
944    pub fn cache_instrument(&self, instrument: InstrumentAny) {
945        self.instruments_cache
946            .insert(instrument.raw_symbol().inner(), instrument);
947        self.cache_initialized.store(true, Ordering::Release);
948    }
949
950    /// Authenticates with Ax using API credentials.
951    ///
952    /// On success, the session token is automatically stored for subsequent authenticated requests.
953    ///
954    /// # Errors
955    ///
956    /// Returns an error if the HTTP request fails or credentials are invalid.
957    pub async fn authenticate(
958        &self,
959        api_key: &str,
960        api_secret: &str,
961        expiration_seconds: i32,
962    ) -> Result<String, AxHttpError> {
963        let resp = self
964            .inner
965            .authenticate(api_key, api_secret, expiration_seconds)
966            .await?;
967        self.inner.set_session_token(resp.token.clone());
968        Ok(resp.token)
969    }
970
971    /// Authenticates with Ax using API credentials and TOTP.
972    ///
973    /// On success, the session token is automatically stored for subsequent authenticated requests.
974    ///
975    /// # Errors
976    ///
977    /// Returns an error if the HTTP request fails or credentials are invalid.
978    pub async fn authenticate_with_totp(
979        &self,
980        api_key: &str,
981        api_secret: &str,
982        expiration_seconds: i32,
983        totp_code: Option<&str>,
984    ) -> Result<String, AxHttpError> {
985        let resp = self
986            .inner
987            .authenticate_with_totp(api_key, api_secret, expiration_seconds, totp_code)
988            .await?;
989        self.inner.set_session_token(resp.token.clone());
990        Ok(resp.token)
991    }
992
993    /// Gets an instrument from the cache by symbol.
994    pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
995        self.instruments_cache
996            .get(symbol)
997            .map(|entry| entry.value().clone())
998    }
999
1000    /// Requests all instruments from Ax.
1001    ///
1002    /// # Errors
1003    ///
1004    /// Returns an error if the HTTP request fails or instrument parsing fails.
1005    pub async fn request_instruments(
1006        &self,
1007        maker_fee: Option<Decimal>,
1008        taker_fee: Option<Decimal>,
1009    ) -> anyhow::Result<Vec<InstrumentAny>> {
1010        let resp = self
1011            .inner
1012            .get_instruments()
1013            .await
1014            .map_err(|e| anyhow::anyhow!(e))?;
1015
1016        let maker_fee = maker_fee.unwrap_or(Decimal::ZERO);
1017        let taker_fee = taker_fee.unwrap_or(Decimal::ZERO);
1018        let ts_init = self.generate_ts_init();
1019
1020        let mut instruments: Vec<InstrumentAny> = Vec::new();
1021        for inst in &resp.instruments {
1022            if inst.state == AxInstrumentState::Suspended {
1023                log::debug!("Skipping suspended instrument: {}", inst.symbol);
1024                continue;
1025            }
1026
1027            match parse_perp_instrument(inst, maker_fee, taker_fee, ts_init, ts_init) {
1028                Ok(instrument) => instruments.push(instrument),
1029                Err(e) => {
1030                    log::warn!("Failed to parse instrument {}: {e}", inst.symbol);
1031                }
1032            }
1033        }
1034
1035        Ok(instruments)
1036    }
1037
1038    /// Requests a single instrument from Ax by symbol.
1039    ///
1040    /// # Errors
1041    ///
1042    /// Returns an error if the HTTP request fails or instrument parsing fails.
1043    pub async fn request_instrument(
1044        &self,
1045        symbol: &str,
1046        maker_fee: Option<Decimal>,
1047        taker_fee: Option<Decimal>,
1048    ) -> anyhow::Result<InstrumentAny> {
1049        let resp = self
1050            .inner
1051            .get_instrument(symbol)
1052            .await
1053            .map_err(|e| anyhow::anyhow!(e))?;
1054
1055        let maker_fee = maker_fee.unwrap_or(Decimal::ZERO);
1056        let taker_fee = taker_fee.unwrap_or(Decimal::ZERO);
1057        let ts_init = self.generate_ts_init();
1058
1059        parse_perp_instrument(&resp, maker_fee, taker_fee, ts_init, ts_init)
1060    }
1061
1062    /// Requests account state from Ax and parses to a Nautilus [`AccountState`].
1063    ///
1064    /// # Errors
1065    ///
1066    /// Returns an error if the HTTP request fails or parsing fails.
1067    pub async fn request_account_state(
1068        &self,
1069        account_id: AccountId,
1070    ) -> anyhow::Result<AccountState> {
1071        let response = self
1072            .inner
1073            .get_balances()
1074            .await
1075            .map_err(|e| anyhow::anyhow!(e))?;
1076
1077        let ts_init = self.generate_ts_init();
1078        parse_account_state(&response, account_id, ts_init, ts_init)
1079    }
1080
1081    /// Requests funding rates from Ax.
1082    ///
1083    /// # Errors
1084    ///
1085    /// Returns an error if the HTTP request fails.
1086    pub async fn request_funding_rates(
1087        &self,
1088        symbol: &str,
1089        start_timestamp_ns: i64,
1090        end_timestamp_ns: i64,
1091    ) -> Result<AxFundingRatesResponse, AxHttpError> {
1092        self.inner
1093            .get_funding_rates(symbol, start_timestamp_ns, end_timestamp_ns)
1094            .await
1095    }
1096
1097    /// Requests historical bars from Ax and parses them to Nautilus Bar types.
1098    ///
1099    /// Requires the instrument to be cached (call `request_instruments` first).
1100    ///
1101    /// # Errors
1102    ///
1103    /// Returns an error if:
1104    /// - The instrument is not found in the cache.
1105    /// - The HTTP request fails.
1106    /// - Bar parsing fails.
1107    pub async fn request_bars(
1108        &self,
1109        symbol: &str,
1110        start_timestamp_ns: i64,
1111        end_timestamp_ns: i64,
1112        width: AxCandleWidth,
1113    ) -> anyhow::Result<Vec<Bar>> {
1114        let symbol_ustr = ustr::Ustr::from(symbol);
1115        let instrument = self
1116            .get_instrument(&symbol_ustr)
1117            .ok_or_else(|| anyhow::anyhow!("Instrument {symbol} not found in cache"))?;
1118
1119        let resp = self
1120            .inner
1121            .get_candles(symbol, start_timestamp_ns, end_timestamp_ns, width)
1122            .await
1123            .map_err(|e| anyhow::anyhow!(e))?;
1124
1125        let ts_init = self.generate_ts_init();
1126        let mut bars = Vec::with_capacity(resp.candles.len());
1127
1128        for candle in &resp.candles {
1129            match parse_bar(candle, &instrument, ts_init) {
1130                Ok(bar) => bars.push(bar),
1131                Err(e) => {
1132                    log::warn!("Failed to parse bar for {symbol}: {e}");
1133                }
1134            }
1135        }
1136
1137        Ok(bars)
1138    }
1139
1140    /// Requests open orders from Ax and parses them to Nautilus [`OrderStatusReport`].
1141    ///
1142    /// Requires instruments to be cached for parsing order details.
1143    ///
1144    /// # Errors
1145    ///
1146    /// Returns an error if:
1147    /// - The HTTP request fails.
1148    /// - An order's instrument is not found in the cache.
1149    /// - Order parsing fails.
1150    pub async fn request_order_status_reports(
1151        &self,
1152        account_id: AccountId,
1153    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1154        let orders = self
1155            .inner
1156            .get_open_orders()
1157            .await
1158            .map_err(|e| anyhow::anyhow!(e))?;
1159
1160        let ts_init = self.generate_ts_init();
1161        let mut reports = Vec::with_capacity(orders.len());
1162
1163        for order in &orders {
1164            let instrument = self
1165                .get_instrument(&order.s)
1166                .ok_or_else(|| anyhow::anyhow!("Instrument {} not found in cache", order.s))?;
1167
1168            match parse_order_status_report(order, account_id, &instrument, ts_init) {
1169                Ok(report) => reports.push(report),
1170                Err(e) => {
1171                    log::warn!("Failed to parse order {}: {e}", order.oid);
1172                }
1173            }
1174        }
1175
1176        Ok(reports)
1177    }
1178
1179    /// Requests fills from Ax and parses them to Nautilus [`FillReport`].
1180    ///
1181    /// Requires instruments to be cached for parsing fill details.
1182    ///
1183    /// # Errors
1184    ///
1185    /// Returns an error if:
1186    /// - The HTTP request fails.
1187    /// - A fill's instrument is not found in the cache.
1188    /// - Fill parsing fails.
1189    pub async fn request_fill_reports(
1190        &self,
1191        account_id: AccountId,
1192    ) -> anyhow::Result<Vec<FillReport>> {
1193        let response = self
1194            .inner
1195            .get_fills()
1196            .await
1197            .map_err(|e| anyhow::anyhow!(e))?;
1198
1199        let ts_init = self.generate_ts_init();
1200        let mut reports = Vec::with_capacity(response.fills.len());
1201
1202        for fill in &response.fills {
1203            let instrument = self
1204                .get_instrument(&fill.symbol)
1205                .ok_or_else(|| anyhow::anyhow!("Instrument {} not found in cache", fill.symbol))?;
1206
1207            match parse_fill_report(fill, account_id, &instrument, ts_init) {
1208                Ok(report) => reports.push(report),
1209                Err(e) => {
1210                    log::warn!("Failed to parse fill {}: {e}", fill.execution_id);
1211                }
1212            }
1213        }
1214
1215        Ok(reports)
1216    }
1217
1218    /// Requests positions from Ax and parses them to Nautilus [`PositionStatusReport`].
1219    ///
1220    /// Requires instruments to be cached for parsing position details.
1221    ///
1222    /// # Errors
1223    ///
1224    /// Returns an error if:
1225    /// - The HTTP request fails.
1226    /// - A position's instrument is not found in the cache.
1227    /// - Position parsing fails.
1228    pub async fn request_position_reports(
1229        &self,
1230        account_id: AccountId,
1231    ) -> anyhow::Result<Vec<PositionStatusReport>> {
1232        let response = self
1233            .inner
1234            .get_positions()
1235            .await
1236            .map_err(|e| anyhow::anyhow!(e))?;
1237
1238        let ts_init = self.generate_ts_init();
1239        let mut reports = Vec::with_capacity(response.positions.len());
1240
1241        for position in &response.positions {
1242            let instrument = self.get_instrument(&position.symbol).ok_or_else(|| {
1243                anyhow::anyhow!("Instrument {} not found in cache", position.symbol)
1244            })?;
1245
1246            match parse_position_status_report(position, account_id, &instrument, ts_init) {
1247                Ok(report) => reports.push(report),
1248                Err(e) => {
1249                    log::warn!("Failed to parse position for {}: {e}", position.symbol);
1250                }
1251            }
1252        }
1253
1254        Ok(reports)
1255    }
1256}