1use std::{
24 collections::HashMap,
25 sync::{Arc, LazyLock, Mutex},
26};
27
28use chrono::{DateTime, Utc};
29use nautilus_core::{UnixNanos, consts::NAUTILUS_USER_AGENT, time::get_atomic_clock_realtime};
30use nautilus_execution::reports::{
31 fill::FillReport, order::OrderStatusReport, position::PositionStatusReport,
32};
33use nautilus_model::{
34 enums::{OrderSide, OrderType, TimeInForce},
35 events::AccountState,
36 identifiers::{AccountId, ClientOrderId, Symbol, VenueOrderId},
37 instruments::{Instrument, InstrumentAny},
38 types::{Price, Quantity},
39};
40use nautilus_network::{http::HttpClient, ratelimiter::quota::Quota};
41use reqwest::{Method, StatusCode, header::USER_AGENT};
42use serde::{Deserialize, Serialize, de::DeserializeOwned};
43use ustr::Ustr;
44
45use super::{
46 error::CoinbaseIntxHttpError,
47 models::{
48 CoinbaseIntxAsset, CoinbaseIntxBalance, CoinbaseIntxFeeTier, CoinbaseIntxFillList,
49 CoinbaseIntxInstrument, CoinbaseIntxOrder, CoinbaseIntxOrderList, CoinbaseIntxPortfolio,
50 CoinbaseIntxPortfolioDetails, CoinbaseIntxPortfolioFeeRates, CoinbaseIntxPortfolioSummary,
51 CoinbaseIntxPosition,
52 },
53 parse::{
54 parse_account_state, parse_fill_report, parse_instrument_any, parse_order_status_report,
55 parse_position_status_report,
56 },
57 query::{
58 CancelOrderParams, CancelOrdersParams, CreateOrderParams, CreateOrderParamsBuilder,
59 GetOrderParams, GetOrdersParams, GetOrdersParamsBuilder, GetPortfolioFillsParams,
60 GetPortfolioFillsParamsBuilder, ModifyOrderParams,
61 },
62};
63use crate::{
64 common::{
65 consts::COINBASE_INTX_REST_URL,
66 credential::{Credential, get_env_var},
67 enums::{CoinbaseIntxOrderType, CoinbaseIntxSide, CoinbaseIntxTimeInForce},
68 },
69 http::{
70 error::ErrorBody,
71 query::{CancelOrdersParamsBuilder, ModifyOrderParamsBuilder},
72 },
73};
74
75#[derive(Debug, Serialize, Deserialize)]
77pub struct CoinbaseIntxResponse<T> {
78 pub code: String,
80 pub msg: String,
82 pub data: Vec<T>,
84}
85
86pub static COINBASE_INTX_REST_QUOTA: LazyLock<Quota> =
88 LazyLock::new(|| Quota::rate_per_second(40).unwrap());
89
90#[derive(Clone)]
96pub struct CoinbaseIntxHttpInnerClient {
97 base_url: String,
98 client: HttpClient,
99 credential: Option<Credential>,
100}
101
102impl Default for CoinbaseIntxHttpInnerClient {
103 fn default() -> Self {
104 Self::new(None, Some(60))
105 }
106}
107
108impl CoinbaseIntxHttpInnerClient {
109 pub fn new(base_url: Option<String>, timeout_secs: Option<u64>) -> Self {
115 Self {
116 base_url: base_url.unwrap_or(COINBASE_INTX_REST_URL.to_string()),
117 client: HttpClient::new(
118 Self::default_headers(),
119 vec![],
120 vec![],
121 Some(*COINBASE_INTX_REST_QUOTA),
122 timeout_secs,
123 ),
124 credential: None,
125 }
126 }
127
128 pub fn with_credentials(
131 api_key: String,
132 api_secret: String,
133 api_passphrase: String,
134 base_url: String,
135 timeout_secs: Option<u64>,
136 ) -> Self {
137 Self {
138 base_url,
139 client: HttpClient::new(
140 Self::default_headers(),
141 vec![],
142 vec![],
143 Some(*COINBASE_INTX_REST_QUOTA),
144 timeout_secs,
145 ),
146 credential: Some(Credential::new(api_key, api_secret, api_passphrase)),
147 }
148 }
149
150 fn default_headers() -> HashMap<String, String> {
152 HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())])
153 }
154
155 fn sign_request(
162 &self,
163 method: &Method,
164 path: &str,
165 body: Option<&[u8]>,
166 ) -> Result<HashMap<String, String>, CoinbaseIntxHttpError> {
167 let credential = match self.credential.as_ref() {
168 Some(c) => c,
169 None => return Err(CoinbaseIntxHttpError::MissingCredentials),
170 };
171
172 let api_key = credential.api_key.clone().to_string();
173 let api_passphrase = credential.api_passphrase.clone().to_string();
174 let timestamp = Utc::now().timestamp().to_string();
175 let body_str = body
176 .and_then(|b| String::from_utf8(b.to_vec()).ok())
177 .unwrap_or_default();
178
179 let signature = credential.sign(×tamp, method.as_str(), path, &body_str);
180
181 let mut headers = HashMap::new();
182 headers.insert("Accept".to_string(), "application/json".to_string());
183 headers.insert("CB-ACCESS-KEY".to_string(), api_key);
184 headers.insert("CB-ACCESS-PASSPHRASE".to_string(), api_passphrase);
185 headers.insert("CB-ACCESS-SIGN".to_string(), signature);
186 headers.insert("CB-ACCESS-TIMESTAMP".to_string(), timestamp);
187 headers.insert("Content-Type".to_string(), "application/json".to_string());
188
189 Ok(headers)
190 }
191
192 async fn send_request<T: DeserializeOwned>(
199 &self,
200 method: Method,
201 path: &str,
202 body: Option<Vec<u8>>,
203 authenticate: bool,
204 ) -> Result<T, CoinbaseIntxHttpError> {
205 let url = format!("{}{}", self.base_url, path);
206
207 let headers = if authenticate {
208 Some(self.sign_request(&method, path, body.as_deref())?)
209 } else {
210 None
211 };
212
213 tracing::trace!("Request: {url:?} {body:?}");
214
215 let resp = self
216 .client
217 .request(method.clone(), url, headers, body, None, None)
218 .await?;
219
220 tracing::trace!("Response: {resp:?}");
221
222 if resp.status.is_success() {
223 let coinbase_response: T = serde_json::from_slice(&resp.body).map_err(|e| {
224 tracing::error!("Failed to deserialize CoinbaseResponse: {e}");
225 CoinbaseIntxHttpError::JsonError(e.to_string())
226 })?;
227
228 Ok(coinbase_response)
229 } else {
230 let error_body = String::from_utf8_lossy(&resp.body);
231 tracing::error!(
232 "HTTP error {} with body: {error_body}",
233 resp.status.as_str()
234 );
235
236 if let Ok(parsed_error) = serde_json::from_slice::<CoinbaseIntxResponse<T>>(&resp.body)
237 {
238 return Err(CoinbaseIntxHttpError::CoinbaseError {
239 error_code: parsed_error.code,
240 message: parsed_error.msg,
241 });
242 }
243
244 if let Ok(parsed_error) = serde_json::from_slice::<ErrorBody>(&resp.body) {
245 if let (Some(title), Some(error)) = (parsed_error.title, parsed_error.error) {
246 return Err(CoinbaseIntxHttpError::CoinbaseError {
247 error_code: error,
248 message: title,
249 });
250 }
251 }
252
253 Err(CoinbaseIntxHttpError::UnexpectedStatus {
254 status: StatusCode::from_u16(resp.status.as_u16()).unwrap(),
255 body: error_body.to_string(),
256 })
257 }
258 }
259
260 pub async fn http_list_assets(&self) -> Result<Vec<CoinbaseIntxAsset>, CoinbaseIntxHttpError> {
264 let path = "/api/v1/assets";
265 self.send_request(Method::GET, path, None, false).await
266 }
267
268 pub async fn http_get_asset_details(
272 &self,
273 asset: &str,
274 ) -> Result<CoinbaseIntxAsset, CoinbaseIntxHttpError> {
275 let path = format!("/api/v1/assets/{asset}");
276 self.send_request(Method::GET, &path, None, false).await
277 }
278
279 pub async fn http_list_instruments(
283 &self,
284 ) -> Result<Vec<CoinbaseIntxInstrument>, CoinbaseIntxHttpError> {
285 let path = "/api/v1/instruments";
286 self.send_request(Method::GET, path, None, false).await
287 }
288
289 pub async fn http_get_instrument_details(
293 &self,
294 symbol: &str,
295 ) -> Result<CoinbaseIntxInstrument, CoinbaseIntxHttpError> {
296 let path = format!("/api/v1/instruments/{symbol}");
297 self.send_request(Method::GET, &path, None, false).await
298 }
299
300 pub async fn http_list_fee_rate_tiers(
304 &self,
305 ) -> Result<Vec<CoinbaseIntxFeeTier>, CoinbaseIntxHttpError> {
306 let path = "/api/v1/fee-rate-tiers";
307 self.send_request(Method::GET, path, None, true).await
308 }
309
310 pub async fn http_list_portfolios(
314 &self,
315 ) -> Result<Vec<CoinbaseIntxPortfolio>, CoinbaseIntxHttpError> {
316 let path = "/api/v1/portfolios";
317 self.send_request(Method::GET, path, None, true).await
318 }
319
320 pub async fn http_get_portfolio(
324 &self,
325 portfolio_id: &str,
326 ) -> Result<CoinbaseIntxPortfolio, CoinbaseIntxHttpError> {
327 let path = format!("/api/v1/portfolios/{portfolio_id}");
328 self.send_request(Method::GET, &path, None, true).await
329 }
330
331 pub async fn http_get_portfolio_details(
335 &self,
336 portfolio_id: &str,
337 ) -> Result<CoinbaseIntxPortfolioDetails, CoinbaseIntxHttpError> {
338 let path = format!("/api/v1/portfolios/{portfolio_id}/detail");
339 self.send_request(Method::GET, &path, None, true).await
340 }
341
342 pub async fn http_get_portfolio_summary(
346 &self,
347 portfolio_id: &str,
348 ) -> Result<CoinbaseIntxPortfolioSummary, CoinbaseIntxHttpError> {
349 let path = format!("/api/v1/portfolios/{portfolio_id}/summary");
350 self.send_request(Method::GET, &path, None, true).await
351 }
352
353 pub async fn http_list_portfolio_balances(
357 &self,
358 portfolio_id: &str,
359 ) -> Result<Vec<CoinbaseIntxBalance>, CoinbaseIntxHttpError> {
360 let path = format!("/api/v1/portfolios/{portfolio_id}/balances");
361 self.send_request(Method::GET, &path, None, true).await
362 }
363
364 pub async fn http_get_portfolio_balance(
368 &self,
369 portfolio_id: &str,
370 asset: &str,
371 ) -> Result<CoinbaseIntxBalance, CoinbaseIntxHttpError> {
372 let path = format!("/api/v1/portfolios/{portfolio_id}/balances/{asset}");
373 self.send_request(Method::GET, &path, None, true).await
374 }
375
376 pub async fn http_list_portfolio_fills(
380 &self,
381 portfolio_id: &str,
382 params: GetPortfolioFillsParams,
383 ) -> Result<CoinbaseIntxFillList, CoinbaseIntxHttpError> {
384 let query = serde_urlencoded::to_string(¶ms).expect("Failed to serialize params");
385 let path = format!("/api/v1/portfolios/{portfolio_id}/fills?{query}");
386 self.send_request(Method::GET, &path, None, true).await
387 }
388
389 pub async fn http_list_portfolio_positions(
393 &self,
394 portfolio_id: &str,
395 ) -> Result<Vec<CoinbaseIntxPosition>, CoinbaseIntxHttpError> {
396 let path = format!("/api/v1/portfolios/{portfolio_id}/positions");
397 self.send_request(Method::GET, &path, None, true).await
398 }
399
400 pub async fn http_get_portfolio_position(
404 &self,
405 portfolio_id: &str,
406 symbol: &str,
407 ) -> Result<CoinbaseIntxPosition, CoinbaseIntxHttpError> {
408 let path = format!("/api/v1/portfolios/{portfolio_id}/positions/{symbol}");
409 self.send_request(Method::GET, &path, None, true).await
410 }
411
412 pub async fn http_list_portfolio_fee_rates(
416 &self,
417 ) -> Result<Vec<CoinbaseIntxPortfolioFeeRates>, CoinbaseIntxHttpError> {
418 let path = "/api/v1/portfolios/fee-rates";
419 self.send_request(Method::GET, path, None, true).await
420 }
421
422 pub async fn http_create_order(
424 &self,
425 params: CreateOrderParams,
426 ) -> Result<CoinbaseIntxOrder, CoinbaseIntxHttpError> {
427 let path = "/api/v1/orders";
428 let body = serde_json::to_vec(¶ms).expect("Failed to serialize params");
429 self.send_request(Method::POST, path, Some(body), true)
430 .await
431 }
432
433 pub async fn http_get_order(
437 &self,
438 venue_order_id: &str,
439 portfolio_id: &str,
440 ) -> Result<CoinbaseIntxOrder, CoinbaseIntxHttpError> {
441 let params = GetOrderParams {
442 portfolio: portfolio_id.to_string(),
443 };
444 let query = serde_urlencoded::to_string(¶ms).expect("Failed to serialize params");
445 let path = format!("/api/v1/orders/{venue_order_id}?{query}");
446 self.send_request(Method::GET, &path, None, true).await
447 }
448
449 pub async fn http_list_open_orders(
454 &self,
455 params: GetOrdersParams,
456 ) -> Result<CoinbaseIntxOrderList, CoinbaseIntxHttpError> {
457 let query = serde_urlencoded::to_string(¶ms).expect("Failed to serialize params");
458 let path = format!("/api/v1/orders?{}", query);
459 self.send_request(Method::GET, &path, None, true).await
460 }
461
462 pub async fn http_cancel_order(
464 &self,
465 client_order_id: &str,
466 portfolio_id: &str,
467 ) -> Result<CoinbaseIntxOrder, CoinbaseIntxHttpError> {
468 let params = CancelOrderParams {
469 portfolio: portfolio_id.to_string(),
470 };
471 let query = serde_urlencoded::to_string(¶ms).expect("Failed to serialize params");
472 let path = format!("/api/v1/orders/{client_order_id}?{query}");
473 self.send_request(Method::DELETE, &path, None, true).await
474 }
475
476 pub async fn http_cancel_orders(
478 &self,
479 params: CancelOrdersParams,
480 ) -> Result<Vec<CoinbaseIntxOrder>, CoinbaseIntxHttpError> {
481 let query = serde_urlencoded::to_string(¶ms).expect("Failed to serialize params");
482 let path = format!("/api/v1/orders?{}", query);
483 self.send_request(Method::DELETE, &path, None, true).await
484 }
485
486 pub async fn http_modify_order(
490 &self,
491 order_id: &str,
492 params: ModifyOrderParams,
493 ) -> Result<CoinbaseIntxOrder, CoinbaseIntxHttpError> {
494 let path = format!("/api/v1/orders/{}", order_id);
495 let body = serde_json::to_vec(¶ms).expect("Failed to serialize params");
496 self.send_request(Method::PUT, &path, Some(body), true)
497 .await
498 }
499}
500
501#[derive(Clone)]
506#[cfg_attr(
507 feature = "python",
508 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
509)]
510pub struct CoinbaseIntxHttpClient {
511 pub(crate) inner: Arc<CoinbaseIntxHttpInnerClient>,
512 pub(crate) instruments_cache: Arc<Mutex<HashMap<Ustr, InstrumentAny>>>,
513 cache_initialized: bool,
514}
515
516impl Default for CoinbaseIntxHttpClient {
517 fn default() -> Self {
518 Self::new(None, Some(60))
519 }
520}
521
522impl CoinbaseIntxHttpClient {
523 pub fn new(base_url: Option<String>, timeout_secs: Option<u64>) -> Self {
529 Self {
530 inner: Arc::new(CoinbaseIntxHttpInnerClient::new(base_url, timeout_secs)),
531 instruments_cache: Arc::new(Mutex::new(HashMap::new())),
532 cache_initialized: false,
533 }
534 }
535
536 pub fn from_env() -> anyhow::Result<Self> {
539 Self::with_credentials(None, None, None, None, None)
540 }
541
542 pub fn with_credentials(
545 api_key: Option<String>,
546 api_secret: Option<String>,
547 api_passphrase: Option<String>,
548 base_url: Option<String>,
549 timeout_secs: Option<u64>,
550 ) -> anyhow::Result<Self> {
551 let api_key = api_key.unwrap_or(get_env_var("COINBASE_INTX_API_KEY")?);
552 let api_secret = api_secret.unwrap_or(get_env_var("COINBASE_INTX_API_SECRET")?);
553 let api_passphrase = api_passphrase.unwrap_or(get_env_var("COINBASE_INTX_API_PASSPHRASE")?);
554 let base_url = base_url.unwrap_or(COINBASE_INTX_REST_URL.to_string());
555 Ok(Self {
556 inner: Arc::new(CoinbaseIntxHttpInnerClient::with_credentials(
557 api_key,
558 api_secret,
559 api_passphrase,
560 base_url,
561 timeout_secs,
562 )),
563 instruments_cache: Arc::new(Mutex::new(HashMap::new())),
564 cache_initialized: false,
565 })
566 }
567
568 fn get_instrument_from_cache(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
569 match self.instruments_cache.lock().unwrap().get(&symbol) {
570 Some(inst) => Ok(inst.clone()), None => anyhow::bail!("Unable to process request, instrument {symbol} not in cache"),
572 }
573 }
574
575 fn generate_ts_init(&self) -> UnixNanos {
576 get_atomic_clock_realtime().get_time_ns()
577 }
578
579 pub fn base_url(&self) -> &str {
581 self.inner.base_url.as_str()
582 }
583
584 pub fn api_key(&self) -> Option<&str> {
586 self.inner.credential.clone().map(|c| c.api_key.as_str())
587 }
588
589 pub fn is_initialized(&self) -> bool {
593 self.cache_initialized
594 }
595
596 pub fn get_cached_symbols(&self) -> Vec<String> {
598 self.instruments_cache
599 .lock()
600 .unwrap()
601 .keys()
602 .map(|k| k.to_string())
603 .collect()
604 }
605
606 pub fn add_instruments(&mut self, instruments: Vec<InstrumentAny>) {
610 for inst in instruments {
611 self.instruments_cache
612 .lock()
613 .unwrap()
614 .insert(inst.raw_symbol().inner(), inst);
615 }
616 self.cache_initialized = true;
617 }
618
619 pub fn add_instrument(&mut self, instrument: InstrumentAny) {
623 self.instruments_cache
624 .lock()
625 .unwrap()
626 .insert(instrument.raw_symbol().inner(), instrument);
627 self.cache_initialized = true;
628 }
629
630 pub async fn list_portfolios(&self) -> anyhow::Result<Vec<CoinbaseIntxPortfolio>> {
632 let resp = self
633 .inner
634 .http_list_portfolios()
635 .await
636 .map_err(|e| anyhow::anyhow!(e))?;
637
638 Ok(resp)
639 }
640
641 pub async fn request_account_state(
643 &self,
644 account_id: AccountId,
645 ) -> anyhow::Result<AccountState> {
646 let resp = self
647 .inner
648 .http_list_portfolio_balances(account_id.get_issuers_id())
649 .await
650 .map_err(|e| anyhow::anyhow!(e))?;
651
652 let ts_init = self.generate_ts_init();
653 let account_state = parse_account_state(resp, account_id, ts_init)?;
654
655 Ok(account_state)
656 }
657
658 pub async fn request_instruments(&self) -> anyhow::Result<Vec<InstrumentAny>> {
660 let resp = self
661 .inner
662 .http_list_instruments()
663 .await
664 .map_err(|e| anyhow::anyhow!(e))?;
665
666 let ts_init = self.generate_ts_init();
667
668 let mut instruments: Vec<InstrumentAny> = Vec::new();
669 for inst in &resp {
670 let instrument_any = parse_instrument_any(inst, ts_init);
671 if let Some(instrument_any) = instrument_any {
672 instruments.push(instrument_any);
673 }
674 }
675
676 Ok(instruments)
677 }
678
679 pub async fn request_instrument(&self, symbol: &Symbol) -> anyhow::Result<InstrumentAny> {
681 let resp = self
682 .inner
683 .http_get_instrument_details(symbol.as_str())
684 .await
685 .map_err(|e| anyhow::anyhow!(e))?;
686
687 let ts_init = self.generate_ts_init();
688
689 match parse_instrument_any(&resp, ts_init) {
690 Some(inst) => Ok(inst),
691 None => anyhow::bail!("Unable to parse instrument"),
692 }
693 }
694
695 pub async fn request_order_status_report(
697 &self,
698 account_id: AccountId,
699 venue_order_id: VenueOrderId,
700 ) -> anyhow::Result<OrderStatusReport> {
701 let portfolio_id = account_id.get_issuers_id();
702
703 let resp = self
704 .inner
705 .http_get_order(venue_order_id.as_str(), portfolio_id)
706 .await
707 .map_err(|e| anyhow::anyhow!(e))?;
708
709 let instrument = self.get_instrument_from_cache(resp.symbol)?;
710 let ts_init = self.generate_ts_init();
711
712 let report = parse_order_status_report(
713 resp,
714 account_id,
715 instrument.price_precision(),
716 instrument.size_precision(),
717 ts_init,
718 );
719 Ok(report)
720 }
721
722 pub async fn request_order_status_reports(
724 &self,
725 account_id: AccountId,
726 symbol: Symbol,
727 ) -> anyhow::Result<Vec<OrderStatusReport>> {
728 let portfolio_id = account_id.get_issuers_id();
729
730 let mut params = GetOrdersParamsBuilder::default();
731 params.portfolio(portfolio_id);
732 params.instrument(symbol.as_str());
733 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
734
735 let resp = self
736 .inner
737 .http_list_open_orders(params)
738 .await
739 .map_err(|e| anyhow::anyhow!(e))?;
740
741 let ts_init = get_atomic_clock_realtime().get_time_ns();
742
743 let mut reports: Vec<OrderStatusReport> = Vec::new();
744 for order in resp.results {
745 let instrument = self.get_instrument_from_cache(order.symbol)?;
746
747 reports.push(parse_order_status_report(
748 order,
749 account_id,
750 instrument.price_precision(),
751 instrument.size_precision(),
752 ts_init,
753 ))
754 }
755
756 Ok(reports)
757 }
758
759 pub async fn request_fill_reports(
761 &self,
762 account_id: AccountId,
763 client_order_id: Option<ClientOrderId>,
764 start: Option<DateTime<Utc>>,
765 ) -> anyhow::Result<Vec<FillReport>> {
766 let portfolio_id = account_id.get_issuers_id();
767
768 let mut params = GetPortfolioFillsParamsBuilder::default();
769 if let Some(start) = start {
770 params.time_from(start);
771 };
772 if let Some(client_order_id) = client_order_id {
773 params.client_order_id(client_order_id.to_string());
774 }
775 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
776
777 let resp = self
778 .inner
779 .http_list_portfolio_fills(portfolio_id, params)
780 .await
781 .map_err(|e| anyhow::anyhow!(e))?;
782
783 let ts_init = get_atomic_clock_realtime().get_time_ns();
784
785 let mut reports: Vec<FillReport> = Vec::new();
786 for fill in resp.results {
787 let instrument = self.get_instrument_from_cache(fill.symbol)?;
788
789 let report = parse_fill_report(
790 fill,
791 account_id,
792 instrument.price_precision(),
793 instrument.size_precision(),
794 ts_init,
795 );
796 reports.push(report);
797 }
798
799 Ok(reports)
800 }
801
802 pub async fn request_position_status_report(
804 &self,
805 account_id: AccountId,
806 symbol: Symbol,
807 ) -> anyhow::Result<PositionStatusReport> {
808 let portfolio_id = account_id.get_issuers_id();
809
810 let resp = self
811 .inner
812 .http_get_portfolio_position(portfolio_id, symbol.as_str())
813 .await
814 .map_err(|e| anyhow::anyhow!(e))?;
815
816 let instrument = self.get_instrument_from_cache(resp.symbol)?;
817 let ts_init = get_atomic_clock_realtime().get_time_ns();
818
819 let report =
820 parse_position_status_report(resp, account_id, instrument.size_precision(), ts_init);
821
822 Ok(report)
823 }
824
825 pub async fn request_position_status_reports(
827 &self,
828 account_id: AccountId,
829 ) -> anyhow::Result<Vec<PositionStatusReport>> {
830 let portfolio_id = account_id.get_issuers_id();
831
832 let resp = self
833 .inner
834 .http_list_portfolio_positions(portfolio_id)
835 .await
836 .map_err(|e| anyhow::anyhow!(e))?;
837
838 let ts_init = get_atomic_clock_realtime().get_time_ns();
839
840 let mut reports: Vec<PositionStatusReport> = Vec::new();
841 for position in resp {
842 let instrument = self.get_instrument_from_cache(position.symbol)?;
843
844 let report = parse_position_status_report(
845 position,
846 account_id,
847 instrument.size_precision(),
848 ts_init,
849 );
850
851 reports.push(report);
852 }
853
854 Ok(reports)
855 }
856
857 #[allow(clippy::too_many_arguments)]
859 pub async fn submit_order(
860 &self,
861 account_id: AccountId,
862 client_order_id: ClientOrderId,
863 symbol: Symbol,
864 order_side: OrderSide,
865 order_type: OrderType,
866 quantity: Quantity,
867 time_in_force: TimeInForce,
868 expire_time: Option<DateTime<Utc>>,
869 price: Option<Price>,
870 trigger_price: Option<Price>,
871 post_only: Option<bool>,
872 reduce_only: Option<bool>,
873 ) -> anyhow::Result<OrderStatusReport> {
874 let coinbase_side: CoinbaseIntxSide = order_side.into();
875 let coinbase_order_type: CoinbaseIntxOrderType = order_type.into();
876 let coinbase_tif: CoinbaseIntxTimeInForce = time_in_force.into();
877
878 let mut params = CreateOrderParamsBuilder::default();
879 params.portfolio(account_id.get_issuers_id());
880 params.client_order_id(client_order_id.as_str());
881 params.instrument(symbol.as_str());
882 params.side(coinbase_side);
883 params.size(quantity.to_string());
884 params.order_type(coinbase_order_type);
885 params.tif(coinbase_tif);
886 if let Some(expire_time) = expire_time {
887 params.expire_time(expire_time);
888 }
889 if let Some(price) = price {
890 params.price(price.to_string());
891 }
892 if let Some(trigger_price) = trigger_price {
893 params.stop_price(trigger_price.to_string());
894 }
895 if let Some(post_only) = post_only {
896 params.post_only(post_only);
897 }
898 if let Some(reduce_only) = reduce_only {
899 params.close_only(reduce_only);
900 }
901 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
902
903 let resp = self.inner.http_create_order(params).await?;
904 tracing::debug!("Submitted order: {resp:?}");
905
906 let instrument = self.get_instrument_from_cache(resp.symbol)?;
907 let ts_init = get_atomic_clock_realtime().get_time_ns();
908 let report = parse_order_status_report(
909 resp,
910 account_id,
911 instrument.price_precision(),
912 instrument.size_precision(),
913 ts_init,
914 );
915 Ok(report)
916 }
917
918 pub async fn cancel_order(
920 &self,
921 account_id: AccountId,
922 client_order_id: ClientOrderId,
923 ) -> anyhow::Result<OrderStatusReport> {
924 let portfolio_id = account_id.get_issuers_id();
925
926 let resp = self
927 .inner
928 .http_cancel_order(client_order_id.as_str(), portfolio_id)
929 .await?;
930 tracing::debug!("Canceled order: {resp:?}");
931
932 let instrument = self.get_instrument_from_cache(resp.symbol)?;
933 let ts_init = get_atomic_clock_realtime().get_time_ns();
934
935 let report = parse_order_status_report(
936 resp,
937 account_id,
938 instrument.price_precision(),
939 instrument.size_precision(),
940 ts_init,
941 );
942 Ok(report)
943 }
944
945 pub async fn cancel_orders(
947 &self,
948 account_id: AccountId,
949 symbol: Symbol,
950 order_side: Option<OrderSide>,
951 ) -> anyhow::Result<Vec<OrderStatusReport>> {
952 let mut params = CancelOrdersParamsBuilder::default();
953 params.portfolio(account_id.get_issuers_id());
954 params.instrument(symbol.as_str());
955 if let Some(side) = order_side {
956 let side: CoinbaseIntxSide = side.into();
957 params.side(side);
958 };
959 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
960
961 let resp = self.inner.http_cancel_orders(params).await?;
962
963 let instrument = self.get_instrument_from_cache(symbol.inner())?;
964 let ts_init = get_atomic_clock_realtime().get_time_ns();
965
966 let mut reports: Vec<OrderStatusReport> = Vec::with_capacity(resp.len());
967 for order in resp {
968 tracing::debug!("Canceled order: {order:?}");
969 let report = parse_order_status_report(
970 order,
971 account_id,
972 instrument.price_precision(),
973 instrument.size_precision(),
974 ts_init,
975 );
976 reports.push(report)
977 }
978
979 Ok(reports)
980 }
981
982 #[allow(clippy::too_many_arguments)]
984 pub async fn modify_order(
985 &self,
986 account_id: AccountId,
987 client_order_id: ClientOrderId,
988 new_client_order_id: ClientOrderId,
989 price: Option<Price>,
990 trigger_price: Option<Price>,
991 quantity: Option<Quantity>,
992 ) -> anyhow::Result<OrderStatusReport> {
993 let mut params = ModifyOrderParamsBuilder::default();
994 params.portfolio(account_id.get_issuers_id());
995 params.client_order_id(new_client_order_id.as_str());
996 if let Some(price) = price {
997 params.price(price.to_string());
998 };
999 if let Some(trigger_price) = trigger_price {
1000 params.price(trigger_price.to_string());
1001 };
1002 if let Some(quantity) = quantity {
1003 params.size(quantity.to_string());
1004 };
1005 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1006
1007 let resp = self
1008 .inner
1009 .http_modify_order(client_order_id.as_str(), params)
1010 .await?;
1011 tracing::debug!("Modified order {}", resp.client_order_id);
1012
1013 let instrument = self.get_instrument_from_cache(resp.symbol)?;
1014 let ts_init = get_atomic_clock_realtime().get_time_ns();
1015 let report = parse_order_status_report(
1016 resp,
1017 account_id,
1018 instrument.price_precision(),
1019 instrument.size_precision(),
1020 ts_init,
1021 );
1022
1023 Ok(report)
1024 }
1025}