nautilus_coinbase_intx/http/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides the HTTP client integration for the [Coinbase International](https://www.coinbase.com/en/international-exchange) REST API.
17//!
18//! This module defines and implements a [`CoinbaseIntxHttpClient`] for
19//! sending requests to various Coinbase endpoints. It handles request signing
20//! (when credentials are provided), constructs valid HTTP requests
21//! using the [`HttpClient`], and parses the responses back into structured data or a [`CoinbaseIntxHttpError`].
22
23use std::{
24    collections::HashMap,
25    num::NonZeroU32,
26    sync::{Arc, LazyLock, Mutex},
27};
28
29use anyhow::Context;
30use chrono::{DateTime, Utc};
31use nautilus_core::{
32    MUTEX_POISONED, UnixNanos, consts::NAUTILUS_USER_AGENT, env::get_or_env_var,
33    time::get_atomic_clock_realtime,
34};
35use nautilus_model::{
36    enums::{OrderSide, OrderType, TimeInForce},
37    events::AccountState,
38    identifiers::{AccountId, ClientOrderId, Symbol, VenueOrderId},
39    instruments::{Instrument, InstrumentAny},
40    reports::{FillReport, OrderStatusReport, PositionStatusReport},
41    types::{Price, Quantity},
42};
43use nautilus_network::{
44    http::{HttpClient, HttpClientError, Method, StatusCode, USER_AGENT},
45    ratelimiter::quota::Quota,
46};
47use serde::{Deserialize, Serialize, de::DeserializeOwned};
48use ustr::Ustr;
49
50use super::{
51    error::CoinbaseIntxHttpError,
52    models::{
53        CoinbaseIntxAsset, CoinbaseIntxBalance, CoinbaseIntxFeeTier, CoinbaseIntxFillList,
54        CoinbaseIntxInstrument, CoinbaseIntxOrder, CoinbaseIntxOrderList, CoinbaseIntxPortfolio,
55        CoinbaseIntxPortfolioDetails, CoinbaseIntxPortfolioFeeRates, CoinbaseIntxPortfolioSummary,
56        CoinbaseIntxPosition,
57    },
58    parse::{
59        parse_account_state, parse_fill_report, parse_instrument_any, parse_order_status_report,
60        parse_position_status_report,
61    },
62    query::{
63        CancelOrderParams, CancelOrdersParams, CreateOrderParams, CreateOrderParamsBuilder,
64        GetOrderParams, GetOrdersParams, GetOrdersParamsBuilder, GetPortfolioFillsParams,
65        GetPortfolioFillsParamsBuilder, ModifyOrderParams,
66    },
67};
68use crate::{
69    common::{
70        consts::COINBASE_INTX_REST_URL,
71        credential::Credential,
72        enums::{CoinbaseIntxOrderType, CoinbaseIntxSide, CoinbaseIntxTimeInForce},
73    },
74    http::{
75        error::ErrorBody,
76        query::{CancelOrdersParamsBuilder, ModifyOrderParamsBuilder},
77    },
78};
79
80/// Represents an Coinbase HTTP response.
81#[derive(Debug, Serialize, Deserialize)]
82pub struct CoinbaseIntxResponse<T> {
83    /// The Coinbase response code, which is `"0"` for success.
84    pub code: String,
85    /// A message string which can be informational or describe an error cause.
86    pub msg: String,
87    /// The typed data returned by the Coinbase endpoint.
88    pub data: Vec<T>,
89}
90
91// https://docs.cdp.coinbase.com/intx/docs/rate-limits#rest-api-rate-limits
92pub static COINBASE_INTX_REST_QUOTA: LazyLock<Quota> =
93    LazyLock::new(|| Quota::per_second(NonZeroU32::new(100).unwrap()));
94
95/// Provides a lower-level HTTP client for connecting to the [Coinbase International](https://coinbase.com) REST API.
96///
97/// This client wraps the underlying `HttpClient` to handle functionality
98/// specific to Coinbase, such as request signing (for authenticated endpoints),
99/// forming request URLs, and deserializing responses into specific data models.
100#[derive(Debug, Clone)]
101pub struct CoinbaseIntxHttpInnerClient {
102    base_url: String,
103    client: HttpClient,
104    credential: Option<Credential>,
105}
106
107impl Default for CoinbaseIntxHttpInnerClient {
108    fn default() -> Self {
109        Self::new(None, Some(60)).expect("Failed to create default Coinbase INTX HTTP client")
110    }
111}
112
113impl CoinbaseIntxHttpInnerClient {
114    /// Creates a new [`CoinbaseIntxHttpClient`] using the default Coinbase HTTP URL,
115    /// optionally overridden with a custom base url.
116    ///
117    /// This version of the client has **no credentials**, so it can only
118    /// call publicly accessible endpoints.
119    ///
120    /// # Errors
121    ///
122    /// Returns an error if the HTTP client cannot be created.
123    pub fn new(
124        base_url: Option<String>,
125        timeout_secs: Option<u64>,
126    ) -> Result<Self, HttpClientError> {
127        Ok(Self {
128            base_url: base_url.unwrap_or(COINBASE_INTX_REST_URL.to_string()),
129            client: HttpClient::new(
130                Self::default_headers(),
131                vec![],
132                vec![],
133                Some(*COINBASE_INTX_REST_QUOTA),
134                timeout_secs,
135                None, // proxy_url
136            )?,
137            credential: None,
138        })
139    }
140
141    /// Creates a new [`CoinbaseIntxHttpClient`] configured with credentials
142    /// for authenticated requests, optionally using a custom base url.
143    ///
144    /// # Errors
145    ///
146    /// Returns an error if the HTTP client cannot be created.
147    pub fn with_credentials(
148        api_key: String,
149        api_secret: String,
150        api_passphrase: String,
151        base_url: String,
152        timeout_secs: Option<u64>,
153    ) -> Result<Self, HttpClientError> {
154        Ok(Self {
155            base_url,
156            client: HttpClient::new(
157                Self::default_headers(),
158                vec![],
159                vec![],
160                Some(*COINBASE_INTX_REST_QUOTA),
161                timeout_secs,
162                None, // proxy_url
163            )?,
164            credential: Some(Credential::new(api_key, api_secret, api_passphrase)),
165        })
166    }
167
168    /// Builds the default headers to include with each request (e.g., `User-Agent`).
169    fn default_headers() -> HashMap<String, String> {
170        HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())])
171    }
172
173    /// Signs an Coinbase request with timestamp, API key, passphrase, and signature.
174    ///
175    /// # Errors
176    ///
177    /// Returns [`CoinbaseHttpError::MissingCredentials`] if no credentials are set
178    /// but the request requires authentication.
179    fn sign_request(
180        &self,
181        method: &Method,
182        path: &str,
183        body: Option<&[u8]>,
184    ) -> Result<HashMap<String, String>, CoinbaseIntxHttpError> {
185        let credential = match self.credential.as_ref() {
186            Some(c) => c,
187            None => return Err(CoinbaseIntxHttpError::MissingCredentials),
188        };
189
190        let api_key = credential.api_key.clone().to_string();
191        let api_passphrase = credential.api_passphrase.clone().to_string();
192        let timestamp = Utc::now().timestamp().to_string();
193        let body_str = body
194            .and_then(|b| String::from_utf8(b.to_vec()).ok())
195            .unwrap_or_default();
196
197        let signature = credential.sign(&timestamp, method.as_str(), path, &body_str);
198
199        let mut headers = HashMap::new();
200        headers.insert("Accept".to_string(), "application/json".to_string());
201        headers.insert("CB-ACCESS-KEY".to_string(), api_key);
202        headers.insert("CB-ACCESS-PASSPHRASE".to_string(), api_passphrase);
203        headers.insert("CB-ACCESS-SIGN".to_string(), signature);
204        headers.insert("CB-ACCESS-TIMESTAMP".to_string(), timestamp);
205        headers.insert("Content-Type".to_string(), "application/json".to_string());
206
207        Ok(headers)
208    }
209
210    /// Sends an HTTP request to Coinbase International and parses the response into type `T`.
211    ///
212    /// Internally, this method handles:
213    /// - Building the URL from `base_url` + `path`.
214    /// - Optionally signing the request.
215    /// - Deserializing JSON responses into typed models, or returning a [`CoinbaseIntxHttpError`].
216    async fn send_request<T: DeserializeOwned, P: Serialize>(
217        &self,
218        method: Method,
219        path: &str,
220        params: Option<&P>,
221        body: Option<Vec<u8>>,
222        authenticate: bool,
223    ) -> Result<T, CoinbaseIntxHttpError> {
224        let params_str = params
225            .map(serde_urlencoded::to_string)
226            .transpose()
227            .map_err(|e| {
228                CoinbaseIntxHttpError::JsonError(format!("Failed to serialize params: {e}"))
229            })?;
230
231        let full_path = if let Some(ref query) = params_str {
232            if query.is_empty() {
233                path.to_string()
234            } else {
235                format!("{path}?{query}")
236            }
237        } else {
238            path.to_string()
239        };
240
241        let url = format!("{}{}", self.base_url, full_path);
242
243        let headers = if authenticate {
244            Some(self.sign_request(&method, &full_path, body.as_deref())?)
245        } else {
246            None
247        };
248
249        tracing::trace!("Request: {url:?} {body:?}");
250
251        let resp = self
252            .client
253            .request(method.clone(), url, None, headers, body, None, None)
254            .await?;
255
256        tracing::trace!("Response: {resp:?}");
257
258        if resp.status.is_success() {
259            let coinbase_response: T = serde_json::from_slice(&resp.body).map_err(|e| {
260                tracing::error!("Failed to deserialize CoinbaseResponse: {e}");
261                CoinbaseIntxHttpError::JsonError(e.to_string())
262            })?;
263
264            Ok(coinbase_response)
265        } else {
266            let error_body = String::from_utf8_lossy(&resp.body);
267            tracing::error!(
268                "HTTP error {} with body: {error_body}",
269                resp.status.as_str()
270            );
271
272            if let Ok(parsed_error) = serde_json::from_slice::<CoinbaseIntxResponse<T>>(&resp.body)
273            {
274                return Err(CoinbaseIntxHttpError::CoinbaseError {
275                    error_code: parsed_error.code,
276                    message: parsed_error.msg,
277                });
278            }
279
280            if let Ok(parsed_error) = serde_json::from_slice::<ErrorBody>(&resp.body)
281                && let (Some(title), Some(error)) = (parsed_error.title, parsed_error.error)
282            {
283                return Err(CoinbaseIntxHttpError::CoinbaseError {
284                    error_code: error,
285                    message: title,
286                });
287            }
288
289            Err(CoinbaseIntxHttpError::UnexpectedStatus {
290                status: StatusCode::from_u16(resp.status.as_u16()).unwrap(),
291                body: error_body.to_string(),
292            })
293        }
294    }
295
296    /// Requests a list of all supported assets.
297    ///
298    /// See <https://docs.cdp.coinbase.com/intx/reference/getassets>.
299    /// # Errors
300    ///
301    /// Returns an error if the HTTP request fails or the response cannot be parsed.
302    pub async fn http_list_assets(&self) -> Result<Vec<CoinbaseIntxAsset>, CoinbaseIntxHttpError> {
303        let path = "/api/v1/assets";
304        self.send_request::<_, ()>(Method::GET, path, None, None, false)
305            .await
306    }
307
308    /// Requests information for a specific asset.
309    ///
310    /// See <https://docs.cdp.coinbase.com/intx/reference/getasset>.
311    /// # Errors
312    ///
313    /// Returns an error if the HTTP request fails or the response cannot be parsed.
314    pub async fn http_get_asset_details(
315        &self,
316        asset: &str,
317    ) -> Result<CoinbaseIntxAsset, CoinbaseIntxHttpError> {
318        let path = format!("/api/v1/assets/{asset}");
319        self.send_request::<_, ()>(Method::GET, &path, None, None, false)
320            .await
321    }
322
323    /// Requests all instruments available for trading.
324    ///
325    /// See <https://docs.cdp.coinbase.com/intx/reference/getinstruments>.
326    /// # Errors
327    ///
328    /// Returns an error if the HTTP request fails or the response cannot be parsed.
329    pub async fn http_list_instruments(
330        &self,
331    ) -> Result<Vec<CoinbaseIntxInstrument>, CoinbaseIntxHttpError> {
332        let path = "/api/v1/instruments";
333        self.send_request::<_, ()>(Method::GET, path, None, None, false)
334            .await
335    }
336
337    /// Retrieve a list of instruments with open contracts.
338    ///
339    /// See <https://docs.cdp.coinbase.com/intx/reference/getinstrument>.
340    /// # Errors
341    ///
342    /// Returns an error if the HTTP request fails or the response cannot be parsed.
343    pub async fn http_get_instrument_details(
344        &self,
345        symbol: &str,
346    ) -> Result<CoinbaseIntxInstrument, CoinbaseIntxHttpError> {
347        let path = format!("/api/v1/instruments/{symbol}");
348        self.send_request::<_, ()>(Method::GET, &path, None, None, false)
349            .await
350    }
351
352    /// Return all the fee rate tiers.
353    ///
354    /// See <https://docs.cdp.coinbase.com/intx/reference/getassets>.
355    /// # Errors
356    ///
357    /// Returns an error if the HTTP request fails or the response cannot be parsed.
358    pub async fn http_list_fee_rate_tiers(
359        &self,
360    ) -> Result<Vec<CoinbaseIntxFeeTier>, CoinbaseIntxHttpError> {
361        let path = "/api/v1/fee-rate-tiers";
362        self.send_request::<_, ()>(Method::GET, path, None, None, true)
363            .await
364    }
365
366    /// List all user portfolios.
367    ///
368    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfolios>.
369    /// # Errors
370    ///
371    /// Returns an error if the HTTP request fails or the response cannot be parsed.
372    pub async fn http_list_portfolios(
373        &self,
374    ) -> Result<Vec<CoinbaseIntxPortfolio>, CoinbaseIntxHttpError> {
375        let path = "/api/v1/portfolios";
376        self.send_request::<_, ()>(Method::GET, path, None, None, true)
377            .await
378    }
379
380    /// Returns the user's specified portfolio.
381    ///
382    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfolio>.
383    /// # Errors
384    ///
385    /// Returns an error if the HTTP request fails or the response cannot be parsed.
386    pub async fn http_get_portfolio(
387        &self,
388        portfolio_id: &str,
389    ) -> Result<CoinbaseIntxPortfolio, CoinbaseIntxHttpError> {
390        let path = format!("/api/v1/portfolios/{portfolio_id}");
391        self.send_request::<_, ()>(Method::GET, &path, None, None, true)
392            .await
393    }
394
395    /// Retrieves the summary, positions, and balances of a portfolio.
396    ///
397    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfoliodetail>.
398    /// # Errors
399    ///
400    /// Returns an error if the HTTP request fails or the response cannot be parsed.
401    pub async fn http_get_portfolio_details(
402        &self,
403        portfolio_id: &str,
404    ) -> Result<CoinbaseIntxPortfolioDetails, CoinbaseIntxHttpError> {
405        let path = format!("/api/v1/portfolios/{portfolio_id}/detail");
406        self.send_request::<_, ()>(Method::GET, &path, None, None, true)
407            .await
408    }
409
410    /// Retrieves the high level overview of a portfolio.
411    ///
412    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfoliosummary>.
413    /// # Errors
414    ///
415    /// Returns an error if the HTTP request fails or the response cannot be parsed.
416    pub async fn http_get_portfolio_summary(
417        &self,
418        portfolio_id: &str,
419    ) -> Result<CoinbaseIntxPortfolioSummary, CoinbaseIntxHttpError> {
420        let path = format!("/api/v1/portfolios/{portfolio_id}/summary");
421        self.send_request::<_, ()>(Method::GET, &path, None, None, true)
422            .await
423    }
424
425    /// Returns all balances for a given portfolio.
426    ///
427    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfoliobalances>.
428    /// # Errors
429    ///
430    /// Returns an error if the HTTP request fails or the response cannot be parsed.
431    pub async fn http_list_portfolio_balances(
432        &self,
433        portfolio_id: &str,
434    ) -> Result<Vec<CoinbaseIntxBalance>, CoinbaseIntxHttpError> {
435        let path = format!("/api/v1/portfolios/{portfolio_id}/balances");
436        self.send_request::<_, ()>(Method::GET, &path, None, None, true)
437            .await
438    }
439
440    /// Retrieves the balance for a given portfolio and asset.
441    ///
442    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfoliobalance>.
443    /// # Errors
444    ///
445    /// Returns an error if the HTTP request fails or the response cannot be parsed.
446    pub async fn http_get_portfolio_balance(
447        &self,
448        portfolio_id: &str,
449        asset: &str,
450    ) -> Result<CoinbaseIntxBalance, CoinbaseIntxHttpError> {
451        let path = format!("/api/v1/portfolios/{portfolio_id}/balances/{asset}");
452        self.send_request::<_, ()>(Method::GET, &path, None, None, true)
453            .await
454    }
455
456    /// Returns all fills for a given portfolio.
457    ///
458    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfoliofills>.
459    /// # Errors
460    ///
461    /// Returns an error if the HTTP request fails or the response cannot be parsed.
462    pub async fn http_list_portfolio_fills(
463        &self,
464        portfolio_id: &str,
465        params: GetPortfolioFillsParams,
466    ) -> Result<CoinbaseIntxFillList, CoinbaseIntxHttpError> {
467        let path = format!("/api/v1/portfolios/{portfolio_id}/fills");
468        self.send_request(Method::GET, &path, Some(&params), None, true)
469            .await
470    }
471
472    /// Returns all positions for a given portfolio.
473    ///
474    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfoliopositions>.
475    /// # Errors
476    ///
477    /// Returns an error if the HTTP request fails or the response cannot be parsed.
478    pub async fn http_list_portfolio_positions(
479        &self,
480        portfolio_id: &str,
481    ) -> Result<Vec<CoinbaseIntxPosition>, CoinbaseIntxHttpError> {
482        let path = format!("/api/v1/portfolios/{portfolio_id}/positions");
483        self.send_request::<_, ()>(Method::GET, &path, None, None, true)
484            .await
485    }
486
487    /// Retrieves the position for a given portfolio and symbol.
488    ///
489    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfolioposition>.
490    /// # Errors
491    ///
492    /// Returns an error if the HTTP request fails or the response cannot be parsed.
493    pub async fn http_get_portfolio_position(
494        &self,
495        portfolio_id: &str,
496        symbol: &str,
497    ) -> Result<CoinbaseIntxPosition, CoinbaseIntxHttpError> {
498        let path = format!("/api/v1/portfolios/{portfolio_id}/positions/{symbol}");
499        self.send_request::<_, ()>(Method::GET, &path, None, None, true)
500            .await
501    }
502
503    /// Retrieves the Perpetual Future and Spot fee rate tiers for the user.
504    ///
505    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfoliosfeerates>.
506    /// # Errors
507    ///
508    /// Returns an error if the HTTP request fails or the response cannot be parsed.
509    pub async fn http_list_portfolio_fee_rates(
510        &self,
511    ) -> Result<Vec<CoinbaseIntxPortfolioFeeRates>, CoinbaseIntxHttpError> {
512        let path = "/api/v1/portfolios/fee-rates";
513        self.send_request::<_, ()>(Method::GET, path, None, None, true)
514            .await
515    }
516
517    /// Create a new order.
518    /// # Errors
519    ///
520    /// Returns an error if the HTTP request fails or the response cannot be parsed.
521    pub async fn http_create_order(
522        &self,
523        params: CreateOrderParams,
524    ) -> Result<CoinbaseIntxOrder, CoinbaseIntxHttpError> {
525        let path = "/api/v1/orders";
526        let body = serde_json::to_vec(&params)
527            .map_err(|e| CoinbaseIntxHttpError::JsonError(e.to_string()))?;
528        self.send_request::<_, ()>(Method::POST, path, None, Some(body), true)
529            .await
530    }
531
532    /// Retrieves a single order. The order retrieved can be either active or inactive.
533    ///
534    /// See <https://docs.cdp.coinbase.com/intx/reference/getorder>.
535    /// # Errors
536    ///
537    /// Returns an error if the HTTP request fails or the response cannot be parsed.
538    pub async fn http_get_order(
539        &self,
540        venue_order_id: &str,
541        portfolio_id: &str,
542    ) -> Result<CoinbaseIntxOrder, CoinbaseIntxHttpError> {
543        let params = GetOrderParams {
544            portfolio: portfolio_id.to_string(),
545        };
546        let path = format!("/api/v1/orders/{venue_order_id}");
547        self.send_request(Method::GET, &path, Some(&params), None, true)
548            .await
549    }
550
551    /// Returns a list of active orders resting on the order book matching the requested criteria.
552    /// Does not return any rejected, cancelled, or fully filled orders as they are not active.
553    ///
554    /// See <https://docs.cdp.coinbase.com/intx/reference/getorders>.
555    /// # Errors
556    ///
557    /// Returns an error if the HTTP request fails or the response cannot be parsed.
558    pub async fn http_list_open_orders(
559        &self,
560        params: GetOrdersParams,
561    ) -> Result<CoinbaseIntxOrderList, CoinbaseIntxHttpError> {
562        self.send_request(Method::GET, "/api/v1/orders", Some(&params), None, true)
563            .await
564    }
565
566    /// Cancels a single open order.
567    /// # Errors
568    ///
569    /// Returns an error if the HTTP request fails or the response cannot be parsed.
570    pub async fn http_cancel_order(
571        &self,
572        client_order_id: &str,
573        portfolio_id: &str,
574    ) -> Result<CoinbaseIntxOrder, CoinbaseIntxHttpError> {
575        let params = CancelOrderParams {
576            portfolio: portfolio_id.to_string(),
577        };
578        let path = format!("/api/v1/orders/{client_order_id}");
579        self.send_request(Method::DELETE, &path, Some(&params), None, true)
580            .await
581    }
582
583    /// Cancel user orders.
584    /// # Errors
585    ///
586    /// Returns an error if the HTTP request fails or the response cannot be parsed.
587    pub async fn http_cancel_orders(
588        &self,
589        params: CancelOrdersParams,
590    ) -> Result<Vec<CoinbaseIntxOrder>, CoinbaseIntxHttpError> {
591        self.send_request(Method::DELETE, "/api/v1/orders", Some(&params), None, true)
592            .await
593    }
594
595    /// Modify an open order.
596    ///
597    /// See <https://docs.cdp.coinbase.com/intx/reference/modifyorder>.
598    /// # Errors
599    ///
600    /// Returns an error if the HTTP request fails or the response cannot be parsed.
601    pub async fn http_modify_order(
602        &self,
603        order_id: &str,
604        params: ModifyOrderParams,
605    ) -> Result<CoinbaseIntxOrder, CoinbaseIntxHttpError> {
606        let path = format!("/api/v1/orders/{order_id}");
607        let body = serde_json::to_vec(&params)
608            .map_err(|e| CoinbaseIntxHttpError::JsonError(e.to_string()))?;
609        self.send_request::<_, ()>(Method::PUT, &path, None, Some(body), true)
610            .await
611    }
612}
613
614/// Provides a higher-level HTTP client for the [Coinbase International](https://coinbase.com) REST API.
615///
616/// This client wraps the underlying `CoinbaseIntxHttpInnerClient` to handle conversions
617/// into the Nautilus domain model.
618#[derive(Debug, Clone)]
619#[cfg_attr(
620    feature = "python",
621    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
622)]
623pub struct CoinbaseIntxHttpClient {
624    pub(crate) inner: Arc<CoinbaseIntxHttpInnerClient>,
625    pub(crate) instruments_cache: Arc<Mutex<HashMap<Ustr, InstrumentAny>>>,
626    cache_initialized: bool,
627}
628
629impl Default for CoinbaseIntxHttpClient {
630    fn default() -> Self {
631        Self::new(None, Some(60)).expect("Failed to create default Coinbase INTX HTTP client")
632    }
633}
634
635impl CoinbaseIntxHttpClient {
636    /// Creates a new [`CoinbaseIntxHttpClient`] using the default Coinbase HTTP URL,
637    /// optionally overridden with a custom base url.
638    ///
639    /// This version of the client has **no credentials**, so it can only
640    /// call publicly accessible endpoints.
641    ///
642    /// # Errors
643    ///
644    /// Returns an error if the HTTP client cannot be created.
645    pub fn new(
646        base_url: Option<String>,
647        timeout_secs: Option<u64>,
648    ) -> Result<Self, HttpClientError> {
649        Ok(Self {
650            inner: Arc::new(CoinbaseIntxHttpInnerClient::new(base_url, timeout_secs)?),
651            instruments_cache: Arc::new(Mutex::new(HashMap::new())),
652            cache_initialized: false,
653        })
654    }
655
656    /// Creates a new authenticated [`CoinbaseIntxHttpClient`] using environment variables and
657    /// the default Coinbase International HTTP base url.
658    ///
659    /// # Errors
660    ///
661    /// Returns an error if required environment variables are missing or invalid.
662    pub fn from_env() -> anyhow::Result<Self> {
663        Self::with_credentials(None, None, None, None, None)
664    }
665
666    /// Creates a new [`CoinbaseIntxHttpClient`] configured with credentials
667    /// for authenticated requests, optionally using a custom base url.
668    ///
669    /// # Errors
670    ///
671    /// Returns an error if required environment variables are missing or invalid.
672    pub fn with_credentials(
673        api_key: Option<String>,
674        api_secret: Option<String>,
675        api_passphrase: Option<String>,
676        base_url: Option<String>,
677        timeout_secs: Option<u64>,
678    ) -> anyhow::Result<Self> {
679        let api_key = get_or_env_var(api_key, "COINBASE_INTX_API_KEY")?;
680        let api_secret = get_or_env_var(api_secret, "COINBASE_INTX_API_SECRET")?;
681        let api_passphrase = get_or_env_var(api_passphrase, "COINBASE_INTX_API_PASSPHRASE")?;
682        let base_url = base_url.unwrap_or(COINBASE_INTX_REST_URL.to_string());
683        Ok(Self {
684            inner: Arc::new(
685                CoinbaseIntxHttpInnerClient::with_credentials(
686                    api_key,
687                    api_secret,
688                    api_passphrase,
689                    base_url,
690                    timeout_secs,
691                )
692                .context("failed to create Coinbase INTX HTTP client")?,
693            ),
694            instruments_cache: Arc::new(Mutex::new(HashMap::new())),
695            cache_initialized: false,
696        })
697    }
698
699    fn get_instrument_from_cache(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
700        match self
701            .instruments_cache
702            .lock()
703            .expect(MUTEX_POISONED)
704            .get(&symbol)
705        {
706            Some(inst) => Ok(inst.clone()), // TODO: Remove this clone
707            None => anyhow::bail!("Unable to process request, instrument {symbol} not in cache"),
708        }
709    }
710
711    fn generate_ts_init(&self) -> UnixNanos {
712        get_atomic_clock_realtime().get_time_ns()
713    }
714
715    /// Returns the base url being used by the client.
716    #[must_use]
717    pub fn base_url(&self) -> &str {
718        self.inner.base_url.as_str()
719    }
720
721    /// Returns the public API key being used by the client.
722    #[must_use]
723    pub fn api_key(&self) -> Option<&str> {
724        self.inner.credential.as_ref().map(|c| c.api_key.as_str())
725    }
726
727    /// Returns a masked version of the API key for logging purposes.
728    #[must_use]
729    pub fn api_key_masked(&self) -> Option<String> {
730        self.inner.credential.as_ref().map(|c| c.api_key_masked())
731    }
732
733    /// Checks if the client is initialized.
734    ///
735    /// The client is considered initialized if any instruments have been cached from the venue.
736    #[must_use]
737    pub const fn is_initialized(&self) -> bool {
738        self.cache_initialized
739    }
740
741    /// Returns the cached instrument symbols.
742    ///
743    /// # Panics
744    ///
745    /// Panics if the instrument cache mutex is poisoned.
746    #[must_use]
747    pub fn get_cached_symbols(&self) -> Vec<String> {
748        self.instruments_cache
749            .lock()
750            .unwrap()
751            .keys()
752            .map(ToString::to_string)
753            .collect()
754    }
755
756    /// Adds the given instruments into the clients instrument cache.
757    ///
758    /// # Panics
759    ///
760    /// Panics if the instrument cache mutex is poisoned.
761    ///
762    /// Any existing instruments will be replaced.
763    pub fn add_instruments(&mut self, instruments: Vec<InstrumentAny>) {
764        for inst in instruments {
765            self.instruments_cache
766                .lock()
767                .unwrap()
768                .insert(inst.raw_symbol().inner(), inst);
769        }
770        self.cache_initialized = true;
771    }
772
773    /// Adds the given instrument into the clients instrument cache.
774    ///
775    /// # Panics
776    ///
777    /// Panics if the instrument cache mutex is poisoned.
778    ///
779    /// Any existing instrument will be replaced.
780    pub fn add_instrument(&mut self, instrument: InstrumentAny) {
781        self.instruments_cache
782            .lock()
783            .unwrap()
784            .insert(instrument.raw_symbol().inner(), instrument);
785        self.cache_initialized = true;
786    }
787
788    /// Requests a list of portfolio details from Coinbase International.
789    ///
790    /// # Errors
791    ///
792    /// Returns an error if the HTTP request fails or the response cannot be parsed.
793    pub async fn list_portfolios(&self) -> anyhow::Result<Vec<CoinbaseIntxPortfolio>> {
794        let resp = self
795            .inner
796            .http_list_portfolios()
797            .await
798            .map_err(|e| anyhow::anyhow!(e))?;
799
800        Ok(resp)
801    }
802
803    /// Requests the account state for the given account ID from Coinbase International.
804    ///
805    /// # Errors
806    ///
807    /// Returns an error if the HTTP request fails or the response cannot be parsed.
808    pub async fn request_account_state(
809        &self,
810        account_id: AccountId,
811    ) -> anyhow::Result<AccountState> {
812        let resp = self
813            .inner
814            .http_list_portfolio_balances(account_id.get_issuers_id())
815            .await
816            .map_err(|e| anyhow::anyhow!(e))?;
817
818        let ts_init = self.generate_ts_init();
819        let account_state = parse_account_state(resp, account_id, ts_init)?;
820
821        Ok(account_state)
822    }
823
824    /// Requests all instruments from Coinbase International.
825    ///
826    /// # Errors
827    ///
828    /// Returns an error if the HTTP request fails or the response cannot be parsed.
829    pub async fn request_instruments(&self) -> anyhow::Result<Vec<InstrumentAny>> {
830        let resp = self
831            .inner
832            .http_list_instruments()
833            .await
834            .map_err(|e| anyhow::anyhow!(e))?;
835
836        let ts_init = self.generate_ts_init();
837
838        let mut instruments: Vec<InstrumentAny> = Vec::new();
839        for inst in &resp {
840            let instrument_any = parse_instrument_any(inst, ts_init);
841            if let Some(instrument_any) = instrument_any {
842                instruments.push(instrument_any);
843            }
844        }
845
846        Ok(instruments)
847    }
848
849    /// Requests the instrument for the given symbol from Coinbase International.
850    ///
851    /// # Errors
852    ///
853    /// Returns an error if the HTTP request fails or the instrument cannot be parsed.
854    pub async fn request_instrument(&self, symbol: &Symbol) -> anyhow::Result<InstrumentAny> {
855        let resp = self
856            .inner
857            .http_get_instrument_details(symbol.as_str())
858            .await
859            .map_err(|e| anyhow::anyhow!(e))?;
860
861        let ts_init = self.generate_ts_init();
862
863        match parse_instrument_any(&resp, ts_init) {
864            Some(inst) => Ok(inst),
865            None => anyhow::bail!("Unable to parse instrument"),
866        }
867    }
868
869    /// Requests an order status report for the given venue order ID from Coinbase International.
870    ///
871    /// # Errors
872    ///
873    /// Returns an error if the HTTP request fails or the response cannot be parsed.
874    pub async fn request_order_status_report(
875        &self,
876        account_id: AccountId,
877        venue_order_id: VenueOrderId,
878    ) -> anyhow::Result<OrderStatusReport> {
879        let portfolio_id = account_id.get_issuers_id();
880
881        let resp = self
882            .inner
883            .http_get_order(venue_order_id.as_str(), portfolio_id)
884            .await
885            .map_err(|e| anyhow::anyhow!(e))?;
886
887        let instrument = self.get_instrument_from_cache(resp.symbol)?;
888        let ts_init = self.generate_ts_init();
889
890        let report = parse_order_status_report(
891            resp,
892            account_id,
893            instrument.price_precision(),
894            instrument.size_precision(),
895            ts_init,
896        )?;
897        Ok(report)
898    }
899
900    /// Requests order status reports for all **open** orders from Coinbase International.
901    ///
902    /// # Errors
903    ///
904    /// Returns an error if the HTTP request fails or the response cannot be parsed.
905    pub async fn request_order_status_reports(
906        &self,
907        account_id: AccountId,
908        symbol: Symbol,
909    ) -> anyhow::Result<Vec<OrderStatusReport>> {
910        let portfolio_id = account_id.get_issuers_id();
911
912        let mut params = GetOrdersParamsBuilder::default();
913        params.portfolio(portfolio_id);
914        params.instrument(symbol.as_str());
915        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
916
917        let resp = self
918            .inner
919            .http_list_open_orders(params)
920            .await
921            .map_err(|e| anyhow::anyhow!(e))?;
922
923        let ts_init = get_atomic_clock_realtime().get_time_ns();
924
925        let mut reports: Vec<OrderStatusReport> = Vec::new();
926        for order in resp.results {
927            let instrument = self.get_instrument_from_cache(order.symbol)?;
928            let report = parse_order_status_report(
929                order,
930                account_id,
931                instrument.price_precision(),
932                instrument.size_precision(),
933                ts_init,
934            )?;
935            reports.push(report);
936        }
937
938        Ok(reports)
939    }
940
941    /// Requests all fill reports from Coinbase International.
942    ///
943    /// # Errors
944    ///
945    /// Returns an error if the HTTP request fails or the response cannot be parsed.
946    pub async fn request_fill_reports(
947        &self,
948        account_id: AccountId,
949        client_order_id: Option<ClientOrderId>,
950        start: Option<DateTime<Utc>>,
951    ) -> anyhow::Result<Vec<FillReport>> {
952        let portfolio_id = account_id.get_issuers_id();
953
954        let mut params = GetPortfolioFillsParamsBuilder::default();
955        if let Some(start) = start {
956            params.time_from(start);
957        }
958        if let Some(client_order_id) = client_order_id {
959            params.client_order_id(client_order_id.to_string());
960        }
961        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
962
963        let resp = self
964            .inner
965            .http_list_portfolio_fills(portfolio_id, params)
966            .await
967            .map_err(|e| anyhow::anyhow!(e))?;
968
969        let ts_init = get_atomic_clock_realtime().get_time_ns();
970
971        let mut reports: Vec<FillReport> = Vec::new();
972        for fill in resp.results {
973            let instrument = self.get_instrument_from_cache(fill.symbol)?;
974            let report = parse_fill_report(
975                fill,
976                account_id,
977                instrument.price_precision(),
978                instrument.size_precision(),
979                ts_init,
980            )?;
981            reports.push(report);
982        }
983
984        Ok(reports)
985    }
986
987    /// Requests a position status report from Coinbase International.
988    ///
989    /// # Errors
990    ///
991    /// Returns an error if the HTTP request fails or the response cannot be parsed.
992    pub async fn request_position_status_report(
993        &self,
994        account_id: AccountId,
995        symbol: Symbol,
996    ) -> anyhow::Result<PositionStatusReport> {
997        let portfolio_id = account_id.get_issuers_id();
998
999        let resp = self
1000            .inner
1001            .http_get_portfolio_position(portfolio_id, symbol.as_str())
1002            .await
1003            .map_err(|e| anyhow::anyhow!(e))?;
1004
1005        let instrument = self.get_instrument_from_cache(resp.symbol)?;
1006        let ts_init = get_atomic_clock_realtime().get_time_ns();
1007
1008        let report =
1009            parse_position_status_report(resp, account_id, instrument.size_precision(), ts_init)?;
1010        Ok(report)
1011    }
1012
1013    /// Requests all position status reports from Coinbase International.
1014    ///
1015    /// # Errors
1016    ///
1017    /// Returns an error if the HTTP request fails or the response cannot be parsed.
1018    pub async fn request_position_status_reports(
1019        &self,
1020        account_id: AccountId,
1021    ) -> anyhow::Result<Vec<PositionStatusReport>> {
1022        let portfolio_id = account_id.get_issuers_id();
1023
1024        let resp = self
1025            .inner
1026            .http_list_portfolio_positions(portfolio_id)
1027            .await
1028            .map_err(|e| anyhow::anyhow!(e))?;
1029
1030        let ts_init = get_atomic_clock_realtime().get_time_ns();
1031
1032        let mut reports: Vec<PositionStatusReport> = Vec::new();
1033        for position in resp {
1034            let instrument = self.get_instrument_from_cache(position.symbol)?;
1035            let report = parse_position_status_report(
1036                position,
1037                account_id,
1038                instrument.size_precision(),
1039                ts_init,
1040            )?;
1041            reports.push(report);
1042        }
1043
1044        Ok(reports)
1045    }
1046
1047    /// Submits a new order to Coinbase International.
1048    ///
1049    /// # Errors
1050    ///
1051    /// Returns an error if the HTTP request fails or the response cannot be parsed.
1052    #[allow(clippy::too_many_arguments)]
1053    pub async fn submit_order(
1054        &self,
1055        account_id: AccountId,
1056        client_order_id: ClientOrderId,
1057        symbol: Symbol,
1058        order_side: OrderSide,
1059        order_type: OrderType,
1060        quantity: Quantity,
1061        time_in_force: TimeInForce,
1062        expire_time: Option<DateTime<Utc>>,
1063        price: Option<Price>,
1064        trigger_price: Option<Price>,
1065        post_only: Option<bool>,
1066        reduce_only: Option<bool>,
1067    ) -> anyhow::Result<OrderStatusReport> {
1068        let coinbase_side: CoinbaseIntxSide = order_side.into();
1069        let coinbase_order_type: CoinbaseIntxOrderType = order_type.into();
1070        let coinbase_tif: CoinbaseIntxTimeInForce = time_in_force.into();
1071
1072        let mut params = CreateOrderParamsBuilder::default();
1073        params.portfolio(account_id.get_issuers_id());
1074        params.client_order_id(client_order_id.as_str());
1075        params.instrument(symbol.as_str());
1076        params.side(coinbase_side);
1077        params.size(quantity.to_string());
1078        params.order_type(coinbase_order_type);
1079        params.tif(coinbase_tif);
1080        if let Some(expire_time) = expire_time {
1081            params.expire_time(expire_time);
1082        }
1083        if let Some(price) = price {
1084            params.price(price.to_string());
1085        }
1086        if let Some(trigger_price) = trigger_price {
1087            params.stop_price(trigger_price.to_string());
1088        }
1089        if let Some(post_only) = post_only {
1090            params.post_only(post_only);
1091        }
1092        if let Some(reduce_only) = reduce_only {
1093            params.close_only(reduce_only);
1094        }
1095        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1096
1097        let resp = self.inner.http_create_order(params).await?;
1098        tracing::debug!("Submitted order: {resp:?}");
1099
1100        let instrument = self.get_instrument_from_cache(resp.symbol)?;
1101        let ts_init = get_atomic_clock_realtime().get_time_ns();
1102        let report = parse_order_status_report(
1103            resp,
1104            account_id,
1105            instrument.price_precision(),
1106            instrument.size_precision(),
1107            ts_init,
1108        )?;
1109        Ok(report)
1110    }
1111
1112    /// Cancels a currently open order on Coinbase International.
1113    ///
1114    /// # Errors
1115    ///
1116    /// Returns an error if the HTTP request fails or the response cannot be parsed.
1117    pub async fn cancel_order(
1118        &self,
1119        account_id: AccountId,
1120        client_order_id: ClientOrderId,
1121    ) -> anyhow::Result<OrderStatusReport> {
1122        let portfolio_id = account_id.get_issuers_id();
1123
1124        let resp = self
1125            .inner
1126            .http_cancel_order(client_order_id.as_str(), portfolio_id)
1127            .await?;
1128        tracing::debug!("Canceled order: {resp:?}");
1129
1130        let instrument = self.get_instrument_from_cache(resp.symbol)?;
1131        let ts_init = get_atomic_clock_realtime().get_time_ns();
1132
1133        let report = parse_order_status_report(
1134            resp,
1135            account_id,
1136            instrument.price_precision(),
1137            instrument.size_precision(),
1138            ts_init,
1139        )?;
1140        Ok(report)
1141    }
1142
1143    /// Cancels all orders for the given account ID and filter params on Coinbase International.
1144    ///
1145    /// # Errors
1146    ///
1147    /// Returns an error if the HTTP request fails or the response cannot be parsed.
1148    pub async fn cancel_orders(
1149        &self,
1150        account_id: AccountId,
1151        symbol: Symbol,
1152        order_side: Option<OrderSide>,
1153    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1154        let mut params = CancelOrdersParamsBuilder::default();
1155        params.portfolio(account_id.get_issuers_id());
1156        params.instrument(symbol.as_str());
1157        if let Some(side) = order_side {
1158            let side: CoinbaseIntxSide = side.into();
1159            params.side(side);
1160        }
1161        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1162
1163        let resp = self.inner.http_cancel_orders(params).await?;
1164
1165        let instrument = self.get_instrument_from_cache(symbol.inner())?;
1166        let ts_init = get_atomic_clock_realtime().get_time_ns();
1167
1168        let mut reports: Vec<OrderStatusReport> = Vec::with_capacity(resp.len());
1169        for order in resp {
1170            tracing::debug!("Canceled order: {order:?}");
1171            let report = parse_order_status_report(
1172                order,
1173                account_id,
1174                instrument.price_precision(),
1175                instrument.size_precision(),
1176                ts_init,
1177            )?;
1178            reports.push(report);
1179        }
1180
1181        Ok(reports)
1182    }
1183
1184    /// Modifies a currently open order on Coinbase International.
1185    ///
1186    /// # Errors
1187    ///
1188    /// Returns an error if the HTTP request fails or the response cannot be parsed.
1189    #[allow(clippy::too_many_arguments)]
1190    pub async fn modify_order(
1191        &self,
1192        account_id: AccountId,
1193        client_order_id: ClientOrderId,
1194        new_client_order_id: ClientOrderId,
1195        price: Option<Price>,
1196        trigger_price: Option<Price>,
1197        quantity: Option<Quantity>,
1198    ) -> anyhow::Result<OrderStatusReport> {
1199        let mut params = ModifyOrderParamsBuilder::default();
1200        params.portfolio(account_id.get_issuers_id());
1201        params.client_order_id(new_client_order_id.as_str());
1202        if let Some(price) = price {
1203            params.price(price.to_string());
1204        }
1205        if let Some(trigger_price) = trigger_price {
1206            params.price(trigger_price.to_string());
1207        }
1208        if let Some(quantity) = quantity {
1209            params.size(quantity.to_string());
1210        }
1211        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1212
1213        let resp = self
1214            .inner
1215            .http_modify_order(client_order_id.as_str(), params)
1216            .await?;
1217        tracing::debug!("Modified order {}", resp.client_order_id);
1218
1219        let instrument = self.get_instrument_from_cache(resp.symbol)?;
1220        let ts_init = get_atomic_clock_realtime().get_time_ns();
1221        let report = parse_order_status_report(
1222            resp,
1223            account_id,
1224            instrument.price_precision(),
1225            instrument.size_precision(),
1226            ts_init,
1227        )?;
1228        Ok(report)
1229    }
1230}