nautilus_coinbase_intx/http/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides the HTTP client integration for the [Coinbase International](https://www.coinbase.com/en/international-exchange) REST API.
17//!
18//! This module defines and implements a [`CoinbaseIntxHttpClient`] for
19//! sending requests to various Coinbase endpoints. It handles request signing
20//! (when credentials are provided), constructs valid HTTP requests
21//! using the [`HttpClient`], and parses the responses back into structured data or a [`CoinbaseIntxHttpError`].
22
23use std::{
24    collections::HashMap,
25    num::NonZeroU32,
26    sync::{Arc, LazyLock, Mutex},
27};
28
29use anyhow::Context;
30use chrono::{DateTime, Utc};
31use nautilus_core::{
32    MUTEX_POISONED, UnixNanos, consts::NAUTILUS_USER_AGENT, env::get_or_env_var,
33    time::get_atomic_clock_realtime,
34};
35use nautilus_model::{
36    enums::{OrderSide, OrderType, TimeInForce},
37    events::AccountState,
38    identifiers::{AccountId, ClientOrderId, Symbol, VenueOrderId},
39    instruments::{Instrument, InstrumentAny},
40    reports::{FillReport, OrderStatusReport, PositionStatusReport},
41    types::{Price, Quantity},
42};
43use nautilus_network::{
44    http::{HttpClient, HttpClientError},
45    ratelimiter::quota::Quota,
46};
47use reqwest::{Method, StatusCode, header::USER_AGENT};
48use serde::{Deserialize, Serialize, de::DeserializeOwned};
49use ustr::Ustr;
50
51use super::{
52    error::CoinbaseIntxHttpError,
53    models::{
54        CoinbaseIntxAsset, CoinbaseIntxBalance, CoinbaseIntxFeeTier, CoinbaseIntxFillList,
55        CoinbaseIntxInstrument, CoinbaseIntxOrder, CoinbaseIntxOrderList, CoinbaseIntxPortfolio,
56        CoinbaseIntxPortfolioDetails, CoinbaseIntxPortfolioFeeRates, CoinbaseIntxPortfolioSummary,
57        CoinbaseIntxPosition,
58    },
59    parse::{
60        parse_account_state, parse_fill_report, parse_instrument_any, parse_order_status_report,
61        parse_position_status_report,
62    },
63    query::{
64        CancelOrderParams, CancelOrdersParams, CreateOrderParams, CreateOrderParamsBuilder,
65        GetOrderParams, GetOrdersParams, GetOrdersParamsBuilder, GetPortfolioFillsParams,
66        GetPortfolioFillsParamsBuilder, ModifyOrderParams,
67    },
68};
69use crate::{
70    common::{
71        consts::COINBASE_INTX_REST_URL,
72        credential::Credential,
73        enums::{CoinbaseIntxOrderType, CoinbaseIntxSide, CoinbaseIntxTimeInForce},
74    },
75    http::{
76        error::ErrorBody,
77        query::{CancelOrdersParamsBuilder, ModifyOrderParamsBuilder},
78    },
79};
80
81/// Represents an Coinbase HTTP response.
82#[derive(Debug, Serialize, Deserialize)]
83pub struct CoinbaseIntxResponse<T> {
84    /// The Coinbase response code, which is `"0"` for success.
85    pub code: String,
86    /// A message string which can be informational or describe an error cause.
87    pub msg: String,
88    /// The typed data returned by the Coinbase endpoint.
89    pub data: Vec<T>,
90}
91
92// https://docs.cdp.coinbase.com/intx/docs/rate-limits#rest-api-rate-limits
93pub static COINBASE_INTX_REST_QUOTA: LazyLock<Quota> =
94    LazyLock::new(|| Quota::per_second(NonZeroU32::new(100).unwrap()));
95
96/// Provides a lower-level HTTP client for connecting to the [Coinbase International](https://coinbase.com) REST API.
97///
98/// This client wraps the underlying `HttpClient` to handle functionality
99/// specific to Coinbase, such as request signing (for authenticated endpoints),
100/// forming request URLs, and deserializing responses into specific data models.
101#[derive(Debug, Clone)]
102pub struct CoinbaseIntxHttpInnerClient {
103    base_url: String,
104    client: HttpClient,
105    credential: Option<Credential>,
106}
107
108impl Default for CoinbaseIntxHttpInnerClient {
109    fn default() -> Self {
110        Self::new(None, Some(60)).expect("Failed to create default Coinbase INTX HTTP client")
111    }
112}
113
114impl CoinbaseIntxHttpInnerClient {
115    /// Creates a new [`CoinbaseIntxHttpClient`] using the default Coinbase HTTP URL,
116    /// optionally overridden with a custom base url.
117    ///
118    /// This version of the client has **no credentials**, so it can only
119    /// call publicly accessible endpoints.
120    ///
121    /// # Errors
122    ///
123    /// Returns an error if the HTTP client cannot be created.
124    pub fn new(
125        base_url: Option<String>,
126        timeout_secs: Option<u64>,
127    ) -> Result<Self, HttpClientError> {
128        Ok(Self {
129            base_url: base_url.unwrap_or(COINBASE_INTX_REST_URL.to_string()),
130            client: HttpClient::new(
131                Self::default_headers(),
132                vec![],
133                vec![],
134                Some(*COINBASE_INTX_REST_QUOTA),
135                timeout_secs,
136                None, // proxy_url
137            )?,
138            credential: None,
139        })
140    }
141
142    /// Creates a new [`CoinbaseIntxHttpClient`] configured with credentials
143    /// for authenticated requests, optionally using a custom base url.
144    ///
145    /// # Errors
146    ///
147    /// Returns an error if the HTTP client cannot be created.
148    pub fn with_credentials(
149        api_key: String,
150        api_secret: String,
151        api_passphrase: String,
152        base_url: String,
153        timeout_secs: Option<u64>,
154    ) -> Result<Self, HttpClientError> {
155        Ok(Self {
156            base_url,
157            client: HttpClient::new(
158                Self::default_headers(),
159                vec![],
160                vec![],
161                Some(*COINBASE_INTX_REST_QUOTA),
162                timeout_secs,
163                None, // proxy_url
164            )?,
165            credential: Some(Credential::new(api_key, api_secret, api_passphrase)),
166        })
167    }
168
169    /// Builds the default headers to include with each request (e.g., `User-Agent`).
170    fn default_headers() -> HashMap<String, String> {
171        HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())])
172    }
173
174    /// Signs an Coinbase request with timestamp, API key, passphrase, and signature.
175    ///
176    /// # Errors
177    ///
178    /// Returns [`CoinbaseHttpError::MissingCredentials`] if no credentials are set
179    /// but the request requires authentication.
180    fn sign_request(
181        &self,
182        method: &Method,
183        path: &str,
184        body: Option<&[u8]>,
185    ) -> Result<HashMap<String, String>, CoinbaseIntxHttpError> {
186        let credential = match self.credential.as_ref() {
187            Some(c) => c,
188            None => return Err(CoinbaseIntxHttpError::MissingCredentials),
189        };
190
191        let api_key = credential.api_key.clone().to_string();
192        let api_passphrase = credential.api_passphrase.clone().to_string();
193        let timestamp = Utc::now().timestamp().to_string();
194        let body_str = body
195            .and_then(|b| String::from_utf8(b.to_vec()).ok())
196            .unwrap_or_default();
197
198        let signature = credential.sign(&timestamp, method.as_str(), path, &body_str);
199
200        let mut headers = HashMap::new();
201        headers.insert("Accept".to_string(), "application/json".to_string());
202        headers.insert("CB-ACCESS-KEY".to_string(), api_key);
203        headers.insert("CB-ACCESS-PASSPHRASE".to_string(), api_passphrase);
204        headers.insert("CB-ACCESS-SIGN".to_string(), signature);
205        headers.insert("CB-ACCESS-TIMESTAMP".to_string(), timestamp);
206        headers.insert("Content-Type".to_string(), "application/json".to_string());
207
208        Ok(headers)
209    }
210
211    /// Sends an HTTP request to Coinbase International and parses the response into type `T`.
212    ///
213    /// Internally, this method handles:
214    /// - Building the URL from `base_url` + `path`.
215    /// - Optionally signing the request.
216    /// - Deserializing JSON responses into typed models, or returning a [`CoinbaseIntxHttpError`].
217    async fn send_request<T: DeserializeOwned, P: Serialize>(
218        &self,
219        method: Method,
220        path: &str,
221        params: Option<&P>,
222        body: Option<Vec<u8>>,
223        authenticate: bool,
224    ) -> Result<T, CoinbaseIntxHttpError> {
225        let params_str = params
226            .map(serde_urlencoded::to_string)
227            .transpose()
228            .map_err(|e| {
229                CoinbaseIntxHttpError::JsonError(format!("Failed to serialize params: {e}"))
230            })?;
231
232        let full_path = if let Some(ref query) = params_str {
233            if query.is_empty() {
234                path.to_string()
235            } else {
236                format!("{path}?{query}")
237            }
238        } else {
239            path.to_string()
240        };
241
242        let url = format!("{}{}", self.base_url, full_path);
243
244        let headers = if authenticate {
245            Some(self.sign_request(&method, &full_path, body.as_deref())?)
246        } else {
247            None
248        };
249
250        tracing::trace!("Request: {url:?} {body:?}");
251
252        let resp = self
253            .client
254            .request(method.clone(), url, None, headers, body, None, None)
255            .await?;
256
257        tracing::trace!("Response: {resp:?}");
258
259        if resp.status.is_success() {
260            let coinbase_response: T = serde_json::from_slice(&resp.body).map_err(|e| {
261                tracing::error!("Failed to deserialize CoinbaseResponse: {e}");
262                CoinbaseIntxHttpError::JsonError(e.to_string())
263            })?;
264
265            Ok(coinbase_response)
266        } else {
267            let error_body = String::from_utf8_lossy(&resp.body);
268            tracing::error!(
269                "HTTP error {} with body: {error_body}",
270                resp.status.as_str()
271            );
272
273            if let Ok(parsed_error) = serde_json::from_slice::<CoinbaseIntxResponse<T>>(&resp.body)
274            {
275                return Err(CoinbaseIntxHttpError::CoinbaseError {
276                    error_code: parsed_error.code,
277                    message: parsed_error.msg,
278                });
279            }
280
281            if let Ok(parsed_error) = serde_json::from_slice::<ErrorBody>(&resp.body)
282                && let (Some(title), Some(error)) = (parsed_error.title, parsed_error.error)
283            {
284                return Err(CoinbaseIntxHttpError::CoinbaseError {
285                    error_code: error,
286                    message: title,
287                });
288            }
289
290            Err(CoinbaseIntxHttpError::UnexpectedStatus {
291                status: StatusCode::from_u16(resp.status.as_u16()).unwrap(),
292                body: error_body.to_string(),
293            })
294        }
295    }
296
297    /// Requests a list of all supported assets.
298    ///
299    /// See <https://docs.cdp.coinbase.com/intx/reference/getassets>.
300    /// # Errors
301    ///
302    /// Returns an error if the HTTP request fails or the response cannot be parsed.
303    pub async fn http_list_assets(&self) -> Result<Vec<CoinbaseIntxAsset>, CoinbaseIntxHttpError> {
304        let path = "/api/v1/assets";
305        self.send_request::<_, ()>(Method::GET, path, None, None, false)
306            .await
307    }
308
309    /// Requests information for a specific asset.
310    ///
311    /// See <https://docs.cdp.coinbase.com/intx/reference/getasset>.
312    /// # Errors
313    ///
314    /// Returns an error if the HTTP request fails or the response cannot be parsed.
315    pub async fn http_get_asset_details(
316        &self,
317        asset: &str,
318    ) -> Result<CoinbaseIntxAsset, CoinbaseIntxHttpError> {
319        let path = format!("/api/v1/assets/{asset}");
320        self.send_request::<_, ()>(Method::GET, &path, None, None, false)
321            .await
322    }
323
324    /// Requests all instruments available for trading.
325    ///
326    /// See <https://docs.cdp.coinbase.com/intx/reference/getinstruments>.
327    /// # Errors
328    ///
329    /// Returns an error if the HTTP request fails or the response cannot be parsed.
330    pub async fn http_list_instruments(
331        &self,
332    ) -> Result<Vec<CoinbaseIntxInstrument>, CoinbaseIntxHttpError> {
333        let path = "/api/v1/instruments";
334        self.send_request::<_, ()>(Method::GET, path, None, None, false)
335            .await
336    }
337
338    /// Retrieve a list of instruments with open contracts.
339    ///
340    /// See <https://docs.cdp.coinbase.com/intx/reference/getinstrument>.
341    /// # Errors
342    ///
343    /// Returns an error if the HTTP request fails or the response cannot be parsed.
344    pub async fn http_get_instrument_details(
345        &self,
346        symbol: &str,
347    ) -> Result<CoinbaseIntxInstrument, CoinbaseIntxHttpError> {
348        let path = format!("/api/v1/instruments/{symbol}");
349        self.send_request::<_, ()>(Method::GET, &path, None, None, false)
350            .await
351    }
352
353    /// Return all the fee rate tiers.
354    ///
355    /// See <https://docs.cdp.coinbase.com/intx/reference/getassets>.
356    /// # Errors
357    ///
358    /// Returns an error if the HTTP request fails or the response cannot be parsed.
359    pub async fn http_list_fee_rate_tiers(
360        &self,
361    ) -> Result<Vec<CoinbaseIntxFeeTier>, CoinbaseIntxHttpError> {
362        let path = "/api/v1/fee-rate-tiers";
363        self.send_request::<_, ()>(Method::GET, path, None, None, true)
364            .await
365    }
366
367    /// List all user portfolios.
368    ///
369    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfolios>.
370    /// # Errors
371    ///
372    /// Returns an error if the HTTP request fails or the response cannot be parsed.
373    pub async fn http_list_portfolios(
374        &self,
375    ) -> Result<Vec<CoinbaseIntxPortfolio>, CoinbaseIntxHttpError> {
376        let path = "/api/v1/portfolios";
377        self.send_request::<_, ()>(Method::GET, path, None, None, true)
378            .await
379    }
380
381    /// Returns the user's specified portfolio.
382    ///
383    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfolio>.
384    /// # Errors
385    ///
386    /// Returns an error if the HTTP request fails or the response cannot be parsed.
387    pub async fn http_get_portfolio(
388        &self,
389        portfolio_id: &str,
390    ) -> Result<CoinbaseIntxPortfolio, CoinbaseIntxHttpError> {
391        let path = format!("/api/v1/portfolios/{portfolio_id}");
392        self.send_request::<_, ()>(Method::GET, &path, None, None, true)
393            .await
394    }
395
396    /// Retrieves the summary, positions, and balances of a portfolio.
397    ///
398    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfoliodetail>.
399    /// # Errors
400    ///
401    /// Returns an error if the HTTP request fails or the response cannot be parsed.
402    pub async fn http_get_portfolio_details(
403        &self,
404        portfolio_id: &str,
405    ) -> Result<CoinbaseIntxPortfolioDetails, CoinbaseIntxHttpError> {
406        let path = format!("/api/v1/portfolios/{portfolio_id}/detail");
407        self.send_request::<_, ()>(Method::GET, &path, None, None, true)
408            .await
409    }
410
411    /// Retrieves the high level overview of a portfolio.
412    ///
413    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfoliosummary>.
414    /// # Errors
415    ///
416    /// Returns an error if the HTTP request fails or the response cannot be parsed.
417    pub async fn http_get_portfolio_summary(
418        &self,
419        portfolio_id: &str,
420    ) -> Result<CoinbaseIntxPortfolioSummary, CoinbaseIntxHttpError> {
421        let path = format!("/api/v1/portfolios/{portfolio_id}/summary");
422        self.send_request::<_, ()>(Method::GET, &path, None, None, true)
423            .await
424    }
425
426    /// Returns all balances for a given portfolio.
427    ///
428    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfoliobalances>.
429    /// # Errors
430    ///
431    /// Returns an error if the HTTP request fails or the response cannot be parsed.
432    pub async fn http_list_portfolio_balances(
433        &self,
434        portfolio_id: &str,
435    ) -> Result<Vec<CoinbaseIntxBalance>, CoinbaseIntxHttpError> {
436        let path = format!("/api/v1/portfolios/{portfolio_id}/balances");
437        self.send_request::<_, ()>(Method::GET, &path, None, None, true)
438            .await
439    }
440
441    /// Retrieves the balance for a given portfolio and asset.
442    ///
443    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfoliobalance>.
444    /// # Errors
445    ///
446    /// Returns an error if the HTTP request fails or the response cannot be parsed.
447    pub async fn http_get_portfolio_balance(
448        &self,
449        portfolio_id: &str,
450        asset: &str,
451    ) -> Result<CoinbaseIntxBalance, CoinbaseIntxHttpError> {
452        let path = format!("/api/v1/portfolios/{portfolio_id}/balances/{asset}");
453        self.send_request::<_, ()>(Method::GET, &path, None, None, true)
454            .await
455    }
456
457    /// Returns all fills for a given portfolio.
458    ///
459    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfoliofills>.
460    /// # Errors
461    ///
462    /// Returns an error if the HTTP request fails or the response cannot be parsed.
463    pub async fn http_list_portfolio_fills(
464        &self,
465        portfolio_id: &str,
466        params: GetPortfolioFillsParams,
467    ) -> Result<CoinbaseIntxFillList, CoinbaseIntxHttpError> {
468        let path = format!("/api/v1/portfolios/{portfolio_id}/fills");
469        self.send_request(Method::GET, &path, Some(&params), None, true)
470            .await
471    }
472
473    /// Returns all positions for a given portfolio.
474    ///
475    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfoliopositions>.
476    /// # Errors
477    ///
478    /// Returns an error if the HTTP request fails or the response cannot be parsed.
479    pub async fn http_list_portfolio_positions(
480        &self,
481        portfolio_id: &str,
482    ) -> Result<Vec<CoinbaseIntxPosition>, CoinbaseIntxHttpError> {
483        let path = format!("/api/v1/portfolios/{portfolio_id}/positions");
484        self.send_request::<_, ()>(Method::GET, &path, None, None, true)
485            .await
486    }
487
488    /// Retrieves the position for a given portfolio and symbol.
489    ///
490    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfolioposition>.
491    /// # Errors
492    ///
493    /// Returns an error if the HTTP request fails or the response cannot be parsed.
494    pub async fn http_get_portfolio_position(
495        &self,
496        portfolio_id: &str,
497        symbol: &str,
498    ) -> Result<CoinbaseIntxPosition, CoinbaseIntxHttpError> {
499        let path = format!("/api/v1/portfolios/{portfolio_id}/positions/{symbol}");
500        self.send_request::<_, ()>(Method::GET, &path, None, None, true)
501            .await
502    }
503
504    /// Retrieves the Perpetual Future and Spot fee rate tiers for the user.
505    ///
506    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfoliosfeerates>.
507    /// # Errors
508    ///
509    /// Returns an error if the HTTP request fails or the response cannot be parsed.
510    pub async fn http_list_portfolio_fee_rates(
511        &self,
512    ) -> Result<Vec<CoinbaseIntxPortfolioFeeRates>, CoinbaseIntxHttpError> {
513        let path = "/api/v1/portfolios/fee-rates";
514        self.send_request::<_, ()>(Method::GET, path, None, None, true)
515            .await
516    }
517
518    /// Create a new order.
519    /// # Errors
520    ///
521    /// Returns an error if the HTTP request fails or the response cannot be parsed.
522    pub async fn http_create_order(
523        &self,
524        params: CreateOrderParams,
525    ) -> Result<CoinbaseIntxOrder, CoinbaseIntxHttpError> {
526        let path = "/api/v1/orders";
527        let body = serde_json::to_vec(&params)
528            .map_err(|e| CoinbaseIntxHttpError::JsonError(e.to_string()))?;
529        self.send_request::<_, ()>(Method::POST, path, None, Some(body), true)
530            .await
531    }
532
533    /// Retrieves a single order. The order retrieved can be either active or inactive.
534    ///
535    /// See <https://docs.cdp.coinbase.com/intx/reference/getorder>.
536    /// # Errors
537    ///
538    /// Returns an error if the HTTP request fails or the response cannot be parsed.
539    pub async fn http_get_order(
540        &self,
541        venue_order_id: &str,
542        portfolio_id: &str,
543    ) -> Result<CoinbaseIntxOrder, CoinbaseIntxHttpError> {
544        let params = GetOrderParams {
545            portfolio: portfolio_id.to_string(),
546        };
547        let path = format!("/api/v1/orders/{venue_order_id}");
548        self.send_request(Method::GET, &path, Some(&params), None, true)
549            .await
550    }
551
552    /// Returns a list of active orders resting on the order book matching the requested criteria.
553    /// Does not return any rejected, cancelled, or fully filled orders as they are not active.
554    ///
555    /// See <https://docs.cdp.coinbase.com/intx/reference/getorders>.
556    /// # Errors
557    ///
558    /// Returns an error if the HTTP request fails or the response cannot be parsed.
559    pub async fn http_list_open_orders(
560        &self,
561        params: GetOrdersParams,
562    ) -> Result<CoinbaseIntxOrderList, CoinbaseIntxHttpError> {
563        self.send_request(Method::GET, "/api/v1/orders", Some(&params), None, true)
564            .await
565    }
566
567    /// Cancels a single open order.
568    /// # Errors
569    ///
570    /// Returns an error if the HTTP request fails or the response cannot be parsed.
571    pub async fn http_cancel_order(
572        &self,
573        client_order_id: &str,
574        portfolio_id: &str,
575    ) -> Result<CoinbaseIntxOrder, CoinbaseIntxHttpError> {
576        let params = CancelOrderParams {
577            portfolio: portfolio_id.to_string(),
578        };
579        let path = format!("/api/v1/orders/{client_order_id}");
580        self.send_request(Method::DELETE, &path, Some(&params), None, true)
581            .await
582    }
583
584    /// Cancel user orders.
585    /// # Errors
586    ///
587    /// Returns an error if the HTTP request fails or the response cannot be parsed.
588    pub async fn http_cancel_orders(
589        &self,
590        params: CancelOrdersParams,
591    ) -> Result<Vec<CoinbaseIntxOrder>, CoinbaseIntxHttpError> {
592        self.send_request(Method::DELETE, "/api/v1/orders", Some(&params), None, true)
593            .await
594    }
595
596    /// Modify an open order.
597    ///
598    /// See <https://docs.cdp.coinbase.com/intx/reference/modifyorder>.
599    /// # Errors
600    ///
601    /// Returns an error if the HTTP request fails or the response cannot be parsed.
602    pub async fn http_modify_order(
603        &self,
604        order_id: &str,
605        params: ModifyOrderParams,
606    ) -> Result<CoinbaseIntxOrder, CoinbaseIntxHttpError> {
607        let path = format!("/api/v1/orders/{order_id}");
608        let body = serde_json::to_vec(&params)
609            .map_err(|e| CoinbaseIntxHttpError::JsonError(e.to_string()))?;
610        self.send_request::<_, ()>(Method::PUT, &path, None, Some(body), true)
611            .await
612    }
613}
614
615/// Provides a higher-level HTTP client for the [Coinbase International](https://coinbase.com) REST API.
616///
617/// This client wraps the underlying `CoinbaseIntxHttpInnerClient` to handle conversions
618/// into the Nautilus domain model.
619#[derive(Debug, Clone)]
620#[cfg_attr(
621    feature = "python",
622    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
623)]
624pub struct CoinbaseIntxHttpClient {
625    pub(crate) inner: Arc<CoinbaseIntxHttpInnerClient>,
626    pub(crate) instruments_cache: Arc<Mutex<HashMap<Ustr, InstrumentAny>>>,
627    cache_initialized: bool,
628}
629
630impl Default for CoinbaseIntxHttpClient {
631    fn default() -> Self {
632        Self::new(None, Some(60)).expect("Failed to create default Coinbase INTX HTTP client")
633    }
634}
635
636impl CoinbaseIntxHttpClient {
637    /// Creates a new [`CoinbaseIntxHttpClient`] using the default Coinbase HTTP URL,
638    /// optionally overridden with a custom base url.
639    ///
640    /// This version of the client has **no credentials**, so it can only
641    /// call publicly accessible endpoints.
642    ///
643    /// # Errors
644    ///
645    /// Returns an error if the HTTP client cannot be created.
646    pub fn new(
647        base_url: Option<String>,
648        timeout_secs: Option<u64>,
649    ) -> Result<Self, HttpClientError> {
650        Ok(Self {
651            inner: Arc::new(CoinbaseIntxHttpInnerClient::new(base_url, timeout_secs)?),
652            instruments_cache: Arc::new(Mutex::new(HashMap::new())),
653            cache_initialized: false,
654        })
655    }
656
657    /// Creates a new authenticated [`CoinbaseIntxHttpClient`] using environment variables and
658    /// the default Coinbase International HTTP base url.
659    ///
660    /// # Errors
661    ///
662    /// Returns an error if required environment variables are missing or invalid.
663    pub fn from_env() -> anyhow::Result<Self> {
664        Self::with_credentials(None, None, None, None, None)
665    }
666
667    /// Creates a new [`CoinbaseIntxHttpClient`] configured with credentials
668    /// for authenticated requests, optionally using a custom base url.
669    ///
670    /// # Errors
671    ///
672    /// Returns an error if required environment variables are missing or invalid.
673    pub fn with_credentials(
674        api_key: Option<String>,
675        api_secret: Option<String>,
676        api_passphrase: Option<String>,
677        base_url: Option<String>,
678        timeout_secs: Option<u64>,
679    ) -> anyhow::Result<Self> {
680        let api_key = get_or_env_var(api_key, "COINBASE_INTX_API_KEY")?;
681        let api_secret = get_or_env_var(api_secret, "COINBASE_INTX_API_SECRET")?;
682        let api_passphrase = get_or_env_var(api_passphrase, "COINBASE_INTX_API_PASSPHRASE")?;
683        let base_url = base_url.unwrap_or(COINBASE_INTX_REST_URL.to_string());
684        Ok(Self {
685            inner: Arc::new(
686                CoinbaseIntxHttpInnerClient::with_credentials(
687                    api_key,
688                    api_secret,
689                    api_passphrase,
690                    base_url,
691                    timeout_secs,
692                )
693                .context("failed to create Coinbase INTX HTTP client")?,
694            ),
695            instruments_cache: Arc::new(Mutex::new(HashMap::new())),
696            cache_initialized: false,
697        })
698    }
699
700    fn get_instrument_from_cache(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
701        match self
702            .instruments_cache
703            .lock()
704            .expect(MUTEX_POISONED)
705            .get(&symbol)
706        {
707            Some(inst) => Ok(inst.clone()), // TODO: Remove this clone
708            None => anyhow::bail!("Unable to process request, instrument {symbol} not in cache"),
709        }
710    }
711
712    fn generate_ts_init(&self) -> UnixNanos {
713        get_atomic_clock_realtime().get_time_ns()
714    }
715
716    /// Returns the base url being used by the client.
717    #[must_use]
718    pub fn base_url(&self) -> &str {
719        self.inner.base_url.as_str()
720    }
721
722    /// Returns the public API key being used by the client.
723    #[must_use]
724    pub fn api_key(&self) -> Option<&str> {
725        self.inner.credential.as_ref().map(|c| c.api_key.as_str())
726    }
727
728    /// Returns a masked version of the API key for logging purposes.
729    #[must_use]
730    pub fn api_key_masked(&self) -> Option<String> {
731        self.inner.credential.as_ref().map(|c| c.api_key_masked())
732    }
733
734    /// Checks if the client is initialized.
735    ///
736    /// The client is considered initialized if any instruments have been cached from the venue.
737    #[must_use]
738    pub const fn is_initialized(&self) -> bool {
739        self.cache_initialized
740    }
741
742    /// Returns the cached instrument symbols.
743    ///
744    /// # Panics
745    ///
746    /// Panics if the instrument cache mutex is poisoned.
747    #[must_use]
748    pub fn get_cached_symbols(&self) -> Vec<String> {
749        self.instruments_cache
750            .lock()
751            .unwrap()
752            .keys()
753            .map(ToString::to_string)
754            .collect()
755    }
756
757    /// Adds the given instruments into the clients instrument cache.
758    ///
759    /// # Panics
760    ///
761    /// Panics if the instrument cache mutex is poisoned.
762    ///
763    /// Any existing instruments will be replaced.
764    pub fn add_instruments(&mut self, instruments: Vec<InstrumentAny>) {
765        for inst in instruments {
766            self.instruments_cache
767                .lock()
768                .unwrap()
769                .insert(inst.raw_symbol().inner(), inst);
770        }
771        self.cache_initialized = true;
772    }
773
774    /// Adds the given instrument into the clients instrument cache.
775    ///
776    /// # Panics
777    ///
778    /// Panics if the instrument cache mutex is poisoned.
779    ///
780    /// Any existing instrument will be replaced.
781    pub fn add_instrument(&mut self, instrument: InstrumentAny) {
782        self.instruments_cache
783            .lock()
784            .unwrap()
785            .insert(instrument.raw_symbol().inner(), instrument);
786        self.cache_initialized = true;
787    }
788
789    /// Requests a list of portfolio details from Coinbase International.
790    ///
791    /// # Errors
792    ///
793    /// Returns an error if the HTTP request fails or the response cannot be parsed.
794    pub async fn list_portfolios(&self) -> anyhow::Result<Vec<CoinbaseIntxPortfolio>> {
795        let resp = self
796            .inner
797            .http_list_portfolios()
798            .await
799            .map_err(|e| anyhow::anyhow!(e))?;
800
801        Ok(resp)
802    }
803
804    /// Requests the account state for the given account ID from Coinbase International.
805    ///
806    /// # Errors
807    ///
808    /// Returns an error if the HTTP request fails or the response cannot be parsed.
809    pub async fn request_account_state(
810        &self,
811        account_id: AccountId,
812    ) -> anyhow::Result<AccountState> {
813        let resp = self
814            .inner
815            .http_list_portfolio_balances(account_id.get_issuers_id())
816            .await
817            .map_err(|e| anyhow::anyhow!(e))?;
818
819        let ts_init = self.generate_ts_init();
820        let account_state = parse_account_state(resp, account_id, ts_init)?;
821
822        Ok(account_state)
823    }
824
825    /// Requests all instruments from Coinbase International.
826    ///
827    /// # Errors
828    ///
829    /// Returns an error if the HTTP request fails or the response cannot be parsed.
830    pub async fn request_instruments(&self) -> anyhow::Result<Vec<InstrumentAny>> {
831        let resp = self
832            .inner
833            .http_list_instruments()
834            .await
835            .map_err(|e| anyhow::anyhow!(e))?;
836
837        let ts_init = self.generate_ts_init();
838
839        let mut instruments: Vec<InstrumentAny> = Vec::new();
840        for inst in &resp {
841            let instrument_any = parse_instrument_any(inst, ts_init);
842            if let Some(instrument_any) = instrument_any {
843                instruments.push(instrument_any);
844            }
845        }
846
847        Ok(instruments)
848    }
849
850    /// Requests the instrument for the given symbol from Coinbase International.
851    ///
852    /// # Errors
853    ///
854    /// Returns an error if the HTTP request fails or the instrument cannot be parsed.
855    pub async fn request_instrument(&self, symbol: &Symbol) -> anyhow::Result<InstrumentAny> {
856        let resp = self
857            .inner
858            .http_get_instrument_details(symbol.as_str())
859            .await
860            .map_err(|e| anyhow::anyhow!(e))?;
861
862        let ts_init = self.generate_ts_init();
863
864        match parse_instrument_any(&resp, ts_init) {
865            Some(inst) => Ok(inst),
866            None => anyhow::bail!("Unable to parse instrument"),
867        }
868    }
869
870    /// Requests an order status report for the given venue order ID from Coinbase International.
871    ///
872    /// # Errors
873    ///
874    /// Returns an error if the HTTP request fails or the response cannot be parsed.
875    pub async fn request_order_status_report(
876        &self,
877        account_id: AccountId,
878        venue_order_id: VenueOrderId,
879    ) -> anyhow::Result<OrderStatusReport> {
880        let portfolio_id = account_id.get_issuers_id();
881
882        let resp = self
883            .inner
884            .http_get_order(venue_order_id.as_str(), portfolio_id)
885            .await
886            .map_err(|e| anyhow::anyhow!(e))?;
887
888        let instrument = self.get_instrument_from_cache(resp.symbol)?;
889        let ts_init = self.generate_ts_init();
890
891        let report = parse_order_status_report(
892            resp,
893            account_id,
894            instrument.price_precision(),
895            instrument.size_precision(),
896            ts_init,
897        )?;
898        Ok(report)
899    }
900
901    /// Requests order status reports for all **open** orders from Coinbase International.
902    ///
903    /// # Errors
904    ///
905    /// Returns an error if the HTTP request fails or the response cannot be parsed.
906    pub async fn request_order_status_reports(
907        &self,
908        account_id: AccountId,
909        symbol: Symbol,
910    ) -> anyhow::Result<Vec<OrderStatusReport>> {
911        let portfolio_id = account_id.get_issuers_id();
912
913        let mut params = GetOrdersParamsBuilder::default();
914        params.portfolio(portfolio_id);
915        params.instrument(symbol.as_str());
916        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
917
918        let resp = self
919            .inner
920            .http_list_open_orders(params)
921            .await
922            .map_err(|e| anyhow::anyhow!(e))?;
923
924        let ts_init = get_atomic_clock_realtime().get_time_ns();
925
926        let mut reports: Vec<OrderStatusReport> = Vec::new();
927        for order in resp.results {
928            let instrument = self.get_instrument_from_cache(order.symbol)?;
929            let report = parse_order_status_report(
930                order,
931                account_id,
932                instrument.price_precision(),
933                instrument.size_precision(),
934                ts_init,
935            )?;
936            reports.push(report);
937        }
938
939        Ok(reports)
940    }
941
942    /// Requests all fill reports from Coinbase International.
943    ///
944    /// # Errors
945    ///
946    /// Returns an error if the HTTP request fails or the response cannot be parsed.
947    pub async fn request_fill_reports(
948        &self,
949        account_id: AccountId,
950        client_order_id: Option<ClientOrderId>,
951        start: Option<DateTime<Utc>>,
952    ) -> anyhow::Result<Vec<FillReport>> {
953        let portfolio_id = account_id.get_issuers_id();
954
955        let mut params = GetPortfolioFillsParamsBuilder::default();
956        if let Some(start) = start {
957            params.time_from(start);
958        }
959        if let Some(client_order_id) = client_order_id {
960            params.client_order_id(client_order_id.to_string());
961        }
962        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
963
964        let resp = self
965            .inner
966            .http_list_portfolio_fills(portfolio_id, params)
967            .await
968            .map_err(|e| anyhow::anyhow!(e))?;
969
970        let ts_init = get_atomic_clock_realtime().get_time_ns();
971
972        let mut reports: Vec<FillReport> = Vec::new();
973        for fill in resp.results {
974            let instrument = self.get_instrument_from_cache(fill.symbol)?;
975            let report = parse_fill_report(
976                fill,
977                account_id,
978                instrument.price_precision(),
979                instrument.size_precision(),
980                ts_init,
981            )?;
982            reports.push(report);
983        }
984
985        Ok(reports)
986    }
987
988    /// Requests a position status report from Coinbase International.
989    ///
990    /// # Errors
991    ///
992    /// Returns an error if the HTTP request fails or the response cannot be parsed.
993    pub async fn request_position_status_report(
994        &self,
995        account_id: AccountId,
996        symbol: Symbol,
997    ) -> anyhow::Result<PositionStatusReport> {
998        let portfolio_id = account_id.get_issuers_id();
999
1000        let resp = self
1001            .inner
1002            .http_get_portfolio_position(portfolio_id, symbol.as_str())
1003            .await
1004            .map_err(|e| anyhow::anyhow!(e))?;
1005
1006        let instrument = self.get_instrument_from_cache(resp.symbol)?;
1007        let ts_init = get_atomic_clock_realtime().get_time_ns();
1008
1009        let report =
1010            parse_position_status_report(resp, account_id, instrument.size_precision(), ts_init)?;
1011        Ok(report)
1012    }
1013
1014    /// Requests all position status reports from Coinbase International.
1015    ///
1016    /// # Errors
1017    ///
1018    /// Returns an error if the HTTP request fails or the response cannot be parsed.
1019    pub async fn request_position_status_reports(
1020        &self,
1021        account_id: AccountId,
1022    ) -> anyhow::Result<Vec<PositionStatusReport>> {
1023        let portfolio_id = account_id.get_issuers_id();
1024
1025        let resp = self
1026            .inner
1027            .http_list_portfolio_positions(portfolio_id)
1028            .await
1029            .map_err(|e| anyhow::anyhow!(e))?;
1030
1031        let ts_init = get_atomic_clock_realtime().get_time_ns();
1032
1033        let mut reports: Vec<PositionStatusReport> = Vec::new();
1034        for position in resp {
1035            let instrument = self.get_instrument_from_cache(position.symbol)?;
1036            let report = parse_position_status_report(
1037                position,
1038                account_id,
1039                instrument.size_precision(),
1040                ts_init,
1041            )?;
1042            reports.push(report);
1043        }
1044
1045        Ok(reports)
1046    }
1047
1048    /// Submits a new order to Coinbase International.
1049    ///
1050    /// # Errors
1051    ///
1052    /// Returns an error if the HTTP request fails or the response cannot be parsed.
1053    #[allow(clippy::too_many_arguments)]
1054    pub async fn submit_order(
1055        &self,
1056        account_id: AccountId,
1057        client_order_id: ClientOrderId,
1058        symbol: Symbol,
1059        order_side: OrderSide,
1060        order_type: OrderType,
1061        quantity: Quantity,
1062        time_in_force: TimeInForce,
1063        expire_time: Option<DateTime<Utc>>,
1064        price: Option<Price>,
1065        trigger_price: Option<Price>,
1066        post_only: Option<bool>,
1067        reduce_only: Option<bool>,
1068    ) -> anyhow::Result<OrderStatusReport> {
1069        let coinbase_side: CoinbaseIntxSide = order_side.into();
1070        let coinbase_order_type: CoinbaseIntxOrderType = order_type.into();
1071        let coinbase_tif: CoinbaseIntxTimeInForce = time_in_force.into();
1072
1073        let mut params = CreateOrderParamsBuilder::default();
1074        params.portfolio(account_id.get_issuers_id());
1075        params.client_order_id(client_order_id.as_str());
1076        params.instrument(symbol.as_str());
1077        params.side(coinbase_side);
1078        params.size(quantity.to_string());
1079        params.order_type(coinbase_order_type);
1080        params.tif(coinbase_tif);
1081        if let Some(expire_time) = expire_time {
1082            params.expire_time(expire_time);
1083        }
1084        if let Some(price) = price {
1085            params.price(price.to_string());
1086        }
1087        if let Some(trigger_price) = trigger_price {
1088            params.stop_price(trigger_price.to_string());
1089        }
1090        if let Some(post_only) = post_only {
1091            params.post_only(post_only);
1092        }
1093        if let Some(reduce_only) = reduce_only {
1094            params.close_only(reduce_only);
1095        }
1096        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1097
1098        let resp = self.inner.http_create_order(params).await?;
1099        tracing::debug!("Submitted order: {resp:?}");
1100
1101        let instrument = self.get_instrument_from_cache(resp.symbol)?;
1102        let ts_init = get_atomic_clock_realtime().get_time_ns();
1103        let report = parse_order_status_report(
1104            resp,
1105            account_id,
1106            instrument.price_precision(),
1107            instrument.size_precision(),
1108            ts_init,
1109        )?;
1110        Ok(report)
1111    }
1112
1113    /// Cancels a currently open order on Coinbase International.
1114    ///
1115    /// # Errors
1116    ///
1117    /// Returns an error if the HTTP request fails or the response cannot be parsed.
1118    pub async fn cancel_order(
1119        &self,
1120        account_id: AccountId,
1121        client_order_id: ClientOrderId,
1122    ) -> anyhow::Result<OrderStatusReport> {
1123        let portfolio_id = account_id.get_issuers_id();
1124
1125        let resp = self
1126            .inner
1127            .http_cancel_order(client_order_id.as_str(), portfolio_id)
1128            .await?;
1129        tracing::debug!("Canceled order: {resp:?}");
1130
1131        let instrument = self.get_instrument_from_cache(resp.symbol)?;
1132        let ts_init = get_atomic_clock_realtime().get_time_ns();
1133
1134        let report = parse_order_status_report(
1135            resp,
1136            account_id,
1137            instrument.price_precision(),
1138            instrument.size_precision(),
1139            ts_init,
1140        )?;
1141        Ok(report)
1142    }
1143
1144    /// Cancels all orders for the given account ID and filter params on Coinbase International.
1145    ///
1146    /// # Errors
1147    ///
1148    /// Returns an error if the HTTP request fails or the response cannot be parsed.
1149    pub async fn cancel_orders(
1150        &self,
1151        account_id: AccountId,
1152        symbol: Symbol,
1153        order_side: Option<OrderSide>,
1154    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1155        let mut params = CancelOrdersParamsBuilder::default();
1156        params.portfolio(account_id.get_issuers_id());
1157        params.instrument(symbol.as_str());
1158        if let Some(side) = order_side {
1159            let side: CoinbaseIntxSide = side.into();
1160            params.side(side);
1161        }
1162        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1163
1164        let resp = self.inner.http_cancel_orders(params).await?;
1165
1166        let instrument = self.get_instrument_from_cache(symbol.inner())?;
1167        let ts_init = get_atomic_clock_realtime().get_time_ns();
1168
1169        let mut reports: Vec<OrderStatusReport> = Vec::with_capacity(resp.len());
1170        for order in resp {
1171            tracing::debug!("Canceled order: {order:?}");
1172            let report = parse_order_status_report(
1173                order,
1174                account_id,
1175                instrument.price_precision(),
1176                instrument.size_precision(),
1177                ts_init,
1178            )?;
1179            reports.push(report);
1180        }
1181
1182        Ok(reports)
1183    }
1184
1185    /// Modifies a currently open order on Coinbase International.
1186    ///
1187    /// # Errors
1188    ///
1189    /// Returns an error if the HTTP request fails or the response cannot be parsed.
1190    #[allow(clippy::too_many_arguments)]
1191    pub async fn modify_order(
1192        &self,
1193        account_id: AccountId,
1194        client_order_id: ClientOrderId,
1195        new_client_order_id: ClientOrderId,
1196        price: Option<Price>,
1197        trigger_price: Option<Price>,
1198        quantity: Option<Quantity>,
1199    ) -> anyhow::Result<OrderStatusReport> {
1200        let mut params = ModifyOrderParamsBuilder::default();
1201        params.portfolio(account_id.get_issuers_id());
1202        params.client_order_id(new_client_order_id.as_str());
1203        if let Some(price) = price {
1204            params.price(price.to_string());
1205        }
1206        if let Some(trigger_price) = trigger_price {
1207            params.price(trigger_price.to_string());
1208        }
1209        if let Some(quantity) = quantity {
1210            params.size(quantity.to_string());
1211        }
1212        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1213
1214        let resp = self
1215            .inner
1216            .http_modify_order(client_order_id.as_str(), params)
1217            .await?;
1218        tracing::debug!("Modified order {}", resp.client_order_id);
1219
1220        let instrument = self.get_instrument_from_cache(resp.symbol)?;
1221        let ts_init = get_atomic_clock_realtime().get_time_ns();
1222        let report = parse_order_status_report(
1223            resp,
1224            account_id,
1225            instrument.price_precision(),
1226            instrument.size_precision(),
1227            ts_init,
1228        )?;
1229        Ok(report)
1230    }
1231}