nautilus_coinbase_intx/http/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides the HTTP client integration for the [Coinbase International](https://www.coinbase.com/en/international-exchange) REST API.
17//!
18//! This module defines and implements a [`CoinbaseIntxHttpClient`] for
19//! sending requests to various Coinbase endpoints. It handles request signing
20//! (when credentials are provided), constructs valid HTTP requests
21//! using the [`HttpClient`], and parses the responses back into structured data or a [`CoinbaseIntxHttpError`].
22
23use std::{
24    collections::HashMap,
25    sync::{Arc, LazyLock, Mutex},
26};
27
28use chrono::{DateTime, Utc};
29use nautilus_core::{UnixNanos, consts::NAUTILUS_USER_AGENT, time::get_atomic_clock_realtime};
30use nautilus_execution::reports::{
31    fill::FillReport, order::OrderStatusReport, position::PositionStatusReport,
32};
33use nautilus_model::{
34    enums::{OrderSide, OrderType, TimeInForce},
35    events::AccountState,
36    identifiers::{AccountId, ClientOrderId, Symbol, VenueOrderId},
37    instruments::{Instrument, InstrumentAny},
38    types::{Price, Quantity},
39};
40use nautilus_network::{http::HttpClient, ratelimiter::quota::Quota};
41use reqwest::{Method, StatusCode, header::USER_AGENT};
42use serde::{Deserialize, Serialize, de::DeserializeOwned};
43use ustr::Ustr;
44
45use super::{
46    error::CoinbaseIntxHttpError,
47    models::{
48        CoinbaseIntxAsset, CoinbaseIntxBalance, CoinbaseIntxFeeTier, CoinbaseIntxFillList,
49        CoinbaseIntxInstrument, CoinbaseIntxOrder, CoinbaseIntxOrderList, CoinbaseIntxPortfolio,
50        CoinbaseIntxPortfolioDetails, CoinbaseIntxPortfolioFeeRates, CoinbaseIntxPortfolioSummary,
51        CoinbaseIntxPosition,
52    },
53    parse::{
54        parse_account_state, parse_fill_report, parse_instrument_any, parse_order_status_report,
55        parse_position_status_report,
56    },
57    query::{
58        CancelOrderParams, CancelOrdersParams, CreateOrderParams, CreateOrderParamsBuilder,
59        GetOrderParams, GetOrdersParams, GetOrdersParamsBuilder, GetPortfolioFillsParams,
60        GetPortfolioFillsParamsBuilder, ModifyOrderParams,
61    },
62};
63use crate::{
64    common::{
65        consts::COINBASE_INTX_REST_URL,
66        credential::{Credential, get_env_var},
67        enums::{CoinbaseIntxOrderType, CoinbaseIntxSide, CoinbaseIntxTimeInForce},
68    },
69    http::{
70        error::ErrorBody,
71        query::{CancelOrdersParamsBuilder, ModifyOrderParamsBuilder},
72    },
73};
74
75/// Represents an Coinbase HTTP response.
76#[derive(Debug, Serialize, Deserialize)]
77pub struct CoinbaseIntxResponse<T> {
78    /// The Coinbase response code, which is `"0"` for success.
79    pub code: String,
80    /// A message string which can be informational or describe an error cause.
81    pub msg: String,
82    /// The typed data returned by the Coinbase endpoint.
83    pub data: Vec<T>,
84}
85
86// https://docs.cdp.coinbase.com/intx/docs/rate-limits#rest-api-rate-limits
87pub static COINBASE_INTX_REST_QUOTA: LazyLock<Quota> =
88    LazyLock::new(|| Quota::rate_per_second(40).unwrap());
89
90/// Provides a lower-level HTTP client for connecting to the [Coinbase International](https://coinbase.com) REST API.
91///
92/// This client wraps the underlying `HttpClient` to handle functionality
93/// specific to Coinbase, such as request signing (for authenticated endpoints),
94/// forming request URLs, and deserializing responses into specific data models.
95#[derive(Clone)]
96pub struct CoinbaseIntxHttpInnerClient {
97    base_url: String,
98    client: HttpClient,
99    credential: Option<Credential>,
100}
101
102impl Default for CoinbaseIntxHttpInnerClient {
103    fn default() -> Self {
104        Self::new(None, Some(60))
105    }
106}
107
108impl CoinbaseIntxHttpInnerClient {
109    /// Creates a new [`CoinbaseIntxHttpClient`] using the default Coinbase HTTP URL,
110    /// optionally overridden with a custom base url.
111    ///
112    /// This version of the client has **no credentials**, so it can only
113    /// call publicly accessible endpoints.
114    pub fn new(base_url: Option<String>, timeout_secs: Option<u64>) -> Self {
115        Self {
116            base_url: base_url.unwrap_or(COINBASE_INTX_REST_URL.to_string()),
117            client: HttpClient::new(
118                Self::default_headers(),
119                vec![],
120                vec![],
121                Some(*COINBASE_INTX_REST_QUOTA),
122                timeout_secs,
123            ),
124            credential: None,
125        }
126    }
127
128    /// Creates a new [`CoinbaseIntxHttpClient`] configured with credentials
129    /// for authenticated requests, optionally using a custom base url.
130    pub fn with_credentials(
131        api_key: String,
132        api_secret: String,
133        api_passphrase: String,
134        base_url: String,
135        timeout_secs: Option<u64>,
136    ) -> Self {
137        Self {
138            base_url,
139            client: HttpClient::new(
140                Self::default_headers(),
141                vec![],
142                vec![],
143                Some(*COINBASE_INTX_REST_QUOTA),
144                timeout_secs,
145            ),
146            credential: Some(Credential::new(api_key, api_secret, api_passphrase)),
147        }
148    }
149
150    /// Builds the default headers to include with each request (e.g., `User-Agent`).
151    fn default_headers() -> HashMap<String, String> {
152        HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())])
153    }
154
155    /// Signs an Coinbase request with timestamp, API key, passphrase, and signature.
156    ///
157    /// # Errors
158    ///
159    /// Returns [`CoinbaseHttpError::MissingCredentials`] if no credentials are set
160    /// but the request requires authentication.
161    fn sign_request(
162        &self,
163        method: &Method,
164        path: &str,
165        body: Option<&[u8]>,
166    ) -> Result<HashMap<String, String>, CoinbaseIntxHttpError> {
167        let credential = match self.credential.as_ref() {
168            Some(c) => c,
169            None => return Err(CoinbaseIntxHttpError::MissingCredentials),
170        };
171
172        let api_key = credential.api_key.clone().to_string();
173        let api_passphrase = credential.api_passphrase.clone().to_string();
174        let timestamp = Utc::now().timestamp().to_string();
175        let body_str = body
176            .and_then(|b| String::from_utf8(b.to_vec()).ok())
177            .unwrap_or_default();
178
179        let signature = credential.sign(&timestamp, method.as_str(), path, &body_str);
180
181        let mut headers = HashMap::new();
182        headers.insert("Accept".to_string(), "application/json".to_string());
183        headers.insert("CB-ACCESS-KEY".to_string(), api_key);
184        headers.insert("CB-ACCESS-PASSPHRASE".to_string(), api_passphrase);
185        headers.insert("CB-ACCESS-SIGN".to_string(), signature);
186        headers.insert("CB-ACCESS-TIMESTAMP".to_string(), timestamp);
187        headers.insert("Content-Type".to_string(), "application/json".to_string());
188
189        Ok(headers)
190    }
191
192    /// Sends an HTTP request to Coinbase International and parses the response into type `T`.
193    ///
194    /// Internally, this method handles:
195    /// - Building the URL from `base_url` + `path`.
196    /// - Optionally signing the request.
197    /// - Deserializing JSON responses into typed models, or returning a [`CoinbaseIntxHttpError`].
198    async fn send_request<T: DeserializeOwned>(
199        &self,
200        method: Method,
201        path: &str,
202        body: Option<Vec<u8>>,
203        authenticate: bool,
204    ) -> Result<T, CoinbaseIntxHttpError> {
205        let url = format!("{}{}", self.base_url, path);
206
207        let headers = if authenticate {
208            Some(self.sign_request(&method, path, body.as_deref())?)
209        } else {
210            None
211        };
212
213        tracing::trace!("Request: {url:?} {body:?}");
214
215        let resp = self
216            .client
217            .request(method.clone(), url, headers, body, None, None)
218            .await?;
219
220        tracing::trace!("Response: {resp:?}");
221
222        if resp.status.is_success() {
223            let coinbase_response: T = serde_json::from_slice(&resp.body).map_err(|e| {
224                tracing::error!("Failed to deserialize CoinbaseResponse: {e}");
225                CoinbaseIntxHttpError::JsonError(e.to_string())
226            })?;
227
228            Ok(coinbase_response)
229        } else {
230            let error_body = String::from_utf8_lossy(&resp.body);
231            tracing::error!(
232                "HTTP error {} with body: {error_body}",
233                resp.status.as_str()
234            );
235
236            if let Ok(parsed_error) = serde_json::from_slice::<CoinbaseIntxResponse<T>>(&resp.body)
237            {
238                return Err(CoinbaseIntxHttpError::CoinbaseError {
239                    error_code: parsed_error.code,
240                    message: parsed_error.msg,
241                });
242            }
243
244            if let Ok(parsed_error) = serde_json::from_slice::<ErrorBody>(&resp.body) {
245                if let (Some(title), Some(error)) = (parsed_error.title, parsed_error.error) {
246                    return Err(CoinbaseIntxHttpError::CoinbaseError {
247                        error_code: error,
248                        message: title,
249                    });
250                }
251            }
252
253            Err(CoinbaseIntxHttpError::UnexpectedStatus {
254                status: StatusCode::from_u16(resp.status.as_u16()).unwrap(),
255                body: error_body.to_string(),
256            })
257        }
258    }
259
260    /// Requests a list of all supported assets.
261    ///
262    /// See <https://docs.cdp.coinbase.com/intx/reference/getassets>.
263    pub async fn http_list_assets(&self) -> Result<Vec<CoinbaseIntxAsset>, CoinbaseIntxHttpError> {
264        let path = "/api/v1/assets";
265        self.send_request(Method::GET, path, None, false).await
266    }
267
268    /// Requests information for a specific asset.
269    ///
270    /// See <https://docs.cdp.coinbase.com/intx/reference/getasset>.
271    pub async fn http_get_asset_details(
272        &self,
273        asset: &str,
274    ) -> Result<CoinbaseIntxAsset, CoinbaseIntxHttpError> {
275        let path = format!("/api/v1/assets/{asset}");
276        self.send_request(Method::GET, &path, None, false).await
277    }
278
279    /// Requests all instruments available for trading.
280    ///
281    /// See <https://docs.cdp.coinbase.com/intx/reference/getinstruments>.
282    pub async fn http_list_instruments(
283        &self,
284    ) -> Result<Vec<CoinbaseIntxInstrument>, CoinbaseIntxHttpError> {
285        let path = "/api/v1/instruments";
286        self.send_request(Method::GET, path, None, false).await
287    }
288
289    /// Retrieve a list of instruments with open contracts.
290    ///
291    /// See <https://docs.cdp.coinbase.com/intx/reference/getinstrument>.
292    pub async fn http_get_instrument_details(
293        &self,
294        symbol: &str,
295    ) -> Result<CoinbaseIntxInstrument, CoinbaseIntxHttpError> {
296        let path = format!("/api/v1/instruments/{symbol}");
297        self.send_request(Method::GET, &path, None, false).await
298    }
299
300    /// Return all the fee rate tiers.
301    ///
302    /// See <https://docs.cdp.coinbase.com/intx/reference/getassets>.
303    pub async fn http_list_fee_rate_tiers(
304        &self,
305    ) -> Result<Vec<CoinbaseIntxFeeTier>, CoinbaseIntxHttpError> {
306        let path = "/api/v1/fee-rate-tiers";
307        self.send_request(Method::GET, path, None, true).await
308    }
309
310    /// List all user portfolios.
311    ///
312    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfolios>.
313    pub async fn http_list_portfolios(
314        &self,
315    ) -> Result<Vec<CoinbaseIntxPortfolio>, CoinbaseIntxHttpError> {
316        let path = "/api/v1/portfolios";
317        self.send_request(Method::GET, path, None, true).await
318    }
319
320    /// Returns the user's specified portfolio.
321    ///
322    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfolio>.
323    pub async fn http_get_portfolio(
324        &self,
325        portfolio_id: &str,
326    ) -> Result<CoinbaseIntxPortfolio, CoinbaseIntxHttpError> {
327        let path = format!("/api/v1/portfolios/{portfolio_id}");
328        self.send_request(Method::GET, &path, None, true).await
329    }
330
331    /// Retrieves the summary, positions, and balances of a portfolio.
332    ///
333    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfoliodetail>.
334    pub async fn http_get_portfolio_details(
335        &self,
336        portfolio_id: &str,
337    ) -> Result<CoinbaseIntxPortfolioDetails, CoinbaseIntxHttpError> {
338        let path = format!("/api/v1/portfolios/{portfolio_id}/detail");
339        self.send_request(Method::GET, &path, None, true).await
340    }
341
342    /// Retrieves the high level overview of a portfolio.
343    ///
344    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfoliosummary>.
345    pub async fn http_get_portfolio_summary(
346        &self,
347        portfolio_id: &str,
348    ) -> Result<CoinbaseIntxPortfolioSummary, CoinbaseIntxHttpError> {
349        let path = format!("/api/v1/portfolios/{portfolio_id}/summary");
350        self.send_request(Method::GET, &path, None, true).await
351    }
352
353    /// Returns all balances for a given portfolio.
354    ///
355    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfoliobalances>.
356    pub async fn http_list_portfolio_balances(
357        &self,
358        portfolio_id: &str,
359    ) -> Result<Vec<CoinbaseIntxBalance>, CoinbaseIntxHttpError> {
360        let path = format!("/api/v1/portfolios/{portfolio_id}/balances");
361        self.send_request(Method::GET, &path, None, true).await
362    }
363
364    /// Retrieves the balance for a given portfolio and asset.
365    ///
366    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfoliobalance>.
367    pub async fn http_get_portfolio_balance(
368        &self,
369        portfolio_id: &str,
370        asset: &str,
371    ) -> Result<CoinbaseIntxBalance, CoinbaseIntxHttpError> {
372        let path = format!("/api/v1/portfolios/{portfolio_id}/balances/{asset}");
373        self.send_request(Method::GET, &path, None, true).await
374    }
375
376    /// Returns all fills for a given portfolio.
377    ///
378    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfoliofills>.
379    pub async fn http_list_portfolio_fills(
380        &self,
381        portfolio_id: &str,
382        params: GetPortfolioFillsParams,
383    ) -> Result<CoinbaseIntxFillList, CoinbaseIntxHttpError> {
384        let query = serde_urlencoded::to_string(&params).expect("Failed to serialize params");
385        let path = format!("/api/v1/portfolios/{portfolio_id}/fills?{query}");
386        self.send_request(Method::GET, &path, None, true).await
387    }
388
389    /// Returns all positions for a given portfolio.
390    ///
391    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfoliopositions>.
392    pub async fn http_list_portfolio_positions(
393        &self,
394        portfolio_id: &str,
395    ) -> Result<Vec<CoinbaseIntxPosition>, CoinbaseIntxHttpError> {
396        let path = format!("/api/v1/portfolios/{portfolio_id}/positions");
397        self.send_request(Method::GET, &path, None, true).await
398    }
399
400    /// Retrieves the position for a given portfolio and symbol.
401    ///
402    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfolioposition>.
403    pub async fn http_get_portfolio_position(
404        &self,
405        portfolio_id: &str,
406        symbol: &str,
407    ) -> Result<CoinbaseIntxPosition, CoinbaseIntxHttpError> {
408        let path = format!("/api/v1/portfolios/{portfolio_id}/positions/{symbol}");
409        self.send_request(Method::GET, &path, None, true).await
410    }
411
412    /// Retrieves the Perpetual Future and Spot fee rate tiers for the user.
413    ///
414    /// See <https://docs.cdp.coinbase.com/intx/reference/getportfoliosfeerates>.
415    pub async fn http_list_portfolio_fee_rates(
416        &self,
417    ) -> Result<Vec<CoinbaseIntxPortfolioFeeRates>, CoinbaseIntxHttpError> {
418        let path = "/api/v1/portfolios/fee-rates";
419        self.send_request(Method::GET, path, None, true).await
420    }
421
422    /// Create a new order.
423    pub async fn http_create_order(
424        &self,
425        params: CreateOrderParams,
426    ) -> Result<CoinbaseIntxOrder, CoinbaseIntxHttpError> {
427        let path = "/api/v1/orders";
428        let body = serde_json::to_vec(&params).expect("Failed to serialize params");
429        self.send_request(Method::POST, path, Some(body), true)
430            .await
431    }
432
433    /// Retrieves a single order. The order retrieved can be either active or inactive.
434    ///
435    /// See <https://docs.cdp.coinbase.com/intx/reference/getorder>.
436    pub async fn http_get_order(
437        &self,
438        venue_order_id: &str,
439        portfolio_id: &str,
440    ) -> Result<CoinbaseIntxOrder, CoinbaseIntxHttpError> {
441        let params = GetOrderParams {
442            portfolio: portfolio_id.to_string(),
443        };
444        let query = serde_urlencoded::to_string(&params).expect("Failed to serialize params");
445        let path = format!("/api/v1/orders/{venue_order_id}?{query}");
446        self.send_request(Method::GET, &path, None, true).await
447    }
448
449    /// Returns a list of active orders resting on the order book matching the requested criteria.
450    /// Does not return any rejected, cancelled, or fully filled orders as they are not active.
451    ///
452    /// See <https://docs.cdp.coinbase.com/intx/reference/getorders>.
453    pub async fn http_list_open_orders(
454        &self,
455        params: GetOrdersParams,
456    ) -> Result<CoinbaseIntxOrderList, CoinbaseIntxHttpError> {
457        let query = serde_urlencoded::to_string(&params).expect("Failed to serialize params");
458        let path = format!("/api/v1/orders?{}", query);
459        self.send_request(Method::GET, &path, None, true).await
460    }
461
462    /// Cancels a single open order.
463    pub async fn http_cancel_order(
464        &self,
465        client_order_id: &str,
466        portfolio_id: &str,
467    ) -> Result<CoinbaseIntxOrder, CoinbaseIntxHttpError> {
468        let params = CancelOrderParams {
469            portfolio: portfolio_id.to_string(),
470        };
471        let query = serde_urlencoded::to_string(&params).expect("Failed to serialize params");
472        let path = format!("/api/v1/orders/{client_order_id}?{query}");
473        self.send_request(Method::DELETE, &path, None, true).await
474    }
475
476    /// Cancel user orders.
477    pub async fn http_cancel_orders(
478        &self,
479        params: CancelOrdersParams,
480    ) -> Result<Vec<CoinbaseIntxOrder>, CoinbaseIntxHttpError> {
481        let query = serde_urlencoded::to_string(&params).expect("Failed to serialize params");
482        let path = format!("/api/v1/orders?{}", query);
483        self.send_request(Method::DELETE, &path, None, true).await
484    }
485
486    /// Modify an open order.
487    ///
488    /// See <https://docs.cdp.coinbase.com/intx/reference/modifyorder>.
489    pub async fn http_modify_order(
490        &self,
491        order_id: &str,
492        params: ModifyOrderParams,
493    ) -> Result<CoinbaseIntxOrder, CoinbaseIntxHttpError> {
494        let path = format!("/api/v1/orders/{}", order_id);
495        let body = serde_json::to_vec(&params).expect("Failed to serialize params");
496        self.send_request(Method::PUT, &path, Some(body), true)
497            .await
498    }
499}
500
501/// Provides a higher-level HTTP client for the [Coinbase International](https://coinbase.com) REST API.
502///
503/// This client wraps the underlying `CoinbaseIntxHttpInnerClient` to handle conversions
504/// into the Nautilus domain model.
505#[derive(Clone)]
506#[cfg_attr(
507    feature = "python",
508    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
509)]
510pub struct CoinbaseIntxHttpClient {
511    pub(crate) inner: Arc<CoinbaseIntxHttpInnerClient>,
512    pub(crate) instruments_cache: Arc<Mutex<HashMap<Ustr, InstrumentAny>>>,
513    cache_initialized: bool,
514}
515
516impl Default for CoinbaseIntxHttpClient {
517    fn default() -> Self {
518        Self::new(None, Some(60))
519    }
520}
521
522impl CoinbaseIntxHttpClient {
523    /// Creates a new [`CoinbaseIntxHttpClient`] using the default Coinbase HTTP URL,
524    /// optionally overridden with a custom base url.
525    ///
526    /// This version of the client has **no credentials**, so it can only
527    /// call publicly accessible endpoints.
528    pub fn new(base_url: Option<String>, timeout_secs: Option<u64>) -> Self {
529        Self {
530            inner: Arc::new(CoinbaseIntxHttpInnerClient::new(base_url, timeout_secs)),
531            instruments_cache: Arc::new(Mutex::new(HashMap::new())),
532            cache_initialized: false,
533        }
534    }
535
536    /// Creates a new authenticated [`CoinbaseIntxHttpClient`] using environment variables and
537    /// the default Coinbase International HTTP base url.
538    pub fn from_env() -> anyhow::Result<Self> {
539        Self::with_credentials(None, None, None, None, None)
540    }
541
542    /// Creates a new [`CoinbaseIntxHttpClient`] configured with credentials
543    /// for authenticated requests, optionally using a custom base url.
544    pub fn with_credentials(
545        api_key: Option<String>,
546        api_secret: Option<String>,
547        api_passphrase: Option<String>,
548        base_url: Option<String>,
549        timeout_secs: Option<u64>,
550    ) -> anyhow::Result<Self> {
551        let api_key = api_key.unwrap_or(get_env_var("COINBASE_INTX_API_KEY")?);
552        let api_secret = api_secret.unwrap_or(get_env_var("COINBASE_INTX_API_SECRET")?);
553        let api_passphrase = api_passphrase.unwrap_or(get_env_var("COINBASE_INTX_API_PASSPHRASE")?);
554        let base_url = base_url.unwrap_or(COINBASE_INTX_REST_URL.to_string());
555        Ok(Self {
556            inner: Arc::new(CoinbaseIntxHttpInnerClient::with_credentials(
557                api_key,
558                api_secret,
559                api_passphrase,
560                base_url,
561                timeout_secs,
562            )),
563            instruments_cache: Arc::new(Mutex::new(HashMap::new())),
564            cache_initialized: false,
565        })
566    }
567
568    fn get_instrument_from_cache(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
569        match self.instruments_cache.lock().unwrap().get(&symbol) {
570            Some(inst) => Ok(inst.clone()), // TODO: Remove this clone
571            None => anyhow::bail!("Unable to process request, instrument {symbol} not in cache"),
572        }
573    }
574
575    fn generate_ts_init(&self) -> UnixNanos {
576        get_atomic_clock_realtime().get_time_ns()
577    }
578
579    /// Returns the base url being used by the client.
580    pub fn base_url(&self) -> &str {
581        self.inner.base_url.as_str()
582    }
583
584    /// Returns the public API key being used by the client.
585    pub fn api_key(&self) -> Option<&str> {
586        self.inner.credential.clone().map(|c| c.api_key.as_str())
587    }
588
589    /// Checks if the client is initialized.
590    ///
591    /// The client is considered initialized if any instruments have been cached from the venue.
592    pub fn is_initialized(&self) -> bool {
593        self.cache_initialized
594    }
595
596    /// Returns the cached instrument symbols.
597    pub fn get_cached_symbols(&self) -> Vec<String> {
598        self.instruments_cache
599            .lock()
600            .unwrap()
601            .keys()
602            .map(|k| k.to_string())
603            .collect()
604    }
605
606    /// Adds the given instruments into the clients instrument cache.
607    ///
608    /// Any existing instruments will be replaced.
609    pub fn add_instruments(&mut self, instruments: Vec<InstrumentAny>) {
610        for inst in instruments {
611            self.instruments_cache
612                .lock()
613                .unwrap()
614                .insert(inst.raw_symbol().inner(), inst);
615        }
616        self.cache_initialized = true;
617    }
618
619    /// Adds the given instrument into the clients instrument cache.
620    ///
621    /// Any existing instrument will be replaced.
622    pub fn add_instrument(&mut self, instrument: InstrumentAny) {
623        self.instruments_cache
624            .lock()
625            .unwrap()
626            .insert(instrument.raw_symbol().inner(), instrument);
627        self.cache_initialized = true;
628    }
629
630    /// Requests a list of portfolio details from Coinbase International.
631    pub async fn list_portfolios(&self) -> anyhow::Result<Vec<CoinbaseIntxPortfolio>> {
632        let resp = self
633            .inner
634            .http_list_portfolios()
635            .await
636            .map_err(|e| anyhow::anyhow!(e))?;
637
638        Ok(resp)
639    }
640
641    /// Requests the account state for the given account ID from Coinbase International.
642    pub async fn request_account_state(
643        &self,
644        account_id: AccountId,
645    ) -> anyhow::Result<AccountState> {
646        let resp = self
647            .inner
648            .http_list_portfolio_balances(account_id.get_issuers_id())
649            .await
650            .map_err(|e| anyhow::anyhow!(e))?;
651
652        let ts_init = self.generate_ts_init();
653        let account_state = parse_account_state(resp, account_id, ts_init)?;
654
655        Ok(account_state)
656    }
657
658    /// Requests all instruments from Coinbase International.
659    pub async fn request_instruments(&self) -> anyhow::Result<Vec<InstrumentAny>> {
660        let resp = self
661            .inner
662            .http_list_instruments()
663            .await
664            .map_err(|e| anyhow::anyhow!(e))?;
665
666        let ts_init = self.generate_ts_init();
667
668        let mut instruments: Vec<InstrumentAny> = Vec::new();
669        for inst in &resp {
670            let instrument_any = parse_instrument_any(inst, ts_init);
671            if let Some(instrument_any) = instrument_any {
672                instruments.push(instrument_any);
673            }
674        }
675
676        Ok(instruments)
677    }
678
679    /// Requests the instrument for the given symbol from Coinbase International.
680    pub async fn request_instrument(&self, symbol: &Symbol) -> anyhow::Result<InstrumentAny> {
681        let resp = self
682            .inner
683            .http_get_instrument_details(symbol.as_str())
684            .await
685            .map_err(|e| anyhow::anyhow!(e))?;
686
687        let ts_init = self.generate_ts_init();
688
689        match parse_instrument_any(&resp, ts_init) {
690            Some(inst) => Ok(inst),
691            None => anyhow::bail!("Unable to parse instrument"),
692        }
693    }
694
695    /// Requests an order status reports for the given venue order ID from Coinbase International.
696    pub async fn request_order_status_report(
697        &self,
698        account_id: AccountId,
699        venue_order_id: VenueOrderId,
700    ) -> anyhow::Result<OrderStatusReport> {
701        let portfolio_id = account_id.get_issuers_id();
702
703        let resp = self
704            .inner
705            .http_get_order(venue_order_id.as_str(), portfolio_id)
706            .await
707            .map_err(|e| anyhow::anyhow!(e))?;
708
709        let instrument = self.get_instrument_from_cache(resp.symbol)?;
710        let ts_init = self.generate_ts_init();
711
712        let report = parse_order_status_report(
713            resp,
714            account_id,
715            instrument.price_precision(),
716            instrument.size_precision(),
717            ts_init,
718        );
719        Ok(report)
720    }
721
722    /// Requests order status reports for all **open** orders from Coinbase International.
723    pub async fn request_order_status_reports(
724        &self,
725        account_id: AccountId,
726        symbol: Symbol,
727    ) -> anyhow::Result<Vec<OrderStatusReport>> {
728        let portfolio_id = account_id.get_issuers_id();
729
730        let mut params = GetOrdersParamsBuilder::default();
731        params.portfolio(portfolio_id);
732        params.instrument(symbol.as_str());
733        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
734
735        let resp = self
736            .inner
737            .http_list_open_orders(params)
738            .await
739            .map_err(|e| anyhow::anyhow!(e))?;
740
741        let ts_init = get_atomic_clock_realtime().get_time_ns();
742
743        let mut reports: Vec<OrderStatusReport> = Vec::new();
744        for order in resp.results {
745            let instrument = self.get_instrument_from_cache(order.symbol)?;
746
747            reports.push(parse_order_status_report(
748                order,
749                account_id,
750                instrument.price_precision(),
751                instrument.size_precision(),
752                ts_init,
753            ))
754        }
755
756        Ok(reports)
757    }
758
759    /// Requests all fill reports from Coinbase International.
760    pub async fn request_fill_reports(
761        &self,
762        account_id: AccountId,
763        client_order_id: Option<ClientOrderId>,
764        start: Option<DateTime<Utc>>,
765    ) -> anyhow::Result<Vec<FillReport>> {
766        let portfolio_id = account_id.get_issuers_id();
767
768        let mut params = GetPortfolioFillsParamsBuilder::default();
769        if let Some(start) = start {
770            params.time_from(start);
771        };
772        if let Some(client_order_id) = client_order_id {
773            params.client_order_id(client_order_id.to_string());
774        }
775        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
776
777        let resp = self
778            .inner
779            .http_list_portfolio_fills(portfolio_id, params)
780            .await
781            .map_err(|e| anyhow::anyhow!(e))?;
782
783        let ts_init = get_atomic_clock_realtime().get_time_ns();
784
785        let mut reports: Vec<FillReport> = Vec::new();
786        for fill in resp.results {
787            let instrument = self.get_instrument_from_cache(fill.symbol)?;
788
789            let report = parse_fill_report(
790                fill,
791                account_id,
792                instrument.price_precision(),
793                instrument.size_precision(),
794                ts_init,
795            );
796            reports.push(report);
797        }
798
799        Ok(reports)
800    }
801
802    /// Requests a position status report from Coinbase International.
803    pub async fn request_position_status_report(
804        &self,
805        account_id: AccountId,
806        symbol: Symbol,
807    ) -> anyhow::Result<PositionStatusReport> {
808        let portfolio_id = account_id.get_issuers_id();
809
810        let resp = self
811            .inner
812            .http_get_portfolio_position(portfolio_id, symbol.as_str())
813            .await
814            .map_err(|e| anyhow::anyhow!(e))?;
815
816        let instrument = self.get_instrument_from_cache(resp.symbol)?;
817        let ts_init = get_atomic_clock_realtime().get_time_ns();
818
819        let report =
820            parse_position_status_report(resp, account_id, instrument.size_precision(), ts_init);
821
822        Ok(report)
823    }
824
825    /// Requests all position status reports from Coinbase International.
826    pub async fn request_position_status_reports(
827        &self,
828        account_id: AccountId,
829    ) -> anyhow::Result<Vec<PositionStatusReport>> {
830        let portfolio_id = account_id.get_issuers_id();
831
832        let resp = self
833            .inner
834            .http_list_portfolio_positions(portfolio_id)
835            .await
836            .map_err(|e| anyhow::anyhow!(e))?;
837
838        let ts_init = get_atomic_clock_realtime().get_time_ns();
839
840        let mut reports: Vec<PositionStatusReport> = Vec::new();
841        for position in resp {
842            let instrument = self.get_instrument_from_cache(position.symbol)?;
843
844            let report = parse_position_status_report(
845                position,
846                account_id,
847                instrument.size_precision(),
848                ts_init,
849            );
850
851            reports.push(report);
852        }
853
854        Ok(reports)
855    }
856
857    /// Submits a new order to Coinbase International.
858    #[allow(clippy::too_many_arguments)]
859    pub async fn submit_order(
860        &self,
861        account_id: AccountId,
862        client_order_id: ClientOrderId,
863        symbol: Symbol,
864        order_side: OrderSide,
865        order_type: OrderType,
866        quantity: Quantity,
867        time_in_force: TimeInForce,
868        expire_time: Option<DateTime<Utc>>,
869        price: Option<Price>,
870        trigger_price: Option<Price>,
871        post_only: Option<bool>,
872        reduce_only: Option<bool>,
873    ) -> anyhow::Result<OrderStatusReport> {
874        let coinbase_side: CoinbaseIntxSide = order_side.into();
875        let coinbase_order_type: CoinbaseIntxOrderType = order_type.into();
876        let coinbase_tif: CoinbaseIntxTimeInForce = time_in_force.into();
877
878        let mut params = CreateOrderParamsBuilder::default();
879        params.portfolio(account_id.get_issuers_id());
880        params.client_order_id(client_order_id.as_str());
881        params.instrument(symbol.as_str());
882        params.side(coinbase_side);
883        params.size(quantity.to_string());
884        params.order_type(coinbase_order_type);
885        params.tif(coinbase_tif);
886        if let Some(expire_time) = expire_time {
887            params.expire_time(expire_time);
888        }
889        if let Some(price) = price {
890            params.price(price.to_string());
891        }
892        if let Some(trigger_price) = trigger_price {
893            params.stop_price(trigger_price.to_string());
894        }
895        if let Some(post_only) = post_only {
896            params.post_only(post_only);
897        }
898        if let Some(reduce_only) = reduce_only {
899            params.close_only(reduce_only);
900        }
901        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
902
903        let resp = self.inner.http_create_order(params).await?;
904        tracing::debug!("Submitted order: {resp:?}");
905
906        let instrument = self.get_instrument_from_cache(resp.symbol)?;
907        let ts_init = get_atomic_clock_realtime().get_time_ns();
908        let report = parse_order_status_report(
909            resp,
910            account_id,
911            instrument.price_precision(),
912            instrument.size_precision(),
913            ts_init,
914        );
915        Ok(report)
916    }
917
918    /// Cancels a currently open order on Coinbase International.
919    pub async fn cancel_order(
920        &self,
921        account_id: AccountId,
922        client_order_id: ClientOrderId,
923    ) -> anyhow::Result<OrderStatusReport> {
924        let portfolio_id = account_id.get_issuers_id();
925
926        let resp = self
927            .inner
928            .http_cancel_order(client_order_id.as_str(), portfolio_id)
929            .await?;
930        tracing::debug!("Canceled order: {resp:?}");
931
932        let instrument = self.get_instrument_from_cache(resp.symbol)?;
933        let ts_init = get_atomic_clock_realtime().get_time_ns();
934
935        let report = parse_order_status_report(
936            resp,
937            account_id,
938            instrument.price_precision(),
939            instrument.size_precision(),
940            ts_init,
941        );
942        Ok(report)
943    }
944
945    /// Cancels all orders for the given account ID and filter params on Coinbase International.
946    pub async fn cancel_orders(
947        &self,
948        account_id: AccountId,
949        symbol: Symbol,
950        order_side: Option<OrderSide>,
951    ) -> anyhow::Result<Vec<OrderStatusReport>> {
952        let mut params = CancelOrdersParamsBuilder::default();
953        params.portfolio(account_id.get_issuers_id());
954        params.instrument(symbol.as_str());
955        if let Some(side) = order_side {
956            let side: CoinbaseIntxSide = side.into();
957            params.side(side);
958        };
959        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
960
961        let resp = self.inner.http_cancel_orders(params).await?;
962
963        let instrument = self.get_instrument_from_cache(symbol.inner())?;
964        let ts_init = get_atomic_clock_realtime().get_time_ns();
965
966        let mut reports: Vec<OrderStatusReport> = Vec::with_capacity(resp.len());
967        for order in resp {
968            tracing::debug!("Canceled order: {order:?}");
969            let report = parse_order_status_report(
970                order,
971                account_id,
972                instrument.price_precision(),
973                instrument.size_precision(),
974                ts_init,
975            );
976            reports.push(report)
977        }
978
979        Ok(reports)
980    }
981
982    /// Modifies a currently open order on Coinbase International.
983    #[allow(clippy::too_many_arguments)]
984    pub async fn modify_order(
985        &self,
986        account_id: AccountId,
987        client_order_id: ClientOrderId,
988        new_client_order_id: ClientOrderId,
989        price: Option<Price>,
990        trigger_price: Option<Price>,
991        quantity: Option<Quantity>,
992    ) -> anyhow::Result<OrderStatusReport> {
993        let mut params = ModifyOrderParamsBuilder::default();
994        params.portfolio(account_id.get_issuers_id());
995        params.client_order_id(new_client_order_id.as_str());
996        if let Some(price) = price {
997            params.price(price.to_string());
998        };
999        if let Some(trigger_price) = trigger_price {
1000            params.price(trigger_price.to_string());
1001        };
1002        if let Some(quantity) = quantity {
1003            params.size(quantity.to_string());
1004        };
1005        let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1006
1007        let resp = self
1008            .inner
1009            .http_modify_order(client_order_id.as_str(), params)
1010            .await?;
1011        tracing::debug!("Modified order {}", resp.client_order_id);
1012
1013        let instrument = self.get_instrument_from_cache(resp.symbol)?;
1014        let ts_init = get_atomic_clock_realtime().get_time_ns();
1015        let report = parse_order_status_report(
1016            resp,
1017            account_id,
1018            instrument.price_precision(),
1019            instrument.size_precision(),
1020            ts_init,
1021        );
1022
1023        Ok(report)
1024    }
1025}