1use std::{
37 collections::HashMap,
38 fmt::Debug,
39 num::NonZeroU32,
40 sync::{Arc, LazyLock, Mutex},
41};
42
43use ahash::AHashSet;
44use chrono::{DateTime, Utc};
45use nautilus_core::{
46 UnixNanos, consts::NAUTILUS_USER_AGENT, env::get_env_var, time::get_atomic_clock_realtime,
47};
48use nautilus_model::{
49 data::{Bar, BarType, IndexPriceUpdate, MarkPriceUpdate, TradeTick},
50 enums::{AggregationSource, BarAggregation},
51 events::AccountState,
52 identifiers::{AccountId, InstrumentId},
53 instruments::{Instrument, InstrumentAny},
54 reports::{FillReport, OrderStatusReport, PositionStatusReport},
55};
56use nautilus_network::{
57 http::HttpClient,
58 ratelimiter::quota::Quota,
59 retry::{RetryConfig, RetryManager},
60};
61use reqwest::{Method, StatusCode, header::USER_AGENT};
62use serde::{Deserialize, Serialize, de::DeserializeOwned};
63use tokio_util::sync::CancellationToken;
64use ustr::Ustr;
65
66use super::{
67 error::OKXHttpError,
68 models::{
69 OKXAccount, OKXIndexTicker, OKXMarkPrice, OKXOrderHistory, OKXPosition, OKXPositionHistory,
70 OKXPositionTier, OKXTransactionDetail,
71 },
72 query::{
73 GetCandlesticksParams, GetCandlesticksParamsBuilder, GetIndexTickerParams,
74 GetIndexTickerParamsBuilder, GetInstrumentsParams, GetInstrumentsParamsBuilder,
75 GetMarkPriceParams, GetMarkPriceParamsBuilder, GetOrderHistoryParams,
76 GetOrderHistoryParamsBuilder, GetOrderListParams, GetOrderListParamsBuilder,
77 GetPositionTiersParams, GetPositionsHistoryParams, GetPositionsParams,
78 GetPositionsParamsBuilder, GetTradesParams, GetTradesParamsBuilder,
79 GetTransactionDetailsParams, GetTransactionDetailsParamsBuilder, SetPositionModeParams,
80 SetPositionModeParamsBuilder,
81 },
82};
83use crate::{
84 common::{
85 consts::{OKX_HTTP_URL, should_retry_error_code},
86 credential::Credential,
87 enums::{OKXInstrumentType, OKXPositionMode},
88 models::OKXInstrument,
89 parse::{
90 okx_instrument_type, parse_account_state, parse_candlestick, parse_fill_report,
91 parse_index_price_update, parse_instrument_any, parse_mark_price_update,
92 parse_order_status_report, parse_position_status_report, parse_trade_tick,
93 },
94 },
95 http::{
96 models::{OKXCandlestick, OKXTrade},
97 query::{GetOrderParams, GetPendingOrdersParams},
98 },
99};
100
101const OKX_SUCCESS_CODE: &str = "0";
102
103pub static OKX_REST_QUOTA: LazyLock<Quota> =
112 LazyLock::new(|| Quota::per_second(NonZeroU32::new(250).unwrap()));
113
114#[derive(Debug, Serialize, Deserialize)]
116pub struct OKXResponse<T> {
117 pub code: String,
119 pub msg: String,
121 pub data: Vec<T>,
123}
124
125pub struct OKXHttpInnerClient {
131 base_url: String,
132 client: HttpClient,
133 credential: Option<Credential>,
134 retry_manager: RetryManager<OKXHttpError>,
135 cancellation_token: CancellationToken,
136}
137
138impl Default for OKXHttpInnerClient {
139 fn default() -> Self {
140 Self::new(None, Some(60), None, None, None)
141 .expect("Failed to create default OKXHttpInnerClient")
142 }
143}
144
145impl Debug for OKXHttpInnerClient {
146 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
147 let credential = self.credential.as_ref().map(|_| "<redacted>");
148 f.debug_struct(stringify!(OKXHttpInnerClient))
149 .field("base_url", &self.base_url)
150 .field("credential", &credential)
151 .finish_non_exhaustive()
152 }
153}
154
155impl OKXHttpInnerClient {
156 pub fn cancel_all_requests(&self) {
158 self.cancellation_token.cancel();
159 }
160
161 pub fn cancellation_token(&self) -> &CancellationToken {
163 &self.cancellation_token
164 }
165
166 pub fn new(
176 base_url: Option<String>,
177 timeout_secs: Option<u64>,
178 max_retries: Option<u32>,
179 retry_delay_ms: Option<u64>,
180 retry_delay_max_ms: Option<u64>,
181 ) -> Result<Self, OKXHttpError> {
182 let retry_config = RetryConfig {
183 max_retries: max_retries.unwrap_or(3),
184 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
185 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
186 backoff_factor: 2.0,
187 jitter_ms: 1000,
188 operation_timeout_ms: Some(60_000),
189 immediate_first: false,
190 max_elapsed_ms: Some(180_000),
191 };
192
193 let retry_manager = RetryManager::new(retry_config).map_err(|e| {
194 OKXHttpError::ValidationError(format!("Failed to create retry manager: {e}"))
195 })?;
196
197 Ok(Self {
198 base_url: base_url.unwrap_or(OKX_HTTP_URL.to_string()),
199 client: HttpClient::new(
200 Self::default_headers(),
201 vec![],
202 vec![],
203 Some(*OKX_REST_QUOTA),
204 timeout_secs,
205 ),
206 credential: None,
207 retry_manager,
208 cancellation_token: CancellationToken::new(),
209 })
210 }
211
212 #[allow(clippy::too_many_arguments)]
219 pub fn with_credentials(
220 api_key: String,
221 api_secret: String,
222 api_passphrase: String,
223 base_url: String,
224 timeout_secs: Option<u64>,
225 max_retries: Option<u32>,
226 retry_delay_ms: Option<u64>,
227 retry_delay_max_ms: Option<u64>,
228 ) -> Result<Self, OKXHttpError> {
229 let retry_config = RetryConfig {
230 max_retries: max_retries.unwrap_or(3),
231 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
232 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
233 backoff_factor: 2.0,
234 jitter_ms: 1000,
235 operation_timeout_ms: Some(60_000),
236 immediate_first: false,
237 max_elapsed_ms: Some(180_000),
238 };
239
240 let retry_manager = RetryManager::new(retry_config).map_err(|e| {
241 OKXHttpError::ValidationError(format!("Failed to create retry manager: {e}"))
242 })?;
243
244 Ok(Self {
245 base_url,
246 client: HttpClient::new(
247 Self::default_headers(),
248 vec![],
249 vec![],
250 Some(*OKX_REST_QUOTA),
251 timeout_secs,
252 ),
253 credential: Some(Credential::new(api_key, api_secret, api_passphrase)),
254 retry_manager,
255 cancellation_token: CancellationToken::new(),
256 })
257 }
258
259 fn default_headers() -> HashMap<String, String> {
261 HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())])
262 }
263
264 fn build_path<S: Serialize>(base: &str, params: &S) -> Result<String, OKXHttpError> {
270 let query = serde_urlencoded::to_string(params)
271 .map_err(|e| OKXHttpError::JsonError(e.to_string()))?;
272 if query.is_empty() {
273 Ok(base.to_owned())
274 } else {
275 Ok(format!("{base}?{query}"))
276 }
277 }
278
279 fn sign_request(
286 &self,
287 method: &Method,
288 path: &str,
289 body: Option<&[u8]>,
290 ) -> Result<HashMap<String, String>, OKXHttpError> {
291 let credential = match self.credential.as_ref() {
292 Some(c) => c,
293 None => return Err(OKXHttpError::MissingCredentials),
294 };
295
296 let body_str = body
297 .and_then(|b| String::from_utf8(b.to_vec()).ok())
298 .unwrap_or_default();
299
300 tracing::debug!("{method} {path}");
301
302 let api_key = credential.api_key.clone().to_string();
303 let api_passphrase = credential.api_passphrase.clone();
304 let timestamp = Utc::now().format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string();
305 let signature = credential.sign(×tamp, method.as_str(), path, &body_str);
306
307 let mut headers = HashMap::new();
308 headers.insert("OK-ACCESS-KEY".to_string(), api_key);
309 headers.insert("OK-ACCESS-PASSPHRASE".to_string(), api_passphrase);
310 headers.insert("OK-ACCESS-TIMESTAMP".to_string(), timestamp);
311 headers.insert("OK-ACCESS-SIGN".to_string(), signature);
312
313 Ok(headers)
314 }
315
316 async fn send_request<T: DeserializeOwned>(
332 &self,
333 method: Method,
334 path: &str,
335 body: Option<Vec<u8>>,
336 authenticate: bool,
337 ) -> Result<Vec<T>, OKXHttpError> {
338 let url = format!("{}{path}", self.base_url);
339 let endpoint = path.to_string();
340 let method_clone = method.clone();
341 let body_clone = body.clone();
342
343 let operation = || {
344 let url = url.clone();
345 let method = method_clone.clone();
346 let body = body_clone.clone();
347 let endpoint = endpoint.clone();
348
349 async move {
350 let mut headers = if authenticate {
351 self.sign_request(&method, &endpoint, body.as_deref())?
352 } else {
353 HashMap::new()
354 };
355
356 if body.is_some() {
358 headers.insert("Content-Type".to_string(), "application/json".to_string());
359 }
360
361 let resp = self
362 .client
363 .request(method.clone(), url, Some(headers), body, None, None)
364 .await?;
365
366 tracing::trace!("Response: {resp:?}");
367
368 if resp.status.is_success() {
369 let okx_response: OKXResponse<T> =
370 serde_json::from_slice(&resp.body).map_err(|e| {
371 tracing::error!("Failed to deserialize OKXResponse: {e}");
372 OKXHttpError::JsonError(e.to_string())
373 })?;
374
375 if okx_response.code != OKX_SUCCESS_CODE {
376 return Err(OKXHttpError::OkxError {
377 error_code: okx_response.code,
378 message: okx_response.msg,
379 });
380 }
381
382 Ok(okx_response.data)
383 } else {
384 let error_body = String::from_utf8_lossy(&resp.body);
385 tracing::error!(
386 "HTTP error {} with body: {error_body}",
387 resp.status.as_str()
388 );
389
390 if let Ok(parsed_error) = serde_json::from_slice::<OKXResponse<T>>(&resp.body) {
391 return Err(OKXHttpError::OkxError {
392 error_code: parsed_error.code,
393 message: parsed_error.msg,
394 });
395 }
396
397 Err(OKXHttpError::UnexpectedStatus {
398 status: StatusCode::from_u16(resp.status.as_u16()).unwrap(),
399 body: error_body.to_string(),
400 })
401 }
402 }
403 };
404
405 let should_retry = |error: &OKXHttpError| -> bool {
414 match error {
415 OKXHttpError::HttpClientError(_) => true,
416 OKXHttpError::UnexpectedStatus { status, .. } => {
417 status.as_u16() >= 500 || status.as_u16() == 429
418 }
419 OKXHttpError::OkxError { error_code, .. } => should_retry_error_code(error_code),
420 _ => false,
421 }
422 };
423
424 let create_error = |msg: String| -> OKXHttpError {
425 if msg == "canceled" {
426 OKXHttpError::ValidationError("Request canceled".to_string())
427 } else {
428 OKXHttpError::ValidationError(msg)
429 }
430 };
431
432 self.retry_manager
433 .execute_with_retry_with_cancel(
434 &endpoint,
435 operation,
436 should_retry,
437 create_error,
438 &self.cancellation_token,
439 )
440 .await
441 }
442
443 pub async fn http_set_position_mode(
454 &self,
455 params: SetPositionModeParams,
456 ) -> Result<Vec<serde_json::Value>, OKXHttpError> {
457 let path = "/api/v5/account/set-position-mode";
458 let body = serde_json::to_vec(¶ms)?;
459 self.send_request(Method::POST, path, Some(body), true)
460 .await
461 }
462
463 pub async fn http_get_position_tiers(
474 &self,
475 params: GetPositionTiersParams,
476 ) -> Result<Vec<OKXPositionTier>, OKXHttpError> {
477 let path = Self::build_path("/api/v5/public/position-tiers", ¶ms)?;
478 self.send_request(Method::GET, &path, None, false).await
479 }
480
481 pub async fn http_get_instruments(
492 &self,
493 params: GetInstrumentsParams,
494 ) -> Result<Vec<OKXInstrument>, OKXHttpError> {
495 let path = Self::build_path("/api/v5/public/instruments", ¶ms)?;
496 self.send_request(Method::GET, &path, None, false).await
497 }
498
499 pub async fn http_get_mark_price(
513 &self,
514 params: GetMarkPriceParams,
515 ) -> Result<Vec<OKXMarkPrice>, OKXHttpError> {
516 let path = Self::build_path("/api/v5/public/mark-price", ¶ms)?;
517 self.send_request(Method::GET, &path, None, false).await
518 }
519
520 pub async fn http_get_index_ticker(
526 &self,
527 params: GetIndexTickerParams,
528 ) -> Result<Vec<OKXIndexTicker>, OKXHttpError> {
529 let path = Self::build_path("/api/v5/market/index-tickers", ¶ms)?;
530 self.send_request(Method::GET, &path, None, false).await
531 }
532
533 pub async fn http_get_trades(
539 &self,
540 params: GetTradesParams,
541 ) -> Result<Vec<OKXTrade>, OKXHttpError> {
542 let path = Self::build_path("/api/v5/market/history-trades", ¶ms)?;
543 self.send_request(Method::GET, &path, None, false).await
544 }
545
546 pub async fn http_get_candlesticks(
552 &self,
553 params: GetCandlesticksParams,
554 ) -> Result<Vec<OKXCandlestick>, OKXHttpError> {
555 let path = Self::build_path("/api/v5/market/candles", ¶ms)?;
556 self.send_request(Method::GET, &path, None, false).await
557 }
558
559 pub async fn http_get_candlesticks_history(
565 &self,
566 params: GetCandlesticksParams,
567 ) -> Result<Vec<OKXCandlestick>, OKXHttpError> {
568 let path = Self::build_path("/api/v5/market/history-candles", ¶ms)?;
569 self.send_request(Method::GET, &path, None, false).await
570 }
571
572 pub async fn http_get_pending_orders(
578 &self,
579 params: GetPendingOrdersParams,
580 ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
581 let path = Self::build_path("/api/v5/trade/orders-pending", ¶ms)?;
582 self.send_request(Method::GET, &path, None, true).await
583 }
584
585 pub async fn http_get_order(
591 &self,
592 params: GetOrderParams,
593 ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
594 let path = Self::build_path("/api/v5/trade/order", ¶ms)?;
595 self.send_request(Method::GET, &path, None, true).await
596 }
597
598 pub async fn http_get_balance(&self) -> Result<Vec<OKXAccount>, OKXHttpError> {
605 let path = "/api/v5/account/balance";
606 self.send_request(Method::GET, path, None, true).await
607 }
608
609 pub async fn http_get_order_history(
615 &self,
616 params: GetOrderHistoryParams,
617 ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
618 let path = Self::build_path("/api/v5/trade/orders-history", ¶ms)?;
619 self.send_request(Method::GET, &path, None, true).await
620 }
621
622 pub async fn http_get_order_list(
628 &self,
629 params: GetOrderListParams,
630 ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
631 let path = Self::build_path("/api/v5/trade/orders-pending", ¶ms)?;
632 self.send_request(Method::GET, &path, None, true).await
633 }
634
635 pub async fn http_get_positions(
643 &self,
644 params: GetPositionsParams,
645 ) -> Result<Vec<OKXPosition>, OKXHttpError> {
646 let path = Self::build_path("/api/v5/account/positions", ¶ms)?;
647 self.send_request(Method::GET, &path, None, true).await
648 }
649
650 pub async fn http_get_position_history(
656 &self,
657 params: GetPositionsHistoryParams,
658 ) -> Result<Vec<OKXPositionHistory>, OKXHttpError> {
659 let path = Self::build_path("/api/v5/account/positions-history", ¶ms)?;
660 self.send_request(Method::GET, &path, None, true).await
661 }
662
663 pub async fn http_get_transaction_details(
669 &self,
670 params: GetTransactionDetailsParams,
671 ) -> Result<Vec<OKXTransactionDetail>, OKXHttpError> {
672 let path = Self::build_path("/api/v5/trade/fills", ¶ms)?;
673 self.send_request(Method::GET, &path, None, true).await
674 }
675}
676
677#[derive(Clone, Debug)]
682#[cfg_attr(
683 feature = "python",
684 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
685)]
686pub struct OKXHttpClient {
687 pub(crate) inner: Arc<OKXHttpInnerClient>,
688 pub(crate) instruments_cache: Arc<Mutex<HashMap<Ustr, InstrumentAny>>>,
689 cache_initialized: bool,
690}
691
692impl Default for OKXHttpClient {
693 fn default() -> Self {
694 Self::new(None, Some(60), None, None, None).expect("Failed to create default OKXHttpClient")
695 }
696}
697
698impl OKXHttpClient {
699 pub fn new(
709 base_url: Option<String>,
710 timeout_secs: Option<u64>,
711 max_retries: Option<u32>,
712 retry_delay_ms: Option<u64>,
713 retry_delay_max_ms: Option<u64>,
714 ) -> anyhow::Result<Self> {
715 Ok(Self {
716 inner: Arc::new(OKXHttpInnerClient::new(
717 base_url,
718 timeout_secs,
719 max_retries,
720 retry_delay_ms,
721 retry_delay_max_ms,
722 )?),
723 instruments_cache: Arc::new(Mutex::new(HashMap::new())),
724 cache_initialized: false,
725 })
726 }
727
728 pub fn from_env() -> anyhow::Result<Self> {
731 Self::with_credentials(None, None, None, None, None, None, None, None)
732 }
733
734 #[allow(clippy::too_many_arguments)]
737 pub fn with_credentials(
738 api_key: Option<String>,
739 api_secret: Option<String>,
740 api_passphrase: Option<String>,
741 base_url: Option<String>,
742 timeout_secs: Option<u64>,
743 max_retries: Option<u32>,
744 retry_delay_ms: Option<u64>,
745 retry_delay_max_ms: Option<u64>,
746 ) -> anyhow::Result<Self> {
747 let api_key = api_key.unwrap_or(get_env_var("OKX_API_KEY")?);
748 let api_secret = api_secret.unwrap_or(get_env_var("OKX_API_SECRET")?);
749 let api_passphrase = api_passphrase.unwrap_or(get_env_var("OKX_API_PASSPHRASE")?);
750 let base_url = base_url.unwrap_or(OKX_HTTP_URL.to_string());
751
752 Ok(Self {
753 inner: Arc::new(OKXHttpInnerClient::with_credentials(
754 api_key,
755 api_secret,
756 api_passphrase,
757 base_url,
758 timeout_secs,
759 max_retries,
760 retry_delay_ms,
761 retry_delay_max_ms,
762 )?),
763 instruments_cache: Arc::new(Mutex::new(HashMap::new())),
764 cache_initialized: false,
765 })
766 }
767
768 fn get_instrument_from_cache(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
774 self.instruments_cache
775 .lock()
776 .expect("`instruments_cache` lock poisoned")
777 .get(&symbol)
778 .cloned()
779 .ok_or_else(|| anyhow::anyhow!("Instrument {symbol} not in cache"))
780 }
781
782 async fn instrument_or_fetch(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
783 if let Ok(inst) = self.get_instrument_from_cache(symbol) {
784 return Ok(inst);
785 }
786
787 for group in [
788 OKXInstrumentType::Spot,
789 OKXInstrumentType::Margin,
790 OKXInstrumentType::Futures,
791 ] {
792 if let Ok(instruments) = self.request_instruments(group).await {
793 let mut guard = self.instruments_cache.lock().unwrap();
794 for inst in instruments {
795 guard.insert(inst.raw_symbol().inner(), inst);
796 }
797 drop(guard);
798
799 if let Ok(inst) = self.get_instrument_from_cache(symbol) {
800 return Ok(inst);
801 }
802 }
803 }
804
805 anyhow::bail!("Instrument {symbol} not in cache and fetch failed");
806 }
807
808 pub fn cancel_all_requests(&self) {
810 self.inner.cancel_all_requests();
811 }
812
813 pub fn cancellation_token(&self) -> &CancellationToken {
815 self.inner.cancellation_token()
816 }
817
818 pub fn base_url(&self) -> &str {
820 self.inner.base_url.as_str()
821 }
822
823 pub fn api_key(&self) -> Option<&str> {
825 self.inner.credential.as_ref().map(|c| c.api_key.as_str())
826 }
827
828 #[must_use]
832 pub const fn is_initialized(&self) -> bool {
833 self.cache_initialized
834 }
835
836 fn generate_ts_init(&self) -> UnixNanos {
838 get_atomic_clock_realtime().get_time_ns()
839 }
840
841 #[must_use]
843 pub fn get_cached_symbols(&self) -> Vec<String> {
851 self.instruments_cache
852 .lock()
853 .unwrap()
854 .keys()
855 .map(std::string::ToString::to_string)
856 .collect()
857 }
858
859 pub fn add_instruments(&mut self, instruments: Vec<InstrumentAny>) {
868 for inst in instruments {
869 self.instruments_cache
870 .lock()
871 .unwrap()
872 .insert(inst.raw_symbol().inner(), inst);
873 }
874 self.cache_initialized = true;
875 }
876
877 pub fn add_instrument(&mut self, instrument: InstrumentAny) {
886 self.instruments_cache
887 .lock()
888 .unwrap()
889 .insert(instrument.raw_symbol().inner(), instrument);
890 self.cache_initialized = true;
891 }
892
893 pub async fn request_account_state(
899 &self,
900 account_id: AccountId,
901 ) -> anyhow::Result<AccountState> {
902 let resp = self
903 .inner
904 .http_get_balance()
905 .await
906 .map_err(|e| anyhow::anyhow!(e))?;
907
908 let ts_init = self.generate_ts_init();
909 let raw = resp
910 .first()
911 .ok_or_else(|| anyhow::anyhow!("No account state returned from OKX"))?;
912 let account_state = parse_account_state(raw, account_id, ts_init)?;
913
914 Ok(account_state)
915 }
916
917 pub async fn set_position_mode(&self, position_mode: OKXPositionMode) -> anyhow::Result<()> {
930 let mut params = SetPositionModeParamsBuilder::default();
931 params.pos_mode(position_mode);
932 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
933
934 match self.inner.http_set_position_mode(params).await {
935 Ok(_) => Ok(()),
936 Err(e) => {
937 if let crate::http::error::OKXHttpError::OkxError {
939 error_code,
940 message,
941 } = &e
942 && error_code == "50115"
943 {
944 tracing::warn!(
945 "Account does not support position mode setting (derivatives trading not enabled): {message}"
946 );
947 return Ok(()); }
949 anyhow::bail!(e)
950 }
951 }
952 }
953
954 pub async fn request_instruments(
960 &self,
961 instrument_type: OKXInstrumentType,
962 ) -> anyhow::Result<Vec<InstrumentAny>> {
963 let mut params = GetInstrumentsParamsBuilder::default();
964 params.inst_type(instrument_type);
965 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
966
967 let resp = self
968 .inner
969 .http_get_instruments(params)
970 .await
971 .map_err(|e| anyhow::anyhow!(e))?;
972
973 let ts_init = self.generate_ts_init();
974
975 let mut instruments: Vec<InstrumentAny> = Vec::new();
976 for inst in &resp {
977 if let Some(instrument_any) = parse_instrument_any(inst, ts_init)? {
978 instruments.push(instrument_any);
979 }
980 }
981
982 Ok(instruments)
983 }
984
985 pub async fn request_mark_price(
991 &self,
992 instrument_id: InstrumentId,
993 ) -> anyhow::Result<MarkPriceUpdate> {
994 let mut params = GetMarkPriceParamsBuilder::default();
995 params.inst_id(instrument_id.symbol.inner());
996 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
997
998 let resp = self
999 .inner
1000 .http_get_mark_price(params)
1001 .await
1002 .map_err(|e| anyhow::anyhow!(e))?;
1003
1004 let raw = resp
1005 .first()
1006 .ok_or_else(|| anyhow::anyhow!("No mark price returned from OKX"))?;
1007 let inst = self
1008 .instrument_or_fetch(instrument_id.symbol.inner())
1009 .await?;
1010 let ts_init = self.generate_ts_init();
1011
1012 let mark_price =
1013 parse_mark_price_update(raw, instrument_id, inst.price_precision(), ts_init)
1014 .map_err(|e| anyhow::anyhow!(e))?;
1015 Ok(mark_price)
1016 }
1017
1018 pub async fn request_index_price(
1024 &self,
1025 instrument_id: InstrumentId,
1026 ) -> anyhow::Result<IndexPriceUpdate> {
1027 let mut params = GetIndexTickerParamsBuilder::default();
1028 params.inst_id(instrument_id.symbol.inner());
1029 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1030
1031 let resp = self
1032 .inner
1033 .http_get_index_ticker(params)
1034 .await
1035 .map_err(|e| anyhow::anyhow!(e))?;
1036
1037 let raw = resp
1038 .first()
1039 .ok_or_else(|| anyhow::anyhow!("No index price returned from OKX"))?;
1040 let inst = self
1041 .instrument_or_fetch(instrument_id.symbol.inner())
1042 .await?;
1043 let ts_init = self.generate_ts_init();
1044
1045 let index_price =
1046 parse_index_price_update(raw, instrument_id, inst.price_precision(), ts_init)
1047 .map_err(|e| anyhow::anyhow!(e))?;
1048 Ok(index_price)
1049 }
1050
1051 pub async fn request_trades(
1057 &self,
1058 instrument_id: InstrumentId,
1059 start: Option<DateTime<Utc>>,
1060 end: Option<DateTime<Utc>>,
1061 limit: Option<u32>,
1062 ) -> anyhow::Result<Vec<TradeTick>> {
1063 let mut params = GetTradesParamsBuilder::default();
1064
1065 params.inst_id(instrument_id.symbol.inner());
1066 if let Some(s) = start {
1067 params.before(s.timestamp_millis().to_string());
1068 }
1069 if let Some(e) = end {
1070 params.after(e.timestamp_millis().to_string());
1071 }
1072 if let Some(l) = limit {
1073 params.limit(l);
1074 }
1075
1076 let params = params.build().map_err(anyhow::Error::new)?;
1077
1078 let raw_trades = self
1080 .inner
1081 .http_get_trades(params)
1082 .await
1083 .map_err(anyhow::Error::new)?;
1084
1085 let ts_init = self.generate_ts_init();
1086 let inst = self
1087 .instrument_or_fetch(instrument_id.symbol.inner())
1088 .await?;
1089
1090 let mut trades = Vec::with_capacity(raw_trades.len());
1091 for raw in raw_trades {
1092 match parse_trade_tick(
1093 &raw,
1094 instrument_id,
1095 inst.price_precision(),
1096 inst.size_precision(),
1097 ts_init,
1098 ) {
1099 Ok(trade) => trades.push(trade),
1100 Err(e) => tracing::error!("{e}"),
1101 }
1102 }
1103
1104 Ok(trades)
1105 }
1106
1107 pub async fn request_bars(
1148 &self,
1149 bar_type: BarType,
1150 start: Option<DateTime<Utc>>,
1151 mut end: Option<DateTime<Utc>>,
1152 limit: Option<u32>,
1153 ) -> anyhow::Result<Vec<Bar>> {
1154 const HISTORY_SPLIT_DAYS: i64 = 100;
1155 const MAX_PAGES_SOFT: usize = 500;
1156
1157 let limit = if limit == Some(0) { None } else { limit };
1158
1159 anyhow::ensure!(
1160 bar_type.aggregation_source() == AggregationSource::External,
1161 "Only EXTERNAL aggregation is supported"
1162 );
1163 if let (Some(s), Some(e)) = (start, end) {
1164 anyhow::ensure!(s < e, "Invalid time range: start={s:?} end={e:?}");
1165 }
1166
1167 let now = Utc::now();
1168 if let Some(s) = start
1169 && s > now
1170 {
1171 return Ok(Vec::new());
1172 }
1173 if let Some(e) = end
1174 && e > now
1175 {
1176 end = Some(now);
1177 }
1178
1179 let spec = bar_type.spec();
1180 let step = spec.step.get();
1181 let bar_param = match spec.aggregation {
1182 BarAggregation::Second => format!("{step}s"),
1183 BarAggregation::Minute => format!("{step}m"),
1184 BarAggregation::Hour => format!("{step}H"),
1185 BarAggregation::Day => format!("{step}D"),
1186 BarAggregation::Week => format!("{step}W"),
1187 BarAggregation::Month => format!("{step}M"),
1188 a => anyhow::bail!("OKX does not support {a:?} aggregation"),
1189 };
1190
1191 let slot_ms: i64 = match spec.aggregation {
1192 BarAggregation::Second => (step as i64) * 1_000,
1193 BarAggregation::Minute => (step as i64) * 60_000,
1194 BarAggregation::Hour => (step as i64) * 3_600_000,
1195 BarAggregation::Day => (step as i64) * 86_400_000,
1196 BarAggregation::Week => (step as i64) * 7 * 86_400_000,
1197 BarAggregation::Month => (step as i64) * 30 * 86_400_000,
1198 _ => unreachable!("Unsupported aggregation should have been caught above"),
1199 };
1200 let slot_ns: i64 = slot_ms * 1_000_000;
1201
1202 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
1203 enum Mode {
1204 Latest,
1205 Backward,
1206 Range,
1207 }
1208
1209 let mode = match (start, end) {
1210 (None, None) => Mode::Latest,
1211 (Some(_), None) => Mode::Backward, (None, Some(_)) => Mode::Backward,
1213 (Some(_), Some(_)) => Mode::Range,
1214 };
1215
1216 let start_ns = start.and_then(|s| s.timestamp_nanos_opt());
1217 let end_ns = end.and_then(|e| e.timestamp_nanos_opt());
1218
1219 let start_ms = start.map(|s| {
1221 let ms = s.timestamp_millis();
1222 if slot_ms > 0 {
1223 (ms / slot_ms) * slot_ms } else {
1225 ms
1226 }
1227 });
1228 let end_ms = end.map(|e| {
1229 let ms = e.timestamp_millis();
1230 if slot_ms > 0 {
1231 ((ms + slot_ms - 1) / slot_ms) * slot_ms } else {
1233 ms
1234 }
1235 });
1236 let now_ms = now.timestamp_millis();
1237
1238 let symbol = bar_type.instrument_id().symbol;
1239 let inst = self.instrument_or_fetch(symbol.inner()).await?;
1240
1241 let mut out: Vec<Bar> = Vec::new();
1242 let mut pages = 0usize;
1243
1244 let mut after_ms: Option<i64> = None;
1249 let mut before_ms: Option<i64> = match mode {
1250 Mode::Backward => end_ms.map(|v| v.saturating_sub(1)),
1251 Mode::Range => {
1252 Some(end_ms.unwrap_or(now_ms))
1255 }
1256 Mode::Latest => None,
1257 };
1258
1259 let mut forward_prepend_mode = matches!(mode, Mode::Range);
1261
1262 if matches!(mode, Mode::Backward | Mode::Range)
1266 && let Some(b) = before_ms
1267 {
1268 let buffer_ms = slot_ms.max(60_000); if b >= now_ms.saturating_sub(buffer_ms) {
1274 before_ms = Some(now_ms.saturating_sub(buffer_ms));
1275 }
1276 }
1277
1278 let mut have_latest_first_page = false;
1279 let mut progressless_loops = 0u8;
1280
1281 loop {
1282 if let Some(lim) = limit
1283 && lim > 0
1284 && out.len() >= lim as usize
1285 {
1286 break;
1287 }
1288 if pages >= MAX_PAGES_SOFT {
1289 break;
1290 }
1291
1292 let pivot_ms = if let Some(a) = after_ms {
1293 a
1294 } else if let Some(b) = before_ms {
1295 b
1296 } else {
1297 now_ms
1298 };
1299 let age_ms = now_ms.saturating_sub(pivot_ms);
1305 let age_hours = age_ms / (60 * 60 * 1000);
1306 let using_history = age_hours > 1; let page_ceiling = if using_history { 100 } else { 300 };
1309 let remaining = limit
1310 .filter(|&l| l > 0) .map(|l| (l as usize).saturating_sub(out.len()))
1312 .unwrap_or(page_ceiling);
1313 let page_cap = remaining.min(page_ceiling);
1314
1315 let mut p = GetCandlesticksParamsBuilder::default();
1316 p.inst_id(symbol.as_str())
1317 .bar(&bar_param)
1318 .limit(page_cap as u32);
1319
1320 let mut req_used_before = false;
1322
1323 match mode {
1324 Mode::Latest => {
1325 if have_latest_first_page && let Some(b) = before_ms {
1326 p.before_ms(b);
1327 req_used_before = true;
1328 }
1329 }
1330 Mode::Backward => {
1331 if let Some(b) = before_ms {
1332 p.before_ms(b);
1333 req_used_before = true;
1334 }
1335 }
1336 Mode::Range => {
1337 if pages == 0 && !using_history {
1340 } else if forward_prepend_mode {
1343 if let Some(b) = before_ms {
1344 p.before_ms(b);
1345 req_used_before = true;
1346 }
1347 } else if let Some(a) = after_ms {
1348 p.after_ms(a);
1349 }
1350 }
1351 }
1352
1353 let params = p.build().map_err(anyhow::Error::new)?;
1354
1355 let mut raw = if using_history {
1356 self.inner
1357 .http_get_candlesticks_history(params.clone())
1358 .await
1359 .map_err(anyhow::Error::new)?
1360 } else {
1361 self.inner
1362 .http_get_candlesticks(params.clone())
1363 .await
1364 .map_err(anyhow::Error::new)?
1365 };
1366
1367 if raw.is_empty() {
1369 if matches!(mode, Mode::Latest)
1371 && have_latest_first_page
1372 && !using_history
1373 && let Some(b) = before_ms
1374 {
1375 let mut p2 = GetCandlesticksParamsBuilder::default();
1376 p2.inst_id(symbol.as_str())
1377 .bar(&bar_param)
1378 .limit(page_cap as u32);
1379 p2.before_ms(b);
1380 let params2 = p2.build().map_err(anyhow::Error::new)?;
1381 let raw2 = self
1382 .inner
1383 .http_get_candlesticks_history(params2)
1384 .await
1385 .map_err(anyhow::Error::new)?;
1386 if !raw2.is_empty() {
1387 raw = raw2;
1388 } else {
1389 let jump = (page_cap as i64).saturating_mul(slot_ms.max(1));
1391 before_ms = Some(b.saturating_sub(jump));
1392 progressless_loops = progressless_loops.saturating_add(1);
1393 if progressless_loops >= 3 {
1394 break;
1395 }
1396 continue;
1397 }
1398 }
1399
1400 if raw.is_empty() && matches!(mode, Mode::Range) && pages > 0 {
1404 let backstep_ms = (page_cap as i64).saturating_mul(slot_ms.max(1));
1405 let pivot_back = after_ms.unwrap_or(now_ms).saturating_sub(backstep_ms);
1406
1407 let mut p2 = GetCandlesticksParamsBuilder::default();
1408 p2.inst_id(symbol.as_str())
1409 .bar(&bar_param)
1410 .limit(page_cap as u32)
1411 .before_ms(pivot_back);
1412 let params2 = p2.build().map_err(anyhow::Error::new)?;
1413 let raw2 = if (now_ms.saturating_sub(pivot_back)) / (24 * 60 * 60 * 1000)
1414 > HISTORY_SPLIT_DAYS
1415 {
1416 self.inner.http_get_candlesticks_history(params2).await
1417 } else {
1418 self.inner.http_get_candlesticks(params2).await
1419 }
1420 .map_err(anyhow::Error::new)?;
1421 if raw2.is_empty() {
1422 break;
1423 } else {
1424 raw = raw2;
1425 forward_prepend_mode = true;
1426 req_used_before = true;
1427 }
1428 }
1429
1430 if raw.is_empty()
1432 && matches!(mode, Mode::Latest)
1433 && !have_latest_first_page
1434 && !using_history
1435 {
1436 let jump_days_ms = (HISTORY_SPLIT_DAYS + 1) * 86_400_000;
1437 before_ms = Some(now_ms.saturating_sub(jump_days_ms));
1438 have_latest_first_page = true;
1439 continue;
1440 }
1441
1442 if raw.is_empty() {
1444 break;
1445 }
1446 }
1447 pages += 1;
1450
1451 let ts_init = self.generate_ts_init();
1453 let mut page: Vec<Bar> = Vec::with_capacity(raw.len());
1454 for r in &raw {
1455 page.push(parse_candlestick(
1456 r,
1457 bar_type,
1458 inst.price_precision(),
1459 inst.size_precision(),
1460 ts_init,
1461 )?);
1462 }
1463 page.reverse();
1464
1465 let page_oldest_ms = page.first().map(|b| b.ts_event.as_i64() / 1_000_000);
1466 let page_newest_ms = page.last().map(|b| b.ts_event.as_i64() / 1_000_000);
1467
1468 let mut filtered: Vec<Bar> = if matches!(mode, Mode::Range)
1472 && out.is_empty()
1473 && pages < 2
1474 {
1475 let tolerance_ns = slot_ns * 2; if !page.is_empty() {
1482 tracing::debug!(
1483 "Range mode bootstrap page: {} bars from {} to {}, filtering with start={:?} end={:?}",
1484 page.len(),
1485 page.first().unwrap().ts_event.as_i64() / 1_000_000,
1486 page.last().unwrap().ts_event.as_i64() / 1_000_000,
1487 start_ms,
1488 end_ms
1489 );
1490 }
1491
1492 let result: Vec<Bar> = page
1493 .clone()
1494 .into_iter()
1495 .filter(|b| {
1496 let ts = b.ts_event.as_i64();
1497 let ok_after =
1499 start_ns.is_none_or(|sns| ts >= sns.saturating_sub(tolerance_ns));
1500 let ok_before = end_ns.is_none_or(|ens| ts <= ens);
1501 ok_after && ok_before
1502 })
1503 .collect();
1504
1505 result
1506 } else {
1507 page.clone()
1509 .into_iter()
1510 .filter(|b| {
1511 let ts = b.ts_event.as_i64();
1512 let ok_after = start_ns.is_none_or(|sns| ts >= sns);
1513 let ok_before = end_ns.is_none_or(|ens| ts <= ens);
1514 ok_after && ok_before
1515 })
1516 .collect()
1517 };
1518
1519 if !page.is_empty() && filtered.is_empty() {
1520 if matches!(mode, Mode::Range)
1522 && !forward_prepend_mode
1523 && let (Some(newest_ms), Some(start_ms)) = (page_newest_ms, start_ms)
1524 && newest_ms < start_ms.saturating_sub(slot_ms * 2)
1525 {
1526 break;
1528 }
1529 }
1530
1531 let contribution;
1533
1534 if out.is_empty() {
1535 contribution = filtered.len();
1536 out = filtered;
1537 } else {
1538 match mode {
1539 Mode::Backward | Mode::Latest => {
1540 if let Some(first) = out.first() {
1541 filtered.retain(|b| b.ts_event < first.ts_event);
1542 }
1543 contribution = filtered.len();
1544 if contribution != 0 {
1545 let mut new_out = Vec::with_capacity(out.len() + filtered.len());
1546 new_out.extend_from_slice(&filtered);
1547 new_out.extend_from_slice(&out);
1548 out = new_out;
1549 }
1550 }
1551 Mode::Range => {
1552 if forward_prepend_mode || req_used_before {
1553 if let Some(first) = out.first() {
1555 filtered.retain(|b| b.ts_event < first.ts_event);
1556 }
1557 contribution = filtered.len();
1558 if contribution != 0 {
1559 let mut new_out = Vec::with_capacity(out.len() + filtered.len());
1560 new_out.extend_from_slice(&filtered);
1561 new_out.extend_from_slice(&out);
1562 out = new_out;
1563 }
1564 } else {
1565 if let Some(last) = out.last() {
1567 filtered.retain(|b| b.ts_event > last.ts_event);
1568 }
1569 contribution = filtered.len();
1570 out.extend(filtered);
1571 }
1572 }
1573 }
1574 }
1575
1576 if contribution == 0
1578 && matches!(mode, Mode::Latest | Mode::Backward)
1579 && let Some(b) = before_ms
1580 {
1581 let jump = (page_cap as i64).saturating_mul(slot_ms.max(1));
1582 let new_b = b.saturating_sub(jump);
1583 if new_b != b {
1584 before_ms = Some(new_b);
1585 }
1586 }
1587
1588 if contribution == 0 {
1589 progressless_loops = progressless_loops.saturating_add(1);
1590 if progressless_loops >= 3 {
1591 break;
1592 }
1593 } else {
1594 progressless_loops = 0;
1595
1596 match mode {
1598 Mode::Latest | Mode::Backward => {
1599 if let Some(oldest) = page_oldest_ms {
1600 before_ms = Some(oldest.saturating_sub(1));
1601 have_latest_first_page = true;
1602 } else {
1603 break;
1604 }
1605 }
1606 Mode::Range => {
1607 if forward_prepend_mode || req_used_before {
1608 if let Some(oldest) = page_oldest_ms {
1609 let jump_back = slot_ms.max(60_000); before_ms = Some(oldest.saturating_sub(jump_back));
1612 after_ms = None;
1613 } else {
1614 break;
1615 }
1616 } else if let Some(newest) = page_newest_ms {
1617 after_ms = Some(newest.saturating_add(1));
1618 before_ms = None;
1619 } else {
1620 break;
1621 }
1622 }
1623 }
1624 }
1625
1626 if let Some(lim) = limit
1628 && lim > 0
1629 && out.len() >= lim as usize
1630 {
1631 break;
1632 }
1633 if let Some(ens) = end_ns
1634 && let Some(last) = out.last()
1635 && last.ts_event.as_i64() >= ens
1636 {
1637 break;
1638 }
1639 if let Some(sns) = start_ns
1640 && let Some(first) = out.first()
1641 && (matches!(mode, Mode::Backward) || forward_prepend_mode)
1642 && first.ts_event.as_i64() <= sns
1643 {
1644 if matches!(mode, Mode::Range) {
1646 if let Some(ens) = end_ns
1648 && let Some(last) = out.last()
1649 {
1650 let last_ts = last.ts_event.as_i64();
1651 if last_ts < ens {
1652 forward_prepend_mode = false;
1655 after_ms = Some((last_ts / 1_000_000).saturating_add(1));
1656 before_ms = None;
1657 continue;
1658 }
1659 }
1660 }
1661 break;
1662 }
1663
1664 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1665 }
1666
1667 if out.is_empty() && matches!(mode, Mode::Range) {
1669 let pivot = end_ms.unwrap_or(now_ms.saturating_sub(1));
1670 let hist = (now_ms.saturating_sub(pivot)) / (24 * 60 * 60 * 1000) > HISTORY_SPLIT_DAYS;
1671 let mut p = GetCandlesticksParamsBuilder::default();
1672 p.inst_id(symbol.as_str())
1673 .bar(&bar_param)
1674 .limit(300)
1675 .before_ms(pivot);
1676 let params = p.build().map_err(anyhow::Error::new)?;
1677 let raw = if hist {
1678 self.inner.http_get_candlesticks_history(params).await
1679 } else {
1680 self.inner.http_get_candlesticks(params).await
1681 }
1682 .map_err(anyhow::Error::new)?;
1683 if !raw.is_empty() {
1684 let ts_init = self.generate_ts_init();
1685 let mut page: Vec<Bar> = Vec::with_capacity(raw.len());
1686 for r in &raw {
1687 page.push(parse_candlestick(
1688 r,
1689 bar_type,
1690 inst.price_precision(),
1691 inst.size_precision(),
1692 ts_init,
1693 )?);
1694 }
1695 page.reverse();
1696 out = page
1697 .into_iter()
1698 .filter(|b| {
1699 let ts = b.ts_event.as_i64();
1700 let ok_after = start_ns.is_none_or(|sns| ts >= sns);
1701 let ok_before = end_ns.is_none_or(|ens| ts <= ens);
1702 ok_after && ok_before
1703 })
1704 .collect();
1705 }
1706 }
1707
1708 if let Some(ens) = end_ns {
1710 while out.last().is_some_and(|b| b.ts_event.as_i64() > ens) {
1711 out.pop();
1712 }
1713 }
1714
1715 if matches!(mode, Mode::Range)
1717 && !forward_prepend_mode
1718 && let Some(sns) = start_ns
1719 {
1720 let lower = sns.saturating_sub(slot_ns);
1721 while out.first().is_some_and(|b| b.ts_event.as_i64() < lower) {
1722 out.remove(0);
1723 }
1724 }
1725
1726 if let Some(lim) = limit
1727 && lim > 0
1728 && out.len() > lim as usize
1729 {
1730 out.truncate(lim as usize);
1731 }
1732
1733 Ok(out)
1734 }
1735
1736 #[allow(clippy::too_many_arguments)]
1743 pub async fn request_order_status_reports(
1744 &self,
1745 account_id: AccountId,
1746 instrument_type: Option<OKXInstrumentType>,
1747 instrument_id: Option<InstrumentId>,
1748 start: Option<DateTime<Utc>>,
1749 end: Option<DateTime<Utc>>,
1750 open_only: bool,
1751 limit: Option<u32>,
1752 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1753 let mut history_params = GetOrderHistoryParamsBuilder::default();
1755
1756 let instrument_type = if let Some(instrument_type) = instrument_type {
1757 instrument_type
1758 } else {
1759 let instrument_id = instrument_id.ok_or_else(|| {
1760 anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
1761 })?;
1762 let instrument = self
1763 .instrument_or_fetch(instrument_id.symbol.inner())
1764 .await?;
1765 okx_instrument_type(&instrument)?
1766 };
1767
1768 history_params.inst_type(instrument_type);
1769
1770 if let Some(instrument_id) = instrument_id.as_ref() {
1771 history_params.inst_id(instrument_id.symbol.inner().to_string());
1772 }
1773
1774 if let Some(limit) = limit {
1775 history_params.limit(limit);
1776 }
1777
1778 let history_params = history_params.build().map_err(|e| anyhow::anyhow!(e))?;
1779
1780 let mut pending_params = GetOrderListParamsBuilder::default();
1782 pending_params.inst_type(instrument_type);
1783
1784 if let Some(instrument_id) = instrument_id.as_ref() {
1785 pending_params.inst_id(instrument_id.symbol.inner().to_string());
1786 }
1787
1788 if let Some(limit) = limit {
1789 pending_params.limit(limit);
1790 }
1791
1792 let pending_params = pending_params.build().map_err(|e| anyhow::anyhow!(e))?;
1793
1794 let combined_resp = if open_only {
1795 self.inner
1797 .http_get_order_list(pending_params)
1798 .await
1799 .map_err(|e| anyhow::anyhow!(e))?
1800 } else {
1801 let (history_resp, pending_resp) = tokio::try_join!(
1803 self.inner.http_get_order_history(history_params),
1804 self.inner.http_get_order_list(pending_params)
1805 )
1806 .map_err(|e| anyhow::anyhow!(e))?;
1807
1808 let mut combined_resp = history_resp;
1810 combined_resp.extend(pending_resp);
1811 combined_resp
1812 };
1813
1814 let start_ns = start.map(UnixNanos::from);
1816 let end_ns = end.map(UnixNanos::from);
1817
1818 let ts_init = self.generate_ts_init();
1819 let mut reports = Vec::with_capacity(combined_resp.len());
1820
1821 let mut seen = AHashSet::new();
1823
1824 for order in combined_resp {
1825 if seen.contains(&order.cl_ord_id) {
1826 continue; }
1828 seen.insert(order.cl_ord_id);
1829
1830 let inst = self.instrument_or_fetch(order.inst_id).await?;
1831
1832 let report = parse_order_status_report(
1833 &order,
1834 account_id,
1835 inst.id(),
1836 inst.price_precision(),
1837 inst.size_precision(),
1838 ts_init,
1839 );
1840
1841 if let Some(start_ns) = start_ns
1842 && report.ts_last < start_ns
1843 {
1844 continue;
1845 }
1846 if let Some(end_ns) = end_ns
1847 && report.ts_last > end_ns
1848 {
1849 continue;
1850 }
1851
1852 reports.push(report);
1853 }
1854
1855 Ok(reports)
1856 }
1857
1858 pub async fn request_fill_reports(
1864 &self,
1865 account_id: AccountId,
1866 instrument_type: Option<OKXInstrumentType>,
1867 instrument_id: Option<InstrumentId>,
1868 start: Option<DateTime<Utc>>,
1869 end: Option<DateTime<Utc>>,
1870 limit: Option<u32>,
1871 ) -> anyhow::Result<Vec<FillReport>> {
1872 let mut params = GetTransactionDetailsParamsBuilder::default();
1873
1874 let instrument_type = if let Some(instrument_type) = instrument_type {
1875 instrument_type
1876 } else {
1877 let instrument_id = instrument_id.ok_or_else(|| {
1878 anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
1879 })?;
1880 let instrument = self
1881 .instrument_or_fetch(instrument_id.symbol.inner())
1882 .await?;
1883 okx_instrument_type(&instrument)?
1884 };
1885
1886 params.inst_type(instrument_type);
1887
1888 if let Some(instrument_id) = instrument_id {
1889 let instrument = self
1890 .instrument_or_fetch(instrument_id.symbol.inner())
1891 .await?;
1892 let instrument_type = okx_instrument_type(&instrument)?;
1893 params.inst_type(instrument_type);
1894 params.inst_id(instrument_id.symbol.inner().to_string());
1895 }
1896
1897 if let Some(limit) = limit {
1898 params.limit(limit);
1899 }
1900
1901 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1902
1903 let resp = self
1904 .inner
1905 .http_get_transaction_details(params)
1906 .await
1907 .map_err(|e| anyhow::anyhow!(e))?;
1908
1909 let start_ns = start.map(UnixNanos::from);
1911 let end_ns = end.map(UnixNanos::from);
1912
1913 let ts_init = self.generate_ts_init();
1914 let mut reports = Vec::with_capacity(resp.len());
1915
1916 for detail in resp {
1917 let inst = self.instrument_or_fetch(detail.inst_id).await?;
1918
1919 let report = parse_fill_report(
1920 detail,
1921 account_id,
1922 inst.id(),
1923 inst.price_precision(),
1924 inst.size_precision(),
1925 ts_init,
1926 )?;
1927
1928 if let Some(start_ns) = start_ns
1929 && report.ts_event < start_ns
1930 {
1931 continue;
1932 }
1933
1934 if let Some(end_ns) = end_ns
1935 && report.ts_event > end_ns
1936 {
1937 continue;
1938 }
1939
1940 reports.push(report);
1941 }
1942
1943 Ok(reports)
1944 }
1945
1946 pub async fn request_position_status_reports(
1952 &self,
1953 account_id: AccountId,
1954 instrument_type: Option<OKXInstrumentType>,
1955 instrument_id: Option<InstrumentId>,
1956 ) -> anyhow::Result<Vec<PositionStatusReport>> {
1957 let mut params = GetPositionsParamsBuilder::default();
1958
1959 let instrument_type = if let Some(instrument_type) = instrument_type {
1960 instrument_type
1961 } else {
1962 let instrument_id = instrument_id.ok_or_else(|| {
1963 anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
1964 })?;
1965 let instrument = self
1966 .instrument_or_fetch(instrument_id.symbol.inner())
1967 .await?;
1968 okx_instrument_type(&instrument)?
1969 };
1970
1971 params.inst_type(instrument_type);
1972
1973 instrument_id
1974 .as_ref()
1975 .map(|i| params.inst_id(i.symbol.inner()));
1976
1977 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1978
1979 let resp = self
1980 .inner
1981 .http_get_positions(params)
1982 .await
1983 .map_err(|e| anyhow::anyhow!(e))?;
1984
1985 let ts_init = self.generate_ts_init();
1986 let mut reports = Vec::with_capacity(resp.len());
1987
1988 for position in resp {
1989 let inst = self.instrument_or_fetch(position.inst_id).await?;
1990
1991 let report = parse_position_status_report(
1992 position,
1993 account_id,
1994 inst.id(),
1995 inst.size_precision(),
1996 ts_init,
1997 );
1998 reports.push(report);
1999 }
2000
2001 Ok(reports)
2002 }
2003}