1use std::{
37 collections::HashMap,
38 fmt::Debug,
39 num::NonZeroU32,
40 sync::{Arc, LazyLock, Mutex},
41};
42
43use ahash::AHashSet;
44use chrono::{DateTime, Utc};
45use nautilus_core::{
46 UnixNanos, consts::NAUTILUS_USER_AGENT, env::get_env_var, time::get_atomic_clock_realtime,
47};
48use nautilus_model::{
49 data::{Bar, BarType, IndexPriceUpdate, MarkPriceUpdate, TradeTick},
50 enums::{AggregationSource, BarAggregation},
51 events::AccountState,
52 identifiers::{AccountId, InstrumentId},
53 instruments::{Instrument, InstrumentAny},
54 reports::{FillReport, OrderStatusReport, PositionStatusReport},
55};
56use nautilus_network::{http::HttpClient, ratelimiter::quota::Quota};
57use reqwest::{Method, StatusCode, header::USER_AGENT};
58use serde::{Deserialize, Serialize, de::DeserializeOwned};
59use ustr::Ustr;
60
61use super::{
62 error::OKXHttpError,
63 models::{
64 OKXAccount, OKXIndexTicker, OKXMarkPrice, OKXOrderHistory, OKXPosition, OKXPositionHistory,
65 OKXPositionTier, OKXTransactionDetail,
66 },
67 query::{
68 GetCandlesticksParams, GetCandlesticksParamsBuilder, GetIndexTickerParams,
69 GetIndexTickerParamsBuilder, GetInstrumentsParams, GetInstrumentsParamsBuilder,
70 GetMarkPriceParams, GetMarkPriceParamsBuilder, GetOrderHistoryParams,
71 GetOrderHistoryParamsBuilder, GetOrderListParams, GetOrderListParamsBuilder,
72 GetPositionTiersParams, GetPositionsHistoryParams, GetPositionsParams,
73 GetPositionsParamsBuilder, GetTradesParams, GetTradesParamsBuilder,
74 GetTransactionDetailsParams, GetTransactionDetailsParamsBuilder, SetPositionModeParams,
75 SetPositionModeParamsBuilder,
76 },
77};
78use crate::{
79 common::{
80 consts::OKX_HTTP_URL,
81 credential::Credential,
82 enums::{OKXInstrumentType, OKXPositionMode},
83 models::OKXInstrument,
84 parse::{
85 okx_instrument_type, parse_account_state, parse_candlestick, parse_fill_report,
86 parse_index_price_update, parse_instrument_any, parse_mark_price_update,
87 parse_order_status_report, parse_position_status_report, parse_trade_tick,
88 },
89 },
90 http::{
91 models::{OKXCandlestick, OKXTrade},
92 query::{GetOrderParams, GetPendingOrdersParams},
93 },
94};
95
96const OKX_SUCCESS_CODE: &str = "0";
97
98pub static OKX_REST_QUOTA: LazyLock<Quota> =
107 LazyLock::new(|| Quota::per_second(NonZeroU32::new(250).unwrap()));
108
109#[derive(Debug, Serialize, Deserialize)]
111pub struct OKXResponse<T> {
112 pub code: String,
114 pub msg: String,
116 pub data: Vec<T>,
118}
119
120pub struct OKXHttpInnerClient {
126 base_url: String,
127 client: HttpClient,
128 credential: Option<Credential>,
129}
130
131impl Default for OKXHttpInnerClient {
132 fn default() -> Self {
133 Self::new(None, Some(60))
134 }
135}
136
137impl Debug for OKXHttpInnerClient {
138 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139 let credential = self.credential.as_ref().map(|_| "<redacted>");
140 f.debug_struct(stringify!(OKXHttpInnerClient))
141 .field("base_url", &self.base_url)
142 .field("credential", &credential)
143 .finish_non_exhaustive()
144 }
145}
146
147impl OKXHttpInnerClient {
148 pub fn new(base_url: Option<String>, timeout_secs: Option<u64>) -> Self {
154 Self {
155 base_url: base_url.unwrap_or(OKX_HTTP_URL.to_string()),
156 client: HttpClient::new(
157 Self::default_headers(),
158 vec![],
159 vec![],
160 Some(*OKX_REST_QUOTA),
161 timeout_secs,
162 ),
163 credential: None,
164 }
165 }
166
167 pub fn with_credentials(
170 api_key: String,
171 api_secret: String,
172 api_passphrase: String,
173 base_url: String,
174 timeout_secs: Option<u64>,
175 ) -> Self {
176 Self {
177 base_url,
178 client: HttpClient::new(
179 Self::default_headers(),
180 vec![],
181 vec![],
182 Some(*OKX_REST_QUOTA),
183 timeout_secs,
184 ),
185 credential: Some(Credential::new(api_key, api_secret, api_passphrase)),
186 }
187 }
188
189 fn default_headers() -> HashMap<String, String> {
191 HashMap::from([(USER_AGENT.to_string(), NAUTILUS_USER_AGENT.to_string())])
192 }
193
194 fn build_path<S: Serialize>(base: &str, params: &S) -> Result<String, OKXHttpError> {
200 let query = serde_urlencoded::to_string(params)
201 .map_err(|e| OKXHttpError::JsonError(e.to_string()))?;
202 if query.is_empty() {
203 Ok(base.to_owned())
204 } else {
205 Ok(format!("{base}?{query}"))
206 }
207 }
208
209 fn sign_request(
216 &self,
217 method: &Method,
218 path: &str,
219 body: Option<&[u8]>,
220 ) -> Result<HashMap<String, String>, OKXHttpError> {
221 let credential = match self.credential.as_ref() {
222 Some(c) => c,
223 None => return Err(OKXHttpError::MissingCredentials),
224 };
225
226 let body_str = body
227 .and_then(|b| String::from_utf8(b.to_vec()).ok())
228 .unwrap_or_default();
229
230 tracing::debug!("{method} {path}");
231
232 let api_key = credential.api_key.clone().to_string();
233 let api_passphrase = credential.api_passphrase.clone();
234 let timestamp = Utc::now().format("%Y-%m-%dT%H:%M:%S.%3fZ").to_string();
235 let signature = credential.sign(×tamp, method.as_str(), path, &body_str);
236
237 let mut headers = HashMap::new();
238 headers.insert("OK-ACCESS-KEY".to_string(), api_key);
239 headers.insert("OK-ACCESS-PASSPHRASE".to_string(), api_passphrase);
240 headers.insert("OK-ACCESS-TIMESTAMP".to_string(), timestamp);
241 headers.insert("OK-ACCESS-SIGN".to_string(), signature);
242
243 Ok(headers)
244 }
245
246 async fn send_request<T: DeserializeOwned>(
261 &self,
262 method: Method,
263 path: &str,
264 body: Option<Vec<u8>>,
265 authenticate: bool,
266 ) -> Result<Vec<T>, OKXHttpError> {
267 let url = format!("{}{path}", self.base_url);
268
269 let mut headers = if authenticate {
270 self.sign_request(&method, path, body.as_deref())?
271 } else {
272 HashMap::new()
273 };
274
275 if body.is_some() {
277 headers.insert("Content-Type".to_string(), "application/json".to_string());
278 }
279
280 let resp = self
281 .client
282 .request(method.clone(), url, Some(headers), body, None, None)
283 .await?;
284
285 tracing::trace!("Response: {resp:?}");
286
287 if resp.status.is_success() {
288 let okx_response: OKXResponse<T> = serde_json::from_slice(&resp.body).map_err(|e| {
289 tracing::error!("Failed to deserialize OKXResponse: {e}");
290 OKXHttpError::JsonError(e.to_string())
291 })?;
292
293 if okx_response.code != OKX_SUCCESS_CODE {
294 return Err(OKXHttpError::OkxError {
295 error_code: okx_response.code,
296 message: okx_response.msg,
297 });
298 }
299
300 Ok(okx_response.data)
301 } else {
302 let error_body = String::from_utf8_lossy(&resp.body);
303 tracing::error!(
304 "HTTP error {} with body: {error_body}",
305 resp.status.as_str()
306 );
307
308 if let Ok(parsed_error) = serde_json::from_slice::<OKXResponse<T>>(&resp.body) {
309 return Err(OKXHttpError::OkxError {
310 error_code: parsed_error.code,
311 message: parsed_error.msg,
312 });
313 }
314
315 Err(OKXHttpError::UnexpectedStatus {
316 status: StatusCode::from_u16(resp.status.as_u16()).unwrap(),
317 body: error_body.to_string(),
318 })
319 }
320 }
321
322 pub async fn http_set_position_mode(
333 &self,
334 params: SetPositionModeParams,
335 ) -> Result<Vec<serde_json::Value>, OKXHttpError> {
336 let path = "/api/v5/account/set-position-mode";
337 let body = serde_json::to_vec(¶ms)?;
338 self.send_request(Method::POST, path, Some(body), true)
339 .await
340 }
341
342 pub async fn http_get_position_tiers(
353 &self,
354 params: GetPositionTiersParams,
355 ) -> Result<Vec<OKXPositionTier>, OKXHttpError> {
356 let path = Self::build_path("/api/v5/public/position-tiers", ¶ms)?;
357 self.send_request(Method::GET, &path, None, false).await
358 }
359
360 pub async fn http_get_instruments(
371 &self,
372 params: GetInstrumentsParams,
373 ) -> Result<Vec<OKXInstrument>, OKXHttpError> {
374 let path = Self::build_path("/api/v5/public/instruments", ¶ms)?;
375 self.send_request(Method::GET, &path, None, false).await
376 }
377
378 pub async fn http_get_mark_price(
392 &self,
393 params: GetMarkPriceParams,
394 ) -> Result<Vec<OKXMarkPrice>, OKXHttpError> {
395 let path = Self::build_path("/api/v5/public/mark-price", ¶ms)?;
396 self.send_request(Method::GET, &path, None, false).await
397 }
398
399 pub async fn http_get_index_ticker(
405 &self,
406 params: GetIndexTickerParams,
407 ) -> Result<Vec<OKXIndexTicker>, OKXHttpError> {
408 let path = Self::build_path("/api/v5/market/index-tickers", ¶ms)?;
409 self.send_request(Method::GET, &path, None, false).await
410 }
411
412 pub async fn http_get_trades(
418 &self,
419 params: GetTradesParams,
420 ) -> Result<Vec<OKXTrade>, OKXHttpError> {
421 let path = Self::build_path("/api/v5/market/history-trades", ¶ms)?;
422 self.send_request(Method::GET, &path, None, false).await
423 }
424
425 pub async fn http_get_candlesticks(
431 &self,
432 params: GetCandlesticksParams,
433 ) -> Result<Vec<OKXCandlestick>, OKXHttpError> {
434 let path = Self::build_path("/api/v5/market/candles", ¶ms)?;
435 self.send_request(Method::GET, &path, None, false).await
436 }
437
438 pub async fn http_get_candlesticks_history(
444 &self,
445 params: GetCandlesticksParams,
446 ) -> Result<Vec<OKXCandlestick>, OKXHttpError> {
447 let path = Self::build_path("/api/v5/market/history-candles", ¶ms)?;
448 self.send_request(Method::GET, &path, None, false).await
449 }
450
451 pub async fn http_get_pending_orders(
457 &self,
458 params: GetPendingOrdersParams,
459 ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
460 let path = Self::build_path("/api/v5/trade/orders-pending", ¶ms)?;
461 self.send_request(Method::GET, &path, None, true).await
462 }
463
464 pub async fn http_get_order(
470 &self,
471 params: GetOrderParams,
472 ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
473 let path = Self::build_path("/api/v5/trade/order", ¶ms)?;
474 self.send_request(Method::GET, &path, None, true).await
475 }
476
477 pub async fn http_get_balance(&self) -> Result<Vec<OKXAccount>, OKXHttpError> {
484 let path = "/api/v5/account/balance";
485 self.send_request(Method::GET, path, None, true).await
486 }
487
488 pub async fn http_get_order_history(
494 &self,
495 params: GetOrderHistoryParams,
496 ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
497 let path = Self::build_path("/api/v5/trade/orders-history", ¶ms)?;
498 self.send_request(Method::GET, &path, None, true).await
499 }
500
501 pub async fn http_get_order_list(
507 &self,
508 params: GetOrderListParams,
509 ) -> Result<Vec<OKXOrderHistory>, OKXHttpError> {
510 let path = Self::build_path("/api/v5/trade/orders-pending", ¶ms)?;
511 self.send_request(Method::GET, &path, None, true).await
512 }
513
514 pub async fn http_get_positions(
522 &self,
523 params: GetPositionsParams,
524 ) -> Result<Vec<OKXPosition>, OKXHttpError> {
525 let path = Self::build_path("/api/v5/account/positions", ¶ms)?;
526 self.send_request(Method::GET, &path, None, true).await
527 }
528
529 pub async fn http_get_position_history(
535 &self,
536 params: GetPositionsHistoryParams,
537 ) -> Result<Vec<OKXPositionHistory>, OKXHttpError> {
538 let path = Self::build_path("/api/v5/account/positions-history", ¶ms)?;
539 self.send_request(Method::GET, &path, None, true).await
540 }
541
542 pub async fn http_get_transaction_details(
548 &self,
549 params: GetTransactionDetailsParams,
550 ) -> Result<Vec<OKXTransactionDetail>, OKXHttpError> {
551 let path = Self::build_path("/api/v5/trade/fills", ¶ms)?;
552 self.send_request(Method::GET, &path, None, true).await
553 }
554}
555
556#[derive(Clone, Debug)]
561#[cfg_attr(
562 feature = "python",
563 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.adapters")
564)]
565pub struct OKXHttpClient {
566 pub(crate) inner: Arc<OKXHttpInnerClient>,
567 pub(crate) instruments_cache: Arc<Mutex<HashMap<Ustr, InstrumentAny>>>,
568 cache_initialized: bool,
569}
570
571impl Default for OKXHttpClient {
572 fn default() -> Self {
573 Self::new(None, Some(60))
574 }
575}
576
577impl OKXHttpClient {
578 pub fn new(base_url: Option<String>, timeout_secs: Option<u64>) -> Self {
584 Self {
585 inner: Arc::new(OKXHttpInnerClient::new(base_url, timeout_secs)),
586 instruments_cache: Arc::new(Mutex::new(HashMap::new())),
587 cache_initialized: false,
588 }
589 }
590
591 pub fn from_env() -> anyhow::Result<Self> {
594 Self::with_credentials(None, None, None, None, None)
595 }
596
597 pub fn with_credentials(
600 api_key: Option<String>,
601 api_secret: Option<String>,
602 api_passphrase: Option<String>,
603 base_url: Option<String>,
604 timeout_secs: Option<u64>,
605 ) -> anyhow::Result<Self> {
606 let api_key = api_key.unwrap_or(get_env_var("OKX_API_KEY")?);
607 let api_secret = api_secret.unwrap_or(get_env_var("OKX_API_SECRET")?);
608 let api_passphrase = api_passphrase.unwrap_or(get_env_var("OKX_API_PASSPHRASE")?);
609 let base_url = base_url.unwrap_or(OKX_HTTP_URL.to_string());
610
611 Ok(Self {
612 inner: Arc::new(OKXHttpInnerClient::with_credentials(
613 api_key,
614 api_secret,
615 api_passphrase,
616 base_url,
617 timeout_secs,
618 )),
619 instruments_cache: Arc::new(Mutex::new(HashMap::new())),
620 cache_initialized: false,
621 })
622 }
623
624 fn get_instrument_from_cache(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
630 self.instruments_cache
631 .lock()
632 .expect("`instruments_cache` lock poisoned")
633 .get(&symbol)
634 .cloned()
635 .ok_or_else(|| anyhow::anyhow!("Instrument {symbol} not in cache"))
636 }
637
638 async fn instrument_or_fetch(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
639 if let Ok(inst) = self.get_instrument_from_cache(symbol) {
640 return Ok(inst);
641 }
642
643 for group in [
644 OKXInstrumentType::Spot,
645 OKXInstrumentType::Margin,
646 OKXInstrumentType::Futures,
647 ] {
648 if let Ok(instruments) = self.request_instruments(group).await {
649 let mut guard = self.instruments_cache.lock().unwrap();
650 for inst in instruments {
651 guard.insert(inst.raw_symbol().inner(), inst);
652 }
653 drop(guard);
654
655 if let Ok(inst) = self.get_instrument_from_cache(symbol) {
656 return Ok(inst);
657 }
658 }
659 }
660
661 anyhow::bail!("Instrument {symbol} not in cache and fetch failed");
662 }
663
664 pub fn base_url(&self) -> &str {
666 self.inner.base_url.as_str()
667 }
668
669 pub fn api_key(&self) -> Option<&str> {
671 self.inner.credential.as_ref().map(|c| c.api_key.as_str())
672 }
673
674 #[must_use]
678 pub const fn is_initialized(&self) -> bool {
679 self.cache_initialized
680 }
681
682 fn generate_ts_init(&self) -> UnixNanos {
684 get_atomic_clock_realtime().get_time_ns()
685 }
686
687 #[must_use]
689 pub fn get_cached_symbols(&self) -> Vec<String> {
697 self.instruments_cache
698 .lock()
699 .unwrap()
700 .keys()
701 .map(std::string::ToString::to_string)
702 .collect()
703 }
704
705 pub fn add_instruments(&mut self, instruments: Vec<InstrumentAny>) {
714 for inst in instruments {
715 self.instruments_cache
716 .lock()
717 .unwrap()
718 .insert(inst.raw_symbol().inner(), inst);
719 }
720 self.cache_initialized = true;
721 }
722
723 pub fn add_instrument(&mut self, instrument: InstrumentAny) {
732 self.instruments_cache
733 .lock()
734 .unwrap()
735 .insert(instrument.raw_symbol().inner(), instrument);
736 self.cache_initialized = true;
737 }
738
739 pub async fn request_account_state(
745 &self,
746 account_id: AccountId,
747 ) -> anyhow::Result<AccountState> {
748 let resp = self
749 .inner
750 .http_get_balance()
751 .await
752 .map_err(|e| anyhow::anyhow!(e))?;
753
754 let ts_init = self.generate_ts_init();
755 let raw = resp
756 .first()
757 .ok_or_else(|| anyhow::anyhow!("No account state returned from OKX"))?;
758 let account_state = parse_account_state(raw, account_id, ts_init)?;
759
760 Ok(account_state)
761 }
762
763 pub async fn set_position_mode(&self, position_mode: OKXPositionMode) -> anyhow::Result<()> {
776 let mut params = SetPositionModeParamsBuilder::default();
777 params.pos_mode(position_mode);
778 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
779
780 match self.inner.http_set_position_mode(params).await {
781 Ok(_) => Ok(()),
782 Err(e) => {
783 if let crate::http::error::OKXHttpError::OkxError {
785 error_code,
786 message,
787 } = &e
788 && error_code == "50115"
789 {
790 tracing::warn!(
791 "Account does not support position mode setting (derivatives trading not enabled): {message}"
792 );
793 return Ok(()); }
795 anyhow::bail!(e)
796 }
797 }
798 }
799
800 pub async fn request_instruments(
806 &self,
807 instrument_type: OKXInstrumentType,
808 ) -> anyhow::Result<Vec<InstrumentAny>> {
809 let mut params = GetInstrumentsParamsBuilder::default();
810 params.inst_type(instrument_type);
811 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
812
813 let resp = self
814 .inner
815 .http_get_instruments(params)
816 .await
817 .map_err(|e| anyhow::anyhow!(e))?;
818
819 let ts_init = self.generate_ts_init();
820
821 let mut instruments: Vec<InstrumentAny> = Vec::new();
822 for inst in &resp {
823 if let Some(instrument_any) = parse_instrument_any(inst, ts_init)? {
824 instruments.push(instrument_any);
825 }
826 }
827
828 Ok(instruments)
829 }
830
831 pub async fn request_mark_price(
837 &self,
838 instrument_id: InstrumentId,
839 ) -> anyhow::Result<MarkPriceUpdate> {
840 let mut params = GetMarkPriceParamsBuilder::default();
841 params.inst_id(instrument_id.symbol.inner());
842 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
843
844 let resp = self
845 .inner
846 .http_get_mark_price(params)
847 .await
848 .map_err(|e| anyhow::anyhow!(e))?;
849
850 let raw = resp
851 .first()
852 .ok_or_else(|| anyhow::anyhow!("No mark price returned from OKX"))?;
853 let inst = self
854 .instrument_or_fetch(instrument_id.symbol.inner())
855 .await?;
856 let ts_init = self.generate_ts_init();
857
858 let mark_price =
859 parse_mark_price_update(raw, instrument_id, inst.price_precision(), ts_init)
860 .map_err(|e| anyhow::anyhow!(e))?;
861 Ok(mark_price)
862 }
863
864 pub async fn request_index_price(
870 &self,
871 instrument_id: InstrumentId,
872 ) -> anyhow::Result<IndexPriceUpdate> {
873 let mut params = GetIndexTickerParamsBuilder::default();
874 params.inst_id(instrument_id.symbol.inner());
875 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
876
877 let resp = self
878 .inner
879 .http_get_index_ticker(params)
880 .await
881 .map_err(|e| anyhow::anyhow!(e))?;
882
883 let raw = resp
884 .first()
885 .ok_or_else(|| anyhow::anyhow!("No index price returned from OKX"))?;
886 let inst = self
887 .instrument_or_fetch(instrument_id.symbol.inner())
888 .await?;
889 let ts_init = self.generate_ts_init();
890
891 let index_price =
892 parse_index_price_update(raw, instrument_id, inst.price_precision(), ts_init)
893 .map_err(|e| anyhow::anyhow!(e))?;
894 Ok(index_price)
895 }
896
897 pub async fn request_trades(
903 &self,
904 instrument_id: InstrumentId,
905 start: Option<DateTime<Utc>>,
906 end: Option<DateTime<Utc>>,
907 limit: Option<u32>,
908 ) -> anyhow::Result<Vec<TradeTick>> {
909 let mut params = GetTradesParamsBuilder::default();
910
911 params.inst_id(instrument_id.symbol.inner());
912 if let Some(s) = start {
913 params.before(s.timestamp_millis().to_string());
914 }
915 if let Some(e) = end {
916 params.after(e.timestamp_millis().to_string());
917 }
918 if let Some(l) = limit {
919 params.limit(l);
920 }
921
922 let params = params.build().map_err(anyhow::Error::new)?;
923
924 let raw_trades = self
926 .inner
927 .http_get_trades(params)
928 .await
929 .map_err(anyhow::Error::new)?;
930
931 let ts_init = self.generate_ts_init();
932 let inst = self
933 .instrument_or_fetch(instrument_id.symbol.inner())
934 .await?;
935
936 let mut trades = Vec::with_capacity(raw_trades.len());
937 for raw in raw_trades {
938 match parse_trade_tick(
939 &raw,
940 instrument_id,
941 inst.price_precision(),
942 inst.size_precision(),
943 ts_init,
944 ) {
945 Ok(trade) => trades.push(trade),
946 Err(e) => tracing::error!("{e}"),
947 }
948 }
949
950 Ok(trades)
951 }
952
953 pub async fn request_bars(
994 &self,
995 bar_type: BarType,
996 start: Option<DateTime<Utc>>,
997 mut end: Option<DateTime<Utc>>,
998 limit: Option<u32>,
999 ) -> anyhow::Result<Vec<Bar>> {
1000 const HISTORY_SPLIT_DAYS: i64 = 100;
1001 const MAX_PAGES_SOFT: usize = 500;
1002
1003 let limit = if limit == Some(0) { None } else { limit };
1004
1005 anyhow::ensure!(
1006 bar_type.aggregation_source() == AggregationSource::External,
1007 "Only EXTERNAL aggregation is supported"
1008 );
1009 if let (Some(s), Some(e)) = (start, end) {
1010 anyhow::ensure!(s < e, "Invalid time range: start={s:?} end={e:?}");
1011 }
1012
1013 let now = Utc::now();
1014 if let Some(s) = start
1015 && s > now
1016 {
1017 return Ok(Vec::new());
1018 }
1019 if let Some(e) = end
1020 && e > now
1021 {
1022 end = Some(now);
1023 }
1024
1025 let spec = bar_type.spec();
1026 let step = spec.step.get();
1027 let bar_param = match spec.aggregation {
1028 BarAggregation::Second => format!("{step}s"),
1029 BarAggregation::Minute => format!("{step}m"),
1030 BarAggregation::Hour => format!("{step}H"),
1031 BarAggregation::Day => format!("{step}D"),
1032 BarAggregation::Week => format!("{step}W"),
1033 BarAggregation::Month => format!("{step}M"),
1034 a => anyhow::bail!("OKX does not support {a:?} aggregation"),
1035 };
1036
1037 let slot_ms: i64 = match spec.aggregation {
1038 BarAggregation::Second => (step as i64) * 1_000,
1039 BarAggregation::Minute => (step as i64) * 60_000,
1040 BarAggregation::Hour => (step as i64) * 3_600_000,
1041 BarAggregation::Day => (step as i64) * 86_400_000,
1042 BarAggregation::Week => (step as i64) * 7 * 86_400_000,
1043 BarAggregation::Month => (step as i64) * 30 * 86_400_000,
1044 _ => unreachable!("Unsupported aggregation should have been caught above"),
1045 };
1046 let slot_ns: i64 = slot_ms * 1_000_000;
1047
1048 #[derive(Clone, Copy, Debug, PartialEq, Eq)]
1049 enum Mode {
1050 Latest,
1051 Backward,
1052 Range,
1053 }
1054
1055 let mode = match (start, end) {
1056 (None, None) => Mode::Latest,
1057 (Some(_), None) => Mode::Backward, (None, Some(_)) => Mode::Backward,
1059 (Some(_), Some(_)) => Mode::Range,
1060 };
1061
1062 let start_ns = start.and_then(|s| s.timestamp_nanos_opt());
1063 let end_ns = end.and_then(|e| e.timestamp_nanos_opt());
1064
1065 let start_ms = start.map(|s| {
1067 let ms = s.timestamp_millis();
1068 if slot_ms > 0 {
1069 (ms / slot_ms) * slot_ms } else {
1071 ms
1072 }
1073 });
1074 let end_ms = end.map(|e| {
1075 let ms = e.timestamp_millis();
1076 if slot_ms > 0 {
1077 ((ms + slot_ms - 1) / slot_ms) * slot_ms } else {
1079 ms
1080 }
1081 });
1082 let now_ms = now.timestamp_millis();
1083
1084 let symbol = bar_type.instrument_id().symbol;
1085 let inst = self.instrument_or_fetch(symbol.inner()).await?;
1086
1087 let mut out: Vec<Bar> = Vec::new();
1088 let mut pages = 0usize;
1089
1090 let mut after_ms: Option<i64> = None;
1095 let mut before_ms: Option<i64> = match mode {
1096 Mode::Backward => end_ms.map(|v| v.saturating_sub(1)),
1097 Mode::Range => {
1098 Some(end_ms.unwrap_or(now_ms))
1101 }
1102 Mode::Latest => None,
1103 };
1104
1105 let mut forward_prepend_mode = matches!(mode, Mode::Range);
1107
1108 if matches!(mode, Mode::Backward | Mode::Range)
1112 && let Some(b) = before_ms
1113 {
1114 let buffer_ms = slot_ms.max(60_000); if b >= now_ms.saturating_sub(buffer_ms) {
1120 before_ms = Some(now_ms.saturating_sub(buffer_ms));
1121 }
1122 }
1123
1124 let mut have_latest_first_page = false;
1125 let mut progressless_loops = 0u8;
1126
1127 loop {
1128 if let Some(lim) = limit
1129 && lim > 0
1130 && out.len() >= lim as usize
1131 {
1132 break;
1133 }
1134 if pages >= MAX_PAGES_SOFT {
1135 break;
1136 }
1137
1138 let pivot_ms = if let Some(a) = after_ms {
1139 a
1140 } else if let Some(b) = before_ms {
1141 b
1142 } else {
1143 now_ms
1144 };
1145 let age_ms = now_ms.saturating_sub(pivot_ms);
1151 let age_hours = age_ms / (60 * 60 * 1000);
1152 let using_history = age_hours > 1; let page_ceiling = if using_history { 100 } else { 300 };
1155 let remaining = limit
1156 .filter(|&l| l > 0) .map(|l| (l as usize).saturating_sub(out.len()))
1158 .unwrap_or(page_ceiling);
1159 let page_cap = remaining.min(page_ceiling);
1160
1161 let mut p = GetCandlesticksParamsBuilder::default();
1162 p.inst_id(symbol.as_str())
1163 .bar(&bar_param)
1164 .limit(page_cap as u32);
1165
1166 let mut req_used_before = false;
1168
1169 match mode {
1170 Mode::Latest => {
1171 if have_latest_first_page && let Some(b) = before_ms {
1172 p.before_ms(b);
1173 req_used_before = true;
1174 }
1175 }
1176 Mode::Backward => {
1177 if let Some(b) = before_ms {
1178 p.before_ms(b);
1179 req_used_before = true;
1180 }
1181 }
1182 Mode::Range => {
1183 if pages == 0 && !using_history {
1186 } else if forward_prepend_mode {
1189 if let Some(b) = before_ms {
1190 p.before_ms(b);
1191 req_used_before = true;
1192 }
1193 } else if let Some(a) = after_ms {
1194 p.after_ms(a);
1195 }
1196 }
1197 }
1198
1199 let params = p.build().map_err(anyhow::Error::new)?;
1200
1201 let mut raw = if using_history {
1202 self.inner
1203 .http_get_candlesticks_history(params.clone())
1204 .await
1205 .map_err(anyhow::Error::new)?
1206 } else {
1207 self.inner
1208 .http_get_candlesticks(params.clone())
1209 .await
1210 .map_err(anyhow::Error::new)?
1211 };
1212
1213 if raw.is_empty() {
1215 if matches!(mode, Mode::Latest)
1217 && have_latest_first_page
1218 && !using_history
1219 && let Some(b) = before_ms
1220 {
1221 let mut p2 = GetCandlesticksParamsBuilder::default();
1222 p2.inst_id(symbol.as_str())
1223 .bar(&bar_param)
1224 .limit(page_cap as u32);
1225 p2.before_ms(b);
1226 let params2 = p2.build().map_err(anyhow::Error::new)?;
1227 let raw2 = self
1228 .inner
1229 .http_get_candlesticks_history(params2)
1230 .await
1231 .map_err(anyhow::Error::new)?;
1232 if !raw2.is_empty() {
1233 raw = raw2;
1234 } else {
1235 let jump = (page_cap as i64).saturating_mul(slot_ms.max(1));
1237 before_ms = Some(b.saturating_sub(jump));
1238 progressless_loops = progressless_loops.saturating_add(1);
1239 if progressless_loops >= 3 {
1240 break;
1241 }
1242 continue;
1243 }
1244 }
1245
1246 if raw.is_empty() && matches!(mode, Mode::Range) && pages > 0 {
1250 let backstep_ms = (page_cap as i64).saturating_mul(slot_ms.max(1));
1251 let pivot_back = after_ms.unwrap_or(now_ms).saturating_sub(backstep_ms);
1252
1253 let mut p2 = GetCandlesticksParamsBuilder::default();
1254 p2.inst_id(symbol.as_str())
1255 .bar(&bar_param)
1256 .limit(page_cap as u32)
1257 .before_ms(pivot_back);
1258 let params2 = p2.build().map_err(anyhow::Error::new)?;
1259 let raw2 = if (now_ms.saturating_sub(pivot_back)) / (24 * 60 * 60 * 1000)
1260 > HISTORY_SPLIT_DAYS
1261 {
1262 self.inner.http_get_candlesticks_history(params2).await
1263 } else {
1264 self.inner.http_get_candlesticks(params2).await
1265 }
1266 .map_err(anyhow::Error::new)?;
1267 if raw2.is_empty() {
1268 break;
1269 } else {
1270 raw = raw2;
1271 forward_prepend_mode = true;
1272 req_used_before = true;
1273 }
1274 }
1275
1276 if raw.is_empty()
1278 && matches!(mode, Mode::Latest)
1279 && !have_latest_first_page
1280 && !using_history
1281 {
1282 let jump_days_ms = (HISTORY_SPLIT_DAYS + 1) * 86_400_000;
1283 before_ms = Some(now_ms.saturating_sub(jump_days_ms));
1284 have_latest_first_page = true;
1285 continue;
1286 }
1287
1288 if raw.is_empty() {
1290 break;
1291 }
1292 }
1293 pages += 1;
1296
1297 let ts_init = self.generate_ts_init();
1299 let mut page: Vec<Bar> = Vec::with_capacity(raw.len());
1300 for r in &raw {
1301 page.push(parse_candlestick(
1302 r,
1303 bar_type,
1304 inst.price_precision(),
1305 inst.size_precision(),
1306 ts_init,
1307 )?);
1308 }
1309 page.reverse();
1310
1311 let page_oldest_ms = page.first().map(|b| b.ts_event.as_i64() / 1_000_000);
1312 let page_newest_ms = page.last().map(|b| b.ts_event.as_i64() / 1_000_000);
1313
1314 let mut filtered: Vec<Bar> = if matches!(mode, Mode::Range)
1318 && out.is_empty()
1319 && pages < 2
1320 {
1321 let tolerance_ns = slot_ns * 2; if !page.is_empty() {
1328 tracing::debug!(
1329 "Range mode bootstrap page: {} bars from {} to {}, filtering with start={:?} end={:?}",
1330 page.len(),
1331 page.first().unwrap().ts_event.as_i64() / 1_000_000,
1332 page.last().unwrap().ts_event.as_i64() / 1_000_000,
1333 start_ms,
1334 end_ms
1335 );
1336 }
1337
1338 let result: Vec<Bar> = page
1339 .clone()
1340 .into_iter()
1341 .filter(|b| {
1342 let ts = b.ts_event.as_i64();
1343 let ok_after =
1345 start_ns.is_none_or(|sns| ts >= sns.saturating_sub(tolerance_ns));
1346 let ok_before = end_ns.is_none_or(|ens| ts <= ens);
1347 ok_after && ok_before
1348 })
1349 .collect();
1350
1351 result
1352 } else {
1353 page.clone()
1355 .into_iter()
1356 .filter(|b| {
1357 let ts = b.ts_event.as_i64();
1358 let ok_after = start_ns.is_none_or(|sns| ts >= sns);
1359 let ok_before = end_ns.is_none_or(|ens| ts <= ens);
1360 ok_after && ok_before
1361 })
1362 .collect()
1363 };
1364
1365 if !page.is_empty() && filtered.is_empty() {
1366 if matches!(mode, Mode::Range)
1368 && !forward_prepend_mode
1369 && let (Some(newest_ms), Some(start_ms)) = (page_newest_ms, start_ms)
1370 && newest_ms < start_ms.saturating_sub(slot_ms * 2)
1371 {
1372 break;
1374 }
1375 }
1376
1377 let contribution;
1379
1380 if out.is_empty() {
1381 contribution = filtered.len();
1382 out = filtered;
1383 } else {
1384 match mode {
1385 Mode::Backward | Mode::Latest => {
1386 if let Some(first) = out.first() {
1387 filtered.retain(|b| b.ts_event < first.ts_event);
1388 }
1389 contribution = filtered.len();
1390 if contribution != 0 {
1391 let mut new_out = Vec::with_capacity(out.len() + filtered.len());
1392 new_out.extend_from_slice(&filtered);
1393 new_out.extend_from_slice(&out);
1394 out = new_out;
1395 }
1396 }
1397 Mode::Range => {
1398 if forward_prepend_mode || req_used_before {
1399 if let Some(first) = out.first() {
1401 filtered.retain(|b| b.ts_event < first.ts_event);
1402 }
1403 contribution = filtered.len();
1404 if contribution != 0 {
1405 let mut new_out = Vec::with_capacity(out.len() + filtered.len());
1406 new_out.extend_from_slice(&filtered);
1407 new_out.extend_from_slice(&out);
1408 out = new_out;
1409 }
1410 } else {
1411 if let Some(last) = out.last() {
1413 filtered.retain(|b| b.ts_event > last.ts_event);
1414 }
1415 contribution = filtered.len();
1416 out.extend(filtered);
1417 }
1418 }
1419 }
1420 }
1421
1422 if contribution == 0
1424 && matches!(mode, Mode::Latest | Mode::Backward)
1425 && let Some(b) = before_ms
1426 {
1427 let jump = (page_cap as i64).saturating_mul(slot_ms.max(1));
1428 let new_b = b.saturating_sub(jump);
1429 if new_b != b {
1430 before_ms = Some(new_b);
1431 }
1432 }
1433
1434 if contribution == 0 {
1435 progressless_loops = progressless_loops.saturating_add(1);
1436 if progressless_loops >= 3 {
1437 break;
1438 }
1439 } else {
1440 progressless_loops = 0;
1441
1442 match mode {
1444 Mode::Latest | Mode::Backward => {
1445 if let Some(oldest) = page_oldest_ms {
1446 before_ms = Some(oldest.saturating_sub(1));
1447 have_latest_first_page = true;
1448 } else {
1449 break;
1450 }
1451 }
1452 Mode::Range => {
1453 if forward_prepend_mode || req_used_before {
1454 if let Some(oldest) = page_oldest_ms {
1455 let jump_back = slot_ms.max(60_000); before_ms = Some(oldest.saturating_sub(jump_back));
1458 after_ms = None;
1459 } else {
1460 break;
1461 }
1462 } else if let Some(newest) = page_newest_ms {
1463 after_ms = Some(newest.saturating_add(1));
1464 before_ms = None;
1465 } else {
1466 break;
1467 }
1468 }
1469 }
1470 }
1471
1472 if let Some(lim) = limit
1474 && lim > 0
1475 && out.len() >= lim as usize
1476 {
1477 break;
1478 }
1479 if let Some(ens) = end_ns
1480 && let Some(last) = out.last()
1481 && last.ts_event.as_i64() >= ens
1482 {
1483 break;
1484 }
1485 if let Some(sns) = start_ns
1486 && let Some(first) = out.first()
1487 && (matches!(mode, Mode::Backward) || forward_prepend_mode)
1488 && first.ts_event.as_i64() <= sns
1489 {
1490 if matches!(mode, Mode::Range) {
1492 if let Some(ens) = end_ns
1494 && let Some(last) = out.last()
1495 {
1496 let last_ts = last.ts_event.as_i64();
1497 if last_ts < ens {
1498 forward_prepend_mode = false;
1501 after_ms = Some((last_ts / 1_000_000).saturating_add(1));
1502 before_ms = None;
1503 continue;
1504 }
1505 }
1506 }
1507 break;
1508 }
1509
1510 tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
1511 }
1512
1513 if out.is_empty() && matches!(mode, Mode::Range) {
1515 let pivot = end_ms.unwrap_or(now_ms.saturating_sub(1));
1516 let hist = (now_ms.saturating_sub(pivot)) / (24 * 60 * 60 * 1000) > HISTORY_SPLIT_DAYS;
1517 let mut p = GetCandlesticksParamsBuilder::default();
1518 p.inst_id(symbol.as_str())
1519 .bar(&bar_param)
1520 .limit(300)
1521 .before_ms(pivot);
1522 let params = p.build().map_err(anyhow::Error::new)?;
1523 let raw = if hist {
1524 self.inner.http_get_candlesticks_history(params).await
1525 } else {
1526 self.inner.http_get_candlesticks(params).await
1527 }
1528 .map_err(anyhow::Error::new)?;
1529 if !raw.is_empty() {
1530 let ts_init = self.generate_ts_init();
1531 let mut page: Vec<Bar> = Vec::with_capacity(raw.len());
1532 for r in &raw {
1533 page.push(parse_candlestick(
1534 r,
1535 bar_type,
1536 inst.price_precision(),
1537 inst.size_precision(),
1538 ts_init,
1539 )?);
1540 }
1541 page.reverse();
1542 out = page
1543 .into_iter()
1544 .filter(|b| {
1545 let ts = b.ts_event.as_i64();
1546 let ok_after = start_ns.is_none_or(|sns| ts >= sns);
1547 let ok_before = end_ns.is_none_or(|ens| ts <= ens);
1548 ok_after && ok_before
1549 })
1550 .collect();
1551 }
1552 }
1553
1554 if let Some(ens) = end_ns {
1556 while out.last().is_some_and(|b| b.ts_event.as_i64() > ens) {
1557 out.pop();
1558 }
1559 }
1560
1561 if matches!(mode, Mode::Range)
1563 && !forward_prepend_mode
1564 && let Some(sns) = start_ns
1565 {
1566 let lower = sns.saturating_sub(slot_ns);
1567 while out.first().is_some_and(|b| b.ts_event.as_i64() < lower) {
1568 out.remove(0);
1569 }
1570 }
1571
1572 if let Some(lim) = limit
1573 && lim > 0
1574 && out.len() > lim as usize
1575 {
1576 out.truncate(lim as usize);
1577 }
1578
1579 Ok(out)
1580 }
1581
1582 #[allow(clippy::too_many_arguments)]
1589 pub async fn request_order_status_reports(
1590 &self,
1591 account_id: AccountId,
1592 instrument_type: Option<OKXInstrumentType>,
1593 instrument_id: Option<InstrumentId>,
1594 start: Option<DateTime<Utc>>,
1595 end: Option<DateTime<Utc>>,
1596 open_only: bool,
1597 limit: Option<u32>,
1598 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1599 let mut history_params = GetOrderHistoryParamsBuilder::default();
1601
1602 let instrument_type = if let Some(instrument_type) = instrument_type {
1603 instrument_type
1604 } else {
1605 let instrument_id = instrument_id.ok_or_else(|| {
1606 anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
1607 })?;
1608 let instrument = self
1609 .instrument_or_fetch(instrument_id.symbol.inner())
1610 .await?;
1611 okx_instrument_type(&instrument)?
1612 };
1613
1614 history_params.inst_type(instrument_type);
1615
1616 if let Some(instrument_id) = instrument_id.as_ref() {
1617 history_params.inst_id(instrument_id.symbol.inner().to_string());
1618 }
1619
1620 if let Some(limit) = limit {
1621 history_params.limit(limit);
1622 }
1623
1624 let history_params = history_params.build().map_err(|e| anyhow::anyhow!(e))?;
1625
1626 let mut pending_params = GetOrderListParamsBuilder::default();
1628 pending_params.inst_type(instrument_type);
1629
1630 if let Some(instrument_id) = instrument_id.as_ref() {
1631 pending_params.inst_id(instrument_id.symbol.inner().to_string());
1632 }
1633
1634 if let Some(limit) = limit {
1635 pending_params.limit(limit);
1636 }
1637
1638 let pending_params = pending_params.build().map_err(|e| anyhow::anyhow!(e))?;
1639
1640 let combined_resp = if open_only {
1641 self.inner
1643 .http_get_order_list(pending_params)
1644 .await
1645 .map_err(|e| anyhow::anyhow!(e))?
1646 } else {
1647 let (history_resp, pending_resp) = tokio::try_join!(
1649 self.inner.http_get_order_history(history_params),
1650 self.inner.http_get_order_list(pending_params)
1651 )
1652 .map_err(|e| anyhow::anyhow!(e))?;
1653
1654 let mut combined_resp = history_resp;
1656 combined_resp.extend(pending_resp);
1657 combined_resp
1658 };
1659
1660 let start_ns = start.map(UnixNanos::from);
1662 let end_ns = end.map(UnixNanos::from);
1663
1664 let ts_init = self.generate_ts_init();
1665 let mut reports = Vec::with_capacity(combined_resp.len());
1666
1667 let mut seen = AHashSet::new();
1669
1670 for order in combined_resp {
1671 if seen.contains(&order.cl_ord_id) {
1672 continue; }
1674 seen.insert(order.cl_ord_id);
1675
1676 let inst = self.instrument_or_fetch(order.inst_id).await?;
1677
1678 let report = parse_order_status_report(
1679 &order,
1680 account_id,
1681 inst.id(),
1682 inst.price_precision(),
1683 inst.size_precision(),
1684 ts_init,
1685 );
1686
1687 if let Some(start_ns) = start_ns
1688 && report.ts_last < start_ns
1689 {
1690 continue;
1691 }
1692 if let Some(end_ns) = end_ns
1693 && report.ts_last > end_ns
1694 {
1695 continue;
1696 }
1697
1698 reports.push(report);
1699 }
1700
1701 Ok(reports)
1702 }
1703
1704 pub async fn request_fill_reports(
1710 &self,
1711 account_id: AccountId,
1712 instrument_type: Option<OKXInstrumentType>,
1713 instrument_id: Option<InstrumentId>,
1714 start: Option<DateTime<Utc>>,
1715 end: Option<DateTime<Utc>>,
1716 limit: Option<u32>,
1717 ) -> anyhow::Result<Vec<FillReport>> {
1718 let mut params = GetTransactionDetailsParamsBuilder::default();
1719
1720 let instrument_type = if let Some(instrument_type) = instrument_type {
1721 instrument_type
1722 } else {
1723 let instrument_id = instrument_id.ok_or_else(|| {
1724 anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
1725 })?;
1726 let instrument = self
1727 .instrument_or_fetch(instrument_id.symbol.inner())
1728 .await?;
1729 okx_instrument_type(&instrument)?
1730 };
1731
1732 params.inst_type(instrument_type);
1733
1734 if let Some(instrument_id) = instrument_id {
1735 let instrument = self
1736 .instrument_or_fetch(instrument_id.symbol.inner())
1737 .await?;
1738 let instrument_type = okx_instrument_type(&instrument)?;
1739 params.inst_type(instrument_type);
1740 params.inst_id(instrument_id.symbol.inner().to_string());
1741 }
1742
1743 if let Some(limit) = limit {
1744 params.limit(limit);
1745 }
1746
1747 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1748
1749 let resp = self
1750 .inner
1751 .http_get_transaction_details(params)
1752 .await
1753 .map_err(|e| anyhow::anyhow!(e))?;
1754
1755 let start_ns = start.map(UnixNanos::from);
1757 let end_ns = end.map(UnixNanos::from);
1758
1759 let ts_init = self.generate_ts_init();
1760 let mut reports = Vec::with_capacity(resp.len());
1761
1762 for detail in resp {
1763 let inst = self.instrument_or_fetch(detail.inst_id).await?;
1764
1765 let report = parse_fill_report(
1766 detail,
1767 account_id,
1768 inst.id(),
1769 inst.price_precision(),
1770 inst.size_precision(),
1771 ts_init,
1772 )?;
1773
1774 if let Some(start_ns) = start_ns
1775 && report.ts_event < start_ns
1776 {
1777 continue;
1778 }
1779
1780 if let Some(end_ns) = end_ns
1781 && report.ts_event > end_ns
1782 {
1783 continue;
1784 }
1785
1786 reports.push(report);
1787 }
1788
1789 Ok(reports)
1790 }
1791
1792 pub async fn request_position_status_reports(
1798 &self,
1799 account_id: AccountId,
1800 instrument_type: Option<OKXInstrumentType>,
1801 instrument_id: Option<InstrumentId>,
1802 ) -> anyhow::Result<Vec<PositionStatusReport>> {
1803 let mut params = GetPositionsParamsBuilder::default();
1804
1805 let instrument_type = if let Some(instrument_type) = instrument_type {
1806 instrument_type
1807 } else {
1808 let instrument_id = instrument_id.ok_or_else(|| {
1809 anyhow::anyhow!("Instrument ID required if `instrument_type` not provided")
1810 })?;
1811 let instrument = self
1812 .instrument_or_fetch(instrument_id.symbol.inner())
1813 .await?;
1814 okx_instrument_type(&instrument)?
1815 };
1816
1817 params.inst_type(instrument_type);
1818
1819 instrument_id
1820 .as_ref()
1821 .map(|i| params.inst_id(i.symbol.inner()));
1822
1823 let params = params.build().map_err(|e| anyhow::anyhow!(e))?;
1824
1825 let resp = self
1826 .inner
1827 .http_get_positions(params)
1828 .await
1829 .map_err(|e| anyhow::anyhow!(e))?;
1830
1831 let ts_init = self.generate_ts_init();
1832 let mut reports = Vec::with_capacity(resp.len());
1833
1834 for position in resp {
1835 let inst = self.instrument_or_fetch(position.inst_id).await?;
1836
1837 let report = parse_position_status_report(
1838 position,
1839 account_id,
1840 inst.id(),
1841 inst.size_precision(),
1842 ts_init,
1843 );
1844 reports.push(report);
1845 }
1846
1847 Ok(reports)
1848 }
1849}