1use std::{
24 collections::HashMap,
25 num::NonZeroU32,
26 sync::{Arc, LazyLock, Mutex},
27};
28
29use anyhow::Context;
30use chrono::{DateTime, Utc};
31use nautilus_core::{
32 MUTEX_POISONED, UnixNanos, consts::NAUTILUS_USER_AGENT, env::get_or_env_var,
33 time::get_atomic_clock_realtime,
34};
35use nautilus_model::{
36 enums::{OrderSide, OrderType, TimeInForce},
37 events::AccountState,
38 identifiers::{AccountId, ClientOrderId, Symbol, VenueOrderId},
39 instruments::{Instrument, InstrumentAny},
40 reports::{FillReport, OrderStatusReport, PositionStatusReport},
41 types::{Price, Quantity},
42};
43use nautilus_network::{
44 http::{HttpClient, HttpClientError, Method, StatusCode, USER_AGENT},
45 ratelimiter::quota::Quota,
46};
47use serde::{Deserialize, Serialize, de::DeserializeOwned};
48use ustr::Ustr;
49
50use super::{
51 error::CoinbaseIntxHttpError,
52 models::{
53 CoinbaseIntxAsset, CoinbaseIntxBalance, CoinbaseIntxFeeTier, CoinbaseIntxFillList,
54 CoinbaseIntxInstrument, CoinbaseIntxOrder, CoinbaseIntxOrderList, CoinbaseIntxPortfolio,
55 CoinbaseIntxPortfolioDetails, CoinbaseIntxPortfolioFeeRates, CoinbaseIntxPortfolioSummary,
56 CoinbaseIntxPosition,
57 },
58 parse::{
59 parse_account_state, parse_fill_report, parse_instrument_any, parse_order_status_report,
60 parse_position_status_report,
61 },
62 query::{
63 CancelOrderParams, CancelOrdersParams, CreateOrderParams, CreateOrderParamsBuilder,
64 GetOrderParams, GetOrdersParams, GetOrdersParamsBuilder, GetPortfolioFillsParams,
65 GetPortfolioFillsParamsBuilder, ModifyOrderParams,
66 },
67};
68use crate::{
69 common::{
70 consts::COINBASE_INTX_REST_URL,
71 credential::Credential,
72 enums::{CoinbaseIntxOrderType, CoinbaseIntxSide, CoinbaseIntxTimeInForce},
73 },
74 http::{
75 error::ErrorBody,
76 query::{CancelOrdersParamsBuilder, ModifyOrderParamsBuilder},
77 },
78};
79
80#[derive(Debug, Serialize, Deserialize)]
82pub struct CoinbaseIntxResponse<T> {
83 pub code: String,
85 pub msg: String,
87 pub data: Vec<T>,
89}
90
91pub static COINBASE_INTX_REST_QUOTA: LazyLock<Quota> =
93 LazyLock::new(|| Quota::per_second(NonZeroU32::new(100).unwrap()));
94
95#[derive(Debug, Clone)]
101pub struct CoinbaseIntxHttpInnerClient {
102 base_url: String,
103 client: HttpClient,
104 credential: Option<Credential>,
105}
106
107impl Default for CoinbaseIntxHttpInnerClient {
108 fn default() -> Self {
109 Self::new(None, Some(60)).expect("Failed to create default Coinbase INTX HTTP client")
110 }
111}
112
113impl CoinbaseIntxHttpInnerClient {
114 pub fn new(
124 base_url: Option<String>,
125 timeout_secs: Option<u64>,
126 ) -> Result<Self, HttpClientError> {
127 Ok(Self {
128 base_url: base_url.unwrap_or(COINBASE_INTX_REST_URL.to_string()),
129 client: HttpClient::new(
130 Self::default_headers(),
131 vec![],
132 vec![],
133 Some(*COINBASE_INTX_REST_QUOTA),
134 timeout_secs,
135 None, )?,
137 credential: None,
138 })
139 }
140
141 pub fn with_credentials(
148 api_key: String,
149 api_secret: String,
150 api_passphrase: String,
151 base_url: String,
152 timeout_secs: Option<u64>,
153 ) -> Result<Self, HttpClientError> {
154 Ok(Self {
155 base_url,
156 client: HttpClient::new(
157 Self::default_headers(),
158 vec![],
159 vec![],
160 Some(*COINBASE_INTX_REST_QUOTA),
161 timeout_secs,
162 None, )?,
164 credential: Some(Credential::new(api_key, api_secret, api_passphrase)),
165 })
166 }
167
168 fn default_headers() -> HashMap<String, String> {
170 HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())])
171 }
172
173 fn sign_request(
180 &self,
181 method: &Method,
182 path: &str,
183 body: Option<&[u8]>,
184 ) -> Result<HashMap<String, String>, CoinbaseIntxHttpError> {
185 let credential = match self.credential.as_ref() {
186 Some(c) => c,
187 None => return Err(CoinbaseIntxHttpError::MissingCredentials),
188 };
189
190 let api_key = credential.api_key.clone().to_string();
191 let api_passphrase = credential.api_passphrase.clone().to_string();
192 let timestamp = Utc::now().timestamp().to_string();
193 let body_str = body
194 .and_then(|b| String::from_utf8(b.to_vec()).ok())
195 .unwrap_or_default();
196
197 let signature = credential.sign(×tamp, method.as_str(), path, &body_str);
198
199 let mut headers = HashMap::new();
200 headers.insert("Accept".to_string(), "application/json".to_string());
201 headers.insert("CB-ACCESS-KEY".to_string(), api_key);
202 headers.insert("CB-ACCESS-PASSPHRASE".to_string(), api_passphrase);
203 headers.insert("CB-ACCESS-SIGN".to_string(), signature);
204 headers.insert("CB-ACCESS-TIMESTAMP".to_string(), timestamp);
205 headers.insert("Content-Type".to_string(), "application/json".to_string());
206
207 Ok(headers)
208 }
209
210 async fn send_request<T: DeserializeOwned, P: Serialize>(
217 &self,
218 method: Method,
219 path: &str,
220 params: Option<&P>,
221 body: Option<Vec<u8>>,
222 authenticate: bool,
223 ) -> Result<T, CoinbaseIntxHttpError> {
224 let params_str = params
225 .map(serde_urlencoded::to_string)
226 .transpose()
227 .map_err(|e| {
228 CoinbaseIntxHttpError::JsonError(format!("Failed to serialize params: {e}"))
229 })?;
230
231 let full_path = if let Some(ref query) = params_str {
232 if query.is_empty() {
233 path.to_string()
234 } else {
235 format!("{path}?{query}")
236 }
237 } else {
238 path.to_string()
239 };
240
241 let url = format!("{}{}", self.base_url, full_path);
242
243 let headers = if authenticate {
244 Some(self.sign_request(&method, &full_path, body.as_deref())?)
245 } else {
246 None
247 };
248
249 tracing::trace!("Request: {url:?} {body:?}");
250
251 let resp = self
252 .client
253 .request(method.clone(), url, None, headers, body, None, None)
254 .await?;
255
256 tracing::trace!("Response: {resp:?}");
257
258 if resp.status.is_success() {
259 let coinbase_response: T = serde_json::from_slice(&resp.body).map_err(|e| {
260 tracing::error!("Failed to deserialize CoinbaseResponse: {e}");
261 CoinbaseIntxHttpError::JsonError(e.to_string())
262 })?;
263
264 Ok(coinbase_response)
265 } else {
266 let error_body = String::from_utf8_lossy(&resp.body);
267 tracing::error!(
268 "HTTP error {} with body: {error_body}",
269 resp.status.as_str()
270 );
271
272 if let Ok(parsed_error) = serde_json::from_slice::<CoinbaseIntxResponse<T>>(&resp.body)
273 {
274 return Err(CoinbaseIntxHttpError::CoinbaseError {
275 error_code: parsed_error.code,
276 message: parsed_error.msg,
277 });
278 }
279
280 if let Ok(parsed_error) = serde_json::from_slice::<ErrorBody>(&resp.body)
281 && let (Some(title), Some(error)) = (parsed_error.title, parsed_error.error)
282 {
283 return Err(CoinbaseIntxHttpError::CoinbaseError {
284 error_code: error,
285 message: title,
286 });
287 }
288
289 Err(CoinbaseIntxHttpError::UnexpectedStatus {
290 status: StatusCode::from_u16(resp.status.as_u16()).unwrap(),
291 body: error_body.to_string(),
292 })
293 }
294 }
295
296 pub async fn http_list_assets(&self) -> Result<Vec<CoinbaseIntxAsset>, CoinbaseIntxHttpError> {
303 let path = "/api/v1/assets";
304 self.send_request::<_, ()>(Method::GET, path, None, None, false)
305 .await
306 }
307
308 pub async fn http_get_asset_details(
315 &self,
316 asset: &str,
317 ) -> Result<CoinbaseIntxAsset, CoinbaseIntxHttpError> {
318 let path = format!("/api/v1/assets/{asset}");
319 self.send_request::<_, ()>(Method::GET, &path, None, None, false)
320 .await
321 }
322
323 pub async fn http_list_instruments(
330 &self,
331 ) -> Result<Vec<CoinbaseIntxInstrument>, CoinbaseIntxHttpError> {
332 let path = "/api/v1/instruments";
333 self.send_request::<_, ()>(Method::GET, path, None, None, false)
334 .await
335 }
336
337 pub async fn http_get_instrument_details(
344 &self,
345 symbol: &str,
346 ) -> Result<CoinbaseIntxInstrument, CoinbaseIntxHttpError> {
347 let path = format!("/api/v1/instruments/{symbol}");
348 self.send_request::<_, ()>(Method::GET, &path, None, None, false)
349 .await
350 }
351
352 pub async fn http_list_fee_rate_tiers(
359 &self,
360 ) -> Result<Vec<CoinbaseIntxFeeTier>, CoinbaseIntxHttpError> {
361 let path = "/api/v1/fee-rate-tiers";
362 self.send_request::<_, ()>(Method::GET, path, None, None, true)
363 .await
364 }
365
366 pub async fn http_list_portfolios(
373 &self,
374 ) -> Result<Vec<CoinbaseIntxPortfolio>, CoinbaseIntxHttpError> {
375 let path = "/api/v1/portfolios";
376 self.send_request::<_, ()>(Method::GET, path, None, None, true)
377 .await
378 }
379
380 pub async fn http_get_portfolio(
387 &self,
388 portfolio_id: &str,
389 ) -> Result<CoinbaseIntxPortfolio, CoinbaseIntxHttpError> {
390 let path = format!("/api/v1/portfolios/{portfolio_id}");
391 self.send_request::<_, ()>(Method::GET, &path, None, None, true)
392 .await
393 }
394
395 pub async fn http_get_portfolio_details(
402 &self,
403 portfolio_id: &str,
404 ) -> Result<CoinbaseIntxPortfolioDetails, CoinbaseIntxHttpError> {
405 let path = format!("/api/v1/portfolios/{portfolio_id}/detail");
406 self.send_request::<_, ()>(Method::GET, &path, None, None, true)
407 .await
408 }
409
410 pub async fn http_get_portfolio_summary(
417 &self,
418 portfolio_id: &str,
419 ) -> Result<CoinbaseIntxPortfolioSummary, CoinbaseIntxHttpError> {
420 let path = format!("/api/v1/portfolios/{portfolio_id}/summary");
421 self.send_request::<_, ()>(Method::GET, &path, None, None, true)
422 .await
423 }
424
425 pub async fn http_list_portfolio_balances(
432 &self,
433 portfolio_id: &str,
434 ) -> Result<Vec<CoinbaseIntxBalance>, CoinbaseIntxHttpError> {
435 let path = format!("/api/v1/portfolios/{portfolio_id}/balances");
436 self.send_request::<_, ()>(Method::GET, &path, None, None, true)
437 .await
438 }
439
440 pub async fn http_get_portfolio_balance(
447 &self,
448 portfolio_id: &str,
449 asset: &str,
450 ) -> Result<CoinbaseIntxBalance, CoinbaseIntxHttpError> {
451 let path = format!("/api/v1/portfolios/{portfolio_id}/balances/{asset}");
452 self.send_request::<_, ()>(Method::GET, &path, None, None, true)
453 .await
454 }
455
456 pub async fn http_list_portfolio_fills(
463 &self,
464 portfolio_id: &str,
465 params: GetPortfolioFillsParams,
466 ) -> Result<CoinbaseIntxFillList, CoinbaseIntxHttpError> {
467 let path = format!("/api/v1/portfolios/{portfolio_id}/fills");
468 self.send_request(Method::GET, &path, Some(¶ms), None, true)
469 .await
470 }
471
472 pub async fn http_list_portfolio_positions(
479 &self,
480 portfolio_id: &str,
481 ) -> Result<Vec<CoinbaseIntxPosition>, CoinbaseIntxHttpError> {
482 let path = format!("/api/v1/portfolios/{portfolio_id}/positions");
483 self.send_request::<_, ()>(Method::GET, &path, None, None, true)
484 .await
485 }
486
487 pub async fn http_get_portfolio_position(
494 &self,
495 portfolio_id: &str,
496 symbol: &str,
497 ) -> Result<CoinbaseIntxPosition, CoinbaseIntxHttpError> {
498 let path = format!("/api/v1/portfolios/{portfolio_id}/positions/{symbol}");
499 self.send_request::<_, ()>(Method::GET, &path, None, None, true)
500 .await
501 }
502
503 pub async fn http_list_portfolio_fee_rates(
510 &self,
511 ) -> Result<Vec<CoinbaseIntxPortfolioFeeRates>, CoinbaseIntxHttpError> {
512 let path = "/api/v1/portfolios/fee-rates";
513 self.send_request::<_, ()>(Method::GET, path, None, None, true)
514 .await
515 }
516
517 pub async fn http_create_order(
522 &self,
523 params: CreateOrderParams,
524 ) -> Result<CoinbaseIntxOrder, CoinbaseIntxHttpError> {
525 let path = "/api/v1/orders";
526 let body = serde_json::to_vec(¶ms)
527 .map_err(|e| CoinbaseIntxHttpError::JsonError(e.to_string()))?;
528 self.send_request::<_, ()>(Method::POST, path, None, Some(body), true)
529 .await
530 }
531
532 pub async fn http_get_order(
539 &self,
540 venue_order_id: &str,
541 portfolio_id: &str,
542 ) -> Result<CoinbaseIntxOrder, CoinbaseIntxHttpError> {
543 let params = GetOrderParams {
544 portfolio: portfolio_id.to_string(),
545 };
546 let path = format!("/api/v1/orders/{venue_order_id}");
547 self.send_request(Method::GET, &path, Some(¶ms), None, true)
548 .await
549 }
550
551 pub async fn http_list_open_orders(
559 &self,
560 params: GetOrdersParams,
561 ) -> Result<CoinbaseIntxOrderList, CoinbaseIntxHttpError> {
562 self.send_request(Method::GET, "/api/v1/orders", Some(¶ms), None, true)
563 .await
564 }
565
566 pub async fn http_cancel_order(
571 &self,
572 client_order_id: &str,
573 portfolio_id: &str,
574 ) -> Result<CoinbaseIntxOrder, CoinbaseIntxHttpError> {
575 let params = CancelOrderParams {
576 portfolio: portfolio_id.to_string(),
577 };
578 let path = format!("/api/v1/orders/{client_order_id}");
579 self.send_request(Method::DELETE, &path, Some(¶ms), None, true)
580 .await
581 }
582
583 pub async fn http_cancel_orders(
588 &self,
589 params: CancelOrdersParams,
590 ) -> Result<Vec<CoinbaseIntxOrder>, CoinbaseIntxHttpError> {
591 self.send_request(Method::DELETE, "/api/v1/orders", Some(¶ms), None, true)
592 .await
593 }
594
595 pub async fn http_modify_order(
602 &self,
603 order_id: &str,
604 params: ModifyOrderParams,
605 ) -> Result<CoinbaseIntxOrder, CoinbaseIntxHttpError> {
606 let path = format!("/api/v1/orders/{order_id}");
607 let body = serde_json::to_vec(¶ms)
608 .map_err(|e| CoinbaseIntxHttpError::JsonError(e.to_string()))?;
609 self.send_request::<_, ()>(Method::PUT, &path, None, Some(body), true)
610 .await
611 }
612}
613
614#[derive(Debug, Clone)]
619#[cfg_attr(
620 feature = "python",
621 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
622)]
623pub struct CoinbaseIntxHttpClient {
624 pub(crate) inner: Arc<CoinbaseIntxHttpInnerClient>,
625 pub(crate) instruments_cache: Arc<Mutex<HashMap<Ustr, InstrumentAny>>>,
626 cache_initialized: bool,
627}
628
629impl Default for CoinbaseIntxHttpClient {
630 fn default() -> Self {
631 Self::new(None, Some(60)).expect("Failed to create default Coinbase INTX HTTP client")
632 }
633}
634
635impl CoinbaseIntxHttpClient {
636 pub fn new(
646 base_url: Option<String>,
647 timeout_secs: Option<u64>,
648 ) -> Result<Self, HttpClientError> {
649 Ok(Self {
650 inner: Arc::new(CoinbaseIntxHttpInnerClient::new(base_url, timeout_secs)?),
651 instruments_cache: Arc::new(Mutex::new(HashMap::new())),
652 cache_initialized: false,
653 })
654 }
655
656 pub fn from_env() -> anyhow::Result<Self> {
663 Self::with_credentials(None, None, None, None, None)
664 }
665
666 pub fn with_credentials(
673 api_key: Option<String>,
674 api_secret: Option<String>,
675 api_passphrase: Option<String>,
676 base_url: Option<String>,
677 timeout_secs: Option<u64>,
678 ) -> anyhow::Result<Self> {
679 let api_key = get_or_env_var(api_key, "COINBASE_INTX_API_KEY")?;
680 let api_secret = get_or_env_var(api_secret, "COINBASE_INTX_API_SECRET")?;
681 let api_passphrase = get_or_env_var(api_passphrase, "COINBASE_INTX_API_PASSPHRASE")?;
682 let base_url = base_url.unwrap_or(COINBASE_INTX_REST_URL.to_string());
683 Ok(Self {
684 inner: Arc::new(
685 CoinbaseIntxHttpInnerClient::with_credentials(
686 api_key,
687 api_secret,
688 api_passphrase,
689 base_url,
690 timeout_secs,
691 )
692 .context("failed to create Coinbase INTX HTTP client")?,
693 ),
694 instruments_cache: Arc::new(Mutex::new(HashMap::new())),
695 cache_initialized: false,
696 })
697 }
698
699 fn get_instrument_from_cache(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
700 match self
701 .instruments_cache
702 .lock()
703 .expect(MUTEX_POISONED)
704 .get(&symbol)
705 {
706 Some(inst) => Ok(inst.clone()), None => anyhow::bail!("Unable to process request, instrument {symbol} not in cache"),
708 }
709 }
710
711 fn generate_ts_init(&self) -> UnixNanos {
712 get_atomic_clock_realtime().get_time_ns()
713 }
714
715 #[must_use]
717 pub fn base_url(&self) -> &str {
718 self.inner.base_url.as_str()
719 }
720
721 #[must_use]
723 pub fn api_key(&self) -> Option<&str> {
724 self.inner.credential.as_ref().map(|c| c.api_key.as_str())
725 }
726
727 #[must_use]
729 pub fn api_key_masked(&self) -> Option<String> {
730 self.inner.credential.as_ref().map(|c| c.api_key_masked())
731 }
732
733 #[must_use]
737 pub const fn is_initialized(&self) -> bool {
738 self.cache_initialized
739 }
740
741 #[must_use]
747 pub fn get_cached_symbols(&self) -> Vec<String> {
748 self.instruments_cache
749 .lock()
750 .unwrap()
751 .keys()
752 .map(ToString::to_string)
753 .collect()
754 }
755
756 pub fn add_instruments(&mut self, instruments: Vec<InstrumentAny>) {
764 for inst in instruments {
765 self.instruments_cache
766 .lock()
767 .unwrap()
768 .insert(inst.raw_symbol().inner(), inst);
769 }
770 self.cache_initialized = true;
771 }
772
773 pub fn add_instrument(&mut self, instrument: InstrumentAny) {
781 self.instruments_cache
782 .lock()
783 .unwrap()
784 .insert(instrument.raw_symbol().inner(), instrument);
785 self.cache_initialized = true;
786 }
787
788 pub async fn list_portfolios(&self) -> anyhow::Result<Vec<CoinbaseIntxPortfolio>> {
794 let resp = self
795 .inner
796 .http_list_portfolios()
797 .await
798 .map_err(|e| anyhow::anyhow!(e))?;
799
800 Ok(resp)
801 }
802
803 pub async fn request_account_state(
809 &self,
810 account_id: AccountId,
811 ) -> anyhow::Result<AccountState> {
812 let resp = self
813 .inner
814 .http_list_portfolio_balances(account_id.get_issuers_id())
815 .await
816 .map_err(|e| anyhow::anyhow!(e))?;
817
818 let ts_init = self.generate_ts_init();
819 let account_state = parse_account_state(resp, account_id, ts_init)?;
820
821 Ok(account_state)
822 }
823
824 pub async fn request_instruments(&self) -> anyhow::Result<Vec<InstrumentAny>> {
830 let resp = self
831 .inner
832 .http_list_instruments()
833 .await
834 .map_err(|e| anyhow::anyhow!(e))?;
835
836 let ts_init = self.generate_ts_init();
837
838 let mut instruments: Vec<InstrumentAny> = Vec::new();
839 for inst in &resp {
840 let instrument_any = parse_instrument_any(inst, ts_init);
841 if let Some(instrument_any) = instrument_any {
842 instruments.push(instrument_any);
843 }
844 }
845
846 Ok(instruments)
847 }
848
849 pub async fn request_instrument(&self, symbol: &Symbol) -> anyhow::Result<InstrumentAny> {
855 let resp = self
856 .inner
857 .http_get_instrument_details(symbol.as_str())
858 .await
859 .map_err(|e| anyhow::anyhow!(e))?;
860
861 let ts_init = self.generate_ts_init();
862
863 match parse_instrument_any(&resp, ts_init) {
864 Some(inst) => Ok(inst),
865 None => anyhow::bail!("Unable to parse instrument"),
866 }
867 }
868
869 pub async fn request_order_status_report(
875 &self,
876 account_id: AccountId,
877 venue_order_id: VenueOrderId,
878 ) -> anyhow::Result<OrderStatusReport> {
879 let portfolio_id = account_id.get_issuers_id();
880
881 let resp = self
882 .inner
883 .http_get_order(venue_order_id.as_str(), portfolio_id)
884 .await
885 .map_err(|e| anyhow::anyhow!(e))?;
886
887 let instrument = self.get_instrument_from_cache(resp.symbol)?;
888 let ts_init = self.generate_ts_init();
889
890 let report = parse_order_status_report(
891 resp,
892 account_id,
893 instrument.price_precision(),
894 instrument.size_precision(),
895 ts_init,
896 )?;
897 Ok(report)
898 }
899
900 pub async fn request_order_status_reports(
906 &self,
907 account_id: AccountId,
908 symbol: Symbol,
909 ) -> anyhow::Result<Vec<OrderStatusReport>> {
910 let portfolio_id = account_id.get_issuers_id();
911
912 let mut params = GetOrdersParamsBuilder::default();
913 params.portfolio(portfolio_id);
914 params.instrument(symbol.as_str());
915 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
916
917 let resp = self
918 .inner
919 .http_list_open_orders(params)
920 .await
921 .map_err(|e| anyhow::anyhow!(e))?;
922
923 let ts_init = get_atomic_clock_realtime().get_time_ns();
924
925 let mut reports: Vec<OrderStatusReport> = Vec::new();
926 for order in resp.results {
927 let instrument = self.get_instrument_from_cache(order.symbol)?;
928 let report = parse_order_status_report(
929 order,
930 account_id,
931 instrument.price_precision(),
932 instrument.size_precision(),
933 ts_init,
934 )?;
935 reports.push(report);
936 }
937
938 Ok(reports)
939 }
940
941 pub async fn request_fill_reports(
947 &self,
948 account_id: AccountId,
949 client_order_id: Option<ClientOrderId>,
950 start: Option<DateTime<Utc>>,
951 ) -> anyhow::Result<Vec<FillReport>> {
952 let portfolio_id = account_id.get_issuers_id();
953
954 let mut params = GetPortfolioFillsParamsBuilder::default();
955 if let Some(start) = start {
956 params.time_from(start);
957 }
958 if let Some(client_order_id) = client_order_id {
959 params.client_order_id(client_order_id.to_string());
960 }
961 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
962
963 let resp = self
964 .inner
965 .http_list_portfolio_fills(portfolio_id, params)
966 .await
967 .map_err(|e| anyhow::anyhow!(e))?;
968
969 let ts_init = get_atomic_clock_realtime().get_time_ns();
970
971 let mut reports: Vec<FillReport> = Vec::new();
972 for fill in resp.results {
973 let instrument = self.get_instrument_from_cache(fill.symbol)?;
974 let report = parse_fill_report(
975 fill,
976 account_id,
977 instrument.price_precision(),
978 instrument.size_precision(),
979 ts_init,
980 )?;
981 reports.push(report);
982 }
983
984 Ok(reports)
985 }
986
987 pub async fn request_position_status_report(
993 &self,
994 account_id: AccountId,
995 symbol: Symbol,
996 ) -> anyhow::Result<PositionStatusReport> {
997 let portfolio_id = account_id.get_issuers_id();
998
999 let resp = self
1000 .inner
1001 .http_get_portfolio_position(portfolio_id, symbol.as_str())
1002 .await
1003 .map_err(|e| anyhow::anyhow!(e))?;
1004
1005 let instrument = self.get_instrument_from_cache(resp.symbol)?;
1006 let ts_init = get_atomic_clock_realtime().get_time_ns();
1007
1008 let report =
1009 parse_position_status_report(resp, account_id, instrument.size_precision(), ts_init)?;
1010 Ok(report)
1011 }
1012
1013 pub async fn request_position_status_reports(
1019 &self,
1020 account_id: AccountId,
1021 ) -> anyhow::Result<Vec<PositionStatusReport>> {
1022 let portfolio_id = account_id.get_issuers_id();
1023
1024 let resp = self
1025 .inner
1026 .http_list_portfolio_positions(portfolio_id)
1027 .await
1028 .map_err(|e| anyhow::anyhow!(e))?;
1029
1030 let ts_init = get_atomic_clock_realtime().get_time_ns();
1031
1032 let mut reports: Vec<PositionStatusReport> = Vec::new();
1033 for position in resp {
1034 let instrument = self.get_instrument_from_cache(position.symbol)?;
1035 let report = parse_position_status_report(
1036 position,
1037 account_id,
1038 instrument.size_precision(),
1039 ts_init,
1040 )?;
1041 reports.push(report);
1042 }
1043
1044 Ok(reports)
1045 }
1046
1047 #[allow(clippy::too_many_arguments)]
1053 pub async fn submit_order(
1054 &self,
1055 account_id: AccountId,
1056 client_order_id: ClientOrderId,
1057 symbol: Symbol,
1058 order_side: OrderSide,
1059 order_type: OrderType,
1060 quantity: Quantity,
1061 time_in_force: TimeInForce,
1062 expire_time: Option<DateTime<Utc>>,
1063 price: Option<Price>,
1064 trigger_price: Option<Price>,
1065 post_only: Option<bool>,
1066 reduce_only: Option<bool>,
1067 ) -> anyhow::Result<OrderStatusReport> {
1068 let coinbase_side: CoinbaseIntxSide = order_side.into();
1069 let coinbase_order_type: CoinbaseIntxOrderType = order_type.into();
1070 let coinbase_tif: CoinbaseIntxTimeInForce = time_in_force.into();
1071
1072 let mut params = CreateOrderParamsBuilder::default();
1073 params.portfolio(account_id.get_issuers_id());
1074 params.client_order_id(client_order_id.as_str());
1075 params.instrument(symbol.as_str());
1076 params.side(coinbase_side);
1077 params.size(quantity.to_string());
1078 params.order_type(coinbase_order_type);
1079 params.tif(coinbase_tif);
1080 if let Some(expire_time) = expire_time {
1081 params.expire_time(expire_time);
1082 }
1083 if let Some(price) = price {
1084 params.price(price.to_string());
1085 }
1086 if let Some(trigger_price) = trigger_price {
1087 params.stop_price(trigger_price.to_string());
1088 }
1089 if let Some(post_only) = post_only {
1090 params.post_only(post_only);
1091 }
1092 if let Some(reduce_only) = reduce_only {
1093 params.close_only(reduce_only);
1094 }
1095 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1096
1097 let resp = self.inner.http_create_order(params).await?;
1098 tracing::debug!("Submitted order: {resp:?}");
1099
1100 let instrument = self.get_instrument_from_cache(resp.symbol)?;
1101 let ts_init = get_atomic_clock_realtime().get_time_ns();
1102 let report = parse_order_status_report(
1103 resp,
1104 account_id,
1105 instrument.price_precision(),
1106 instrument.size_precision(),
1107 ts_init,
1108 )?;
1109 Ok(report)
1110 }
1111
1112 pub async fn cancel_order(
1118 &self,
1119 account_id: AccountId,
1120 client_order_id: ClientOrderId,
1121 ) -> anyhow::Result<OrderStatusReport> {
1122 let portfolio_id = account_id.get_issuers_id();
1123
1124 let resp = self
1125 .inner
1126 .http_cancel_order(client_order_id.as_str(), portfolio_id)
1127 .await?;
1128 tracing::debug!("Canceled order: {resp:?}");
1129
1130 let instrument = self.get_instrument_from_cache(resp.symbol)?;
1131 let ts_init = get_atomic_clock_realtime().get_time_ns();
1132
1133 let report = parse_order_status_report(
1134 resp,
1135 account_id,
1136 instrument.price_precision(),
1137 instrument.size_precision(),
1138 ts_init,
1139 )?;
1140 Ok(report)
1141 }
1142
1143 pub async fn cancel_orders(
1149 &self,
1150 account_id: AccountId,
1151 symbol: Symbol,
1152 order_side: Option<OrderSide>,
1153 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1154 let mut params = CancelOrdersParamsBuilder::default();
1155 params.portfolio(account_id.get_issuers_id());
1156 params.instrument(symbol.as_str());
1157 if let Some(side) = order_side {
1158 let side: CoinbaseIntxSide = side.into();
1159 params.side(side);
1160 }
1161 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1162
1163 let resp = self.inner.http_cancel_orders(params).await?;
1164
1165 let instrument = self.get_instrument_from_cache(symbol.inner())?;
1166 let ts_init = get_atomic_clock_realtime().get_time_ns();
1167
1168 let mut reports: Vec<OrderStatusReport> = Vec::with_capacity(resp.len());
1169 for order in resp {
1170 tracing::debug!("Canceled order: {order:?}");
1171 let report = parse_order_status_report(
1172 order,
1173 account_id,
1174 instrument.price_precision(),
1175 instrument.size_precision(),
1176 ts_init,
1177 )?;
1178 reports.push(report);
1179 }
1180
1181 Ok(reports)
1182 }
1183
1184 #[allow(clippy::too_many_arguments)]
1190 pub async fn modify_order(
1191 &self,
1192 account_id: AccountId,
1193 client_order_id: ClientOrderId,
1194 new_client_order_id: ClientOrderId,
1195 price: Option<Price>,
1196 trigger_price: Option<Price>,
1197 quantity: Option<Quantity>,
1198 ) -> anyhow::Result<OrderStatusReport> {
1199 let mut params = ModifyOrderParamsBuilder::default();
1200 params.portfolio(account_id.get_issuers_id());
1201 params.client_order_id(new_client_order_id.as_str());
1202 if let Some(price) = price {
1203 params.price(price.to_string());
1204 }
1205 if let Some(trigger_price) = trigger_price {
1206 params.price(trigger_price.to_string());
1207 }
1208 if let Some(quantity) = quantity {
1209 params.size(quantity.to_string());
1210 }
1211 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1212
1213 let resp = self
1214 .inner
1215 .http_modify_order(client_order_id.as_str(), params)
1216 .await?;
1217 tracing::debug!("Modified order {}", resp.client_order_id);
1218
1219 let instrument = self.get_instrument_from_cache(resp.symbol)?;
1220 let ts_init = get_atomic_clock_realtime().get_time_ns();
1221 let report = parse_order_status_report(
1222 resp,
1223 account_id,
1224 instrument.price_precision(),
1225 instrument.size_precision(),
1226 ts_init,
1227 )?;
1228 Ok(report)
1229 }
1230}