nautilus_coinbase_intx/http/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides the HTTP client integration for the [Coinbase International](https://www.coinbase.com/en/international-exchange) REST API.
17//!
18//! This module defines and implements a [`CoinbaseIntxHttpClient`] for
19//! sending requests to various Coinbase endpoints. It handles request signing
20//! (when credentials are provided), constructs valid HTTP requests
21//! using the [`HttpClient`], and parses the responses back into structured data or a [`CoinbaseIntxHttpError`].
22
23use std::{
24    collections::HashMap,
25    num::NonZeroU32,
26    sync::{Arc, LazyLock, Mutex},
27};
28
29use chrono::{DateTime, Utc};
30use nautilus_core::{
31    MUTEX_POISONED, UnixNanos, consts::NAUTILUS_USER_AGENT, env::get_or_env_var,
32    time::get_atomic_clock_realtime,
33};
34use nautilus_model::{
35    enums::{OrderSide, OrderType, TimeInForce},
36    events::AccountState,
37    identifiers::{AccountId, ClientOrderId, Symbol, VenueOrderId},
38    instruments::{Instrument, InstrumentAny},
39    reports::{FillReport, OrderStatusReport, PositionStatusReport},
40    types::{Price, Quantity},
41};
42use nautilus_network::{http::HttpClient, ratelimiter::quota::Quota};
43use reqwest::{Method, StatusCode, header::USER_AGENT};
44use serde::{Deserialize, Serialize, de::DeserializeOwned};
45use ustr::Ustr;
46
47use super::{
48    error::CoinbaseIntxHttpError,
49    models::{
50        CoinbaseIntxAsset, CoinbaseIntxBalance, CoinbaseIntxFeeTier, CoinbaseIntxFillList,
51        CoinbaseIntxInstrument, CoinbaseIntxOrder, CoinbaseIntxOrderList, CoinbaseIntxPortfolio,
52        CoinbaseIntxPortfolioDetails, CoinbaseIntxPortfolioFeeRates, CoinbaseIntxPortfolioSummary,
53        CoinbaseIntxPosition,
54    },
55    parse::{
56        parse_account_state, parse_fill_report, parse_instrument_any, parse_order_status_report,
57        parse_position_status_report,
58    },
59    query::{
60        CancelOrderParams, CancelOrdersParams, CreateOrderParams, CreateOrderParamsBuilder,
61        GetOrderParams, GetOrdersParams, GetOrdersParamsBuilder, GetPortfolioFillsParams,
62        GetPortfolioFillsParamsBuilder, ModifyOrderParams,
63    },
64};
65use crate::{
66    common::{
67        consts::COINBASE_INTX_REST_URL,
68        credential::Credential,
69        enums::{CoinbaseIntxOrderType, CoinbaseIntxSide, CoinbaseIntxTimeInForce},
70    },
71    http::{
72        error::ErrorBody,
73        query::{CancelOrdersParamsBuilder, ModifyOrderParamsBuilder},
74    },
75};
76
77/// Represents an Coinbase HTTP response.
78#[derive(Debug, Serialize, Deserialize)]
79pub struct CoinbaseIntxResponse<T> {
80    /// The Coinbase response code, which is `"0"` for success.
81    pub code: String,
82    /// A message string which can be informational or describe an error cause.
83    pub msg: String,
84    /// The typed data returned by the Coinbase endpoint.
85    pub data: Vec<T>,
86}
87
88// https://docs.cdp.coinbase.com/intx/docs/rate-limits#rest-api-rate-limits
89pub static COINBASE_INTX_REST_QUOTA: LazyLock<Quota> =
90    LazyLock::new(|| Quota::per_second(NonZeroU32::new(100).unwrap()));
91
92/// Provides a lower-level HTTP client for connecting to the [Coinbase International](https://coinbase.com) REST API.
93///
94/// This client wraps the underlying `HttpClient` to handle functionality
95/// specific to Coinbase, such as request signing (for authenticated endpoints),
96/// forming request URLs, and deserializing responses into specific data models.
97#[derive(Debug, Clone)]
98pub struct CoinbaseIntxHttpInnerClient {
99    base_url: String,
100    client: HttpClient,
101    credential: Option<Credential>,
102}
103
104impl Default for CoinbaseIntxHttpInnerClient {
105    fn default() -> Self {
106        Self::new(None, Some(60))
107    }
108}
109
110impl CoinbaseIntxHttpInnerClient {
111    /// Creates a new [`CoinbaseIntxHttpClient`] using the default Coinbase HTTP URL,
112    /// optionally overridden with a custom base url.
113    ///
114    /// This version of the client has **no credentials**, so it can only
115    /// call publicly accessible endpoints.
116    #[must_use]
117    pub fn new(base_url: Option<String>, timeout_secs: Option<u64>) -> Self {
118        Self {
119            base_url: base_url.unwrap_or(COINBASE_INTX_REST_URL.to_string()),
120            client: HttpClient::new(
121                Self::default_headers(),
122                vec![],
123                vec![],
124                Some(*COINBASE_INTX_REST_QUOTA),
125                timeout_secs,
126            ),
127            credential: None,
128        }
129    }
130
131    /// Creates a new [`CoinbaseIntxHttpClient`] configured with credentials
132    /// for authenticated requests, optionally using a custom base url.
133    #[must_use]
134    pub fn with_credentials(
135        api_key: String,
136        api_secret: String,
137        api_passphrase: String,
138        base_url: String,
139        timeout_secs: Option<u64>,
140    ) -> Self {
141        Self {
142            base_url,
143            client: HttpClient::new(
144                Self::default_headers(),
145                vec![],
146                vec![],
147                Some(*COINBASE_INTX_REST_QUOTA),
148                timeout_secs,
149            ),
150            credential: Some(Credential::new(api_key, api_secret, api_passphrase)),
151        }
152    }
153
154    /// Builds the default headers to include with each request (e.g., `User-Agent`).
155    fn default_headers() -> HashMap<String, String> {
156        HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())])
157    }
158
159    /// Signs an Coinbase request with timestamp, API key, passphrase, and signature.
160    ///
161    /// # Errors
162    ///
163    /// Returns [`CoinbaseHttpError::MissingCredentials`] if no credentials are set
164    /// but the request requires authentication.
165    fn sign_request(
166        &self,
167        method: &Method,
168        path: &str,
169        body: Option<&[u8]>,
170    ) -> Result<HashMap<String, String>, CoinbaseIntxHttpError> {
171        let credential = match self.credential.as_ref() {
172            Some(c) => c,
173            None => return Err(CoinbaseIntxHttpError::MissingCredentials),
174        };
175
176        let api_key = credential.api_key.clone().to_string();
177        let api_passphrase = credential.api_passphrase.clone().to_string();
178        let timestamp = Utc::now().timestamp().to_string();
179        let body_str = body
180            .and_then(|b| String::from_utf8(b.to_vec()).ok())
181            .unwrap_or_default();
182
183        let signature = credential.sign(&timestamp, method.as_str(), path, &body_str);
184
185        let mut headers = HashMap::new();
186        headers.insert("Accept".to_string(), "application/json".to_string());
187        headers.insert("CB-ACCESS-KEY".to_string(), api_key);
188        headers.insert("CB-ACCESS-PASSPHRASE".to_string(), api_passphrase);
189        headers.insert("CB-ACCESS-SIGN".to_string(), signature);
190        headers.insert("CB-ACCESS-TIMESTAMP".to_string(), timestamp);
191        headers.insert("Content-Type".to_string(), "application/json".to_string());
192
193        Ok(headers)
194    }
195
196    /// Sends an HTTP request to Coinbase International and parses the response into type `T`.
197    ///
198    /// Internally, this method handles:
199    /// - Building the URL from `base_url` + `path`.
200    /// - Optionally signing the request.
201    /// - Deserializing JSON responses into typed models, or returning a [`CoinbaseIntxHttpError`].
202    async fn send_request<T: DeserializeOwned>(
203        &self,
204        method: Method,
205        path: &str,
206        body: Option<Vec<u8>>,
207        authenticate: bool,
208    ) -> Result<T, CoinbaseIntxHttpError> {
209        let url = format!("{}{}", self.base_url, path);
210
211        let headers = if authenticate {
212            Some(self.sign_request(&method, path, body.as_deref())?)
213        } else {
214            None
215        };
216
217        tracing::trace!("Request: {url:?} {body:?}");
218
219        let resp = self
220            .client
221            .request(method.clone(), url, headers, body, None, None)
222            .await?;
223
224        tracing::trace!("Response: {resp:?}");
225
226        if resp.status.is_success() {
227            let coinbase_response: T = serde_json::from_slice(&resp.body).map_err(|e| {
228                tracing::error!("Failed to deserialize CoinbaseResponse: {e}");
229                CoinbaseIntxHttpError::JsonError(e.to_string())
230            })?;
231
232            Ok(coinbase_response)
233        } else {
234            let error_body = String::from_utf8_lossy(&resp.body);
235            tracing::error!(
236                "HTTP error {} with body: {error_body}",
237                resp.status.as_str()
238            );
239
240            if let Ok(parsed_error) = serde_json::from_slice::<CoinbaseIntxResponse<T>>(&resp.body)
241            {
242                return Err(CoinbaseIntxHttpError::CoinbaseError {
243                    error_code: parsed_error.code,
244                    message: parsed_error.msg,
245                });
246            }
247
248            if let Ok(parsed_error) = serde_json::from_slice::<ErrorBody>(&resp.body)
249                && let (Some(title), Some(error)) = (parsed_error.title, parsed_error.error)
250            {
251                return Err(CoinbaseIntxHttpError::CoinbaseError {
252                    error_code: error,
253                    message: title,
254                });
255            }
256
257            Err(CoinbaseIntxHttpError::UnexpectedStatus {
258                status: StatusCode::from_u16(resp.status.as_u16()).unwrap(),
259                body: error_body.to_string(),
260            })
261        }
262    }
263
264    /// Requests a list of all supported assets.
265    ///
266    /// See <https://docs.cdp.coinbase.com/intx/reference/getassets>.
267    /// # Errors
268    ///
269    /// Returns an error if the HTTP request fails or the response cannot be parsed.
270    pub async fn http_list_assets(&self) -> Result<Vec<CoinbaseIntxAsset>, CoinbaseIntxHttpError> {
271        let path = "/api/v1/assets";
272        self.send_request(Method::GET, path, None, false).await
273    }
274
275    /// Requests information for a specific asset.
276    ///
277    /// See <https://docs.cdp.coinbase.com/intx/reference/getasset>.
278    /// # Errors
279    ///
280    /// Returns an error if the HTTP request fails or the response cannot be parsed.
281    pub async fn http_get_asset_details(
282        &self,
283        asset: &str,
284    ) -> Result<CoinbaseIntxAsset, CoinbaseIntxHttpError> {
285        let path = format!("/api/v1/assets/{asset}");
286        self.send_request(Method::GET, &path, None, false).await
287    }
288
289    /// Requests all instruments available for trading.
290    ///
291    /// See <https://docs.cdp.coinbase.com/intx/reference/getinstruments>.
292    /// # Errors
293    ///
294    /// Returns an error if the HTTP request fails or the response cannot be parsed.
295    pub async fn http_list_instruments(
296        &self,
297    ) -> Result<Vec<CoinbaseIntxInstrument>, CoinbaseIntxHttpError> {
298        let path = "/api/v1/instruments";
299        self.send_request(Method::GET, path, None, false).await
300    }
301
302    /// Retrieve a list of instruments with open contracts.
303    ///
304    /// See <https://docs.cdp.coinbase.com/intx/reference/getinstrument>.
305    /// # Errors
306    ///
307    /// Returns an error if the HTTP request fails or the response cannot be parsed.
308    pub async fn http_get_instrument_details(
309        &self,
310        symbol: &str,
311    ) -> Result<CoinbaseIntxInstrument, CoinbaseIntxHttpError> {
312        let path = format!("/api/v1/instruments/{symbol}");
313        self.send_request(Method::GET, &path, None, false).await
314    }
315
316    /// Return all the fee rate tiers.
317    ///
318    /// See <https://docs.cdp.coinbase.com/intx/reference/getassets>.
319    /// # Errors
320    ///
321    /// Returns an error if the HTTP request fails or the response cannot be parsed.
322    pub async fn http_list_fee_rate_tiers(
323        &self,
324    ) -> Result<Vec<CoinbaseIntxFeeTier>, CoinbaseIntxHttpError> {
325        let path = "/api/v1/fee-rate-tiers";
326        self.send_request(Method::GET, path, None, true).await
327    }
328
329    /// List all user portfolios.
330    ///
331    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfolios>.
332    /// # Errors
333    ///
334    /// Returns an error if the HTTP request fails or the response cannot be parsed.
335    pub async fn http_list_portfolios(
336        &self,
337    ) -> Result<Vec<CoinbaseIntxPortfolio>, CoinbaseIntxHttpError> {
338        let path = "/api/v1/portfolios";
339        self.send_request(Method::GET, path, None, true).await
340    }
341
342    /// Returns the user's specified portfolio.
343    ///
344    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfolio>.
345    /// # Errors
346    ///
347    /// Returns an error if the HTTP request fails or the response cannot be parsed.
348    pub async fn http_get_portfolio(
349        &self,
350        portfolio_id: &str,
351    ) -> Result<CoinbaseIntxPortfolio, CoinbaseIntxHttpError> {
352        let path = format!("/api/v1/portfolios/{portfolio_id}");
353        self.send_request(Method::GET, &path, None, true).await
354    }
355
356    /// Retrieves the summary, positions, and balances of a portfolio.
357    ///
358    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfoliodetail>.
359    /// # Errors
360    ///
361    /// Returns an error if the HTTP request fails or the response cannot be parsed.
362    pub async fn http_get_portfolio_details(
363        &self,
364        portfolio_id: &str,
365    ) -> Result<CoinbaseIntxPortfolioDetails, CoinbaseIntxHttpError> {
366        let path = format!("/api/v1/portfolios/{portfolio_id}/detail");
367        self.send_request(Method::GET, &path, None, true).await
368    }
369
370    /// Retrieves the high level overview of a portfolio.
371    ///
372    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfoliosummary>.
373    /// # Errors
374    ///
375    /// Returns an error if the HTTP request fails or the response cannot be parsed.
376    pub async fn http_get_portfolio_summary(
377        &self,
378        portfolio_id: &str,
379    ) -> Result<CoinbaseIntxPortfolioSummary, CoinbaseIntxHttpError> {
380        let path = format!("/api/v1/portfolios/{portfolio_id}/summary");
381        self.send_request(Method::GET, &path, None, true).await
382    }
383
384    /// Returns all balances for a given portfolio.
385    ///
386    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfoliobalances>.
387    /// # Errors
388    ///
389    /// Returns an error if the HTTP request fails or the response cannot be parsed.
390    pub async fn http_list_portfolio_balances(
391        &self,
392        portfolio_id: &str,
393    ) -> Result<Vec<CoinbaseIntxBalance>, CoinbaseIntxHttpError> {
394        let path = format!("/api/v1/portfolios/{portfolio_id}/balances");
395        self.send_request(Method::GET, &path, None, true).await
396    }
397
398    /// Retrieves the balance for a given portfolio and asset.
399    ///
400    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfoliobalance>.
401    /// # Errors
402    ///
403    /// Returns an error if the HTTP request fails or the response cannot be parsed.
404    pub async fn http_get_portfolio_balance(
405        &self,
406        portfolio_id: &str,
407        asset: &str,
408    ) -> Result<CoinbaseIntxBalance, CoinbaseIntxHttpError> {
409        let path = format!("/api/v1/portfolios/{portfolio_id}/balances/{asset}");
410        self.send_request(Method::GET, &path, None, true).await
411    }
412
413    /// Returns all fills for a given portfolio.
414    ///
415    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfoliofills>.
416    /// # Errors
417    ///
418    /// Returns an error if the HTTP request fails or the response cannot be parsed.
419    pub async fn http_list_portfolio_fills(
420        &self,
421        portfolio_id: &str,
422        params: GetPortfolioFillsParams,
423    ) -> Result<CoinbaseIntxFillList, CoinbaseIntxHttpError> {
424        let query = serde_urlencoded::to_string(&params)
425            .map_err(|e| CoinbaseIntxHttpError::JsonError(e.to_string()))?;
426        let path = format!("/api/v1/portfolios/{portfolio_id}/fills?{query}");
427        self.send_request(Method::GET, &path, None, true).await
428    }
429
430    /// Returns all positions for a given portfolio.
431    ///
432    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfoliopositions>.
433    /// # Errors
434    ///
435    /// Returns an error if the HTTP request fails or the response cannot be parsed.
436    pub async fn http_list_portfolio_positions(
437        &self,
438        portfolio_id: &str,
439    ) -> Result<Vec<CoinbaseIntxPosition>, CoinbaseIntxHttpError> {
440        let path = format!("/api/v1/portfolios/{portfolio_id}/positions");
441        self.send_request(Method::GET, &path, None, true).await
442    }
443
444    /// Retrieves the position for a given portfolio and symbol.
445    ///
446    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfolioposition>.
447    /// # Errors
448    ///
449    /// Returns an error if the HTTP request fails or the response cannot be parsed.
450    pub async fn http_get_portfolio_position(
451        &self,
452        portfolio_id: &str,
453        symbol: &str,
454    ) -> Result<CoinbaseIntxPosition, CoinbaseIntxHttpError> {
455        let path = format!("/api/v1/portfolios/{portfolio_id}/positions/{symbol}");
456        self.send_request(Method::GET, &path, None, true).await
457    }
458
459    /// Retrieves the Perpetual Future and Spot fee rate tiers for the user.
460    ///
461    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfoliosfeerates>.
462    /// # Errors
463    ///
464    /// Returns an error if the HTTP request fails or the response cannot be parsed.
465    pub async fn http_list_portfolio_fee_rates(
466        &self,
467    ) -> Result<Vec<CoinbaseIntxPortfolioFeeRates>, CoinbaseIntxHttpError> {
468        let path = "/api/v1/portfolios/fee-rates";
469        self.send_request(Method::GET, path, None, true).await
470    }
471
472    /// Create a new order.
473    /// # Errors
474    ///
475    /// Returns an error if the HTTP request fails or the response cannot be parsed.
476    pub async fn http_create_order(
477        &self,
478        params: CreateOrderParams,
479    ) -> Result<CoinbaseIntxOrder, CoinbaseIntxHttpError> {
480        let path = "/api/v1/orders";
481        let body = serde_json::to_vec(&params)
482            .map_err(|e| CoinbaseIntxHttpError::JsonError(e.to_string()))?;
483        self.send_request(Method::POST, path, Some(body), true)
484            .await
485    }
486
487    /// Retrieves a single order. The order retrieved can be either active or inactive.
488    ///
489    /// See <https://docs.cdp.coinbase.com/intx/reference/getorder>.
490    /// # Errors
491    ///
492    /// Returns an error if the HTTP request fails or the response cannot be parsed.
493    pub async fn http_get_order(
494        &self,
495        venue_order_id: &str,
496        portfolio_id: &str,
497    ) -> Result<CoinbaseIntxOrder, CoinbaseIntxHttpError> {
498        let params = GetOrderParams {
499            portfolio: portfolio_id.to_string(),
500        };
501        let query = serde_urlencoded::to_string(&params)
502            .map_err(|e| CoinbaseIntxHttpError::JsonError(e.to_string()))?;
503        let path = format!("/api/v1/orders/{venue_order_id}?{query}");
504        self.send_request(Method::GET, &path, None, true).await
505    }
506
507    /// Returns a list of active orders resting on the order book matching the requested criteria.
508    /// Does not return any rejected, cancelled, or fully filled orders as they are not active.
509    ///
510    /// See <https://docs.cdp.coinbase.com/intx/reference/getorders>.
511    /// # Errors
512    ///
513    /// Returns an error if the HTTP request fails or the response cannot be parsed.
514    pub async fn http_list_open_orders(
515        &self,
516        params: GetOrdersParams,
517    ) -> Result<CoinbaseIntxOrderList, CoinbaseIntxHttpError> {
518        let query = serde_urlencoded::to_string(&params)
519            .map_err(|e| CoinbaseIntxHttpError::JsonError(e.to_string()))?;
520        let path = format!("/api/v1/orders?{query}");
521        self.send_request(Method::GET, &path, None, true).await
522    }
523
524    /// Cancels a single open order.
525    /// # Errors
526    ///
527    /// Returns an error if the HTTP request fails or the response cannot be parsed.
528    pub async fn http_cancel_order(
529        &self,
530        client_order_id: &str,
531        portfolio_id: &str,
532    ) -> Result<CoinbaseIntxOrder, CoinbaseIntxHttpError> {
533        let params = CancelOrderParams {
534            portfolio: portfolio_id.to_string(),
535        };
536        let query = serde_urlencoded::to_string(&params)
537            .map_err(|e| CoinbaseIntxHttpError::JsonError(e.to_string()))?;
538        let path = format!("/api/v1/orders/{client_order_id}?{query}");
539        self.send_request(Method::DELETE, &path, None, true).await
540    }
541
542    /// Cancel user orders.
543    /// # Errors
544    ///
545    /// Returns an error if the HTTP request fails or the response cannot be parsed.
546    pub async fn http_cancel_orders(
547        &self,
548        params: CancelOrdersParams,
549    ) -> Result<Vec<CoinbaseIntxOrder>, CoinbaseIntxHttpError> {
550        let query = serde_urlencoded::to_string(&params)
551            .map_err(|e| CoinbaseIntxHttpError::JsonError(e.to_string()))?;
552        let path = format!("/api/v1/orders?{query}");
553        self.send_request(Method::DELETE, &path, None, true).await
554    }
555
556    /// Modify an open order.
557    ///
558    /// See <https://docs.cdp.coinbase.com/intx/reference/modifyorder>.
559    /// # Errors
560    ///
561    /// Returns an error if the HTTP request fails or the response cannot be parsed.
562    pub async fn http_modify_order(
563        &self,
564        order_id: &str,
565        params: ModifyOrderParams,
566    ) -> Result<CoinbaseIntxOrder, CoinbaseIntxHttpError> {
567        let path = format!("/api/v1/orders/{order_id}");
568        let body = serde_json::to_vec(&params)
569            .map_err(|e| CoinbaseIntxHttpError::JsonError(e.to_string()))?;
570        self.send_request(Method::PUT, &path, Some(body), true)
571            .await
572    }
573}
574
575/// Provides a higher-level HTTP client for the [Coinbase International](https://coinbase.com) REST API.
576///
577/// This client wraps the underlying `CoinbaseIntxHttpInnerClient` to handle conversions
578/// into the Nautilus domain model.
579#[derive(Debug, Clone)]
580#[cfg_attr(
581    feature = "python",
582    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
583)]
584pub struct CoinbaseIntxHttpClient {
585    pub(crate) inner: Arc<CoinbaseIntxHttpInnerClient>,
586    pub(crate) instruments_cache: Arc<Mutex<HashMap<Ustr, InstrumentAny>>>,
587    cache_initialized: bool,
588}
589
590impl Default for CoinbaseIntxHttpClient {
591    fn default() -> Self {
592        Self::new(None, Some(60))
593    }
594}
595
596impl CoinbaseIntxHttpClient {
597    /// Creates a new [`CoinbaseIntxHttpClient`] using the default Coinbase HTTP URL,
598    /// optionally overridden with a custom base url.
599    ///
600    /// This version of the client has **no credentials**, so it can only
601    /// call publicly accessible endpoints.
602    #[must_use]
603    pub fn new(base_url: Option<String>, timeout_secs: Option<u64>) -> Self {
604        Self {
605            inner: Arc::new(CoinbaseIntxHttpInnerClient::new(base_url, timeout_secs)),
606            instruments_cache: Arc::new(Mutex::new(HashMap::new())),
607            cache_initialized: false,
608        }
609    }
610
611    /// Creates a new authenticated [`CoinbaseIntxHttpClient`] using environment variables and
612    /// the default Coinbase International HTTP base url.
613    ///
614    /// # Errors
615    ///
616    /// Returns an error if required environment variables are missing or invalid.
617    pub fn from_env() -> anyhow::Result<Self> {
618        Self::with_credentials(None, None, None, None, None)
619    }
620
621    /// Creates a new [`CoinbaseIntxHttpClient`] configured with credentials
622    /// for authenticated requests, optionally using a custom base url.
623    ///
624    /// # Errors
625    ///
626    /// Returns an error if required environment variables are missing or invalid.
627    pub fn with_credentials(
628        api_key: Option<String>,
629        api_secret: Option<String>,
630        api_passphrase: Option<String>,
631        base_url: Option<String>,
632        timeout_secs: Option<u64>,
633    ) -> anyhow::Result<Self> {
634        let api_key = get_or_env_var(api_key, "COINBASE_INTX_API_KEY")?;
635        let api_secret = get_or_env_var(api_secret, "COINBASE_INTX_API_SECRET")?;
636        let api_passphrase = get_or_env_var(api_passphrase, "COINBASE_INTX_API_PASSPHRASE")?;
637        let base_url = base_url.unwrap_or(COINBASE_INTX_REST_URL.to_string());
638        Ok(Self {
639            inner: Arc::new(CoinbaseIntxHttpInnerClient::with_credentials(
640                api_key,
641                api_secret,
642                api_passphrase,
643                base_url,
644                timeout_secs,
645            )),
646            instruments_cache: Arc::new(Mutex::new(HashMap::new())),
647            cache_initialized: false,
648        })
649    }
650
651    fn get_instrument_from_cache(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
652        match self
653            .instruments_cache
654            .lock()
655            .expect(MUTEX_POISONED)
656            .get(&symbol)
657        {
658            Some(inst) => Ok(inst.clone()), // TODO: Remove this clone
659            None => anyhow::bail!("Unable to process request, instrument {symbol} not in cache"),
660        }
661    }
662
663    fn generate_ts_init(&self) -> UnixNanos {
664        get_atomic_clock_realtime().get_time_ns()
665    }
666
667    /// Returns the base url being used by the client.
668    #[must_use]
669    pub fn base_url(&self) -> &str {
670        self.inner.base_url.as_str()
671    }
672
673    /// Returns the public API key being used by the client.
674    #[must_use]
675    pub fn api_key(&self) -> Option<&str> {
676        self.inner.credential.as_ref().map(|c| c.api_key.as_str())
677    }
678
679    /// Checks if the client is initialized.
680    ///
681    /// The client is considered initialized if any instruments have been cached from the venue.
682    #[must_use]
683    pub const fn is_initialized(&self) -> bool {
684        self.cache_initialized
685    }
686
687    /// Returns the cached instrument symbols.
688    ///
689    /// # Panics
690    ///
691    /// Panics if the instrument cache mutex is poisoned.
692    #[must_use]
693    pub fn get_cached_symbols(&self) -> Vec<String> {
694        self.instruments_cache
695            .lock()
696            .unwrap()
697            .keys()
698            .map(ToString::to_string)
699            .collect()
700    }
701
702    /// Adds the given instruments into the clients instrument cache.
703    ///
704    /// # Panics
705    ///
706    /// Panics if the instrument cache mutex is poisoned.
707    ///
708    /// Any existing instruments will be replaced.
709    pub fn add_instruments(&mut self, instruments: Vec<InstrumentAny>) {
710        for inst in instruments {
711            self.instruments_cache
712                .lock()
713                .unwrap()
714                .insert(inst.raw_symbol().inner(), inst);
715        }
716        self.cache_initialized = true;
717    }
718
719    /// Adds the given instrument into the clients instrument cache.
720    ///
721    /// # Panics
722    ///
723    /// Panics if the instrument cache mutex is poisoned.
724    ///
725    /// Any existing instrument will be replaced.
726    pub fn add_instrument(&mut self, instrument: InstrumentAny) {
727        self.instruments_cache
728            .lock()
729            .unwrap()
730            .insert(instrument.raw_symbol().inner(), instrument);
731        self.cache_initialized = true;
732    }
733
734    /// Requests a list of portfolio details from Coinbase International.
735    ///
736    /// # Errors
737    ///
738    /// Returns an error if the HTTP request fails or the response cannot be parsed.
739    pub async fn list_portfolios(&self) -> anyhow::Result<Vec<CoinbaseIntxPortfolio>> {
740        let resp = self
741            .inner
742            .http_list_portfolios()
743            .await
744            .map_err(|e| anyhow::anyhow!(e))?;
745
746        Ok(resp)
747    }
748
749    /// Requests the account state for the given account ID from Coinbase International.
750    ///
751    /// # Errors
752    ///
753    /// Returns an error if the HTTP request fails or the response cannot be parsed.
754    pub async fn request_account_state(
755        &self,
756        account_id: AccountId,
757    ) -> anyhow::Result<AccountState> {
758        let resp = self
759            .inner
760            .http_list_portfolio_balances(account_id.get_issuers_id())
761            .await
762            .map_err(|e| anyhow::anyhow!(e))?;
763
764        let ts_init = self.generate_ts_init();
765        let account_state = parse_account_state(resp, account_id, ts_init)?;
766
767        Ok(account_state)
768    }
769
770    /// Requests all instruments from Coinbase International.
771    ///
772    /// # Errors
773    ///
774    /// Returns an error if the HTTP request fails or the response cannot be parsed.
775    pub async fn request_instruments(&self) -> anyhow::Result<Vec<InstrumentAny>> {
776        let resp = self
777            .inner
778            .http_list_instruments()
779            .await
780            .map_err(|e| anyhow::anyhow!(e))?;
781
782        let ts_init = self.generate_ts_init();
783
784        let mut instruments: Vec<InstrumentAny> = Vec::new();
785        for inst in &resp {
786            let instrument_any = parse_instrument_any(inst, ts_init);
787            if let Some(instrument_any) = instrument_any {
788                instruments.push(instrument_any);
789            }
790        }
791
792        Ok(instruments)
793    }
794
795    /// Requests the instrument for the given symbol from Coinbase International.
796    ///
797    /// # Errors
798    ///
799    /// Returns an error if the HTTP request fails or the instrument cannot be parsed.
800    pub async fn request_instrument(&self, symbol: &Symbol) -> anyhow::Result<InstrumentAny> {
801        let resp = self
802            .inner
803            .http_get_instrument_details(symbol.as_str())
804            .await
805            .map_err(|e| anyhow::anyhow!(e))?;
806
807        let ts_init = self.generate_ts_init();
808
809        match parse_instrument_any(&resp, ts_init) {
810            Some(inst) => Ok(inst),
811            None => anyhow::bail!("Unable to parse instrument"),
812        }
813    }
814
815    /// Requests an order status report for the given venue order ID from Coinbase International.
816    ///
817    /// # Errors
818    ///
819    /// Returns an error if the HTTP request fails or the response cannot be parsed.
820    pub async fn request_order_status_report(
821        &self,
822        account_id: AccountId,
823        venue_order_id: VenueOrderId,
824    ) -> anyhow::Result<OrderStatusReport> {
825        let portfolio_id = account_id.get_issuers_id();
826
827        let resp = self
828            .inner
829            .http_get_order(venue_order_id.as_str(), portfolio_id)
830            .await
831            .map_err(|e| anyhow::anyhow!(e))?;
832
833        let instrument = self.get_instrument_from_cache(resp.symbol)?;
834        let ts_init = self.generate_ts_init();
835
836        let report = parse_order_status_report(
837            resp,
838            account_id,
839            instrument.price_precision(),
840            instrument.size_precision(),
841            ts_init,
842        )?;
843        Ok(report)
844    }
845
846    /// Requests order status reports for all **open** orders from Coinbase International.
847    ///
848    /// # Errors
849    ///
850    /// Returns an error if the HTTP request fails or the response cannot be parsed.
851    pub async fn request_order_status_reports(
852        &self,
853        account_id: AccountId,
854        symbol: Symbol,
855    ) -> anyhow::Result<Vec<OrderStatusReport>> {
856        let portfolio_id = account_id.get_issuers_id();
857
858        let mut params = GetOrdersParamsBuilder::default();
859        params.portfolio(portfolio_id);
860        params.instrument(symbol.as_str());
861        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
862
863        let resp = self
864            .inner
865            .http_list_open_orders(params)
866            .await
867            .map_err(|e| anyhow::anyhow!(e))?;
868
869        let ts_init = get_atomic_clock_realtime().get_time_ns();
870
871        let mut reports: Vec<OrderStatusReport> = Vec::new();
872        for order in resp.results {
873            let instrument = self.get_instrument_from_cache(order.symbol)?;
874            let report = parse_order_status_report(
875                order,
876                account_id,
877                instrument.price_precision(),
878                instrument.size_precision(),
879                ts_init,
880            )?;
881            reports.push(report);
882        }
883
884        Ok(reports)
885    }
886
887    /// Requests all fill reports from Coinbase International.
888    ///
889    /// # Errors
890    ///
891    /// Returns an error if the HTTP request fails or the response cannot be parsed.
892    pub async fn request_fill_reports(
893        &self,
894        account_id: AccountId,
895        client_order_id: Option<ClientOrderId>,
896        start: Option<DateTime<Utc>>,
897    ) -> anyhow::Result<Vec<FillReport>> {
898        let portfolio_id = account_id.get_issuers_id();
899
900        let mut params = GetPortfolioFillsParamsBuilder::default();
901        if let Some(start) = start {
902            params.time_from(start);
903        }
904        if let Some(client_order_id) = client_order_id {
905            params.client_order_id(client_order_id.to_string());
906        }
907        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
908
909        let resp = self
910            .inner
911            .http_list_portfolio_fills(portfolio_id, params)
912            .await
913            .map_err(|e| anyhow::anyhow!(e))?;
914
915        let ts_init = get_atomic_clock_realtime().get_time_ns();
916
917        let mut reports: Vec<FillReport> = Vec::new();
918        for fill in resp.results {
919            let instrument = self.get_instrument_from_cache(fill.symbol)?;
920            let report = parse_fill_report(
921                fill,
922                account_id,
923                instrument.price_precision(),
924                instrument.size_precision(),
925                ts_init,
926            )?;
927            reports.push(report);
928        }
929
930        Ok(reports)
931    }
932
933    /// Requests a position status report from Coinbase International.
934    ///
935    /// # Errors
936    ///
937    /// Returns an error if the HTTP request fails or the response cannot be parsed.
938    pub async fn request_position_status_report(
939        &self,
940        account_id: AccountId,
941        symbol: Symbol,
942    ) -> anyhow::Result<PositionStatusReport> {
943        let portfolio_id = account_id.get_issuers_id();
944
945        let resp = self
946            .inner
947            .http_get_portfolio_position(portfolio_id, symbol.as_str())
948            .await
949            .map_err(|e| anyhow::anyhow!(e))?;
950
951        let instrument = self.get_instrument_from_cache(resp.symbol)?;
952        let ts_init = get_atomic_clock_realtime().get_time_ns();
953
954        let report =
955            parse_position_status_report(resp, account_id, instrument.size_precision(), ts_init)?;
956        Ok(report)
957    }
958
959    /// Requests all position status reports from Coinbase International.
960    ///
961    /// # Errors
962    ///
963    /// Returns an error if the HTTP request fails or the response cannot be parsed.
964    pub async fn request_position_status_reports(
965        &self,
966        account_id: AccountId,
967    ) -> anyhow::Result<Vec<PositionStatusReport>> {
968        let portfolio_id = account_id.get_issuers_id();
969
970        let resp = self
971            .inner
972            .http_list_portfolio_positions(portfolio_id)
973            .await
974            .map_err(|e| anyhow::anyhow!(e))?;
975
976        let ts_init = get_atomic_clock_realtime().get_time_ns();
977
978        let mut reports: Vec<PositionStatusReport> = Vec::new();
979        for position in resp {
980            let instrument = self.get_instrument_from_cache(position.symbol)?;
981            let report = parse_position_status_report(
982                position,
983                account_id,
984                instrument.size_precision(),
985                ts_init,
986            )?;
987            reports.push(report);
988        }
989
990        Ok(reports)
991    }
992
993    /// Submits a new order to Coinbase International.
994    ///
995    /// # Errors
996    ///
997    /// Returns an error if the HTTP request fails or the response cannot be parsed.
998    #[allow(clippy::too_many_arguments)]
999    pub async fn submit_order(
1000        &self,
1001        account_id: AccountId,
1002        client_order_id: ClientOrderId,
1003        symbol: Symbol,
1004        order_side: OrderSide,
1005        order_type: OrderType,
1006        quantity: Quantity,
1007        time_in_force: TimeInForce,
1008        expire_time: Option<DateTime<Utc>>,
1009        price: Option<Price>,
1010        trigger_price: Option<Price>,
1011        post_only: Option<bool>,
1012        reduce_only: Option<bool>,
1013    ) -> anyhow::Result<OrderStatusReport> {
1014        let coinbase_side: CoinbaseIntxSide = order_side.into();
1015        let coinbase_order_type: CoinbaseIntxOrderType = order_type.into();
1016        let coinbase_tif: CoinbaseIntxTimeInForce = time_in_force.into();
1017
1018        let mut params = CreateOrderParamsBuilder::default();
1019        params.portfolio(account_id.get_issuers_id());
1020        params.client_order_id(client_order_id.as_str());
1021        params.instrument(symbol.as_str());
1022        params.side(coinbase_side);
1023        params.size(quantity.to_string());
1024        params.order_type(coinbase_order_type);
1025        params.tif(coinbase_tif);
1026        if let Some(expire_time) = expire_time {
1027            params.expire_time(expire_time);
1028        }
1029        if let Some(price) = price {
1030            params.price(price.to_string());
1031        }
1032        if let Some(trigger_price) = trigger_price {
1033            params.stop_price(trigger_price.to_string());
1034        }
1035        if let Some(post_only) = post_only {
1036            params.post_only(post_only);
1037        }
1038        if let Some(reduce_only) = reduce_only {
1039            params.close_only(reduce_only);
1040        }
1041        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1042
1043        let resp = self.inner.http_create_order(params).await?;
1044        tracing::debug!("Submitted order: {resp:?}");
1045
1046        let instrument = self.get_instrument_from_cache(resp.symbol)?;
1047        let ts_init = get_atomic_clock_realtime().get_time_ns();
1048        let report = parse_order_status_report(
1049            resp,
1050            account_id,
1051            instrument.price_precision(),
1052            instrument.size_precision(),
1053            ts_init,
1054        )?;
1055        Ok(report)
1056    }
1057
1058    /// Cancels a currently open order on Coinbase International.
1059    ///
1060    /// # Errors
1061    ///
1062    /// Returns an error if the HTTP request fails or the response cannot be parsed.
1063    pub async fn cancel_order(
1064        &self,
1065        account_id: AccountId,
1066        client_order_id: ClientOrderId,
1067    ) -> anyhow::Result<OrderStatusReport> {
1068        let portfolio_id = account_id.get_issuers_id();
1069
1070        let resp = self
1071            .inner
1072            .http_cancel_order(client_order_id.as_str(), portfolio_id)
1073            .await?;
1074        tracing::debug!("Canceled order: {resp:?}");
1075
1076        let instrument = self.get_instrument_from_cache(resp.symbol)?;
1077        let ts_init = get_atomic_clock_realtime().get_time_ns();
1078
1079        let report = parse_order_status_report(
1080            resp,
1081            account_id,
1082            instrument.price_precision(),
1083            instrument.size_precision(),
1084            ts_init,
1085        )?;
1086        Ok(report)
1087    }
1088
1089    /// Cancels all orders for the given account ID and filter params on Coinbase International.
1090    ///
1091    /// # Errors
1092    ///
1093    /// Returns an error if the HTTP request fails or the response cannot be parsed.
1094    pub async fn cancel_orders(
1095        &self,
1096        account_id: AccountId,
1097        symbol: Symbol,
1098        order_side: Option<OrderSide>,
1099    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1100        let mut params = CancelOrdersParamsBuilder::default();
1101        params.portfolio(account_id.get_issuers_id());
1102        params.instrument(symbol.as_str());
1103        if let Some(side) = order_side {
1104            let side: CoinbaseIntxSide = side.into();
1105            params.side(side);
1106        }
1107        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1108
1109        let resp = self.inner.http_cancel_orders(params).await?;
1110
1111        let instrument = self.get_instrument_from_cache(symbol.inner())?;
1112        let ts_init = get_atomic_clock_realtime().get_time_ns();
1113
1114        let mut reports: Vec<OrderStatusReport> = Vec::with_capacity(resp.len());
1115        for order in resp {
1116            tracing::debug!("Canceled order: {order:?}");
1117            let report = parse_order_status_report(
1118                order,
1119                account_id,
1120                instrument.price_precision(),
1121                instrument.size_precision(),
1122                ts_init,
1123            )?;
1124            reports.push(report);
1125        }
1126
1127        Ok(reports)
1128    }
1129
1130    /// Modifies a currently open order on Coinbase International.
1131    ///
1132    /// # Errors
1133    ///
1134    /// Returns an error if the HTTP request fails or the response cannot be parsed.
1135    #[allow(clippy::too_many_arguments)]
1136    pub async fn modify_order(
1137        &self,
1138        account_id: AccountId,
1139        client_order_id: ClientOrderId,
1140        new_client_order_id: ClientOrderId,
1141        price: Option<Price>,
1142        trigger_price: Option<Price>,
1143        quantity: Option<Quantity>,
1144    ) -> anyhow::Result<OrderStatusReport> {
1145        let mut params = ModifyOrderParamsBuilder::default();
1146        params.portfolio(account_id.get_issuers_id());
1147        params.client_order_id(new_client_order_id.as_str());
1148        if let Some(price) = price {
1149            params.price(price.to_string());
1150        }
1151        if let Some(trigger_price) = trigger_price {
1152            params.price(trigger_price.to_string());
1153        }
1154        if let Some(quantity) = quantity {
1155            params.size(quantity.to_string());
1156        }
1157        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1158
1159        let resp = self
1160            .inner
1161            .http_modify_order(client_order_id.as_str(), params)
1162            .await?;
1163        tracing::debug!("Modified order {}", resp.client_order_id);
1164
1165        let instrument = self.get_instrument_from_cache(resp.symbol)?;
1166        let ts_init = get_atomic_clock_realtime().get_time_ns();
1167        let report = parse_order_status_report(
1168            resp,
1169            account_id,
1170            instrument.price_precision(),
1171            instrument.size_precision(),
1172            ts_init,
1173        )?;
1174        Ok(report)
1175    }
1176}