1use std::{
24 collections::HashMap,
25 num::NonZeroU32,
26 sync::{Arc, LazyLock, Mutex},
27};
28
29use anyhow::Context;
30use chrono::{DateTime, Utc};
31use nautilus_core::{
32 MUTEX_POISONED, UnixNanos, consts::NAUTILUS_USER_AGENT, env::get_or_env_var,
33 time::get_atomic_clock_realtime,
34};
35use nautilus_model::{
36 enums::{OrderSide, OrderType, TimeInForce},
37 events::AccountState,
38 identifiers::{AccountId, ClientOrderId, Symbol, VenueOrderId},
39 instruments::{Instrument, InstrumentAny},
40 reports::{FillReport, OrderStatusReport, PositionStatusReport},
41 types::{Price, Quantity},
42};
43use nautilus_network::{
44 http::{HttpClient, HttpClientError},
45 ratelimiter::quota::Quota,
46};
47use reqwest::{Method, StatusCode, header::USER_AGENT};
48use serde::{Deserialize, Serialize, de::DeserializeOwned};
49use ustr::Ustr;
50
51use super::{
52 error::CoinbaseIntxHttpError,
53 models::{
54 CoinbaseIntxAsset, CoinbaseIntxBalance, CoinbaseIntxFeeTier, CoinbaseIntxFillList,
55 CoinbaseIntxInstrument, CoinbaseIntxOrder, CoinbaseIntxOrderList, CoinbaseIntxPortfolio,
56 CoinbaseIntxPortfolioDetails, CoinbaseIntxPortfolioFeeRates, CoinbaseIntxPortfolioSummary,
57 CoinbaseIntxPosition,
58 },
59 parse::{
60 parse_account_state, parse_fill_report, parse_instrument_any, parse_order_status_report,
61 parse_position_status_report,
62 },
63 query::{
64 CancelOrderParams, CancelOrdersParams, CreateOrderParams, CreateOrderParamsBuilder,
65 GetOrderParams, GetOrdersParams, GetOrdersParamsBuilder, GetPortfolioFillsParams,
66 GetPortfolioFillsParamsBuilder, ModifyOrderParams,
67 },
68};
69use crate::{
70 common::{
71 consts::COINBASE_INTX_REST_URL,
72 credential::Credential,
73 enums::{CoinbaseIntxOrderType, CoinbaseIntxSide, CoinbaseIntxTimeInForce},
74 },
75 http::{
76 error::ErrorBody,
77 query::{CancelOrdersParamsBuilder, ModifyOrderParamsBuilder},
78 },
79};
80
81#[derive(Debug, Serialize, Deserialize)]
83pub struct CoinbaseIntxResponse<T> {
84 pub code: String,
86 pub msg: String,
88 pub data: Vec<T>,
90}
91
92pub static COINBASE_INTX_REST_QUOTA: LazyLock<Quota> =
94 LazyLock::new(|| Quota::per_second(NonZeroU32::new(100).unwrap()));
95
96#[derive(Debug, Clone)]
102pub struct CoinbaseIntxHttpInnerClient {
103 base_url: String,
104 client: HttpClient,
105 credential: Option<Credential>,
106}
107
108impl Default for CoinbaseIntxHttpInnerClient {
109 fn default() -> Self {
110 Self::new(None, Some(60)).expect("Failed to create default Coinbase INTX HTTP client")
111 }
112}
113
114impl CoinbaseIntxHttpInnerClient {
115 pub fn new(
125 base_url: Option<String>,
126 timeout_secs: Option<u64>,
127 ) -> Result<Self, HttpClientError> {
128 Ok(Self {
129 base_url: base_url.unwrap_or(COINBASE_INTX_REST_URL.to_string()),
130 client: HttpClient::new(
131 Self::default_headers(),
132 vec![],
133 vec![],
134 Some(*COINBASE_INTX_REST_QUOTA),
135 timeout_secs,
136 None, )?,
138 credential: None,
139 })
140 }
141
142 pub fn with_credentials(
149 api_key: String,
150 api_secret: String,
151 api_passphrase: String,
152 base_url: String,
153 timeout_secs: Option<u64>,
154 ) -> Result<Self, HttpClientError> {
155 Ok(Self {
156 base_url,
157 client: HttpClient::new(
158 Self::default_headers(),
159 vec![],
160 vec![],
161 Some(*COINBASE_INTX_REST_QUOTA),
162 timeout_secs,
163 None, )?,
165 credential: Some(Credential::new(api_key, api_secret, api_passphrase)),
166 })
167 }
168
169 fn default_headers() -> HashMap<String, String> {
171 HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())])
172 }
173
174 fn sign_request(
181 &self,
182 method: &Method,
183 path: &str,
184 body: Option<&[u8]>,
185 ) -> Result<HashMap<String, String>, CoinbaseIntxHttpError> {
186 let credential = match self.credential.as_ref() {
187 Some(c) => c,
188 None => return Err(CoinbaseIntxHttpError::MissingCredentials),
189 };
190
191 let api_key = credential.api_key.clone().to_string();
192 let api_passphrase = credential.api_passphrase.clone().to_string();
193 let timestamp = Utc::now().timestamp().to_string();
194 let body_str = body
195 .and_then(|b| String::from_utf8(b.to_vec()).ok())
196 .unwrap_or_default();
197
198 let signature = credential.sign(×tamp, method.as_str(), path, &body_str);
199
200 let mut headers = HashMap::new();
201 headers.insert("Accept".to_string(), "application/json".to_string());
202 headers.insert("CB-ACCESS-KEY".to_string(), api_key);
203 headers.insert("CB-ACCESS-PASSPHRASE".to_string(), api_passphrase);
204 headers.insert("CB-ACCESS-SIGN".to_string(), signature);
205 headers.insert("CB-ACCESS-TIMESTAMP".to_string(), timestamp);
206 headers.insert("Content-Type".to_string(), "application/json".to_string());
207
208 Ok(headers)
209 }
210
211 async fn send_request<T: DeserializeOwned, P: Serialize>(
218 &self,
219 method: Method,
220 path: &str,
221 params: Option<&P>,
222 body: Option<Vec<u8>>,
223 authenticate: bool,
224 ) -> Result<T, CoinbaseIntxHttpError> {
225 let params_str = params
226 .map(serde_urlencoded::to_string)
227 .transpose()
228 .map_err(|e| {
229 CoinbaseIntxHttpError::JsonError(format!("Failed to serialize params: {e}"))
230 })?;
231
232 let full_path = if let Some(ref query) = params_str {
233 if query.is_empty() {
234 path.to_string()
235 } else {
236 format!("{path}?{query}")
237 }
238 } else {
239 path.to_string()
240 };
241
242 let url = format!("{}{}", self.base_url, full_path);
243
244 let headers = if authenticate {
245 Some(self.sign_request(&method, &full_path, body.as_deref())?)
246 } else {
247 None
248 };
249
250 tracing::trace!("Request: {url:?} {body:?}");
251
252 let resp = self
253 .client
254 .request(method.clone(), url, None, headers, body, None, None)
255 .await?;
256
257 tracing::trace!("Response: {resp:?}");
258
259 if resp.status.is_success() {
260 let coinbase_response: T = serde_json::from_slice(&resp.body).map_err(|e| {
261 tracing::error!("Failed to deserialize CoinbaseResponse: {e}");
262 CoinbaseIntxHttpError::JsonError(e.to_string())
263 })?;
264
265 Ok(coinbase_response)
266 } else {
267 let error_body = String::from_utf8_lossy(&resp.body);
268 tracing::error!(
269 "HTTP error {} with body: {error_body}",
270 resp.status.as_str()
271 );
272
273 if let Ok(parsed_error) = serde_json::from_slice::<CoinbaseIntxResponse<T>>(&resp.body)
274 {
275 return Err(CoinbaseIntxHttpError::CoinbaseError {
276 error_code: parsed_error.code,
277 message: parsed_error.msg,
278 });
279 }
280
281 if let Ok(parsed_error) = serde_json::from_slice::<ErrorBody>(&resp.body)
282 && let (Some(title), Some(error)) = (parsed_error.title, parsed_error.error)
283 {
284 return Err(CoinbaseIntxHttpError::CoinbaseError {
285 error_code: error,
286 message: title,
287 });
288 }
289
290 Err(CoinbaseIntxHttpError::UnexpectedStatus {
291 status: StatusCode::from_u16(resp.status.as_u16()).unwrap(),
292 body: error_body.to_string(),
293 })
294 }
295 }
296
297 pub async fn http_list_assets(&self) -> Result<Vec<CoinbaseIntxAsset>, CoinbaseIntxHttpError> {
304 let path = "/api/v1/assets";
305 self.send_request::<_, ()>(Method::GET, path, None, None, false)
306 .await
307 }
308
309 pub async fn http_get_asset_details(
316 &self,
317 asset: &str,
318 ) -> Result<CoinbaseIntxAsset, CoinbaseIntxHttpError> {
319 let path = format!("/api/v1/assets/{asset}");
320 self.send_request::<_, ()>(Method::GET, &path, None, None, false)
321 .await
322 }
323
324 pub async fn http_list_instruments(
331 &self,
332 ) -> Result<Vec<CoinbaseIntxInstrument>, CoinbaseIntxHttpError> {
333 let path = "/api/v1/instruments";
334 self.send_request::<_, ()>(Method::GET, path, None, None, false)
335 .await
336 }
337
338 pub async fn http_get_instrument_details(
345 &self,
346 symbol: &str,
347 ) -> Result<CoinbaseIntxInstrument, CoinbaseIntxHttpError> {
348 let path = format!("/api/v1/instruments/{symbol}");
349 self.send_request::<_, ()>(Method::GET, &path, None, None, false)
350 .await
351 }
352
353 pub async fn http_list_fee_rate_tiers(
360 &self,
361 ) -> Result<Vec<CoinbaseIntxFeeTier>, CoinbaseIntxHttpError> {
362 let path = "/api/v1/fee-rate-tiers";
363 self.send_request::<_, ()>(Method::GET, path, None, None, true)
364 .await
365 }
366
367 pub async fn http_list_portfolios(
374 &self,
375 ) -> Result<Vec<CoinbaseIntxPortfolio>, CoinbaseIntxHttpError> {
376 let path = "/api/v1/portfolios";
377 self.send_request::<_, ()>(Method::GET, path, None, None, true)
378 .await
379 }
380
381 pub async fn http_get_portfolio(
388 &self,
389 portfolio_id: &str,
390 ) -> Result<CoinbaseIntxPortfolio, CoinbaseIntxHttpError> {
391 let path = format!("/api/v1/portfolios/{portfolio_id}");
392 self.send_request::<_, ()>(Method::GET, &path, None, None, true)
393 .await
394 }
395
396 pub async fn http_get_portfolio_details(
403 &self,
404 portfolio_id: &str,
405 ) -> Result<CoinbaseIntxPortfolioDetails, CoinbaseIntxHttpError> {
406 let path = format!("/api/v1/portfolios/{portfolio_id}/detail");
407 self.send_request::<_, ()>(Method::GET, &path, None, None, true)
408 .await
409 }
410
411 pub async fn http_get_portfolio_summary(
418 &self,
419 portfolio_id: &str,
420 ) -> Result<CoinbaseIntxPortfolioSummary, CoinbaseIntxHttpError> {
421 let path = format!("/api/v1/portfolios/{portfolio_id}/summary");
422 self.send_request::<_, ()>(Method::GET, &path, None, None, true)
423 .await
424 }
425
426 pub async fn http_list_portfolio_balances(
433 &self,
434 portfolio_id: &str,
435 ) -> Result<Vec<CoinbaseIntxBalance>, CoinbaseIntxHttpError> {
436 let path = format!("/api/v1/portfolios/{portfolio_id}/balances");
437 self.send_request::<_, ()>(Method::GET, &path, None, None, true)
438 .await
439 }
440
441 pub async fn http_get_portfolio_balance(
448 &self,
449 portfolio_id: &str,
450 asset: &str,
451 ) -> Result<CoinbaseIntxBalance, CoinbaseIntxHttpError> {
452 let path = format!("/api/v1/portfolios/{portfolio_id}/balances/{asset}");
453 self.send_request::<_, ()>(Method::GET, &path, None, None, true)
454 .await
455 }
456
457 pub async fn http_list_portfolio_fills(
464 &self,
465 portfolio_id: &str,
466 params: GetPortfolioFillsParams,
467 ) -> Result<CoinbaseIntxFillList, CoinbaseIntxHttpError> {
468 let path = format!("/api/v1/portfolios/{portfolio_id}/fills");
469 self.send_request(Method::GET, &path, Some(¶ms), None, true)
470 .await
471 }
472
473 pub async fn http_list_portfolio_positions(
480 &self,
481 portfolio_id: &str,
482 ) -> Result<Vec<CoinbaseIntxPosition>, CoinbaseIntxHttpError> {
483 let path = format!("/api/v1/portfolios/{portfolio_id}/positions");
484 self.send_request::<_, ()>(Method::GET, &path, None, None, true)
485 .await
486 }
487
488 pub async fn http_get_portfolio_position(
495 &self,
496 portfolio_id: &str,
497 symbol: &str,
498 ) -> Result<CoinbaseIntxPosition, CoinbaseIntxHttpError> {
499 let path = format!("/api/v1/portfolios/{portfolio_id}/positions/{symbol}");
500 self.send_request::<_, ()>(Method::GET, &path, None, None, true)
501 .await
502 }
503
504 pub async fn http_list_portfolio_fee_rates(
511 &self,
512 ) -> Result<Vec<CoinbaseIntxPortfolioFeeRates>, CoinbaseIntxHttpError> {
513 let path = "/api/v1/portfolios/fee-rates";
514 self.send_request::<_, ()>(Method::GET, path, None, None, true)
515 .await
516 }
517
518 pub async fn http_create_order(
523 &self,
524 params: CreateOrderParams,
525 ) -> Result<CoinbaseIntxOrder, CoinbaseIntxHttpError> {
526 let path = "/api/v1/orders";
527 let body = serde_json::to_vec(¶ms)
528 .map_err(|e| CoinbaseIntxHttpError::JsonError(e.to_string()))?;
529 self.send_request::<_, ()>(Method::POST, path, None, Some(body), true)
530 .await
531 }
532
533 pub async fn http_get_order(
540 &self,
541 venue_order_id: &str,
542 portfolio_id: &str,
543 ) -> Result<CoinbaseIntxOrder, CoinbaseIntxHttpError> {
544 let params = GetOrderParams {
545 portfolio: portfolio_id.to_string(),
546 };
547 let path = format!("/api/v1/orders/{venue_order_id}");
548 self.send_request(Method::GET, &path, Some(¶ms), None, true)
549 .await
550 }
551
552 pub async fn http_list_open_orders(
560 &self,
561 params: GetOrdersParams,
562 ) -> Result<CoinbaseIntxOrderList, CoinbaseIntxHttpError> {
563 self.send_request(Method::GET, "/api/v1/orders", Some(¶ms), None, true)
564 .await
565 }
566
567 pub async fn http_cancel_order(
572 &self,
573 client_order_id: &str,
574 portfolio_id: &str,
575 ) -> Result<CoinbaseIntxOrder, CoinbaseIntxHttpError> {
576 let params = CancelOrderParams {
577 portfolio: portfolio_id.to_string(),
578 };
579 let path = format!("/api/v1/orders/{client_order_id}");
580 self.send_request(Method::DELETE, &path, Some(¶ms), None, true)
581 .await
582 }
583
584 pub async fn http_cancel_orders(
589 &self,
590 params: CancelOrdersParams,
591 ) -> Result<Vec<CoinbaseIntxOrder>, CoinbaseIntxHttpError> {
592 self.send_request(Method::DELETE, "/api/v1/orders", Some(¶ms), None, true)
593 .await
594 }
595
596 pub async fn http_modify_order(
603 &self,
604 order_id: &str,
605 params: ModifyOrderParams,
606 ) -> Result<CoinbaseIntxOrder, CoinbaseIntxHttpError> {
607 let path = format!("/api/v1/orders/{order_id}");
608 let body = serde_json::to_vec(¶ms)
609 .map_err(|e| CoinbaseIntxHttpError::JsonError(e.to_string()))?;
610 self.send_request::<_, ()>(Method::PUT, &path, None, Some(body), true)
611 .await
612 }
613}
614
615#[derive(Debug, Clone)]
620#[cfg_attr(
621 feature = "python",
622 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
623)]
624pub struct CoinbaseIntxHttpClient {
625 pub(crate) inner: Arc<CoinbaseIntxHttpInnerClient>,
626 pub(crate) instruments_cache: Arc<Mutex<HashMap<Ustr, InstrumentAny>>>,
627 cache_initialized: bool,
628}
629
630impl Default for CoinbaseIntxHttpClient {
631 fn default() -> Self {
632 Self::new(None, Some(60)).expect("Failed to create default Coinbase INTX HTTP client")
633 }
634}
635
636impl CoinbaseIntxHttpClient {
637 pub fn new(
647 base_url: Option<String>,
648 timeout_secs: Option<u64>,
649 ) -> Result<Self, HttpClientError> {
650 Ok(Self {
651 inner: Arc::new(CoinbaseIntxHttpInnerClient::new(base_url, timeout_secs)?),
652 instruments_cache: Arc::new(Mutex::new(HashMap::new())),
653 cache_initialized: false,
654 })
655 }
656
657 pub fn from_env() -> anyhow::Result<Self> {
664 Self::with_credentials(None, None, None, None, None)
665 }
666
667 pub fn with_credentials(
674 api_key: Option<String>,
675 api_secret: Option<String>,
676 api_passphrase: Option<String>,
677 base_url: Option<String>,
678 timeout_secs: Option<u64>,
679 ) -> anyhow::Result<Self> {
680 let api_key = get_or_env_var(api_key, "COINBASE_INTX_API_KEY")?;
681 let api_secret = get_or_env_var(api_secret, "COINBASE_INTX_API_SECRET")?;
682 let api_passphrase = get_or_env_var(api_passphrase, "COINBASE_INTX_API_PASSPHRASE")?;
683 let base_url = base_url.unwrap_or(COINBASE_INTX_REST_URL.to_string());
684 Ok(Self {
685 inner: Arc::new(
686 CoinbaseIntxHttpInnerClient::with_credentials(
687 api_key,
688 api_secret,
689 api_passphrase,
690 base_url,
691 timeout_secs,
692 )
693 .context("failed to create Coinbase INTX HTTP client")?,
694 ),
695 instruments_cache: Arc::new(Mutex::new(HashMap::new())),
696 cache_initialized: false,
697 })
698 }
699
700 fn get_instrument_from_cache(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
701 match self
702 .instruments_cache
703 .lock()
704 .expect(MUTEX_POISONED)
705 .get(&symbol)
706 {
707 Some(inst) => Ok(inst.clone()), None => anyhow::bail!("Unable to process request, instrument {symbol} not in cache"),
709 }
710 }
711
712 fn generate_ts_init(&self) -> UnixNanos {
713 get_atomic_clock_realtime().get_time_ns()
714 }
715
716 #[must_use]
718 pub fn base_url(&self) -> &str {
719 self.inner.base_url.as_str()
720 }
721
722 #[must_use]
724 pub fn api_key(&self) -> Option<&str> {
725 self.inner.credential.as_ref().map(|c| c.api_key.as_str())
726 }
727
728 #[must_use]
730 pub fn api_key_masked(&self) -> Option<String> {
731 self.inner.credential.as_ref().map(|c| c.api_key_masked())
732 }
733
734 #[must_use]
738 pub const fn is_initialized(&self) -> bool {
739 self.cache_initialized
740 }
741
742 #[must_use]
748 pub fn get_cached_symbols(&self) -> Vec<String> {
749 self.instruments_cache
750 .lock()
751 .unwrap()
752 .keys()
753 .map(ToString::to_string)
754 .collect()
755 }
756
757 pub fn add_instruments(&mut self, instruments: Vec<InstrumentAny>) {
765 for inst in instruments {
766 self.instruments_cache
767 .lock()
768 .unwrap()
769 .insert(inst.raw_symbol().inner(), inst);
770 }
771 self.cache_initialized = true;
772 }
773
774 pub fn add_instrument(&mut self, instrument: InstrumentAny) {
782 self.instruments_cache
783 .lock()
784 .unwrap()
785 .insert(instrument.raw_symbol().inner(), instrument);
786 self.cache_initialized = true;
787 }
788
789 pub async fn list_portfolios(&self) -> anyhow::Result<Vec<CoinbaseIntxPortfolio>> {
795 let resp = self
796 .inner
797 .http_list_portfolios()
798 .await
799 .map_err(|e| anyhow::anyhow!(e))?;
800
801 Ok(resp)
802 }
803
804 pub async fn request_account_state(
810 &self,
811 account_id: AccountId,
812 ) -> anyhow::Result<AccountState> {
813 let resp = self
814 .inner
815 .http_list_portfolio_balances(account_id.get_issuers_id())
816 .await
817 .map_err(|e| anyhow::anyhow!(e))?;
818
819 let ts_init = self.generate_ts_init();
820 let account_state = parse_account_state(resp, account_id, ts_init)?;
821
822 Ok(account_state)
823 }
824
825 pub async fn request_instruments(&self) -> anyhow::Result<Vec<InstrumentAny>> {
831 let resp = self
832 .inner
833 .http_list_instruments()
834 .await
835 .map_err(|e| anyhow::anyhow!(e))?;
836
837 let ts_init = self.generate_ts_init();
838
839 let mut instruments: Vec<InstrumentAny> = Vec::new();
840 for inst in &resp {
841 let instrument_any = parse_instrument_any(inst, ts_init);
842 if let Some(instrument_any) = instrument_any {
843 instruments.push(instrument_any);
844 }
845 }
846
847 Ok(instruments)
848 }
849
850 pub async fn request_instrument(&self, symbol: &Symbol) -> anyhow::Result<InstrumentAny> {
856 let resp = self
857 .inner
858 .http_get_instrument_details(symbol.as_str())
859 .await
860 .map_err(|e| anyhow::anyhow!(e))?;
861
862 let ts_init = self.generate_ts_init();
863
864 match parse_instrument_any(&resp, ts_init) {
865 Some(inst) => Ok(inst),
866 None => anyhow::bail!("Unable to parse instrument"),
867 }
868 }
869
870 pub async fn request_order_status_report(
876 &self,
877 account_id: AccountId,
878 venue_order_id: VenueOrderId,
879 ) -> anyhow::Result<OrderStatusReport> {
880 let portfolio_id = account_id.get_issuers_id();
881
882 let resp = self
883 .inner
884 .http_get_order(venue_order_id.as_str(), portfolio_id)
885 .await
886 .map_err(|e| anyhow::anyhow!(e))?;
887
888 let instrument = self.get_instrument_from_cache(resp.symbol)?;
889 let ts_init = self.generate_ts_init();
890
891 let report = parse_order_status_report(
892 resp,
893 account_id,
894 instrument.price_precision(),
895 instrument.size_precision(),
896 ts_init,
897 )?;
898 Ok(report)
899 }
900
901 pub async fn request_order_status_reports(
907 &self,
908 account_id: AccountId,
909 symbol: Symbol,
910 ) -> anyhow::Result<Vec<OrderStatusReport>> {
911 let portfolio_id = account_id.get_issuers_id();
912
913 let mut params = GetOrdersParamsBuilder::default();
914 params.portfolio(portfolio_id);
915 params.instrument(symbol.as_str());
916 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
917
918 let resp = self
919 .inner
920 .http_list_open_orders(params)
921 .await
922 .map_err(|e| anyhow::anyhow!(e))?;
923
924 let ts_init = get_atomic_clock_realtime().get_time_ns();
925
926 let mut reports: Vec<OrderStatusReport> = Vec::new();
927 for order in resp.results {
928 let instrument = self.get_instrument_from_cache(order.symbol)?;
929 let report = parse_order_status_report(
930 order,
931 account_id,
932 instrument.price_precision(),
933 instrument.size_precision(),
934 ts_init,
935 )?;
936 reports.push(report);
937 }
938
939 Ok(reports)
940 }
941
942 pub async fn request_fill_reports(
948 &self,
949 account_id: AccountId,
950 client_order_id: Option<ClientOrderId>,
951 start: Option<DateTime<Utc>>,
952 ) -> anyhow::Result<Vec<FillReport>> {
953 let portfolio_id = account_id.get_issuers_id();
954
955 let mut params = GetPortfolioFillsParamsBuilder::default();
956 if let Some(start) = start {
957 params.time_from(start);
958 }
959 if let Some(client_order_id) = client_order_id {
960 params.client_order_id(client_order_id.to_string());
961 }
962 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
963
964 let resp = self
965 .inner
966 .http_list_portfolio_fills(portfolio_id, params)
967 .await
968 .map_err(|e| anyhow::anyhow!(e))?;
969
970 let ts_init = get_atomic_clock_realtime().get_time_ns();
971
972 let mut reports: Vec<FillReport> = Vec::new();
973 for fill in resp.results {
974 let instrument = self.get_instrument_from_cache(fill.symbol)?;
975 let report = parse_fill_report(
976 fill,
977 account_id,
978 instrument.price_precision(),
979 instrument.size_precision(),
980 ts_init,
981 )?;
982 reports.push(report);
983 }
984
985 Ok(reports)
986 }
987
988 pub async fn request_position_status_report(
994 &self,
995 account_id: AccountId,
996 symbol: Symbol,
997 ) -> anyhow::Result<PositionStatusReport> {
998 let portfolio_id = account_id.get_issuers_id();
999
1000 let resp = self
1001 .inner
1002 .http_get_portfolio_position(portfolio_id, symbol.as_str())
1003 .await
1004 .map_err(|e| anyhow::anyhow!(e))?;
1005
1006 let instrument = self.get_instrument_from_cache(resp.symbol)?;
1007 let ts_init = get_atomic_clock_realtime().get_time_ns();
1008
1009 let report =
1010 parse_position_status_report(resp, account_id, instrument.size_precision(), ts_init)?;
1011 Ok(report)
1012 }
1013
1014 pub async fn request_position_status_reports(
1020 &self,
1021 account_id: AccountId,
1022 ) -> anyhow::Result<Vec<PositionStatusReport>> {
1023 let portfolio_id = account_id.get_issuers_id();
1024
1025 let resp = self
1026 .inner
1027 .http_list_portfolio_positions(portfolio_id)
1028 .await
1029 .map_err(|e| anyhow::anyhow!(e))?;
1030
1031 let ts_init = get_atomic_clock_realtime().get_time_ns();
1032
1033 let mut reports: Vec<PositionStatusReport> = Vec::new();
1034 for position in resp {
1035 let instrument = self.get_instrument_from_cache(position.symbol)?;
1036 let report = parse_position_status_report(
1037 position,
1038 account_id,
1039 instrument.size_precision(),
1040 ts_init,
1041 )?;
1042 reports.push(report);
1043 }
1044
1045 Ok(reports)
1046 }
1047
1048 #[allow(clippy::too_many_arguments)]
1054 pub async fn submit_order(
1055 &self,
1056 account_id: AccountId,
1057 client_order_id: ClientOrderId,
1058 symbol: Symbol,
1059 order_side: OrderSide,
1060 order_type: OrderType,
1061 quantity: Quantity,
1062 time_in_force: TimeInForce,
1063 expire_time: Option<DateTime<Utc>>,
1064 price: Option<Price>,
1065 trigger_price: Option<Price>,
1066 post_only: Option<bool>,
1067 reduce_only: Option<bool>,
1068 ) -> anyhow::Result<OrderStatusReport> {
1069 let coinbase_side: CoinbaseIntxSide = order_side.into();
1070 let coinbase_order_type: CoinbaseIntxOrderType = order_type.into();
1071 let coinbase_tif: CoinbaseIntxTimeInForce = time_in_force.into();
1072
1073 let mut params = CreateOrderParamsBuilder::default();
1074 params.portfolio(account_id.get_issuers_id());
1075 params.client_order_id(client_order_id.as_str());
1076 params.instrument(symbol.as_str());
1077 params.side(coinbase_side);
1078 params.size(quantity.to_string());
1079 params.order_type(coinbase_order_type);
1080 params.tif(coinbase_tif);
1081 if let Some(expire_time) = expire_time {
1082 params.expire_time(expire_time);
1083 }
1084 if let Some(price) = price {
1085 params.price(price.to_string());
1086 }
1087 if let Some(trigger_price) = trigger_price {
1088 params.stop_price(trigger_price.to_string());
1089 }
1090 if let Some(post_only) = post_only {
1091 params.post_only(post_only);
1092 }
1093 if let Some(reduce_only) = reduce_only {
1094 params.close_only(reduce_only);
1095 }
1096 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1097
1098 let resp = self.inner.http_create_order(params).await?;
1099 tracing::debug!("Submitted order: {resp:?}");
1100
1101 let instrument = self.get_instrument_from_cache(resp.symbol)?;
1102 let ts_init = get_atomic_clock_realtime().get_time_ns();
1103 let report = parse_order_status_report(
1104 resp,
1105 account_id,
1106 instrument.price_precision(),
1107 instrument.size_precision(),
1108 ts_init,
1109 )?;
1110 Ok(report)
1111 }
1112
1113 pub async fn cancel_order(
1119 &self,
1120 account_id: AccountId,
1121 client_order_id: ClientOrderId,
1122 ) -> anyhow::Result<OrderStatusReport> {
1123 let portfolio_id = account_id.get_issuers_id();
1124
1125 let resp = self
1126 .inner
1127 .http_cancel_order(client_order_id.as_str(), portfolio_id)
1128 .await?;
1129 tracing::debug!("Canceled order: {resp:?}");
1130
1131 let instrument = self.get_instrument_from_cache(resp.symbol)?;
1132 let ts_init = get_atomic_clock_realtime().get_time_ns();
1133
1134 let report = parse_order_status_report(
1135 resp,
1136 account_id,
1137 instrument.price_precision(),
1138 instrument.size_precision(),
1139 ts_init,
1140 )?;
1141 Ok(report)
1142 }
1143
1144 pub async fn cancel_orders(
1150 &self,
1151 account_id: AccountId,
1152 symbol: Symbol,
1153 order_side: Option<OrderSide>,
1154 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1155 let mut params = CancelOrdersParamsBuilder::default();
1156 params.portfolio(account_id.get_issuers_id());
1157 params.instrument(symbol.as_str());
1158 if let Some(side) = order_side {
1159 let side: CoinbaseIntxSide = side.into();
1160 params.side(side);
1161 }
1162 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1163
1164 let resp = self.inner.http_cancel_orders(params).await?;
1165
1166 let instrument = self.get_instrument_from_cache(symbol.inner())?;
1167 let ts_init = get_atomic_clock_realtime().get_time_ns();
1168
1169 let mut reports: Vec<OrderStatusReport> = Vec::with_capacity(resp.len());
1170 for order in resp {
1171 tracing::debug!("Canceled order: {order:?}");
1172 let report = parse_order_status_report(
1173 order,
1174 account_id,
1175 instrument.price_precision(),
1176 instrument.size_precision(),
1177 ts_init,
1178 )?;
1179 reports.push(report);
1180 }
1181
1182 Ok(reports)
1183 }
1184
1185 #[allow(clippy::too_many_arguments)]
1191 pub async fn modify_order(
1192 &self,
1193 account_id: AccountId,
1194 client_order_id: ClientOrderId,
1195 new_client_order_id: ClientOrderId,
1196 price: Option<Price>,
1197 trigger_price: Option<Price>,
1198 quantity: Option<Quantity>,
1199 ) -> anyhow::Result<OrderStatusReport> {
1200 let mut params = ModifyOrderParamsBuilder::default();
1201 params.portfolio(account_id.get_issuers_id());
1202 params.client_order_id(new_client_order_id.as_str());
1203 if let Some(price) = price {
1204 params.price(price.to_string());
1205 }
1206 if let Some(trigger_price) = trigger_price {
1207 params.price(trigger_price.to_string());
1208 }
1209 if let Some(quantity) = quantity {
1210 params.size(quantity.to_string());
1211 }
1212 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1213
1214 let resp = self
1215 .inner
1216 .http_modify_order(client_order_id.as_str(), params)
1217 .await?;
1218 tracing::debug!("Modified order {}", resp.client_order_id);
1219
1220 let instrument = self.get_instrument_from_cache(resp.symbol)?;
1221 let ts_init = get_atomic_clock_realtime().get_time_ns();
1222 let report = parse_order_status_report(
1223 resp,
1224 account_id,
1225 instrument.price_precision(),
1226 instrument.size_precision(),
1227 ts_init,
1228 )?;
1229 Ok(report)
1230 }
1231}