Skip to main content

nautilus_architect_ax/http/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides the HTTP client integration for the Ax REST API.
17
18use std::{
19    collections::HashMap,
20    fmt::Debug,
21    num::NonZeroU32,
22    sync::{
23        Arc, LazyLock, RwLock,
24        atomic::{AtomicBool, Ordering},
25    },
26};
27
28use chrono::{DateTime, Utc};
29use dashmap::DashMap;
30use nautilus_core::{
31    UUID4, consts::NAUTILUS_USER_AGENT, nanos::UnixNanos, time::get_atomic_clock_realtime,
32};
33use nautilus_model::{
34    data::{Bar, BookOrder, FundingRateUpdate, TradeTick},
35    enums::{BookType, OrderSide, OrderType, TimeInForce},
36    events::AccountState,
37    identifiers::{AccountId, ClientOrderId, InstrumentId, VenueOrderId},
38    instruments::{Instrument, any::InstrumentAny},
39    orderbook::OrderBook,
40    reports::{FillReport, OrderStatusReport, PositionStatusReport},
41    types::{Price, Quantity},
42};
43use nautilus_network::{
44    http::HttpClient,
45    ratelimiter::quota::Quota,
46    retry::{RetryConfig, RetryManager},
47};
48use reqwest::{Method, header::USER_AGENT};
49use rust_decimal::Decimal;
50use serde::{Serialize, de::DeserializeOwned};
51use tokio_util::sync::CancellationToken;
52use ustr::Ustr;
53
54use super::{
55    error::AxHttpError,
56    models::{
57        AuthenticateApiKeyRequest, AxAuthenticateResponse, AxBalancesResponse,
58        AxBatchCancelOrdersResponse, AxBookResponse, AxCancelAllOrdersResponse,
59        AxCancelOrderResponse, AxCandle, AxCandleResponse, AxCandlesResponse, AxFillsResponse,
60        AxFundingRatesResponse, AxInitialMarginRequirementResponse, AxInstrument,
61        AxInstrumentsResponse, AxOpenOrdersResponse, AxOrderStatusQueryResponse, AxOrdersResponse,
62        AxPlaceOrderResponse, AxPositionsResponse, AxPreviewAggressiveLimitOrderResponse,
63        AxRiskSnapshotResponse, AxTicker, AxTickersResponse, AxTradesResponse,
64        AxTransactionsResponse, AxWhoAmI, BatchCancelOrdersRequest, CancelAllOrdersRequest,
65        CancelOrderRequest, PlaceOrderRequest, PreviewAggressiveLimitOrderRequest,
66    },
67    parse::{
68        parse_account_state, parse_bar, parse_fill_report, parse_funding_rate,
69        parse_order_status_report, parse_perp_instrument, parse_position_status_report,
70        parse_trade_tick,
71    },
72    query::{
73        GetBookParams, GetCandleParams, GetCandlesParams, GetFundingRatesParams,
74        GetInstrumentParams, GetOrderStatusParams, GetOrdersParams, GetTickerParams,
75        GetTradesParams, GetTransactionsParams,
76    },
77};
78use crate::common::{
79    consts::{AX_HTTP_URL, AX_ORDERS_URL},
80    credential::Credential,
81    enums::{AxCandleWidth, AxInstrumentState},
82    parse::client_order_id_to_cid,
83};
84
85/// Default Ax REST API rate limit.
86///
87/// Conservative default of 10 requests per second.
88pub static AX_REST_QUOTA: LazyLock<Quota> = LazyLock::new(|| {
89    Quota::per_second(NonZeroU32::new(10).expect("Should be a valid non-zero u32"))
90});
91
92const AX_GLOBAL_RATE_KEY: &str = "architect:global";
93
94/// Raw HTTP client for low-level AX Exchange API operations.
95///
96/// This client handles request/response operations with the AX Exchange API,
97/// returning venue-specific response types. It does not parse to Nautilus domain types.
98pub struct AxRawHttpClient {
99    base_url: String,
100    orders_base_url: String,
101    client: HttpClient,
102    credential: Option<Credential>,
103    session_token: RwLock<Option<String>>,
104    retry_manager: RetryManager<AxHttpError>,
105    cancellation_token: RwLock<CancellationToken>,
106}
107
108impl Default for AxRawHttpClient {
109    fn default() -> Self {
110        Self::new(None, None, Some(60), None, None, None, None)
111            .expect("Failed to create default AxRawHttpClient")
112    }
113}
114
115impl Debug for AxRawHttpClient {
116    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
117        let has_session_token = self.session_token.read().is_ok_and(|guard| guard.is_some());
118        f.debug_struct(stringify!(AxRawHttpClient))
119            .field("base_url", &self.base_url)
120            .field("orders_base_url", &self.orders_base_url)
121            .field("has_credentials", &self.credential.is_some())
122            .field("has_session_token", &has_session_token)
123            .finish()
124    }
125}
126
127impl AxRawHttpClient {
128    /// Returns the base URL for this client.
129    #[must_use]
130    pub fn base_url(&self) -> &str {
131        &self.base_url
132    }
133
134    /// Returns a masked version of the API key for logging purposes.
135    #[must_use]
136    pub fn api_key_masked(&self) -> String {
137        self.credential
138            .as_ref()
139            .map_or_else(|| "None".to_string(), |c| c.masked_api_key())
140    }
141
142    /// Cancel all pending HTTP requests.
143    ///
144    /// # Panics
145    ///
146    /// Panics if the cancellation token lock is poisoned.
147    pub fn cancel_all_requests(&self) {
148        self.cancellation_token
149            .read()
150            .expect("Lock poisoned")
151            .cancel();
152    }
153
154    /// Replaces the cancelled token so new requests can proceed after reconnect.
155    ///
156    /// # Panics
157    ///
158    /// Panics if the cancellation token lock is poisoned.
159    pub fn reset_cancellation_token(&self) {
160        *self.cancellation_token.write().expect("Lock poisoned") = CancellationToken::new();
161    }
162
163    /// Get a clone of the current cancellation token.
164    ///
165    /// # Panics
166    ///
167    /// Panics if the cancellation token lock is poisoned.
168    pub fn cancellation_token(&self) -> CancellationToken {
169        self.cancellation_token
170            .read()
171            .expect("Lock poisoned")
172            .clone()
173    }
174
175    /// Creates a new [`AxRawHttpClient`] using the default Ax HTTP URL.
176    ///
177    /// # Errors
178    ///
179    /// Returns an error if the retry manager cannot be created.
180    #[allow(clippy::too_many_arguments)]
181    pub fn new(
182        base_url: Option<String>,
183        orders_base_url: Option<String>,
184        timeout_secs: Option<u64>,
185        max_retries: Option<u32>,
186        retry_delay_ms: Option<u64>,
187        retry_delay_max_ms: Option<u64>,
188        proxy_url: Option<String>,
189    ) -> Result<Self, AxHttpError> {
190        let retry_config = RetryConfig {
191            max_retries: max_retries.unwrap_or(3),
192            initial_delay_ms: retry_delay_ms.unwrap_or(1000),
193            max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
194            backoff_factor: 2.0,
195            jitter_ms: 1000,
196            operation_timeout_ms: Some(60_000),
197            immediate_first: false,
198            max_elapsed_ms: Some(180_000),
199        };
200
201        let retry_manager = RetryManager::new(retry_config);
202
203        Ok(Self {
204            base_url: base_url.unwrap_or_else(|| AX_HTTP_URL.to_string()),
205            orders_base_url: orders_base_url.unwrap_or_else(|| AX_ORDERS_URL.to_string()),
206            client: HttpClient::new(
207                Self::default_headers(),
208                vec![],
209                Self::rate_limiter_quotas(),
210                Some(*AX_REST_QUOTA),
211                timeout_secs,
212                proxy_url,
213            )
214            .map_err(|e| AxHttpError::NetworkError(format!("Failed to create HTTP client: {e}")))?,
215            credential: None,
216            session_token: RwLock::new(None),
217            retry_manager,
218            cancellation_token: RwLock::new(CancellationToken::new()),
219        })
220    }
221
222    /// Creates a new [`AxRawHttpClient`] configured with credentials.
223    ///
224    /// # Errors
225    ///
226    /// Returns an error if the HTTP client cannot be created.
227    #[allow(clippy::too_many_arguments)]
228    pub fn with_credentials(
229        api_key: String,
230        api_secret: String,
231        base_url: Option<String>,
232        orders_base_url: Option<String>,
233        timeout_secs: Option<u64>,
234        max_retries: Option<u32>,
235        retry_delay_ms: Option<u64>,
236        retry_delay_max_ms: Option<u64>,
237        proxy_url: Option<String>,
238    ) -> Result<Self, AxHttpError> {
239        let retry_config = RetryConfig {
240            max_retries: max_retries.unwrap_or(3),
241            initial_delay_ms: retry_delay_ms.unwrap_or(1000),
242            max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
243            backoff_factor: 2.0,
244            jitter_ms: 1000,
245            operation_timeout_ms: Some(60_000),
246            immediate_first: false,
247            max_elapsed_ms: Some(180_000),
248        };
249
250        let retry_manager = RetryManager::new(retry_config);
251
252        Ok(Self {
253            base_url: base_url.unwrap_or_else(|| AX_HTTP_URL.to_string()),
254            orders_base_url: orders_base_url.unwrap_or_else(|| AX_ORDERS_URL.to_string()),
255            client: HttpClient::new(
256                Self::default_headers(),
257                vec![],
258                Self::rate_limiter_quotas(),
259                Some(*AX_REST_QUOTA),
260                timeout_secs,
261                proxy_url,
262            )
263            .map_err(|e| AxHttpError::NetworkError(format!("Failed to create HTTP client: {e}")))?,
264            credential: Some(Credential::new(api_key, api_secret)),
265            session_token: RwLock::new(None),
266            retry_manager,
267            cancellation_token: RwLock::new(CancellationToken::new()),
268        })
269    }
270
271    /// Sets the session token for authenticated requests.
272    ///
273    /// The session token is obtained through the login flow and used for bearer token authentication.
274    ///
275    /// # Panics
276    ///
277    /// Panics if the internal lock is poisoned (indicates a panic in another thread).
278    pub fn set_session_token(&self, token: String) {
279        // SAFETY: Lock poisoning indicates a panic in another thread, which is fatal
280        *self.session_token.write().expect("Lock poisoned") = Some(token);
281    }
282
283    fn default_headers() -> HashMap<String, String> {
284        HashMap::from([
285            (USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string()),
286            ("Accept".to_string(), "application/json".to_string()),
287        ])
288    }
289
290    fn rate_limiter_quotas() -> Vec<(String, Quota)> {
291        vec![(AX_GLOBAL_RATE_KEY.to_string(), *AX_REST_QUOTA)]
292    }
293
294    fn rate_limit_keys(endpoint: &str) -> Vec<String> {
295        let normalized = endpoint.split('?').next().unwrap_or(endpoint);
296        let route = format!("architect:{normalized}");
297
298        vec![AX_GLOBAL_RATE_KEY.to_string(), route]
299    }
300
301    fn auth_headers(&self) -> Result<HashMap<String, String>, AxHttpError> {
302        // SAFETY: Lock poisoning indicates a panic in another thread, which is fatal
303        let guard = self.session_token.read().expect("Lock poisoned");
304        let session_token = guard.as_ref().ok_or(AxHttpError::MissingSessionToken)?;
305
306        let mut headers = HashMap::new();
307        headers.insert(
308            "Authorization".to_string(),
309            format!("Bearer {session_token}"),
310        );
311
312        Ok(headers)
313    }
314
315    async fn send_request<T: DeserializeOwned, P: Serialize>(
316        &self,
317        method: Method,
318        endpoint: &str,
319        params: Option<&P>,
320        body: Option<Vec<u8>>,
321        authenticate: bool,
322    ) -> Result<T, AxHttpError> {
323        self.send_request_to_url(&self.base_url, method, endpoint, params, body, authenticate)
324            .await
325    }
326
327    async fn send_request_to_url<T: DeserializeOwned, P: Serialize>(
328        &self,
329        base_url: &str,
330        method: Method,
331        endpoint: &str,
332        params: Option<&P>,
333        body: Option<Vec<u8>>,
334        authenticate: bool,
335    ) -> Result<T, AxHttpError> {
336        let endpoint = endpoint.to_string();
337        let url = format!("{base_url}{endpoint}");
338
339        let params_str = if method == Method::GET || method == Method::DELETE {
340            params
341                .map(serde_urlencoded::to_string)
342                .transpose()
343                .map_err(|e| AxHttpError::JsonError(format!("Failed to serialize params: {e}")))?
344        } else {
345            None
346        };
347
348        let operation = || {
349            let url = url.clone();
350            let method = method.clone();
351            let endpoint = endpoint.clone();
352            let params_str = params_str.clone();
353            let body = body.clone();
354
355            async move {
356                let mut headers = Self::default_headers();
357
358                if authenticate {
359                    let auth_headers = self.auth_headers()?;
360                    headers.extend(auth_headers);
361                }
362
363                if body.is_some() {
364                    headers.insert("Content-Type".to_string(), "application/json".to_string());
365                }
366
367                let full_url = if let Some(ref query) = params_str {
368                    if query.is_empty() {
369                        url
370                    } else {
371                        format!("{url}?{query}")
372                    }
373                } else {
374                    url
375                };
376
377                let rate_limit_keys = Self::rate_limit_keys(&endpoint);
378
379                let response = self
380                    .client
381                    .request(
382                        method,
383                        full_url,
384                        None,
385                        Some(headers),
386                        body,
387                        None,
388                        Some(rate_limit_keys),
389                    )
390                    .await?;
391
392                let status = response.status;
393                let response_body = String::from_utf8_lossy(&response.body).to_string();
394
395                if !status.is_success() {
396                    return Err(AxHttpError::UnexpectedStatus {
397                        status: status.as_u16(),
398                        body: response_body,
399                    });
400                }
401
402                serde_json::from_str(&response_body).map_err(|e| {
403                    AxHttpError::JsonError(format!(
404                        "Failed to deserialize response: {e}\nBody: {response_body}"
405                    ))
406                })
407            }
408        };
409
410        // Only retry idempotent methods to avoid duplicate orders/cancels
411        let is_idempotent = matches!(method, Method::GET | Method::HEAD | Method::OPTIONS);
412        let should_retry = |error: &AxHttpError| -> bool { is_idempotent && error.is_retryable() };
413
414        let create_error = |msg: String| -> AxHttpError {
415            if msg == "canceled" {
416                AxHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
417            } else {
418                AxHttpError::NetworkError(msg)
419            }
420        };
421
422        let cancel_token = self
423            .cancellation_token
424            .read()
425            .expect("Lock poisoned")
426            .clone();
427
428        self.retry_manager
429            .execute_with_retry_with_cancel(
430                endpoint.as_str(),
431                operation,
432                should_retry,
433                create_error,
434                &cancel_token,
435            )
436            .await
437    }
438
439    /// Fetches the current authenticated user information.
440    ///
441    /// # Endpoint
442    /// `GET /whoami`
443    ///
444    /// # Errors
445    ///
446    /// Returns an error if the request fails or the response cannot be parsed.
447    pub async fn get_whoami(&self) -> Result<AxWhoAmI, AxHttpError> {
448        self.send_request::<AxWhoAmI, ()>(Method::GET, "/whoami", None, None, true)
449            .await
450    }
451
452    /// Fetches all available instruments.
453    ///
454    /// # Endpoint
455    /// `GET /instruments`
456    ///
457    /// # Errors
458    ///
459    /// Returns an error if the request fails or the response cannot be parsed.
460    pub async fn get_instruments(&self) -> Result<AxInstrumentsResponse, AxHttpError> {
461        self.send_request::<AxInstrumentsResponse, ()>(
462            Method::GET,
463            "/instruments",
464            None,
465            None,
466            false,
467        )
468        .await
469    }
470
471    /// Fetches all account balances for the authenticated user.
472    ///
473    /// # Endpoint
474    /// `GET /balances`
475    ///
476    /// # Errors
477    ///
478    /// Returns an error if the request fails or the response cannot be parsed.
479    pub async fn get_balances(&self) -> Result<AxBalancesResponse, AxHttpError> {
480        self.send_request::<AxBalancesResponse, ()>(Method::GET, "/balances", None, None, true)
481            .await
482    }
483
484    /// Fetches all open positions for the authenticated user.
485    ///
486    /// # Endpoint
487    /// `GET /positions`
488    ///
489    /// # Errors
490    ///
491    /// Returns an error if the request fails or the response cannot be parsed.
492    pub async fn get_positions(&self) -> Result<AxPositionsResponse, AxHttpError> {
493        self.send_request::<AxPositionsResponse, ()>(Method::GET, "/positions", None, None, true)
494            .await
495    }
496
497    /// Fetches all tickers.
498    ///
499    /// # Endpoint
500    /// `GET /tickers`
501    ///
502    /// # Errors
503    ///
504    /// Returns an error if the request fails or the response cannot be parsed.
505    pub async fn get_tickers(&self) -> Result<AxTickersResponse, AxHttpError> {
506        self.send_request::<AxTickersResponse, ()>(Method::GET, "/tickers", None, None, true)
507            .await
508    }
509
510    /// Fetches a single ticker by symbol.
511    ///
512    /// # Endpoint
513    /// `GET /ticker?symbol=<symbol>`
514    ///
515    /// # Errors
516    ///
517    /// Returns an error if the request fails or the response cannot be parsed.
518    pub async fn get_ticker(&self, symbol: Ustr) -> Result<AxTicker, AxHttpError> {
519        let params = GetTickerParams::new(symbol);
520        self.send_request::<AxTicker, _>(Method::GET, "/ticker", Some(&params), None, true)
521            .await
522    }
523
524    /// Fetches a single instrument by symbol.
525    ///
526    /// # Endpoint
527    /// `GET /instrument?symbol=<symbol>`
528    ///
529    /// # Errors
530    ///
531    /// Returns an error if the request fails or the response cannot be parsed.
532    pub async fn get_instrument(&self, symbol: Ustr) -> Result<AxInstrument, AxHttpError> {
533        let params = GetInstrumentParams::new(symbol);
534        self.send_request::<AxInstrument, _>(Method::GET, "/instrument", Some(&params), None, false)
535            .await
536    }
537
538    /// Authenticates using API key and secret to obtain a session token.
539    ///
540    /// # Endpoint
541    /// `POST /authenticate`
542    ///
543    /// # Errors
544    ///
545    /// Returns an error if the request fails or the response cannot be parsed.
546    pub async fn authenticate(
547        &self,
548        api_key: &str,
549        api_secret: &str,
550        expiration_seconds: i32,
551    ) -> Result<AxAuthenticateResponse, AxHttpError> {
552        let request = AuthenticateApiKeyRequest::new(api_key, api_secret, expiration_seconds);
553
554        let body = serde_json::to_vec(&request)
555            .map_err(|e| AxHttpError::JsonError(format!("Failed to serialize request: {e}")))?;
556
557        self.send_request::<AxAuthenticateResponse, ()>(
558            Method::POST,
559            "/authenticate",
560            None,
561            Some(body),
562            false,
563        )
564        .await
565    }
566
567    /// Authenticates using stored credentials or environment variables.
568    ///
569    /// # Credential Resolution
570    ///
571    /// Credentials are resolved in the following order:
572    /// 1. Stored credentials (from `with_credentials` constructor)
573    /// 2. Environment variables (`AX_API_KEY` and `AX_API_SECRET`)
574    ///
575    /// # Errors
576    ///
577    /// Returns an error if:
578    /// - No credentials are available from either source
579    /// - The HTTP request fails
580    /// - The credentials are invalid
581    pub async fn authenticate_auto(
582        &self,
583        expiration_seconds: i32,
584    ) -> Result<AxAuthenticateResponse, AxHttpError> {
585        let (api_key, api_secret) = self
586            .resolve_credentials()
587            .ok_or(AxHttpError::MissingCredentials)?;
588
589        self.authenticate(&api_key, &api_secret, expiration_seconds)
590            .await
591    }
592
593    fn resolve_credentials(&self) -> Option<(String, String)> {
594        if let Some(cred) = &self.credential {
595            return Some((cred.api_key().to_string(), cred.api_secret().to_string()));
596        }
597
598        let api_key = std::env::var("AX_API_KEY").ok()?;
599        let api_secret = std::env::var("AX_API_SECRET").ok()?;
600        Some((api_key, api_secret))
601    }
602
603    /// Places a new order.
604    ///
605    /// # Endpoint
606    /// `POST /place_order` (orders base URL)
607    ///
608    /// # Errors
609    ///
610    /// Returns an error if the request fails or the response cannot be parsed.
611    pub async fn place_order(
612        &self,
613        request: &PlaceOrderRequest,
614    ) -> Result<AxPlaceOrderResponse, AxHttpError> {
615        let body = serde_json::to_vec(request)
616            .map_err(|e| AxHttpError::JsonError(format!("Failed to serialize request: {e}")))?;
617        self.send_request_to_url::<AxPlaceOrderResponse, ()>(
618            &self.orders_base_url,
619            Method::POST,
620            "/place_order",
621            None,
622            Some(body),
623            true,
624        )
625        .await
626    }
627
628    /// Cancels an existing order.
629    ///
630    /// # Endpoint
631    /// `POST /cancel_order` (orders base URL)
632    ///
633    /// # Errors
634    ///
635    /// Returns an error if the request fails or the response cannot be parsed.
636    pub async fn cancel_order(&self, order_id: &str) -> Result<AxCancelOrderResponse, AxHttpError> {
637        let request = CancelOrderRequest::new(order_id);
638        let body = serde_json::to_vec(&request)
639            .map_err(|e| AxHttpError::JsonError(format!("Failed to serialize request: {e}")))?;
640        self.send_request_to_url::<AxCancelOrderResponse, ()>(
641            &self.orders_base_url,
642            Method::POST,
643            "/cancel_order",
644            None,
645            Some(body),
646            true,
647        )
648        .await
649    }
650
651    /// Cancels all open orders, optionally filtered by symbol or venue.
652    ///
653    /// # Endpoint
654    /// `POST /cancel_all_orders` (orders base URL)
655    ///
656    /// # Errors
657    ///
658    /// Returns an error if the request fails or the response cannot be parsed.
659    pub async fn cancel_all_orders(
660        &self,
661        request: &CancelAllOrdersRequest,
662    ) -> Result<AxCancelAllOrdersResponse, AxHttpError> {
663        let body = serde_json::to_vec(request)
664            .map_err(|e| AxHttpError::JsonError(format!("Failed to serialize request: {e}")))?;
665        self.send_request_to_url::<AxCancelAllOrdersResponse, ()>(
666            &self.orders_base_url,
667            Method::POST,
668            "/cancel_all_orders",
669            None,
670            Some(body),
671            true,
672        )
673        .await
674    }
675
676    /// Cancels multiple orders by their IDs in a single batch request.
677    ///
678    /// # Endpoint
679    /// `POST /batch_cancel_orders` (orders base URL)
680    ///
681    /// # Errors
682    ///
683    /// Returns an error if the request fails or the response cannot be parsed.
684    pub async fn batch_cancel_orders(
685        &self,
686        request: &BatchCancelOrdersRequest,
687    ) -> Result<AxBatchCancelOrdersResponse, AxHttpError> {
688        let body = serde_json::to_vec(request)
689            .map_err(|e| AxHttpError::JsonError(format!("Failed to serialize request: {e}")))?;
690        self.send_request_to_url::<AxBatchCancelOrdersResponse, ()>(
691            &self.orders_base_url,
692            Method::POST,
693            "/batch_cancel_orders",
694            None,
695            Some(body),
696            true,
697        )
698        .await
699    }
700
701    /// Fetches all open orders.
702    ///
703    /// # Endpoint
704    /// `GET /open_orders` (orders base URL)
705    ///
706    /// # Errors
707    ///
708    /// Returns an error if the request fails or the response cannot be parsed.
709    pub async fn get_open_orders(&self) -> Result<AxOpenOrdersResponse, AxHttpError> {
710        self.send_request_to_url::<AxOpenOrdersResponse, ()>(
711            &self.orders_base_url,
712            Method::GET,
713            "/open_orders",
714            None,
715            None,
716            true,
717        )
718        .await
719    }
720
721    /// Fetches all fills/trades.
722    ///
723    /// # Endpoint
724    /// `GET /fills`
725    ///
726    /// # Errors
727    ///
728    /// Returns an error if the request fails or the response cannot be parsed.
729    pub async fn get_fills(&self) -> Result<AxFillsResponse, AxHttpError> {
730        self.send_request::<AxFillsResponse, ()>(Method::GET, "/fills", None, None, true)
731            .await
732    }
733
734    /// Fetches historical candles.
735    ///
736    /// # Endpoint
737    /// `GET /candles`
738    ///
739    /// # Errors
740    ///
741    /// Returns an error if the request fails or the response cannot be parsed.
742    pub async fn get_candles(
743        &self,
744        symbol: Ustr,
745        start_timestamp_ns: i64,
746        end_timestamp_ns: i64,
747        candle_width: AxCandleWidth,
748    ) -> Result<AxCandlesResponse, AxHttpError> {
749        let params =
750            GetCandlesParams::new(symbol, start_timestamp_ns, end_timestamp_ns, candle_width);
751        self.send_request::<AxCandlesResponse, _>(
752            Method::GET,
753            "/candles",
754            Some(&params),
755            None,
756            true,
757        )
758        .await
759    }
760
761    /// Fetches the current (incomplete) candle.
762    ///
763    /// # Endpoint
764    /// `GET /candles/current`
765    ///
766    /// # Errors
767    ///
768    /// Returns an error if the request fails or the response cannot be parsed.
769    pub async fn get_current_candle(
770        &self,
771        symbol: Ustr,
772        candle_width: AxCandleWidth,
773    ) -> Result<AxCandle, AxHttpError> {
774        let params = GetCandleParams::new(symbol, candle_width);
775        let response = self
776            .send_request::<AxCandleResponse, _>(
777                Method::GET,
778                "/candles/current",
779                Some(&params),
780                None,
781                true,
782            )
783            .await?;
784        Ok(response.candle)
785    }
786
787    /// Fetches the last completed candle.
788    ///
789    /// # Endpoint
790    /// `GET /candles/last`
791    ///
792    /// # Errors
793    ///
794    /// Returns an error if the request fails or the response cannot be parsed.
795    pub async fn get_last_candle(
796        &self,
797        symbol: Ustr,
798        candle_width: AxCandleWidth,
799    ) -> Result<AxCandle, AxHttpError> {
800        let params = GetCandleParams::new(symbol, candle_width);
801        let response = self
802            .send_request::<AxCandleResponse, _>(
803                Method::GET,
804                "/candles/last",
805                Some(&params),
806                None,
807                true,
808            )
809            .await?;
810        Ok(response.candle)
811    }
812
813    /// Fetches funding rates for a symbol.
814    ///
815    /// # Endpoint
816    /// `GET /funding-rates`
817    ///
818    /// # Errors
819    ///
820    /// Returns an error if the request fails or the response cannot be parsed.
821    pub async fn get_funding_rates(
822        &self,
823        symbol: Ustr,
824        start_timestamp_ns: i64,
825        end_timestamp_ns: i64,
826    ) -> Result<AxFundingRatesResponse, AxHttpError> {
827        let params = GetFundingRatesParams::new(symbol, start_timestamp_ns, end_timestamp_ns);
828        self.send_request::<AxFundingRatesResponse, _>(
829            Method::GET,
830            "/funding-rates",
831            Some(&params),
832            None,
833            true,
834        )
835        .await
836    }
837
838    /// Fetches the current risk snapshot.
839    ///
840    /// # Endpoint
841    /// `GET /risk-snapshot`
842    ///
843    /// # Errors
844    ///
845    /// Returns an error if the request fails or the response cannot be parsed.
846    pub async fn get_risk_snapshot(&self) -> Result<AxRiskSnapshotResponse, AxHttpError> {
847        self.send_request::<AxRiskSnapshotResponse, ()>(
848            Method::GET,
849            "/risk-snapshot",
850            None,
851            None,
852            true,
853        )
854        .await
855    }
856
857    /// Previews an aggressive limit order to get the "take through" price.
858    ///
859    /// This endpoint calculates the price needed to sweep the order book for a given
860    /// quantity, which is used to simulate market orders on AX (which only supports
861    /// limit orders natively).
862    ///
863    /// # Endpoint
864    /// `POST /preview-aggressive-limit-order`
865    ///
866    /// # Errors
867    ///
868    /// Returns an error if the request fails or the response cannot be parsed.
869    pub async fn preview_aggressive_limit_order(
870        &self,
871        request: &PreviewAggressiveLimitOrderRequest,
872    ) -> Result<AxPreviewAggressiveLimitOrderResponse, AxHttpError> {
873        let body = serde_json::to_vec(request)
874            .map_err(|e| AxHttpError::JsonError(format!("Failed to serialize request: {e}")))?;
875        self.send_request::<AxPreviewAggressiveLimitOrderResponse, ()>(
876            Method::POST,
877            "/preview-aggressive-limit-order",
878            None,
879            Some(body),
880            true,
881        )
882        .await
883    }
884
885    /// Fetches transactions filtered by type.
886    ///
887    /// # Endpoint
888    /// `GET /transactions`
889    ///
890    /// # Errors
891    ///
892    /// Returns an error if the request fails or the response cannot be parsed.
893    pub async fn get_transactions(
894        &self,
895        transaction_types: Vec<String>,
896    ) -> Result<AxTransactionsResponse, AxHttpError> {
897        let params = GetTransactionsParams::new(transaction_types);
898        self.send_request::<AxTransactionsResponse, _>(
899            Method::GET,
900            "/transactions",
901            Some(&params),
902            None,
903            true,
904        )
905        .await
906    }
907
908    /// Fetches recent trades for a symbol.
909    ///
910    /// # Endpoint
911    /// `GET /trades`
912    ///
913    /// # Errors
914    ///
915    /// Returns an error if the request fails or the response cannot be parsed.
916    pub async fn get_trades(
917        &self,
918        symbol: Ustr,
919        limit: Option<i32>,
920    ) -> Result<AxTradesResponse, AxHttpError> {
921        let params = GetTradesParams::new(symbol, limit);
922        self.send_request::<AxTradesResponse, _>(Method::GET, "/trades", Some(&params), None, true)
923            .await
924    }
925
926    /// Fetches an order book snapshot for a symbol.
927    ///
928    /// # Endpoint
929    /// `GET /book`
930    ///
931    /// # Errors
932    ///
933    /// Returns an error if the request fails or the response cannot be parsed.
934    pub async fn get_book(
935        &self,
936        symbol: Ustr,
937        level: Option<i32>,
938    ) -> Result<AxBookResponse, AxHttpError> {
939        let params = GetBookParams::new(symbol, level);
940        self.send_request::<AxBookResponse, _>(Method::GET, "/book", Some(&params), None, false)
941            .await
942    }
943
944    /// Fetches the status of a single order by order ID.
945    ///
946    /// # Endpoint
947    /// `GET /order-status` (orders base URL)
948    ///
949    /// # Errors
950    ///
951    /// Returns an error if the request fails or the response cannot be parsed.
952    pub async fn get_order_status_by_id(
953        &self,
954        order_id: &str,
955    ) -> Result<AxOrderStatusQueryResponse, AxHttpError> {
956        let params = GetOrderStatusParams::by_order_id(order_id);
957        self.send_request_to_url::<AxOrderStatusQueryResponse, _>(
958            &self.orders_base_url,
959            Method::GET,
960            "/order-status",
961            Some(&params),
962            None,
963            true,
964        )
965        .await
966    }
967
968    /// Fetches the status of a single order by client order ID.
969    ///
970    /// # Endpoint
971    /// `GET /order-status` (orders base URL)
972    ///
973    /// # Errors
974    ///
975    /// Returns an error if the request fails or the response cannot be parsed.
976    pub async fn get_order_status_by_cid(
977        &self,
978        client_order_id: u64,
979    ) -> Result<AxOrderStatusQueryResponse, AxHttpError> {
980        let params = GetOrderStatusParams::by_client_order_id(client_order_id);
981        self.send_request_to_url::<AxOrderStatusQueryResponse, _>(
982            &self.orders_base_url,
983            Method::GET,
984            "/order-status",
985            Some(&params),
986            None,
987            true,
988        )
989        .await
990    }
991
992    /// Fetches historical orders with optional filters.
993    ///
994    /// # Endpoint
995    /// `GET /orders` (orders base URL)
996    ///
997    /// # Errors
998    ///
999    /// Returns an error if the request fails or the response cannot be parsed.
1000    pub async fn get_orders(
1001        &self,
1002        params: &GetOrdersParams,
1003    ) -> Result<AxOrdersResponse, AxHttpError> {
1004        self.send_request_to_url::<AxOrdersResponse, _>(
1005            &self.orders_base_url,
1006            Method::GET,
1007            "/orders",
1008            Some(params),
1009            None,
1010            true,
1011        )
1012        .await
1013    }
1014
1015    /// Checks the initial margin requirement for a proposed order.
1016    ///
1017    /// # Endpoint
1018    /// `POST /initial-margin-requirement` (orders base URL)
1019    ///
1020    /// # Errors
1021    ///
1022    /// Returns an error if the request fails or the response cannot be parsed.
1023    pub async fn check_initial_margin(
1024        &self,
1025        request: &PlaceOrderRequest,
1026    ) -> Result<AxInitialMarginRequirementResponse, AxHttpError> {
1027        let body = serde_json::to_vec(request)
1028            .map_err(|e| AxHttpError::JsonError(format!("Failed to serialize request: {e}")))?;
1029        self.send_request_to_url::<AxInitialMarginRequirementResponse, ()>(
1030            &self.orders_base_url,
1031            Method::POST,
1032            "/initial-margin-requirement",
1033            None,
1034            Some(body),
1035            true,
1036        )
1037        .await
1038    }
1039}
1040
1041/// High-level HTTP client for the Ax REST API.
1042///
1043/// This client wraps the underlying [`AxRawHttpClient`] to provide a convenient
1044/// interface for Python bindings and instrument caching.
1045#[derive(Debug)]
1046#[cfg_attr(
1047    feature = "python",
1048    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.architect")
1049)]
1050pub struct AxHttpClient {
1051    pub(crate) inner: Arc<AxRawHttpClient>,
1052    pub(crate) instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
1053    cache_initialized: Arc<AtomicBool>,
1054}
1055
1056impl Clone for AxHttpClient {
1057    fn clone(&self) -> Self {
1058        Self {
1059            inner: self.inner.clone(),
1060            instruments_cache: self.instruments_cache.clone(),
1061            cache_initialized: self.cache_initialized.clone(),
1062        }
1063    }
1064}
1065
1066impl Default for AxHttpClient {
1067    fn default() -> Self {
1068        Self::new(None, None, None, None, None, None, None)
1069            .expect("Failed to create default AxHttpClient")
1070    }
1071}
1072
1073impl AxHttpClient {
1074    /// Creates a new [`AxHttpClient`] using the default Ax HTTP URL.
1075    ///
1076    /// # Errors
1077    ///
1078    /// Returns an error if the retry manager cannot be created.
1079    #[allow(clippy::too_many_arguments)]
1080    pub fn new(
1081        base_url: Option<String>,
1082        orders_base_url: Option<String>,
1083        timeout_secs: Option<u64>,
1084        max_retries: Option<u32>,
1085        retry_delay_ms: Option<u64>,
1086        retry_delay_max_ms: Option<u64>,
1087        proxy_url: Option<String>,
1088    ) -> Result<Self, AxHttpError> {
1089        Ok(Self {
1090            inner: Arc::new(AxRawHttpClient::new(
1091                base_url,
1092                orders_base_url,
1093                timeout_secs,
1094                max_retries,
1095                retry_delay_ms,
1096                retry_delay_max_ms,
1097                proxy_url,
1098            )?),
1099            instruments_cache: Arc::new(DashMap::new()),
1100            cache_initialized: Arc::new(AtomicBool::new(false)),
1101        })
1102    }
1103
1104    /// Creates a new [`AxHttpClient`] configured with credentials.
1105    ///
1106    /// # Errors
1107    ///
1108    /// Returns an error if the HTTP client cannot be created.
1109    #[allow(clippy::too_many_arguments)]
1110    pub fn with_credentials(
1111        api_key: String,
1112        api_secret: String,
1113        base_url: Option<String>,
1114        orders_base_url: Option<String>,
1115        timeout_secs: Option<u64>,
1116        max_retries: Option<u32>,
1117        retry_delay_ms: Option<u64>,
1118        retry_delay_max_ms: Option<u64>,
1119        proxy_url: Option<String>,
1120    ) -> Result<Self, AxHttpError> {
1121        Ok(Self {
1122            inner: Arc::new(AxRawHttpClient::with_credentials(
1123                api_key,
1124                api_secret,
1125                base_url,
1126                orders_base_url,
1127                timeout_secs,
1128                max_retries,
1129                retry_delay_ms,
1130                retry_delay_max_ms,
1131                proxy_url,
1132            )?),
1133            instruments_cache: Arc::new(DashMap::new()),
1134            cache_initialized: Arc::new(AtomicBool::new(false)),
1135        })
1136    }
1137
1138    /// Returns the base URL for this client.
1139    #[must_use]
1140    pub fn base_url(&self) -> &str {
1141        self.inner.base_url()
1142    }
1143
1144    /// Returns a masked version of the API key for logging purposes.
1145    #[must_use]
1146    pub fn api_key_masked(&self) -> String {
1147        self.inner.api_key_masked()
1148    }
1149
1150    /// Cancel all pending HTTP requests.
1151    pub fn cancel_all_requests(&self) {
1152        self.inner.cancel_all_requests();
1153    }
1154
1155    /// Replaces the cancelled token so new requests can proceed after reconnect.
1156    pub fn reset_cancellation_token(&self) {
1157        self.inner.reset_cancellation_token();
1158    }
1159
1160    /// Sets the session token for authenticated requests.
1161    ///
1162    /// The session token is obtained through the login flow and used for bearer token authentication.
1163    pub fn set_session_token(&self, token: String) {
1164        self.inner.set_session_token(token);
1165    }
1166
1167    /// Generates a timestamp for initialization.
1168    fn generate_ts_init(&self) -> UnixNanos {
1169        get_atomic_clock_realtime().get_time_ns()
1170    }
1171
1172    /// Checks if the client is initialized.
1173    ///
1174    /// The client is considered initialized if any instruments have been cached from the venue.
1175    #[must_use]
1176    pub fn is_initialized(&self) -> bool {
1177        self.cache_initialized.load(Ordering::Acquire)
1178    }
1179
1180    /// Returns a snapshot of all instrument symbols currently held in the internal cache.
1181    #[must_use]
1182    pub fn get_cached_symbols(&self) -> Vec<String> {
1183        self.instruments_cache
1184            .iter()
1185            .map(|entry| entry.key().to_string())
1186            .collect()
1187    }
1188
1189    /// Caches multiple instruments.
1190    ///
1191    /// Any existing instruments with the same symbols will be replaced.
1192    pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
1193        for inst in instruments {
1194            self.instruments_cache
1195                .insert(inst.raw_symbol().inner(), inst);
1196        }
1197        self.cache_initialized.store(true, Ordering::Release);
1198    }
1199
1200    /// Caches a single instrument.
1201    ///
1202    /// Any existing instrument with the same symbol will be replaced.
1203    pub fn cache_instrument(&self, instrument: InstrumentAny) {
1204        self.instruments_cache
1205            .insert(instrument.raw_symbol().inner(), instrument);
1206        self.cache_initialized.store(true, Ordering::Release);
1207    }
1208
1209    /// Authenticates with Ax using API credentials.
1210    ///
1211    /// On success, the session token is automatically stored for subsequent authenticated requests.
1212    ///
1213    /// # Errors
1214    ///
1215    /// Returns an error if the HTTP request fails or credentials are invalid.
1216    pub async fn authenticate(
1217        &self,
1218        api_key: &str,
1219        api_secret: &str,
1220        expiration_seconds: i32,
1221    ) -> Result<String, AxHttpError> {
1222        let resp = self
1223            .inner
1224            .authenticate(api_key, api_secret, expiration_seconds)
1225            .await?;
1226        self.inner.set_session_token(resp.token.clone());
1227        Ok(resp.token)
1228    }
1229
1230    /// Authenticates using stored credentials or environment variables.
1231    ///
1232    /// # Credential Resolution
1233    ///
1234    /// Credentials are resolved in the following order:
1235    /// 1. Stored credentials (from `with_credentials` constructor)
1236    /// 2. Environment variables (`AX_API_KEY` and `AX_API_SECRET`)
1237    ///
1238    /// On success, the session token is automatically stored for subsequent authenticated requests.
1239    ///
1240    /// # Errors
1241    ///
1242    /// Returns an error if:
1243    /// - No credentials are available from either source
1244    /// - The HTTP request fails
1245    /// - The credentials are invalid
1246    pub async fn authenticate_auto(&self, expiration_seconds: i32) -> Result<String, AxHttpError> {
1247        let resp = self.inner.authenticate_auto(expiration_seconds).await?;
1248        self.inner.set_session_token(resp.token.clone());
1249        Ok(resp.token)
1250    }
1251
1252    /// Gets an instrument from the cache by symbol.
1253    pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
1254        self.instruments_cache
1255            .get(symbol)
1256            .map(|entry| entry.value().clone())
1257    }
1258
1259    /// Requests all instruments from Ax.
1260    ///
1261    /// # Errors
1262    ///
1263    /// Returns an error if the HTTP request fails or instrument parsing fails.
1264    pub async fn request_instruments(
1265        &self,
1266        maker_fee: Option<Decimal>,
1267        taker_fee: Option<Decimal>,
1268    ) -> anyhow::Result<Vec<InstrumentAny>> {
1269        let resp = self
1270            .inner
1271            .get_instruments()
1272            .await
1273            .map_err(|e| anyhow::anyhow!(e))?;
1274
1275        let maker_fee = maker_fee.unwrap_or(Decimal::ZERO);
1276        let taker_fee = taker_fee.unwrap_or(Decimal::ZERO);
1277        let ts_init = self.generate_ts_init();
1278
1279        let mut instruments: Vec<InstrumentAny> = Vec::new();
1280        for inst in &resp.instruments {
1281            if inst.state == AxInstrumentState::Suspended {
1282                log::debug!("Skipping suspended instrument: {}", inst.symbol);
1283                continue;
1284            }
1285
1286            // Skip test instruments (not real tradable products)
1287            if inst.symbol.as_str().starts_with("TEST") {
1288                log::debug!("Skipping test instrument: {}", inst.symbol);
1289                continue;
1290            }
1291
1292            match parse_perp_instrument(inst, maker_fee, taker_fee, ts_init, ts_init) {
1293                Ok(instrument) => instruments.push(instrument),
1294                Err(e) => {
1295                    log::warn!("Failed to parse instrument {}: {e}", inst.symbol);
1296                }
1297            }
1298        }
1299
1300        Ok(instruments)
1301    }
1302
1303    /// Requests a single instrument from Ax by symbol.
1304    ///
1305    /// # Errors
1306    ///
1307    /// Returns an error if the HTTP request fails or instrument parsing fails.
1308    pub async fn request_instrument(
1309        &self,
1310        symbol: Ustr,
1311        maker_fee: Option<Decimal>,
1312        taker_fee: Option<Decimal>,
1313    ) -> anyhow::Result<InstrumentAny> {
1314        let resp = self
1315            .inner
1316            .get_instrument(symbol)
1317            .await
1318            .map_err(|e| anyhow::anyhow!(e))?;
1319
1320        let maker_fee = maker_fee.unwrap_or(Decimal::ZERO);
1321        let taker_fee = taker_fee.unwrap_or(Decimal::ZERO);
1322        let ts_init = self.generate_ts_init();
1323
1324        parse_perp_instrument(&resp, maker_fee, taker_fee, ts_init, ts_init)
1325    }
1326
1327    /// Requests an order book snapshot from Ax and builds a Nautilus [`OrderBook`].
1328    ///
1329    /// Requires the instrument to be cached.
1330    ///
1331    /// # Errors
1332    ///
1333    /// Returns an error if:
1334    /// - The instrument is not found in the cache.
1335    /// - The HTTP request fails.
1336    pub async fn request_book_snapshot(
1337        &self,
1338        symbol: Ustr,
1339        depth: Option<usize>,
1340    ) -> anyhow::Result<OrderBook> {
1341        let instrument = self
1342            .get_instrument(&symbol)
1343            .ok_or_else(|| anyhow::anyhow!("Instrument {symbol} not found in cache"))?;
1344
1345        let resp = self
1346            .inner
1347            .get_book(symbol, Some(2))
1348            .await
1349            .map_err(|e| anyhow::anyhow!(e))?;
1350
1351        let instrument_id = instrument.id();
1352        let mut book = OrderBook::new(instrument_id, BookType::L2_MBP);
1353
1354        let price_precision = instrument.price_precision();
1355        let size_precision = instrument.size_precision();
1356        let ts_event = UnixNanos::from(resp.book.ts as u64 * 1_000_000_000 + resp.book.tn as u64);
1357
1358        for (i, level) in resp.book.b.iter().enumerate() {
1359            if depth.is_some_and(|d| i >= d) {
1360                break;
1361            }
1362            let price = Price::from_decimal_dp(level.p, price_precision)
1363                .unwrap_or_else(|_| Price::from(level.p.to_string().as_str()));
1364            let size = Quantity::new(level.q as f64, size_precision);
1365            let order = BookOrder::new(OrderSide::Buy, price, size, i as u64);
1366            book.add(order, 0, i as u64, ts_event);
1367        }
1368
1369        let bids_len = resp.book.b.len();
1370        for (i, level) in resp.book.a.iter().enumerate() {
1371            if depth.is_some_and(|d| i >= d) {
1372                break;
1373            }
1374            let price = Price::from_decimal_dp(level.p, price_precision)
1375                .unwrap_or_else(|_| Price::from(level.p.to_string().as_str()));
1376            let size = Quantity::new(level.q as f64, size_precision);
1377            let order = BookOrder::new(OrderSide::Sell, price, size, (bids_len + i) as u64);
1378            book.add(order, 0, (bids_len + i) as u64, ts_event);
1379        }
1380
1381        Ok(book)
1382    }
1383
1384    /// Requests recent trades from Ax and parses them to Nautilus [`TradeTick`].
1385    ///
1386    /// The AX trades endpoint does not accept time range parameters, so
1387    /// `start` and `end` are applied as client-side filters after fetching.
1388    ///
1389    /// Requires the instrument to be cached.
1390    ///
1391    /// # Errors
1392    ///
1393    /// Returns an error if:
1394    /// - The instrument is not found in the cache.
1395    /// - The HTTP request fails.
1396    /// - Trade parsing fails.
1397    pub async fn request_trade_ticks(
1398        &self,
1399        symbol: Ustr,
1400        limit: Option<i32>,
1401        start: Option<UnixNanos>,
1402        end: Option<UnixNanos>,
1403    ) -> anyhow::Result<Vec<TradeTick>> {
1404        let instrument = self
1405            .get_instrument(&symbol)
1406            .ok_or_else(|| anyhow::anyhow!("Instrument {symbol} not found in cache"))?;
1407
1408        let resp = self
1409            .inner
1410            .get_trades(symbol, limit)
1411            .await
1412            .map_err(|e| anyhow::anyhow!(e))?;
1413
1414        let ts_init = self.generate_ts_init();
1415        let mut ticks = Vec::with_capacity(resp.trades.len());
1416
1417        for trade in &resp.trades {
1418            match parse_trade_tick(trade, &instrument, ts_init) {
1419                Ok(tick) => {
1420                    if start.is_some_and(|s| tick.ts_event < s) {
1421                        continue;
1422                    }
1423                    if end.is_some_and(|e| tick.ts_event > e) {
1424                        continue;
1425                    }
1426                    ticks.push(tick);
1427                }
1428                Err(e) => {
1429                    log::warn!("Failed to parse trade for {symbol}: {e}");
1430                }
1431            }
1432        }
1433
1434        Ok(ticks)
1435    }
1436
1437    /// Requests historical bars from Ax and parses them to Nautilus Bar types.
1438    ///
1439    /// Requires the instrument to be cached (call `request_instruments` first).
1440    ///
1441    /// # Errors
1442    ///
1443    /// Returns an error if:
1444    /// - The instrument is not found in the cache.
1445    /// - The HTTP request fails.
1446    /// - Bar parsing fails.
1447    pub async fn request_bars(
1448        &self,
1449        symbol: Ustr,
1450        start: Option<DateTime<Utc>>,
1451        end: Option<DateTime<Utc>>,
1452        width: AxCandleWidth,
1453    ) -> anyhow::Result<Vec<Bar>> {
1454        let instrument = self
1455            .get_instrument(&symbol)
1456            .ok_or_else(|| anyhow::anyhow!("Instrument {symbol} not found in cache"))?;
1457
1458        let start_ns = start.and_then(|dt| dt.timestamp_nanos_opt()).unwrap_or(0);
1459        let end_ns = end
1460            .and_then(|dt| dt.timestamp_nanos_opt())
1461            .unwrap_or_else(|| self.generate_ts_init().as_i64());
1462        let resp = self
1463            .inner
1464            .get_candles(symbol, start_ns, end_ns, width)
1465            .await
1466            .map_err(|e| anyhow::anyhow!(e))?;
1467
1468        let ts_init = self.generate_ts_init();
1469        let mut bars = Vec::with_capacity(resp.candles.len());
1470
1471        for candle in &resp.candles {
1472            match parse_bar(candle, &instrument, ts_init) {
1473                Ok(bar) => bars.push(bar),
1474                Err(e) => {
1475                    log::warn!("Failed to parse bar for {symbol}: {e}");
1476                }
1477            }
1478        }
1479
1480        Ok(bars)
1481    }
1482
1483    /// Requests funding rates from Ax and parses them to Nautilus types.
1484    ///
1485    /// # Errors
1486    ///
1487    /// Returns an error if the HTTP request fails.
1488    pub async fn request_funding_rates(
1489        &self,
1490        instrument_id: InstrumentId,
1491        start: Option<DateTime<Utc>>,
1492        end: Option<DateTime<Utc>>,
1493    ) -> Result<Vec<FundingRateUpdate>, AxHttpError> {
1494        let symbol = instrument_id.symbol.inner();
1495        let start_ns = start.and_then(|dt| dt.timestamp_nanos_opt()).unwrap_or(0);
1496        let end_ns = end
1497            .and_then(|dt| dt.timestamp_nanos_opt())
1498            .unwrap_or_else(|| self.generate_ts_init().as_i64());
1499        let response = self
1500            .inner
1501            .get_funding_rates(symbol, start_ns, end_ns)
1502            .await?;
1503
1504        let ts_init = self.generate_ts_init();
1505        let funding_rates = response
1506            .funding_rates
1507            .iter()
1508            .map(|r| parse_funding_rate(r, instrument_id, ts_init))
1509            .collect();
1510
1511        Ok(funding_rates)
1512    }
1513
1514    /// Requests account state from Ax and parses to a Nautilus [`AccountState`].
1515    ///
1516    /// # Errors
1517    ///
1518    /// Returns an error if the HTTP request fails or parsing fails.
1519    pub async fn request_account_state(
1520        &self,
1521        account_id: AccountId,
1522    ) -> anyhow::Result<AccountState> {
1523        let response = self
1524            .inner
1525            .get_balances()
1526            .await
1527            .map_err(|e| anyhow::anyhow!(e))?;
1528
1529        let ts_init = self.generate_ts_init();
1530        parse_account_state(&response, account_id, ts_init, ts_init)
1531    }
1532
1533    /// Checks the initial margin requirement for a proposed order.
1534    ///
1535    /// # Errors
1536    ///
1537    /// Returns an error if the HTTP request fails.
1538    pub async fn check_initial_margin(
1539        &self,
1540        request: &PlaceOrderRequest,
1541    ) -> anyhow::Result<Decimal> {
1542        let resp = self
1543            .inner
1544            .check_initial_margin(request)
1545            .await
1546            .map_err(|e| anyhow::anyhow!(e))?;
1547        Ok(resp.im)
1548    }
1549
1550    /// Queries a single order by venue order ID or client order ID using the
1551    /// dedicated `/order-status` endpoint, which works for any order state.
1552    ///
1553    /// The caller must supply `order_side`, `order_type`, and `time_in_force`
1554    /// because the endpoint does not return these fields.
1555    ///
1556    /// # Errors
1557    ///
1558    /// Returns an error if:
1559    /// - Neither `venue_order_id` nor `client_order_id` is provided.
1560    /// - The HTTP request fails.
1561    #[allow(clippy::too_many_arguments)]
1562    pub async fn request_order_status(
1563        &self,
1564        account_id: AccountId,
1565        instrument_id: InstrumentId,
1566        client_order_id: Option<ClientOrderId>,
1567        venue_order_id: Option<VenueOrderId>,
1568        order_side: OrderSide,
1569        order_type: OrderType,
1570        time_in_force: TimeInForce,
1571    ) -> anyhow::Result<OrderStatusReport> {
1572        let resp = if let Some(ref voi) = venue_order_id {
1573            self.inner.get_order_status_by_id(voi.as_str()).await
1574        } else if let Some(ref coid) = client_order_id {
1575            let cid = client_order_id_to_cid(coid);
1576            self.inner.get_order_status_by_cid(cid).await
1577        } else {
1578            anyhow::bail!("Either venue_order_id or client_order_id must be provided")
1579        }
1580        .map_err(|e| anyhow::anyhow!(e))?;
1581
1582        let detail = resp.status;
1583        let size_precision = self
1584            .get_instrument(&detail.symbol)
1585            .map_or(0, |i| i.size_precision());
1586
1587        let voi = VenueOrderId::new(&detail.order_id);
1588        let order_status = detail.state.into();
1589        let filled = detail.filled_quantity.unwrap_or(0);
1590        let remaining = detail.remaining_quantity.unwrap_or(0);
1591        let quantity = Quantity::new((filled + remaining) as f64, size_precision);
1592        let filled_qty = Quantity::new(filled as f64, size_precision);
1593        let ts_init = self.generate_ts_init();
1594
1595        let resolved_coid = client_order_id
1596            .unwrap_or_else(|| ClientOrderId::new(format!("CID-{}", detail.clord_id.unwrap_or(0))));
1597
1598        Ok(OrderStatusReport::new(
1599            account_id,
1600            instrument_id,
1601            Some(resolved_coid),
1602            voi,
1603            order_side,
1604            order_type,
1605            time_in_force,
1606            order_status,
1607            quantity,
1608            filled_qty,
1609            ts_init,
1610            ts_init,
1611            ts_init,
1612            Some(UUID4::new()),
1613        ))
1614    }
1615
1616    /// Requests open orders from Ax and parses them to Nautilus [`OrderStatusReport`].
1617    ///
1618    /// Requires instruments to be cached for parsing order details.
1619    ///
1620    /// The `cid_resolver` parameter is an optional function that resolves a `cid` (u64)
1621    /// to a `ClientOrderId`. This is needed for correlating orders submitted via WebSocket.
1622    ///
1623    /// # Errors
1624    ///
1625    /// Returns an error if:
1626    /// - The HTTP request fails.
1627    /// - An order's instrument is not found in the cache.
1628    /// - Order parsing fails.
1629    pub async fn request_order_status_reports<F>(
1630        &self,
1631        account_id: AccountId,
1632        cid_resolver: Option<F>,
1633    ) -> anyhow::Result<Vec<OrderStatusReport>>
1634    where
1635        F: Fn(u64) -> Option<ClientOrderId>,
1636    {
1637        let response = self
1638            .inner
1639            .get_open_orders()
1640            .await
1641            .map_err(|e| anyhow::anyhow!(e))?;
1642
1643        let ts_init = self.generate_ts_init();
1644        let mut reports = Vec::with_capacity(response.orders.len());
1645
1646        for order in &response.orders {
1647            let instrument = self
1648                .get_instrument(&order.s)
1649                .ok_or_else(|| anyhow::anyhow!("Instrument {} not found in cache", order.s))?;
1650
1651            match parse_order_status_report(
1652                order,
1653                account_id,
1654                &instrument,
1655                ts_init,
1656                cid_resolver.as_ref(),
1657            ) {
1658                Ok(report) => reports.push(report),
1659                Err(e) => {
1660                    log::warn!("Failed to parse order {}: {e}", order.oid);
1661                }
1662            }
1663        }
1664
1665        Ok(reports)
1666    }
1667
1668    /// Requests fills from Ax and parses them to Nautilus [`FillReport`].
1669    ///
1670    /// Requires instruments to be cached for parsing fill details.
1671    ///
1672    /// # Errors
1673    ///
1674    /// Returns an error if:
1675    /// - The HTTP request fails.
1676    /// - A fill's instrument is not found in the cache.
1677    /// - Fill parsing fails.
1678    pub async fn request_fill_reports(
1679        &self,
1680        account_id: AccountId,
1681    ) -> anyhow::Result<Vec<FillReport>> {
1682        let response = self
1683            .inner
1684            .get_fills()
1685            .await
1686            .map_err(|e| anyhow::anyhow!(e))?;
1687
1688        let ts_init = self.generate_ts_init();
1689        let mut reports = Vec::with_capacity(response.fills.len());
1690
1691        for fill in &response.fills {
1692            let instrument = self
1693                .get_instrument(&fill.symbol)
1694                .ok_or_else(|| anyhow::anyhow!("Instrument {} not found in cache", fill.symbol))?;
1695
1696            match parse_fill_report(fill, account_id, &instrument, ts_init) {
1697                Ok(report) => reports.push(report),
1698                Err(e) => {
1699                    log::warn!("Failed to parse fill {}: {e}", fill.trade_id);
1700                }
1701            }
1702        }
1703
1704        Ok(reports)
1705    }
1706
1707    /// Requests positions from Ax and parses them to Nautilus [`PositionStatusReport`].
1708    ///
1709    /// Requires instruments to be cached for parsing position details.
1710    ///
1711    /// # Errors
1712    ///
1713    /// Returns an error if:
1714    /// - The HTTP request fails.
1715    /// - A position's instrument is not found in the cache.
1716    /// - Position parsing fails.
1717    pub async fn request_position_reports(
1718        &self,
1719        account_id: AccountId,
1720    ) -> anyhow::Result<Vec<PositionStatusReport>> {
1721        let response = self
1722            .inner
1723            .get_positions()
1724            .await
1725            .map_err(|e| anyhow::anyhow!(e))?;
1726
1727        let ts_init = self.generate_ts_init();
1728        let mut reports = Vec::with_capacity(response.positions.len());
1729
1730        for position in &response.positions {
1731            // Skip flat positions (zero quantity)
1732            if position.signed_quantity == 0 {
1733                continue;
1734            }
1735
1736            let instrument = self.get_instrument(&position.symbol).ok_or_else(|| {
1737                anyhow::anyhow!("Instrument {} not found in cache", position.symbol)
1738            })?;
1739
1740            match parse_position_status_report(position, account_id, &instrument, ts_init) {
1741                Ok(report) => reports.push(report),
1742                Err(e) => {
1743                    log::warn!("Failed to parse position for {}: {e}", position.symbol);
1744                }
1745            }
1746        }
1747
1748        Ok(reports)
1749    }
1750}