1use std::sync::{
19 Arc,
20 atomic::{AtomicBool, AtomicU64, Ordering},
21};
22
23use chrono::{DateTime, Utc};
24use dashmap::DashMap;
25use nautilus_core::{nanos::UnixNanos, time::get_atomic_clock_realtime};
26use nautilus_model::{
27 data::{Bar, BarType, TradeTick},
28 enums::{AggregationSource, BarAggregation},
29 events::AccountState,
30 identifiers::{AccountId, InstrumentId},
31 instruments::{Instrument, InstrumentAny},
32 orderbook::OrderBook,
33};
34use nautilus_network::{
35 http::{HttpClient, Method},
36 retry::{RetryConfig, RetryManager},
37};
38use serde::{Serialize, de::DeserializeOwned};
39use tokio_util::sync::CancellationToken;
40use ustr::Ustr;
41
42use super::{
43 error::DeribitHttpError,
44 models::{
45 DeribitAccountSummariesResponse, DeribitCurrency, DeribitInstrument, DeribitJsonRpcRequest,
46 DeribitJsonRpcResponse,
47 },
48 query::{GetAccountSummariesParams, GetInstrumentParams, GetInstrumentsParams},
49};
50use crate::{
51 common::{
52 consts::{DERIBIT_API_PATH, JSONRPC_VERSION, should_retry_error_code},
53 credential::Credential,
54 parse::{
55 extract_server_timestamp, parse_account_state, parse_bars,
56 parse_deribit_instrument_any, parse_order_book, parse_trade_tick,
57 },
58 urls::get_http_base_url,
59 },
60 http::{
61 models::{DeribitOrderBook, DeribitTradesResponse, DeribitTradingViewChartData},
62 query::{
63 GetLastTradesByInstrumentAndTimeParams, GetOrderBookParams,
64 GetTradingViewChartDataParams,
65 },
66 },
67};
68
69#[allow(dead_code)]
70const DERIBIT_SUCCESS_CODE: i64 = 0;
71
72#[derive(Debug)]
77pub struct DeribitRawHttpClient {
78 base_url: String,
79 client: HttpClient,
80 credential: Option<Credential>,
81 retry_manager: RetryManager<DeribitHttpError>,
82 cancellation_token: CancellationToken,
83 request_id: AtomicU64,
84}
85
86impl DeribitRawHttpClient {
87 #[allow(clippy::too_many_arguments)]
93 pub fn new(
94 base_url: Option<String>,
95 is_testnet: bool,
96 timeout_secs: Option<u64>,
97 max_retries: Option<u32>,
98 retry_delay_ms: Option<u64>,
99 retry_delay_max_ms: Option<u64>,
100 proxy_url: Option<String>,
101 ) -> Result<Self, DeribitHttpError> {
102 let base_url = base_url
103 .unwrap_or_else(|| format!("{}{}", get_http_base_url(is_testnet), DERIBIT_API_PATH));
104 let retry_config = RetryConfig {
105 max_retries: max_retries.unwrap_or(3),
106 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
107 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
108 backoff_factor: 2.0,
109 jitter_ms: 1000,
110 operation_timeout_ms: Some(60_000),
111 immediate_first: false,
112 max_elapsed_ms: Some(180_000),
113 };
114
115 let retry_manager = RetryManager::new(retry_config);
116
117 Ok(Self {
118 base_url,
119 client: HttpClient::new(
120 std::collections::HashMap::new(), Vec::new(), Vec::new(), None, timeout_secs,
125 proxy_url,
126 )
127 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))?,
128 credential: None,
129 retry_manager,
130 cancellation_token: CancellationToken::new(),
131 request_id: AtomicU64::new(1),
132 })
133 }
134
135 pub fn cancellation_token(&self) -> &CancellationToken {
137 &self.cancellation_token
138 }
139
140 #[must_use]
142 pub fn is_testnet(&self) -> bool {
143 self.base_url.contains("test")
144 }
145
146 #[allow(clippy::too_many_arguments)]
152 pub fn with_credentials(
153 api_key: String,
154 api_secret: String,
155 base_url: Option<String>,
156 is_testnet: bool,
157 timeout_secs: Option<u64>,
158 max_retries: Option<u32>,
159 retry_delay_ms: Option<u64>,
160 retry_delay_max_ms: Option<u64>,
161 proxy_url: Option<String>,
162 ) -> Result<Self, DeribitHttpError> {
163 let base_url = base_url
164 .unwrap_or_else(|| format!("{}{}", get_http_base_url(is_testnet), DERIBIT_API_PATH));
165 let retry_config = RetryConfig {
166 max_retries: max_retries.unwrap_or(3),
167 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
168 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
169 backoff_factor: 2.0,
170 jitter_ms: 1000,
171 operation_timeout_ms: Some(60_000),
172 immediate_first: false,
173 max_elapsed_ms: Some(180_000),
174 };
175
176 let retry_manager = RetryManager::new(retry_config);
177 let credential = Credential::new(api_key, api_secret);
178
179 Ok(Self {
180 base_url,
181 client: HttpClient::new(
182 std::collections::HashMap::new(),
183 Vec::new(),
184 Vec::new(),
185 None,
186 timeout_secs,
187 proxy_url,
188 )
189 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))?,
190 credential: Some(credential),
191 retry_manager,
192 cancellation_token: CancellationToken::new(),
193 request_id: AtomicU64::new(1),
194 })
195 }
196
197 #[allow(clippy::too_many_arguments)]
209 pub fn new_with_env(
210 api_key: Option<String>,
211 api_secret: Option<String>,
212 is_testnet: bool,
213 timeout_secs: Option<u64>,
214 max_retries: Option<u32>,
215 retry_delay_ms: Option<u64>,
216 retry_delay_max_ms: Option<u64>,
217 proxy_url: Option<String>,
218 ) -> Result<Self, DeribitHttpError> {
219 let (key_env, secret_env) = if is_testnet {
221 ("DERIBIT_TESTNET_API_KEY", "DERIBIT_TESTNET_API_SECRET")
222 } else {
223 ("DERIBIT_API_KEY", "DERIBIT_API_SECRET")
224 };
225
226 let api_key = nautilus_core::env::get_or_env_var_opt(api_key, key_env);
228 let api_secret = nautilus_core::env::get_or_env_var_opt(api_secret, secret_env);
229
230 if let (Some(key), Some(secret)) = (api_key, api_secret) {
232 Self::with_credentials(
233 key,
234 secret,
235 None,
236 is_testnet,
237 timeout_secs,
238 max_retries,
239 retry_delay_ms,
240 retry_delay_max_ms,
241 proxy_url,
242 )
243 } else {
244 Self::new(
246 None,
247 is_testnet,
248 timeout_secs,
249 max_retries,
250 retry_delay_ms,
251 retry_delay_max_ms,
252 proxy_url,
253 )
254 }
255 }
256
257 async fn send_request<T, P>(
259 &self,
260 method: &str,
261 params: P,
262 authenticate: bool,
263 ) -> Result<DeribitJsonRpcResponse<T>, DeribitHttpError>
264 where
265 T: DeserializeOwned,
266 P: Serialize,
267 {
268 let operation_id = format!("{}#{}", self.base_url, method);
270 let operation = || {
271 let method = method.to_string();
272 let params_clone = serde_json::to_value(¶ms).unwrap();
273
274 async move {
275 let id = self.request_id.fetch_add(1, Ordering::SeqCst);
277 let request = DeribitJsonRpcRequest {
278 jsonrpc: JSONRPC_VERSION,
279 id,
280 method: method.clone(),
281 params: params_clone.clone(),
282 };
283
284 let body = serde_json::to_vec(&request)?;
285
286 let mut headers = std::collections::HashMap::new();
288 headers.insert("Content-Type".to_string(), "application/json".to_string());
289
290 if authenticate {
292 let credentials = self
293 .credential
294 .as_ref()
295 .ok_or(DeribitHttpError::MissingCredentials)?;
296 let auth_headers = credentials.sign_auth_headers("POST", "/api/v2", &body)?;
297 headers.extend(auth_headers);
298 }
299
300 let resp = self
301 .client
302 .request(
303 Method::POST,
304 self.base_url.clone(),
305 None,
306 Some(headers),
307 Some(body),
308 None,
309 None,
310 )
311 .await
312 .map_err(|e| DeribitHttpError::NetworkError(e.to_string()))?;
313
314 let json_value: serde_json::Value = match serde_json::from_slice(&resp.body) {
320 Ok(json) => json,
321 Err(_) => {
322 let error_body = String::from_utf8_lossy(&resp.body);
324 tracing::error!(
325 method = %method,
326 status = resp.status.as_u16(),
327 "Non-JSON response: {error_body}"
328 );
329 return Err(DeribitHttpError::UnexpectedStatus {
330 status: resp.status.as_u16(),
331 body: error_body.to_string(),
332 });
333 }
334 };
335
336 let json_rpc_response: DeribitJsonRpcResponse<T> =
338 serde_json::from_value(json_value.clone()).map_err(|e| {
339 tracing::error!(
340 method = %method,
341 status = resp.status.as_u16(),
342 error = %e,
343 "Failed to deserialize Deribit JSON-RPC response"
344 );
345 tracing::debug!(
346 "Response JSON (first 2000 chars): {}",
347 &json_value
348 .to_string()
349 .chars()
350 .take(2000)
351 .collect::<String>()
352 );
353 DeribitHttpError::JsonError(e.to_string())
354 })?;
355
356 if json_rpc_response.result.is_some() {
358 Ok(json_rpc_response)
359 } else if let Some(error) = &json_rpc_response.error {
360 tracing::warn!(
362 method = %method,
363 http_status = resp.status.as_u16(),
364 error_code = error.code,
365 error_message = %error.message,
366 error_data = ?error.data,
367 "Deribit RPC error response"
368 );
369
370 Err(DeribitHttpError::from_jsonrpc_error(
372 error.code,
373 error.message.clone(),
374 error.data.clone(),
375 ))
376 } else {
377 tracing::error!(
378 method = %method,
379 status = resp.status.as_u16(),
380 request_id = ?json_rpc_response.id,
381 "Response contains neither result nor error field"
382 );
383 Err(DeribitHttpError::JsonError(
384 "Response contains neither result nor error".to_string(),
385 ))
386 }
387 }
388 };
389
390 let should_retry = |error: &DeribitHttpError| -> bool {
399 match error {
400 DeribitHttpError::NetworkError(_) => true,
401 DeribitHttpError::UnexpectedStatus { status, .. } => {
402 *status >= 500 || *status == 429
403 }
404 DeribitHttpError::DeribitError { error_code, .. } => {
405 should_retry_error_code(*error_code)
406 }
407 _ => false,
408 }
409 };
410
411 let create_error = |msg: String| -> DeribitHttpError {
412 if msg == "canceled" {
413 DeribitHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
414 } else {
415 DeribitHttpError::NetworkError(msg)
416 }
417 };
418
419 self.retry_manager
420 .execute_with_retry_with_cancel(
421 &operation_id,
422 operation,
423 should_retry,
424 create_error,
425 &self.cancellation_token,
426 )
427 .await
428 }
429
430 pub async fn get_instruments(
436 &self,
437 params: GetInstrumentsParams,
438 ) -> Result<DeribitJsonRpcResponse<Vec<DeribitInstrument>>, DeribitHttpError> {
439 self.send_request("public/get_instruments", params, false)
440 .await
441 }
442
443 pub async fn get_instrument(
449 &self,
450 params: GetInstrumentParams,
451 ) -> Result<DeribitJsonRpcResponse<DeribitInstrument>, DeribitHttpError> {
452 self.send_request("public/get_instrument", params, false)
453 .await
454 }
455
456 pub async fn get_last_trades_by_instrument_and_time(
462 &self,
463 params: GetLastTradesByInstrumentAndTimeParams,
464 ) -> Result<DeribitJsonRpcResponse<DeribitTradesResponse>, DeribitHttpError> {
465 self.send_request(
466 "public/get_last_trades_by_instrument_and_time",
467 params,
468 false,
469 )
470 .await
471 }
472
473 pub async fn get_tradingview_chart_data(
479 &self,
480 params: GetTradingViewChartDataParams,
481 ) -> Result<DeribitJsonRpcResponse<DeribitTradingViewChartData>, DeribitHttpError> {
482 self.send_request("public/get_tradingview_chart_data", params, false)
483 .await
484 }
485
486 pub async fn get_account_summaries(
495 &self,
496 params: GetAccountSummariesParams,
497 ) -> Result<DeribitJsonRpcResponse<DeribitAccountSummariesResponse>, DeribitHttpError> {
498 self.send_request("private/get_account_summaries", params, true)
499 .await
500 }
501
502 pub async fn get_order_book(
508 &self,
509 params: GetOrderBookParams,
510 ) -> Result<DeribitJsonRpcResponse<DeribitOrderBook>, DeribitHttpError> {
511 self.send_request("public/get_order_book", params, false)
512 .await
513 }
514}
515
516#[derive(Debug)]
521#[cfg_attr(
522 feature = "python",
523 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.deribit")
524)]
525pub struct DeribitHttpClient {
526 pub(crate) inner: Arc<DeribitRawHttpClient>,
527 pub(crate) instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
528 cache_initialized: AtomicBool,
529}
530
531impl Clone for DeribitHttpClient {
532 fn clone(&self) -> Self {
533 let cache_initialized = AtomicBool::new(false);
534
535 let is_initialized = self.cache_initialized.load(Ordering::Acquire);
536 if is_initialized {
537 cache_initialized.store(true, Ordering::Release);
538 }
539
540 Self {
541 inner: self.inner.clone(),
542 instruments_cache: self.instruments_cache.clone(),
543 cache_initialized,
544 }
545 }
546}
547
548impl DeribitHttpClient {
549 #[allow(clippy::too_many_arguments)]
559 pub fn new(
560 base_url: Option<String>,
561 is_testnet: bool,
562 timeout_secs: Option<u64>,
563 max_retries: Option<u32>,
564 retry_delay_ms: Option<u64>,
565 retry_delay_max_ms: Option<u64>,
566 proxy_url: Option<String>,
567 ) -> anyhow::Result<Self> {
568 let raw_client = Arc::new(DeribitRawHttpClient::new(
569 base_url,
570 is_testnet,
571 timeout_secs,
572 max_retries,
573 retry_delay_ms,
574 retry_delay_max_ms,
575 proxy_url,
576 )?);
577
578 Ok(Self {
579 inner: raw_client,
580 instruments_cache: Arc::new(DashMap::new()),
581 cache_initialized: AtomicBool::new(false),
582 })
583 }
584
585 #[allow(clippy::too_many_arguments)]
597 pub fn new_with_env(
598 api_key: Option<String>,
599 api_secret: Option<String>,
600 is_testnet: bool,
601 timeout_secs: Option<u64>,
602 max_retries: Option<u32>,
603 retry_delay_ms: Option<u64>,
604 retry_delay_max_ms: Option<u64>,
605 proxy_url: Option<String>,
606 ) -> anyhow::Result<Self> {
607 let raw_client = Arc::new(DeribitRawHttpClient::new_with_env(
608 api_key,
609 api_secret,
610 is_testnet,
611 timeout_secs,
612 max_retries,
613 retry_delay_ms,
614 retry_delay_max_ms,
615 proxy_url,
616 )?);
617
618 Ok(Self {
619 inner: raw_client,
620 instruments_cache: Arc::new(DashMap::new()),
621 cache_initialized: AtomicBool::new(false),
622 })
623 }
624
625 pub async fn request_instruments(
631 &self,
632 currency: DeribitCurrency,
633 kind: Option<super::models::DeribitInstrumentKind>,
634 ) -> anyhow::Result<Vec<InstrumentAny>> {
635 let params = if let Some(k) = kind {
637 GetInstrumentsParams::with_kind(currency, k)
638 } else {
639 GetInstrumentsParams::new(currency)
640 };
641
642 let full_response = self.inner.get_instruments(params).await?;
644 let result = full_response
645 .result
646 .ok_or_else(|| anyhow::anyhow!("No result in response"))?;
647 let ts_event = extract_server_timestamp(full_response.us_out)?;
648 let ts_init = self.generate_ts_init();
649
650 let mut instruments = Vec::new();
652 let mut skipped_count = 0;
653 let mut error_count = 0;
654
655 for raw_instrument in result {
656 match parse_deribit_instrument_any(&raw_instrument, ts_init, ts_event) {
657 Ok(Some(instrument)) => {
658 instruments.push(instrument);
659 }
660 Ok(None) => {
661 skipped_count += 1;
663 tracing::debug!(
664 "Skipped unsupported instrument type: {} (kind: {:?})",
665 raw_instrument.instrument_name,
666 raw_instrument.kind
667 );
668 }
669 Err(e) => {
670 error_count += 1;
671 tracing::warn!(
672 "Failed to parse instrument {}: {}",
673 raw_instrument.instrument_name,
674 e
675 );
676 }
677 }
678 }
679
680 tracing::info!(
681 "Parsed {} instruments ({} skipped, {} errors)",
682 instruments.len(),
683 skipped_count,
684 error_count
685 );
686
687 Ok(instruments)
688 }
689
690 pub async fn request_instrument(
702 &self,
703 instrument_id: InstrumentId,
704 ) -> anyhow::Result<InstrumentAny> {
705 let params = GetInstrumentParams {
706 instrument_name: instrument_id.symbol.to_string(),
707 };
708
709 let full_response = self.inner.get_instrument(params).await?;
710 let response = full_response
711 .result
712 .ok_or_else(|| anyhow::anyhow!("No result in response"))?;
713 let ts_event = extract_server_timestamp(full_response.us_out)?;
714 let ts_init = self.generate_ts_init();
715
716 match parse_deribit_instrument_any(&response, ts_init, ts_event)? {
717 Some(instrument) => Ok(instrument),
718 None => anyhow::bail!(
719 "Unsupported instrument type: {} (kind: {:?})",
720 response.instrument_name,
721 response.kind
722 ),
723 }
724 }
725
726 pub async fn request_trades(
743 &self,
744 instrument_id: InstrumentId,
745 start: Option<DateTime<Utc>>,
746 end: Option<DateTime<Utc>>,
747 limit: Option<u32>,
748 ) -> anyhow::Result<Vec<TradeTick>> {
749 let (price_precision, size_precision) =
751 if let Some(instrument) = self.get_instrument(&instrument_id.symbol.inner()) {
752 (instrument.price_precision(), instrument.size_precision())
753 } else {
754 tracing::warn!(
755 "Instrument {} not in cache, skipping trades request",
756 instrument_id
757 );
758 anyhow::bail!("Instrument {instrument_id} not in cache");
759 };
760
761 let start_timestamp = start.map_or_else(
763 || Utc::now().timestamp_millis() - 3_600_000, |dt| dt.timestamp_millis(),
765 );
766
767 let end_timestamp = end.map_or_else(
768 || Utc::now().timestamp_millis(), |dt| dt.timestamp_millis(),
770 );
771
772 let params = GetLastTradesByInstrumentAndTimeParams::new(
773 instrument_id.symbol.to_string(),
774 start_timestamp,
775 end_timestamp,
776 limit,
777 Some("asc".to_string()), );
779
780 let full_response = self
781 .inner
782 .get_last_trades_by_instrument_and_time(params)
783 .await
784 .map_err(|e| anyhow::anyhow!(e))?;
785
786 let response_data = full_response
787 .result
788 .ok_or_else(|| anyhow::anyhow!("No result in response"))?;
789
790 let ts_init = self.generate_ts_init();
791 let mut trades = Vec::with_capacity(response_data.trades.len());
792
793 for raw_trade in &response_data.trades {
794 match parse_trade_tick(
795 raw_trade,
796 instrument_id,
797 price_precision,
798 size_precision,
799 ts_init,
800 ) {
801 Ok(trade) => trades.push(trade),
802 Err(e) => {
803 tracing::warn!(
804 "Failed to parse trade {} for {}: {}",
805 raw_trade.trade_id,
806 instrument_id,
807 e
808 );
809 }
810 }
811 }
812
813 Ok(trades)
814 }
815
816 pub async fn request_bars(
831 &self,
832 bar_type: BarType,
833 start: Option<DateTime<Utc>>,
834 end: Option<DateTime<Utc>>,
835 _limit: Option<u32>,
836 ) -> anyhow::Result<Vec<Bar>> {
837 anyhow::ensure!(
838 bar_type.aggregation_source() == AggregationSource::External,
839 "Only EXTERNAL aggregation is supported"
840 );
841
842 let now = Utc::now();
843
844 let end_dt = end.unwrap_or(now);
846 let start_dt = start.unwrap_or(end_dt - chrono::Duration::hours(1));
847
848 if let (Some(s), Some(e)) = (start, end) {
849 anyhow::ensure!(s < e, "Invalid time range: start={s:?} end={e:?}");
850 }
851
852 let spec = bar_type.spec();
854 let step = spec.step.get();
855 let resolution = match spec.aggregation {
856 BarAggregation::Minute => format!("{step}"),
857 BarAggregation::Hour => format!("{}", step * 60),
858 BarAggregation::Day => "1D".to_string(),
859 a => anyhow::bail!("Deribit does not support {a:?} aggregation"),
860 };
861
862 let supported_resolutions = [
864 "1", "3", "5", "10", "15", "30", "60", "120", "180", "360", "720", "1D",
865 ];
866 if !supported_resolutions.contains(&resolution.as_str()) {
867 anyhow::bail!(
868 "Deribit does not support resolution '{resolution}'. Supported: {supported_resolutions:?}"
869 );
870 }
871
872 let instrument_name = bar_type.instrument_id().symbol.to_string();
873 let start_timestamp = start_dt.timestamp_millis();
874 let end_timestamp = end_dt.timestamp_millis();
875
876 let params = GetTradingViewChartDataParams::new(
877 instrument_name,
878 start_timestamp,
879 end_timestamp,
880 resolution,
881 );
882
883 let full_response = self.inner.get_tradingview_chart_data(params).await?;
884 let chart_data = full_response
885 .result
886 .ok_or_else(|| anyhow::anyhow!("No result in response"))?;
887
888 if chart_data.status == "no_data" {
889 tracing::debug!("No bar data returned for {}", bar_type);
890 return Ok(Vec::new());
891 }
892
893 let instrument_id = bar_type.instrument_id();
895 let (price_precision, size_precision) =
896 if let Some(instrument) = self.get_instrument(&instrument_id.symbol.inner()) {
897 (instrument.price_precision(), instrument.size_precision())
898 } else {
899 tracing::warn!(
900 "Instrument {} not in cache, skipping bars request",
901 instrument_id
902 );
903 anyhow::bail!("Instrument {instrument_id} not in cache");
904 };
905
906 let ts_init = self.generate_ts_init();
907 let bars = parse_bars(
908 &chart_data,
909 bar_type,
910 price_precision,
911 size_precision,
912 ts_init,
913 )?;
914
915 tracing::info!("Parsed {} bars for {}", bars.len(), bar_type);
916
917 Ok(bars)
918 }
919
920 pub async fn request_book_snapshot(
935 &self,
936 instrument_id: InstrumentId,
937 depth: Option<u32>,
938 ) -> anyhow::Result<OrderBook> {
939 let (price_precision, size_precision) =
941 if let Some(instrument) = self.get_instrument(&instrument_id.symbol.inner()) {
942 (instrument.price_precision(), instrument.size_precision())
943 } else {
944 tracing::warn!(
946 "Instrument {} not in cache, using default precisions",
947 instrument_id
948 );
949 (8u8, 8u8)
950 };
951
952 let params = GetOrderBookParams::new(instrument_id.symbol.to_string(), depth);
953 let full_response = self
954 .inner
955 .get_order_book(params)
956 .await
957 .map_err(|e| anyhow::anyhow!(e))?;
958
959 let order_book_data = full_response
960 .result
961 .ok_or_else(|| anyhow::anyhow!("No result in response"))?;
962
963 let ts_init = self.generate_ts_init();
964 let book = parse_order_book(
965 &order_book_data,
966 instrument_id,
967 price_precision,
968 size_precision,
969 ts_init,
970 )?;
971
972 tracing::info!(
973 "Fetched order book for {} with {} bids and {} asks",
974 instrument_id,
975 order_book_data.bids.len(),
976 order_book_data.asks.len()
977 );
978
979 Ok(book)
980 }
981
982 pub async fn request_account_state(
993 &self,
994 account_id: AccountId,
995 ) -> anyhow::Result<AccountState> {
996 let params = GetAccountSummariesParams::default();
997 let full_response = self
998 .inner
999 .get_account_summaries(params)
1000 .await
1001 .map_err(|e| anyhow::anyhow!(e))?;
1002 let response_data = full_response
1003 .result
1004 .ok_or_else(|| anyhow::anyhow!("No result in response"))?;
1005 let ts_init = self.generate_ts_init();
1006 let ts_event = extract_server_timestamp(full_response.us_out)?;
1007
1008 parse_account_state(&response_data.summaries, account_id, ts_init, ts_event)
1009 }
1010
1011 fn generate_ts_init(&self) -> UnixNanos {
1013 get_atomic_clock_realtime().get_time_ns()
1014 }
1015
1016 pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
1018 for inst in instruments {
1019 self.instruments_cache
1020 .insert(inst.raw_symbol().inner(), inst);
1021 }
1022 self.cache_initialized.store(true, Ordering::Release);
1023 }
1024
1025 #[must_use]
1027 pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
1028 self.instruments_cache
1029 .get(symbol)
1030 .map(|entry| entry.value().clone())
1031 }
1032
1033 #[must_use]
1035 pub fn is_cache_initialized(&self) -> bool {
1036 self.cache_initialized.load(Ordering::Acquire)
1037 }
1038
1039 #[must_use]
1041 pub fn is_testnet(&self) -> bool {
1042 self.inner.is_testnet()
1043 }
1044}