1use std::{
24 collections::HashMap,
25 num::NonZeroU32,
26 sync::{Arc, LazyLock, Mutex},
27};
28
29use chrono::{DateTime, Utc};
30use nautilus_core::{
31 MUTEX_POISONED, UnixNanos, consts::NAUTILUS_USER_AGENT, env::get_or_env_var,
32 time::get_atomic_clock_realtime,
33};
34use nautilus_model::{
35 enums::{OrderSide, OrderType, TimeInForce},
36 events::AccountState,
37 identifiers::{AccountId, ClientOrderId, Symbol, VenueOrderId},
38 instruments::{Instrument, InstrumentAny},
39 reports::{FillReport, OrderStatusReport, PositionStatusReport},
40 types::{Price, Quantity},
41};
42use nautilus_network::{http::HttpClient, ratelimiter::quota::Quota};
43use reqwest::{Method, StatusCode, header::USER_AGENT};
44use serde::{Deserialize, Serialize, de::DeserializeOwned};
45use ustr::Ustr;
46
47use super::{
48 error::CoinbaseIntxHttpError,
49 models::{
50 CoinbaseIntxAsset, CoinbaseIntxBalance, CoinbaseIntxFeeTier, CoinbaseIntxFillList,
51 CoinbaseIntxInstrument, CoinbaseIntxOrder, CoinbaseIntxOrderList, CoinbaseIntxPortfolio,
52 CoinbaseIntxPortfolioDetails, CoinbaseIntxPortfolioFeeRates, CoinbaseIntxPortfolioSummary,
53 CoinbaseIntxPosition,
54 },
55 parse::{
56 parse_account_state, parse_fill_report, parse_instrument_any, parse_order_status_report,
57 parse_position_status_report,
58 },
59 query::{
60 CancelOrderParams, CancelOrdersParams, CreateOrderParams, CreateOrderParamsBuilder,
61 GetOrderParams, GetOrdersParams, GetOrdersParamsBuilder, GetPortfolioFillsParams,
62 GetPortfolioFillsParamsBuilder, ModifyOrderParams,
63 },
64};
65use crate::{
66 common::{
67 consts::COINBASE_INTX_REST_URL,
68 credential::Credential,
69 enums::{CoinbaseIntxOrderType, CoinbaseIntxSide, CoinbaseIntxTimeInForce},
70 },
71 http::{
72 error::ErrorBody,
73 query::{CancelOrdersParamsBuilder, ModifyOrderParamsBuilder},
74 },
75};
76
77#[derive(Debug, Serialize, Deserialize)]
79pub struct CoinbaseIntxResponse<T> {
80 pub code: String,
82 pub msg: String,
84 pub data: Vec<T>,
86}
87
88pub static COINBASE_INTX_REST_QUOTA: LazyLock<Quota> =
90 LazyLock::new(|| Quota::per_second(NonZeroU32::new(100).unwrap()));
91
92#[derive(Debug, Clone)]
98pub struct CoinbaseIntxHttpInnerClient {
99 base_url: String,
100 client: HttpClient,
101 credential: Option<Credential>,
102}
103
104impl Default for CoinbaseIntxHttpInnerClient {
105 fn default() -> Self {
106 Self::new(None, Some(60))
107 }
108}
109
110impl CoinbaseIntxHttpInnerClient {
111 #[must_use]
117 pub fn new(base_url: Option<String>, timeout_secs: Option<u64>) -> Self {
118 Self {
119 base_url: base_url.unwrap_or(COINBASE_INTX_REST_URL.to_string()),
120 client: HttpClient::new(
121 Self::default_headers(),
122 vec![],
123 vec![],
124 Some(*COINBASE_INTX_REST_QUOTA),
125 timeout_secs,
126 ),
127 credential: None,
128 }
129 }
130
131 #[must_use]
134 pub fn with_credentials(
135 api_key: String,
136 api_secret: String,
137 api_passphrase: String,
138 base_url: String,
139 timeout_secs: Option<u64>,
140 ) -> Self {
141 Self {
142 base_url,
143 client: HttpClient::new(
144 Self::default_headers(),
145 vec![],
146 vec![],
147 Some(*COINBASE_INTX_REST_QUOTA),
148 timeout_secs,
149 ),
150 credential: Some(Credential::new(api_key, api_secret, api_passphrase)),
151 }
152 }
153
154 fn default_headers() -> HashMap<String, String> {
156 HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())])
157 }
158
159 fn sign_request(
166 &self,
167 method: &Method,
168 path: &str,
169 body: Option<&[u8]>,
170 ) -> Result<HashMap<String, String>, CoinbaseIntxHttpError> {
171 let credential = match self.credential.as_ref() {
172 Some(c) => c,
173 None => return Err(CoinbaseIntxHttpError::MissingCredentials),
174 };
175
176 let api_key = credential.api_key.clone().to_string();
177 let api_passphrase = credential.api_passphrase.clone().to_string();
178 let timestamp = Utc::now().timestamp().to_string();
179 let body_str = body
180 .and_then(|b| String::from_utf8(b.to_vec()).ok())
181 .unwrap_or_default();
182
183 let signature = credential.sign(×tamp, method.as_str(), path, &body_str);
184
185 let mut headers = HashMap::new();
186 headers.insert("Accept".to_string(), "application/json".to_string());
187 headers.insert("CB-ACCESS-KEY".to_string(), api_key);
188 headers.insert("CB-ACCESS-PASSPHRASE".to_string(), api_passphrase);
189 headers.insert("CB-ACCESS-SIGN".to_string(), signature);
190 headers.insert("CB-ACCESS-TIMESTAMP".to_string(), timestamp);
191 headers.insert("Content-Type".to_string(), "application/json".to_string());
192
193 Ok(headers)
194 }
195
196 async fn send_request<T: DeserializeOwned>(
203 &self,
204 method: Method,
205 path: &str,
206 body: Option<Vec<u8>>,
207 authenticate: bool,
208 ) -> Result<T, CoinbaseIntxHttpError> {
209 let url = format!("{}{}", self.base_url, path);
210
211 let headers = if authenticate {
212 Some(self.sign_request(&method, path, body.as_deref())?)
213 } else {
214 None
215 };
216
217 tracing::trace!("Request: {url:?} {body:?}");
218
219 let resp = self
220 .client
221 .request(method.clone(), url, headers, body, None, None)
222 .await?;
223
224 tracing::trace!("Response: {resp:?}");
225
226 if resp.status.is_success() {
227 let coinbase_response: T = serde_json::from_slice(&resp.body).map_err(|e| {
228 tracing::error!("Failed to deserialize CoinbaseResponse: {e}");
229 CoinbaseIntxHttpError::JsonError(e.to_string())
230 })?;
231
232 Ok(coinbase_response)
233 } else {
234 let error_body = String::from_utf8_lossy(&resp.body);
235 tracing::error!(
236 "HTTP error {} with body: {error_body}",
237 resp.status.as_str()
238 );
239
240 if let Ok(parsed_error) = serde_json::from_slice::<CoinbaseIntxResponse<T>>(&resp.body)
241 {
242 return Err(CoinbaseIntxHttpError::CoinbaseError {
243 error_code: parsed_error.code,
244 message: parsed_error.msg,
245 });
246 }
247
248 if let Ok(parsed_error) = serde_json::from_slice::<ErrorBody>(&resp.body)
249 && let (Some(title), Some(error)) = (parsed_error.title, parsed_error.error)
250 {
251 return Err(CoinbaseIntxHttpError::CoinbaseError {
252 error_code: error,
253 message: title,
254 });
255 }
256
257 Err(CoinbaseIntxHttpError::UnexpectedStatus {
258 status: StatusCode::from_u16(resp.status.as_u16()).unwrap(),
259 body: error_body.to_string(),
260 })
261 }
262 }
263
264 pub async fn http_list_assets(&self) -> Result<Vec<CoinbaseIntxAsset>, CoinbaseIntxHttpError> {
271 let path = "/api/v1/assets";
272 self.send_request(Method::GET, path, None, false).await
273 }
274
275 pub async fn http_get_asset_details(
282 &self,
283 asset: &str,
284 ) -> Result<CoinbaseIntxAsset, CoinbaseIntxHttpError> {
285 let path = format!("/api/v1/assets/{asset}");
286 self.send_request(Method::GET, &path, None, false).await
287 }
288
289 pub async fn http_list_instruments(
296 &self,
297 ) -> Result<Vec<CoinbaseIntxInstrument>, CoinbaseIntxHttpError> {
298 let path = "/api/v1/instruments";
299 self.send_request(Method::GET, path, None, false).await
300 }
301
302 pub async fn http_get_instrument_details(
309 &self,
310 symbol: &str,
311 ) -> Result<CoinbaseIntxInstrument, CoinbaseIntxHttpError> {
312 let path = format!("/api/v1/instruments/{symbol}");
313 self.send_request(Method::GET, &path, None, false).await
314 }
315
316 pub async fn http_list_fee_rate_tiers(
323 &self,
324 ) -> Result<Vec<CoinbaseIntxFeeTier>, CoinbaseIntxHttpError> {
325 let path = "/api/v1/fee-rate-tiers";
326 self.send_request(Method::GET, path, None, true).await
327 }
328
329 pub async fn http_list_portfolios(
336 &self,
337 ) -> Result<Vec<CoinbaseIntxPortfolio>, CoinbaseIntxHttpError> {
338 let path = "/api/v1/portfolios";
339 self.send_request(Method::GET, path, None, true).await
340 }
341
342 pub async fn http_get_portfolio(
349 &self,
350 portfolio_id: &str,
351 ) -> Result<CoinbaseIntxPortfolio, CoinbaseIntxHttpError> {
352 let path = format!("/api/v1/portfolios/{portfolio_id}");
353 self.send_request(Method::GET, &path, None, true).await
354 }
355
356 pub async fn http_get_portfolio_details(
363 &self,
364 portfolio_id: &str,
365 ) -> Result<CoinbaseIntxPortfolioDetails, CoinbaseIntxHttpError> {
366 let path = format!("/api/v1/portfolios/{portfolio_id}/detail");
367 self.send_request(Method::GET, &path, None, true).await
368 }
369
370 pub async fn http_get_portfolio_summary(
377 &self,
378 portfolio_id: &str,
379 ) -> Result<CoinbaseIntxPortfolioSummary, CoinbaseIntxHttpError> {
380 let path = format!("/api/v1/portfolios/{portfolio_id}/summary");
381 self.send_request(Method::GET, &path, None, true).await
382 }
383
384 pub async fn http_list_portfolio_balances(
391 &self,
392 portfolio_id: &str,
393 ) -> Result<Vec<CoinbaseIntxBalance>, CoinbaseIntxHttpError> {
394 let path = format!("/api/v1/portfolios/{portfolio_id}/balances");
395 self.send_request(Method::GET, &path, None, true).await
396 }
397
398 pub async fn http_get_portfolio_balance(
405 &self,
406 portfolio_id: &str,
407 asset: &str,
408 ) -> Result<CoinbaseIntxBalance, CoinbaseIntxHttpError> {
409 let path = format!("/api/v1/portfolios/{portfolio_id}/balances/{asset}");
410 self.send_request(Method::GET, &path, None, true).await
411 }
412
413 pub async fn http_list_portfolio_fills(
420 &self,
421 portfolio_id: &str,
422 params: GetPortfolioFillsParams,
423 ) -> Result<CoinbaseIntxFillList, CoinbaseIntxHttpError> {
424 let query = serde_urlencoded::to_string(¶ms)
425 .map_err(|e| CoinbaseIntxHttpError::JsonError(e.to_string()))?;
426 let path = format!("/api/v1/portfolios/{portfolio_id}/fills?{query}");
427 self.send_request(Method::GET, &path, None, true).await
428 }
429
430 pub async fn http_list_portfolio_positions(
437 &self,
438 portfolio_id: &str,
439 ) -> Result<Vec<CoinbaseIntxPosition>, CoinbaseIntxHttpError> {
440 let path = format!("/api/v1/portfolios/{portfolio_id}/positions");
441 self.send_request(Method::GET, &path, None, true).await
442 }
443
444 pub async fn http_get_portfolio_position(
451 &self,
452 portfolio_id: &str,
453 symbol: &str,
454 ) -> Result<CoinbaseIntxPosition, CoinbaseIntxHttpError> {
455 let path = format!("/api/v1/portfolios/{portfolio_id}/positions/{symbol}");
456 self.send_request(Method::GET, &path, None, true).await
457 }
458
459 pub async fn http_list_portfolio_fee_rates(
466 &self,
467 ) -> Result<Vec<CoinbaseIntxPortfolioFeeRates>, CoinbaseIntxHttpError> {
468 let path = "/api/v1/portfolios/fee-rates";
469 self.send_request(Method::GET, path, None, true).await
470 }
471
472 pub async fn http_create_order(
477 &self,
478 params: CreateOrderParams,
479 ) -> Result<CoinbaseIntxOrder, CoinbaseIntxHttpError> {
480 let path = "/api/v1/orders";
481 let body = serde_json::to_vec(¶ms)
482 .map_err(|e| CoinbaseIntxHttpError::JsonError(e.to_string()))?;
483 self.send_request(Method::POST, path, Some(body), true)
484 .await
485 }
486
487 pub async fn http_get_order(
494 &self,
495 venue_order_id: &str,
496 portfolio_id: &str,
497 ) -> Result<CoinbaseIntxOrder, CoinbaseIntxHttpError> {
498 let params = GetOrderParams {
499 portfolio: portfolio_id.to_string(),
500 };
501 let query = serde_urlencoded::to_string(¶ms)
502 .map_err(|e| CoinbaseIntxHttpError::JsonError(e.to_string()))?;
503 let path = format!("/api/v1/orders/{venue_order_id}?{query}");
504 self.send_request(Method::GET, &path, None, true).await
505 }
506
507 pub async fn http_list_open_orders(
515 &self,
516 params: GetOrdersParams,
517 ) -> Result<CoinbaseIntxOrderList, CoinbaseIntxHttpError> {
518 let query = serde_urlencoded::to_string(¶ms)
519 .map_err(|e| CoinbaseIntxHttpError::JsonError(e.to_string()))?;
520 let path = format!("/api/v1/orders?{query}");
521 self.send_request(Method::GET, &path, None, true).await
522 }
523
524 pub async fn http_cancel_order(
529 &self,
530 client_order_id: &str,
531 portfolio_id: &str,
532 ) -> Result<CoinbaseIntxOrder, CoinbaseIntxHttpError> {
533 let params = CancelOrderParams {
534 portfolio: portfolio_id.to_string(),
535 };
536 let query = serde_urlencoded::to_string(¶ms)
537 .map_err(|e| CoinbaseIntxHttpError::JsonError(e.to_string()))?;
538 let path = format!("/api/v1/orders/{client_order_id}?{query}");
539 self.send_request(Method::DELETE, &path, None, true).await
540 }
541
542 pub async fn http_cancel_orders(
547 &self,
548 params: CancelOrdersParams,
549 ) -> Result<Vec<CoinbaseIntxOrder>, CoinbaseIntxHttpError> {
550 let query = serde_urlencoded::to_string(¶ms)
551 .map_err(|e| CoinbaseIntxHttpError::JsonError(e.to_string()))?;
552 let path = format!("/api/v1/orders?{query}");
553 self.send_request(Method::DELETE, &path, None, true).await
554 }
555
556 pub async fn http_modify_order(
563 &self,
564 order_id: &str,
565 params: ModifyOrderParams,
566 ) -> Result<CoinbaseIntxOrder, CoinbaseIntxHttpError> {
567 let path = format!("/api/v1/orders/{order_id}");
568 let body = serde_json::to_vec(¶ms)
569 .map_err(|e| CoinbaseIntxHttpError::JsonError(e.to_string()))?;
570 self.send_request(Method::PUT, &path, Some(body), true)
571 .await
572 }
573}
574
575#[derive(Debug, Clone)]
580#[cfg_attr(
581 feature = "python",
582 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
583)]
584pub struct CoinbaseIntxHttpClient {
585 pub(crate) inner: Arc<CoinbaseIntxHttpInnerClient>,
586 pub(crate) instruments_cache: Arc<Mutex<HashMap<Ustr, InstrumentAny>>>,
587 cache_initialized: bool,
588}
589
590impl Default for CoinbaseIntxHttpClient {
591 fn default() -> Self {
592 Self::new(None, Some(60))
593 }
594}
595
596impl CoinbaseIntxHttpClient {
597 #[must_use]
603 pub fn new(base_url: Option<String>, timeout_secs: Option<u64>) -> Self {
604 Self {
605 inner: Arc::new(CoinbaseIntxHttpInnerClient::new(base_url, timeout_secs)),
606 instruments_cache: Arc::new(Mutex::new(HashMap::new())),
607 cache_initialized: false,
608 }
609 }
610
611 pub fn from_env() -> anyhow::Result<Self> {
618 Self::with_credentials(None, None, None, None, None)
619 }
620
621 pub fn with_credentials(
628 api_key: Option<String>,
629 api_secret: Option<String>,
630 api_passphrase: Option<String>,
631 base_url: Option<String>,
632 timeout_secs: Option<u64>,
633 ) -> anyhow::Result<Self> {
634 let api_key = get_or_env_var(api_key, "COINBASE_INTX_API_KEY")?;
635 let api_secret = get_or_env_var(api_secret, "COINBASE_INTX_API_SECRET")?;
636 let api_passphrase = get_or_env_var(api_passphrase, "COINBASE_INTX_API_PASSPHRASE")?;
637 let base_url = base_url.unwrap_or(COINBASE_INTX_REST_URL.to_string());
638 Ok(Self {
639 inner: Arc::new(CoinbaseIntxHttpInnerClient::with_credentials(
640 api_key,
641 api_secret,
642 api_passphrase,
643 base_url,
644 timeout_secs,
645 )),
646 instruments_cache: Arc::new(Mutex::new(HashMap::new())),
647 cache_initialized: false,
648 })
649 }
650
651 fn get_instrument_from_cache(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
652 match self
653 .instruments_cache
654 .lock()
655 .expect(MUTEX_POISONED)
656 .get(&symbol)
657 {
658 Some(inst) => Ok(inst.clone()), None => anyhow::bail!("Unable to process request, instrument {symbol} not in cache"),
660 }
661 }
662
663 fn generate_ts_init(&self) -> UnixNanos {
664 get_atomic_clock_realtime().get_time_ns()
665 }
666
667 #[must_use]
669 pub fn base_url(&self) -> &str {
670 self.inner.base_url.as_str()
671 }
672
673 #[must_use]
675 pub fn api_key(&self) -> Option<&str> {
676 self.inner.credential.as_ref().map(|c| c.api_key.as_str())
677 }
678
679 #[must_use]
683 pub const fn is_initialized(&self) -> bool {
684 self.cache_initialized
685 }
686
687 #[must_use]
693 pub fn get_cached_symbols(&self) -> Vec<String> {
694 self.instruments_cache
695 .lock()
696 .unwrap()
697 .keys()
698 .map(ToString::to_string)
699 .collect()
700 }
701
702 pub fn add_instruments(&mut self, instruments: Vec<InstrumentAny>) {
710 for inst in instruments {
711 self.instruments_cache
712 .lock()
713 .unwrap()
714 .insert(inst.raw_symbol().inner(), inst);
715 }
716 self.cache_initialized = true;
717 }
718
719 pub fn add_instrument(&mut self, instrument: InstrumentAny) {
727 self.instruments_cache
728 .lock()
729 .unwrap()
730 .insert(instrument.raw_symbol().inner(), instrument);
731 self.cache_initialized = true;
732 }
733
734 pub async fn list_portfolios(&self) -> anyhow::Result<Vec<CoinbaseIntxPortfolio>> {
740 let resp = self
741 .inner
742 .http_list_portfolios()
743 .await
744 .map_err(|e| anyhow::anyhow!(e))?;
745
746 Ok(resp)
747 }
748
749 pub async fn request_account_state(
755 &self,
756 account_id: AccountId,
757 ) -> anyhow::Result<AccountState> {
758 let resp = self
759 .inner
760 .http_list_portfolio_balances(account_id.get_issuers_id())
761 .await
762 .map_err(|e| anyhow::anyhow!(e))?;
763
764 let ts_init = self.generate_ts_init();
765 let account_state = parse_account_state(resp, account_id, ts_init)?;
766
767 Ok(account_state)
768 }
769
770 pub async fn request_instruments(&self) -> anyhow::Result<Vec<InstrumentAny>> {
776 let resp = self
777 .inner
778 .http_list_instruments()
779 .await
780 .map_err(|e| anyhow::anyhow!(e))?;
781
782 let ts_init = self.generate_ts_init();
783
784 let mut instruments: Vec<InstrumentAny> = Vec::new();
785 for inst in &resp {
786 let instrument_any = parse_instrument_any(inst, ts_init);
787 if let Some(instrument_any) = instrument_any {
788 instruments.push(instrument_any);
789 }
790 }
791
792 Ok(instruments)
793 }
794
795 pub async fn request_instrument(&self, symbol: &Symbol) -> anyhow::Result<InstrumentAny> {
801 let resp = self
802 .inner
803 .http_get_instrument_details(symbol.as_str())
804 .await
805 .map_err(|e| anyhow::anyhow!(e))?;
806
807 let ts_init = self.generate_ts_init();
808
809 match parse_instrument_any(&resp, ts_init) {
810 Some(inst) => Ok(inst),
811 None => anyhow::bail!("Unable to parse instrument"),
812 }
813 }
814
815 pub async fn request_order_status_report(
821 &self,
822 account_id: AccountId,
823 venue_order_id: VenueOrderId,
824 ) -> anyhow::Result<OrderStatusReport> {
825 let portfolio_id = account_id.get_issuers_id();
826
827 let resp = self
828 .inner
829 .http_get_order(venue_order_id.as_str(), portfolio_id)
830 .await
831 .map_err(|e| anyhow::anyhow!(e))?;
832
833 let instrument = self.get_instrument_from_cache(resp.symbol)?;
834 let ts_init = self.generate_ts_init();
835
836 let report = parse_order_status_report(
837 resp,
838 account_id,
839 instrument.price_precision(),
840 instrument.size_precision(),
841 ts_init,
842 )?;
843 Ok(report)
844 }
845
846 pub async fn request_order_status_reports(
852 &self,
853 account_id: AccountId,
854 symbol: Symbol,
855 ) -> anyhow::Result<Vec<OrderStatusReport>> {
856 let portfolio_id = account_id.get_issuers_id();
857
858 let mut params = GetOrdersParamsBuilder::default();
859 params.portfolio(portfolio_id);
860 params.instrument(symbol.as_str());
861 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
862
863 let resp = self
864 .inner
865 .http_list_open_orders(params)
866 .await
867 .map_err(|e| anyhow::anyhow!(e))?;
868
869 let ts_init = get_atomic_clock_realtime().get_time_ns();
870
871 let mut reports: Vec<OrderStatusReport> = Vec::new();
872 for order in resp.results {
873 let instrument = self.get_instrument_from_cache(order.symbol)?;
874 let report = parse_order_status_report(
875 order,
876 account_id,
877 instrument.price_precision(),
878 instrument.size_precision(),
879 ts_init,
880 )?;
881 reports.push(report);
882 }
883
884 Ok(reports)
885 }
886
887 pub async fn request_fill_reports(
893 &self,
894 account_id: AccountId,
895 client_order_id: Option<ClientOrderId>,
896 start: Option<DateTime<Utc>>,
897 ) -> anyhow::Result<Vec<FillReport>> {
898 let portfolio_id = account_id.get_issuers_id();
899
900 let mut params = GetPortfolioFillsParamsBuilder::default();
901 if let Some(start) = start {
902 params.time_from(start);
903 }
904 if let Some(client_order_id) = client_order_id {
905 params.client_order_id(client_order_id.to_string());
906 }
907 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
908
909 let resp = self
910 .inner
911 .http_list_portfolio_fills(portfolio_id, params)
912 .await
913 .map_err(|e| anyhow::anyhow!(e))?;
914
915 let ts_init = get_atomic_clock_realtime().get_time_ns();
916
917 let mut reports: Vec<FillReport> = Vec::new();
918 for fill in resp.results {
919 let instrument = self.get_instrument_from_cache(fill.symbol)?;
920 let report = parse_fill_report(
921 fill,
922 account_id,
923 instrument.price_precision(),
924 instrument.size_precision(),
925 ts_init,
926 )?;
927 reports.push(report);
928 }
929
930 Ok(reports)
931 }
932
933 pub async fn request_position_status_report(
939 &self,
940 account_id: AccountId,
941 symbol: Symbol,
942 ) -> anyhow::Result<PositionStatusReport> {
943 let portfolio_id = account_id.get_issuers_id();
944
945 let resp = self
946 .inner
947 .http_get_portfolio_position(portfolio_id, symbol.as_str())
948 .await
949 .map_err(|e| anyhow::anyhow!(e))?;
950
951 let instrument = self.get_instrument_from_cache(resp.symbol)?;
952 let ts_init = get_atomic_clock_realtime().get_time_ns();
953
954 let report =
955 parse_position_status_report(resp, account_id, instrument.size_precision(), ts_init)?;
956 Ok(report)
957 }
958
959 pub async fn request_position_status_reports(
965 &self,
966 account_id: AccountId,
967 ) -> anyhow::Result<Vec<PositionStatusReport>> {
968 let portfolio_id = account_id.get_issuers_id();
969
970 let resp = self
971 .inner
972 .http_list_portfolio_positions(portfolio_id)
973 .await
974 .map_err(|e| anyhow::anyhow!(e))?;
975
976 let ts_init = get_atomic_clock_realtime().get_time_ns();
977
978 let mut reports: Vec<PositionStatusReport> = Vec::new();
979 for position in resp {
980 let instrument = self.get_instrument_from_cache(position.symbol)?;
981 let report = parse_position_status_report(
982 position,
983 account_id,
984 instrument.size_precision(),
985 ts_init,
986 )?;
987 reports.push(report);
988 }
989
990 Ok(reports)
991 }
992
993 #[allow(clippy::too_many_arguments)]
999 pub async fn submit_order(
1000 &self,
1001 account_id: AccountId,
1002 client_order_id: ClientOrderId,
1003 symbol: Symbol,
1004 order_side: OrderSide,
1005 order_type: OrderType,
1006 quantity: Quantity,
1007 time_in_force: TimeInForce,
1008 expire_time: Option<DateTime<Utc>>,
1009 price: Option<Price>,
1010 trigger_price: Option<Price>,
1011 post_only: Option<bool>,
1012 reduce_only: Option<bool>,
1013 ) -> anyhow::Result<OrderStatusReport> {
1014 let coinbase_side: CoinbaseIntxSide = order_side.into();
1015 let coinbase_order_type: CoinbaseIntxOrderType = order_type.into();
1016 let coinbase_tif: CoinbaseIntxTimeInForce = time_in_force.into();
1017
1018 let mut params = CreateOrderParamsBuilder::default();
1019 params.portfolio(account_id.get_issuers_id());
1020 params.client_order_id(client_order_id.as_str());
1021 params.instrument(symbol.as_str());
1022 params.side(coinbase_side);
1023 params.size(quantity.to_string());
1024 params.order_type(coinbase_order_type);
1025 params.tif(coinbase_tif);
1026 if let Some(expire_time) = expire_time {
1027 params.expire_time(expire_time);
1028 }
1029 if let Some(price) = price {
1030 params.price(price.to_string());
1031 }
1032 if let Some(trigger_price) = trigger_price {
1033 params.stop_price(trigger_price.to_string());
1034 }
1035 if let Some(post_only) = post_only {
1036 params.post_only(post_only);
1037 }
1038 if let Some(reduce_only) = reduce_only {
1039 params.close_only(reduce_only);
1040 }
1041 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1042
1043 let resp = self.inner.http_create_order(params).await?;
1044 tracing::debug!("Submitted order: {resp:?}");
1045
1046 let instrument = self.get_instrument_from_cache(resp.symbol)?;
1047 let ts_init = get_atomic_clock_realtime().get_time_ns();
1048 let report = parse_order_status_report(
1049 resp,
1050 account_id,
1051 instrument.price_precision(),
1052 instrument.size_precision(),
1053 ts_init,
1054 )?;
1055 Ok(report)
1056 }
1057
1058 pub async fn cancel_order(
1064 &self,
1065 account_id: AccountId,
1066 client_order_id: ClientOrderId,
1067 ) -> anyhow::Result<OrderStatusReport> {
1068 let portfolio_id = account_id.get_issuers_id();
1069
1070 let resp = self
1071 .inner
1072 .http_cancel_order(client_order_id.as_str(), portfolio_id)
1073 .await?;
1074 tracing::debug!("Canceled order: {resp:?}");
1075
1076 let instrument = self.get_instrument_from_cache(resp.symbol)?;
1077 let ts_init = get_atomic_clock_realtime().get_time_ns();
1078
1079 let report = parse_order_status_report(
1080 resp,
1081 account_id,
1082 instrument.price_precision(),
1083 instrument.size_precision(),
1084 ts_init,
1085 )?;
1086 Ok(report)
1087 }
1088
1089 pub async fn cancel_orders(
1095 &self,
1096 account_id: AccountId,
1097 symbol: Symbol,
1098 order_side: Option<OrderSide>,
1099 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1100 let mut params = CancelOrdersParamsBuilder::default();
1101 params.portfolio(account_id.get_issuers_id());
1102 params.instrument(symbol.as_str());
1103 if let Some(side) = order_side {
1104 let side: CoinbaseIntxSide = side.into();
1105 params.side(side);
1106 }
1107 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1108
1109 let resp = self.inner.http_cancel_orders(params).await?;
1110
1111 let instrument = self.get_instrument_from_cache(symbol.inner())?;
1112 let ts_init = get_atomic_clock_realtime().get_time_ns();
1113
1114 let mut reports: Vec<OrderStatusReport> = Vec::with_capacity(resp.len());
1115 for order in resp {
1116 tracing::debug!("Canceled order: {order:?}");
1117 let report = parse_order_status_report(
1118 order,
1119 account_id,
1120 instrument.price_precision(),
1121 instrument.size_precision(),
1122 ts_init,
1123 )?;
1124 reports.push(report);
1125 }
1126
1127 Ok(reports)
1128 }
1129
1130 #[allow(clippy::too_many_arguments)]
1136 pub async fn modify_order(
1137 &self,
1138 account_id: AccountId,
1139 client_order_id: ClientOrderId,
1140 new_client_order_id: ClientOrderId,
1141 price: Option<Price>,
1142 trigger_price: Option<Price>,
1143 quantity: Option<Quantity>,
1144 ) -> anyhow::Result<OrderStatusReport> {
1145 let mut params = ModifyOrderParamsBuilder::default();
1146 params.portfolio(account_id.get_issuers_id());
1147 params.client_order_id(new_client_order_id.as_str());
1148 if let Some(price) = price {
1149 params.price(price.to_string());
1150 }
1151 if let Some(trigger_price) = trigger_price {
1152 params.price(trigger_price.to_string());
1153 }
1154 if let Some(quantity) = quantity {
1155 params.size(quantity.to_string());
1156 }
1157 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1158
1159 let resp = self
1160 .inner
1161 .http_modify_order(client_order_id.as_str(), params)
1162 .await?;
1163 tracing::debug!("Modified order {}", resp.client_order_id);
1164
1165 let instrument = self.get_instrument_from_cache(resp.symbol)?;
1166 let ts_init = get_atomic_clock_realtime().get_time_ns();
1167 let report = parse_order_status_report(
1168 resp,
1169 account_id,
1170 instrument.price_precision(),
1171 instrument.size_precision(),
1172 ts_init,
1173 )?;
1174 Ok(report)
1175 }
1176}