1use std::sync::{
19 Arc,
20 atomic::{AtomicBool, AtomicU64, Ordering},
21};
22
23use chrono::{DateTime, Utc};
24use dashmap::DashMap;
25use nautilus_core::{nanos::UnixNanos, time::get_atomic_clock_realtime};
26use nautilus_model::{
27 data::{Bar, BarType, TradeTick},
28 enums::{AggregationSource, BarAggregation},
29 events::AccountState,
30 identifiers::{AccountId, InstrumentId},
31 instruments::{Instrument, InstrumentAny},
32 orderbook::OrderBook,
33};
34use nautilus_network::{
35 http::{HttpClient, Method},
36 retry::{RetryConfig, RetryManager},
37};
38use serde::{Serialize, de::DeserializeOwned};
39use tokio_util::sync::CancellationToken;
40use ustr::Ustr;
41
42use super::{
43 error::DeribitHttpError,
44 models::{
45 DeribitAccountSummariesResponse, DeribitCurrency, DeribitInstrument, DeribitJsonRpcRequest,
46 DeribitJsonRpcResponse,
47 },
48 query::{GetAccountSummariesParams, GetInstrumentParams, GetInstrumentsParams},
49};
50use crate::{
51 common::{
52 consts::{DERIBIT_API_PATH, JSONRPC_VERSION, should_retry_error_code},
53 credential::Credential,
54 parse::{
55 extract_server_timestamp, parse_account_state, parse_bars,
56 parse_deribit_instrument_any, parse_order_book, parse_trade_tick,
57 },
58 urls::get_http_base_url,
59 },
60 http::{
61 models::{DeribitOrderBook, DeribitTradesResponse, DeribitTradingViewChartData},
62 query::{
63 GetLastTradesByInstrumentAndTimeParams, GetOrderBookParams,
64 GetTradingViewChartDataParams,
65 },
66 },
67};
68
69#[allow(dead_code)]
70const DERIBIT_SUCCESS_CODE: i64 = 0;
71
72#[derive(Debug)]
77pub struct DeribitRawHttpClient {
78 base_url: String,
79 client: HttpClient,
80 credential: Option<Credential>,
81 retry_manager: RetryManager<DeribitHttpError>,
82 cancellation_token: CancellationToken,
83 request_id: AtomicU64,
84}
85
86impl DeribitRawHttpClient {
87 #[allow(clippy::too_many_arguments)]
93 pub fn new(
94 base_url: Option<String>,
95 is_testnet: bool,
96 timeout_secs: Option<u64>,
97 max_retries: Option<u32>,
98 retry_delay_ms: Option<u64>,
99 retry_delay_max_ms: Option<u64>,
100 proxy_url: Option<String>,
101 ) -> Result<Self, DeribitHttpError> {
102 let base_url = base_url
103 .unwrap_or_else(|| format!("{}{}", get_http_base_url(is_testnet), DERIBIT_API_PATH));
104 let retry_config = RetryConfig {
105 max_retries: max_retries.unwrap_or(3),
106 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
107 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
108 backoff_factor: 2.0,
109 jitter_ms: 1000,
110 operation_timeout_ms: Some(60_000),
111 immediate_first: false,
112 max_elapsed_ms: Some(180_000),
113 };
114
115 let retry_manager = RetryManager::new(retry_config);
116
117 Ok(Self {
118 base_url,
119 client: HttpClient::new(
120 std::collections::HashMap::new(), Vec::new(), Vec::new(), None, timeout_secs,
125 proxy_url,
126 )
127 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))?,
128 credential: None,
129 retry_manager,
130 cancellation_token: CancellationToken::new(),
131 request_id: AtomicU64::new(1),
132 })
133 }
134
135 pub fn cancellation_token(&self) -> &CancellationToken {
137 &self.cancellation_token
138 }
139
140 #[must_use]
142 pub fn is_testnet(&self) -> bool {
143 self.base_url.contains("test")
144 }
145
146 #[allow(clippy::too_many_arguments)]
152 pub fn with_credentials(
153 api_key: String,
154 api_secret: String,
155 base_url: Option<String>,
156 is_testnet: bool,
157 timeout_secs: Option<u64>,
158 max_retries: Option<u32>,
159 retry_delay_ms: Option<u64>,
160 retry_delay_max_ms: Option<u64>,
161 proxy_url: Option<String>,
162 ) -> Result<Self, DeribitHttpError> {
163 let base_url = base_url
164 .unwrap_or_else(|| format!("{}{}", get_http_base_url(is_testnet), DERIBIT_API_PATH));
165 let retry_config = RetryConfig {
166 max_retries: max_retries.unwrap_or(3),
167 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
168 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
169 backoff_factor: 2.0,
170 jitter_ms: 1000,
171 operation_timeout_ms: Some(60_000),
172 immediate_first: false,
173 max_elapsed_ms: Some(180_000),
174 };
175
176 let retry_manager = RetryManager::new(retry_config);
177 let credential = Credential::new(api_key, api_secret);
178
179 Ok(Self {
180 base_url,
181 client: HttpClient::new(
182 std::collections::HashMap::new(),
183 Vec::new(),
184 Vec::new(),
185 None,
186 timeout_secs,
187 proxy_url,
188 )
189 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))?,
190 credential: Some(credential),
191 retry_manager,
192 cancellation_token: CancellationToken::new(),
193 request_id: AtomicU64::new(1),
194 })
195 }
196
197 #[allow(clippy::too_many_arguments)]
209 pub fn new_with_env(
210 api_key: Option<String>,
211 api_secret: Option<String>,
212 is_testnet: bool,
213 timeout_secs: Option<u64>,
214 max_retries: Option<u32>,
215 retry_delay_ms: Option<u64>,
216 retry_delay_max_ms: Option<u64>,
217 proxy_url: Option<String>,
218 ) -> Result<Self, DeribitHttpError> {
219 let (key_env, secret_env) = if is_testnet {
221 ("DERIBIT_TESTNET_API_KEY", "DERIBIT_TESTNET_API_SECRET")
222 } else {
223 ("DERIBIT_API_KEY", "DERIBIT_API_SECRET")
224 };
225
226 let api_key = nautilus_core::env::get_or_env_var_opt(api_key, key_env);
228 let api_secret = nautilus_core::env::get_or_env_var_opt(api_secret, secret_env);
229
230 if let (Some(key), Some(secret)) = (api_key, api_secret) {
232 Self::with_credentials(
233 key,
234 secret,
235 None,
236 is_testnet,
237 timeout_secs,
238 max_retries,
239 retry_delay_ms,
240 retry_delay_max_ms,
241 proxy_url,
242 )
243 } else {
244 Self::new(
246 None,
247 is_testnet,
248 timeout_secs,
249 max_retries,
250 retry_delay_ms,
251 retry_delay_max_ms,
252 proxy_url,
253 )
254 }
255 }
256
257 async fn send_request<T, P>(
259 &self,
260 method: &str,
261 params: P,
262 authenticate: bool,
263 ) -> Result<DeribitJsonRpcResponse<T>, DeribitHttpError>
264 where
265 T: DeserializeOwned,
266 P: Serialize,
267 {
268 let operation_id = format!("{}#{}", self.base_url, method);
270 let operation = || {
271 let method = method.to_string();
272 let params_clone = serde_json::to_value(¶ms).unwrap();
273
274 async move {
275 let id = self.request_id.fetch_add(1, Ordering::SeqCst);
277 let request = DeribitJsonRpcRequest {
278 jsonrpc: JSONRPC_VERSION,
279 id,
280 method: method.clone(),
281 params: params_clone.clone(),
282 };
283
284 let body = serde_json::to_vec(&request)?;
285
286 let mut headers = std::collections::HashMap::new();
288 headers.insert("Content-Type".to_string(), "application/json".to_string());
289
290 if authenticate {
292 let credentials = self
293 .credential
294 .as_ref()
295 .ok_or(DeribitHttpError::MissingCredentials)?;
296 let auth_headers = credentials.sign_auth_headers("POST", "/api/v2", &body)?;
297 headers.extend(auth_headers);
298 }
299
300 let resp = self
301 .client
302 .request(
303 Method::POST,
304 self.base_url.clone(),
305 None,
306 Some(headers),
307 Some(body),
308 None,
309 None,
310 )
311 .await
312 .map_err(|e| DeribitHttpError::NetworkError(e.to_string()))?;
313
314 let json_value: serde_json::Value = match serde_json::from_slice(&resp.body) {
320 Ok(json) => json,
321 Err(_) => {
322 let error_body = String::from_utf8_lossy(&resp.body);
324 log::error!(
325 "Non-JSON response: method={method}, status={}, body={error_body}",
326 resp.status.as_u16()
327 );
328 return Err(DeribitHttpError::UnexpectedStatus {
329 status: resp.status.as_u16(),
330 body: error_body.to_string(),
331 });
332 }
333 };
334
335 let json_rpc_response: DeribitJsonRpcResponse<T> =
337 serde_json::from_value(json_value.clone()).map_err(|e| {
338 log::error!(
339 "Failed to deserialize Deribit JSON-RPC response: method={method}, status={}, error={e}",
340 resp.status.as_u16()
341 );
342 log::debug!(
343 "Response JSON (first 2000 chars): {}",
344 &json_value
345 .to_string()
346 .chars()
347 .take(2000)
348 .collect::<String>()
349 );
350 DeribitHttpError::JsonError(e.to_string())
351 })?;
352
353 if json_rpc_response.result.is_some() {
355 Ok(json_rpc_response)
356 } else if let Some(error) = &json_rpc_response.error {
357 log::warn!(
359 "Deribit RPC error response: method={method}, http_status={}, error_code={}, error_message={}, error_data={:?}",
360 resp.status.as_u16(),
361 error.code,
362 error.message,
363 error.data
364 );
365
366 Err(DeribitHttpError::from_jsonrpc_error(
368 error.code,
369 error.message.clone(),
370 error.data.clone(),
371 ))
372 } else {
373 log::error!(
374 "Response contains neither result nor error field: method={method}, status={}, request_id={:?}",
375 resp.status.as_u16(),
376 json_rpc_response.id
377 );
378 Err(DeribitHttpError::JsonError(
379 "Response contains neither result nor error".to_string(),
380 ))
381 }
382 }
383 };
384
385 let should_retry = |error: &DeribitHttpError| -> bool {
394 match error {
395 DeribitHttpError::NetworkError(_) => true,
396 DeribitHttpError::UnexpectedStatus { status, .. } => {
397 *status >= 500 || *status == 429
398 }
399 DeribitHttpError::DeribitError { error_code, .. } => {
400 should_retry_error_code(*error_code)
401 }
402 _ => false,
403 }
404 };
405
406 let create_error = |msg: String| -> DeribitHttpError {
407 if msg == "canceled" {
408 DeribitHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
409 } else {
410 DeribitHttpError::NetworkError(msg)
411 }
412 };
413
414 self.retry_manager
415 .execute_with_retry_with_cancel(
416 &operation_id,
417 operation,
418 should_retry,
419 create_error,
420 &self.cancellation_token,
421 )
422 .await
423 }
424
425 pub async fn get_instruments(
431 &self,
432 params: GetInstrumentsParams,
433 ) -> Result<DeribitJsonRpcResponse<Vec<DeribitInstrument>>, DeribitHttpError> {
434 self.send_request("public/get_instruments", params, false)
435 .await
436 }
437
438 pub async fn get_instrument(
444 &self,
445 params: GetInstrumentParams,
446 ) -> Result<DeribitJsonRpcResponse<DeribitInstrument>, DeribitHttpError> {
447 self.send_request("public/get_instrument", params, false)
448 .await
449 }
450
451 pub async fn get_last_trades_by_instrument_and_time(
457 &self,
458 params: GetLastTradesByInstrumentAndTimeParams,
459 ) -> Result<DeribitJsonRpcResponse<DeribitTradesResponse>, DeribitHttpError> {
460 self.send_request(
461 "public/get_last_trades_by_instrument_and_time",
462 params,
463 false,
464 )
465 .await
466 }
467
468 pub async fn get_tradingview_chart_data(
474 &self,
475 params: GetTradingViewChartDataParams,
476 ) -> Result<DeribitJsonRpcResponse<DeribitTradingViewChartData>, DeribitHttpError> {
477 self.send_request("public/get_tradingview_chart_data", params, false)
478 .await
479 }
480
481 pub async fn get_account_summaries(
490 &self,
491 params: GetAccountSummariesParams,
492 ) -> Result<DeribitJsonRpcResponse<DeribitAccountSummariesResponse>, DeribitHttpError> {
493 self.send_request("private/get_account_summaries", params, true)
494 .await
495 }
496
497 pub async fn get_order_book(
503 &self,
504 params: GetOrderBookParams,
505 ) -> Result<DeribitJsonRpcResponse<DeribitOrderBook>, DeribitHttpError> {
506 self.send_request("public/get_order_book", params, false)
507 .await
508 }
509}
510
511#[derive(Debug)]
516#[cfg_attr(
517 feature = "python",
518 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.deribit")
519)]
520pub struct DeribitHttpClient {
521 pub(crate) inner: Arc<DeribitRawHttpClient>,
522 pub(crate) instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
523 cache_initialized: AtomicBool,
524}
525
526impl Clone for DeribitHttpClient {
527 fn clone(&self) -> Self {
528 let cache_initialized = AtomicBool::new(false);
529
530 let is_initialized = self.cache_initialized.load(Ordering::Acquire);
531 if is_initialized {
532 cache_initialized.store(true, Ordering::Release);
533 }
534
535 Self {
536 inner: self.inner.clone(),
537 instruments_cache: self.instruments_cache.clone(),
538 cache_initialized,
539 }
540 }
541}
542
543impl DeribitHttpClient {
544 #[allow(clippy::too_many_arguments)]
554 pub fn new(
555 base_url: Option<String>,
556 is_testnet: bool,
557 timeout_secs: Option<u64>,
558 max_retries: Option<u32>,
559 retry_delay_ms: Option<u64>,
560 retry_delay_max_ms: Option<u64>,
561 proxy_url: Option<String>,
562 ) -> anyhow::Result<Self> {
563 let raw_client = Arc::new(DeribitRawHttpClient::new(
564 base_url,
565 is_testnet,
566 timeout_secs,
567 max_retries,
568 retry_delay_ms,
569 retry_delay_max_ms,
570 proxy_url,
571 )?);
572
573 Ok(Self {
574 inner: raw_client,
575 instruments_cache: Arc::new(DashMap::new()),
576 cache_initialized: AtomicBool::new(false),
577 })
578 }
579
580 #[allow(clippy::too_many_arguments)]
592 pub fn new_with_env(
593 api_key: Option<String>,
594 api_secret: Option<String>,
595 is_testnet: bool,
596 timeout_secs: Option<u64>,
597 max_retries: Option<u32>,
598 retry_delay_ms: Option<u64>,
599 retry_delay_max_ms: Option<u64>,
600 proxy_url: Option<String>,
601 ) -> anyhow::Result<Self> {
602 let raw_client = Arc::new(DeribitRawHttpClient::new_with_env(
603 api_key,
604 api_secret,
605 is_testnet,
606 timeout_secs,
607 max_retries,
608 retry_delay_ms,
609 retry_delay_max_ms,
610 proxy_url,
611 )?);
612
613 Ok(Self {
614 inner: raw_client,
615 instruments_cache: Arc::new(DashMap::new()),
616 cache_initialized: AtomicBool::new(false),
617 })
618 }
619
620 pub async fn request_instruments(
626 &self,
627 currency: DeribitCurrency,
628 kind: Option<super::models::DeribitInstrumentKind>,
629 ) -> anyhow::Result<Vec<InstrumentAny>> {
630 let params = if let Some(k) = kind {
632 GetInstrumentsParams::with_kind(currency, k)
633 } else {
634 GetInstrumentsParams::new(currency)
635 };
636
637 let full_response = self.inner.get_instruments(params).await?;
639 let result = full_response
640 .result
641 .ok_or_else(|| anyhow::anyhow!("No result in response"))?;
642 let ts_event = extract_server_timestamp(full_response.us_out)?;
643 let ts_init = self.generate_ts_init();
644
645 let mut instruments = Vec::new();
647 let mut skipped_count = 0;
648 let mut error_count = 0;
649
650 for raw_instrument in result {
651 match parse_deribit_instrument_any(&raw_instrument, ts_init, ts_event) {
652 Ok(Some(instrument)) => {
653 instruments.push(instrument);
654 }
655 Ok(None) => {
656 skipped_count += 1;
658 log::debug!(
659 "Skipped unsupported instrument type: {} (kind: {:?})",
660 raw_instrument.instrument_name,
661 raw_instrument.kind
662 );
663 }
664 Err(e) => {
665 error_count += 1;
666 log::warn!(
667 "Failed to parse instrument {}: {}",
668 raw_instrument.instrument_name,
669 e
670 );
671 }
672 }
673 }
674
675 log::info!(
676 "Parsed {} instruments ({} skipped, {} errors)",
677 instruments.len(),
678 skipped_count,
679 error_count
680 );
681
682 Ok(instruments)
683 }
684
685 pub async fn request_instrument(
697 &self,
698 instrument_id: InstrumentId,
699 ) -> anyhow::Result<InstrumentAny> {
700 let params = GetInstrumentParams {
701 instrument_name: instrument_id.symbol.to_string(),
702 };
703
704 let full_response = self.inner.get_instrument(params).await?;
705 let response = full_response
706 .result
707 .ok_or_else(|| anyhow::anyhow!("No result in response"))?;
708 let ts_event = extract_server_timestamp(full_response.us_out)?;
709 let ts_init = self.generate_ts_init();
710
711 match parse_deribit_instrument_any(&response, ts_init, ts_event)? {
712 Some(instrument) => Ok(instrument),
713 None => anyhow::bail!(
714 "Unsupported instrument type: {} (kind: {:?})",
715 response.instrument_name,
716 response.kind
717 ),
718 }
719 }
720
721 pub async fn request_trades(
738 &self,
739 instrument_id: InstrumentId,
740 start: Option<DateTime<Utc>>,
741 end: Option<DateTime<Utc>>,
742 limit: Option<u32>,
743 ) -> anyhow::Result<Vec<TradeTick>> {
744 let (price_precision, size_precision) =
746 if let Some(instrument) = self.get_instrument(&instrument_id.symbol.inner()) {
747 (instrument.price_precision(), instrument.size_precision())
748 } else {
749 log::warn!("Instrument {instrument_id} not in cache, skipping trades request");
750 anyhow::bail!("Instrument {instrument_id} not in cache");
751 };
752
753 let start_timestamp = start.map_or_else(
755 || Utc::now().timestamp_millis() - 3_600_000, |dt| dt.timestamp_millis(),
757 );
758
759 let end_timestamp = end.map_or_else(
760 || Utc::now().timestamp_millis(), |dt| dt.timestamp_millis(),
762 );
763
764 let params = GetLastTradesByInstrumentAndTimeParams::new(
765 instrument_id.symbol.to_string(),
766 start_timestamp,
767 end_timestamp,
768 limit,
769 Some("asc".to_string()), );
771
772 let full_response = self
773 .inner
774 .get_last_trades_by_instrument_and_time(params)
775 .await
776 .map_err(|e| anyhow::anyhow!(e))?;
777
778 let response_data = full_response
779 .result
780 .ok_or_else(|| anyhow::anyhow!("No result in response"))?;
781
782 let ts_init = self.generate_ts_init();
783 let mut trades = Vec::with_capacity(response_data.trades.len());
784
785 for raw_trade in &response_data.trades {
786 match parse_trade_tick(
787 raw_trade,
788 instrument_id,
789 price_precision,
790 size_precision,
791 ts_init,
792 ) {
793 Ok(trade) => trades.push(trade),
794 Err(e) => {
795 log::warn!(
796 "Failed to parse trade {} for {}: {}",
797 raw_trade.trade_id,
798 instrument_id,
799 e
800 );
801 }
802 }
803 }
804
805 Ok(trades)
806 }
807
808 pub async fn request_bars(
823 &self,
824 bar_type: BarType,
825 start: Option<DateTime<Utc>>,
826 end: Option<DateTime<Utc>>,
827 _limit: Option<u32>,
828 ) -> anyhow::Result<Vec<Bar>> {
829 anyhow::ensure!(
830 bar_type.aggregation_source() == AggregationSource::External,
831 "Only EXTERNAL aggregation is supported"
832 );
833
834 let now = Utc::now();
835
836 let end_dt = end.unwrap_or(now);
838 let start_dt = start.unwrap_or(end_dt - chrono::Duration::hours(1));
839
840 if let (Some(s), Some(e)) = (start, end) {
841 anyhow::ensure!(s < e, "Invalid time range: start={s:?} end={e:?}");
842 }
843
844 let spec = bar_type.spec();
846 let step = spec.step.get();
847 let resolution = match spec.aggregation {
848 BarAggregation::Minute => format!("{step}"),
849 BarAggregation::Hour => format!("{}", step * 60),
850 BarAggregation::Day => "1D".to_string(),
851 a => anyhow::bail!("Deribit does not support {a:?} aggregation"),
852 };
853
854 let supported_resolutions = [
856 "1", "3", "5", "10", "15", "30", "60", "120", "180", "360", "720", "1D",
857 ];
858 if !supported_resolutions.contains(&resolution.as_str()) {
859 anyhow::bail!(
860 "Deribit does not support resolution '{resolution}'. Supported: {supported_resolutions:?}"
861 );
862 }
863
864 let instrument_name = bar_type.instrument_id().symbol.to_string();
865 let start_timestamp = start_dt.timestamp_millis();
866 let end_timestamp = end_dt.timestamp_millis();
867
868 let params = GetTradingViewChartDataParams::new(
869 instrument_name,
870 start_timestamp,
871 end_timestamp,
872 resolution,
873 );
874
875 let full_response = self.inner.get_tradingview_chart_data(params).await?;
876 let chart_data = full_response
877 .result
878 .ok_or_else(|| anyhow::anyhow!("No result in response"))?;
879
880 if chart_data.status == "no_data" {
881 log::debug!("No bar data returned for {bar_type}");
882 return Ok(Vec::new());
883 }
884
885 let instrument_id = bar_type.instrument_id();
887 let (price_precision, size_precision) =
888 if let Some(instrument) = self.get_instrument(&instrument_id.symbol.inner()) {
889 (instrument.price_precision(), instrument.size_precision())
890 } else {
891 log::warn!("Instrument {instrument_id} not in cache, skipping bars request");
892 anyhow::bail!("Instrument {instrument_id} not in cache");
893 };
894
895 let ts_init = self.generate_ts_init();
896 let bars = parse_bars(
897 &chart_data,
898 bar_type,
899 price_precision,
900 size_precision,
901 ts_init,
902 )?;
903
904 log::info!("Parsed {} bars for {}", bars.len(), bar_type);
905
906 Ok(bars)
907 }
908
909 pub async fn request_book_snapshot(
924 &self,
925 instrument_id: InstrumentId,
926 depth: Option<u32>,
927 ) -> anyhow::Result<OrderBook> {
928 let (price_precision, size_precision) =
930 if let Some(instrument) = self.get_instrument(&instrument_id.symbol.inner()) {
931 (instrument.price_precision(), instrument.size_precision())
932 } else {
933 log::warn!("Instrument {instrument_id} not in cache, using default precisions");
935 (8u8, 8u8)
936 };
937
938 let params = GetOrderBookParams::new(instrument_id.symbol.to_string(), depth);
939 let full_response = self
940 .inner
941 .get_order_book(params)
942 .await
943 .map_err(|e| anyhow::anyhow!(e))?;
944
945 let order_book_data = full_response
946 .result
947 .ok_or_else(|| anyhow::anyhow!("No result in response"))?;
948
949 let ts_init = self.generate_ts_init();
950 let book = parse_order_book(
951 &order_book_data,
952 instrument_id,
953 price_precision,
954 size_precision,
955 ts_init,
956 )?;
957
958 log::info!(
959 "Fetched order book for {} with {} bids and {} asks",
960 instrument_id,
961 order_book_data.bids.len(),
962 order_book_data.asks.len()
963 );
964
965 Ok(book)
966 }
967
968 pub async fn request_account_state(
979 &self,
980 account_id: AccountId,
981 ) -> anyhow::Result<AccountState> {
982 let params = GetAccountSummariesParams::default();
983 let full_response = self
984 .inner
985 .get_account_summaries(params)
986 .await
987 .map_err(|e| anyhow::anyhow!(e))?;
988 let response_data = full_response
989 .result
990 .ok_or_else(|| anyhow::anyhow!("No result in response"))?;
991 let ts_init = self.generate_ts_init();
992 let ts_event = extract_server_timestamp(full_response.us_out)?;
993
994 parse_account_state(&response_data.summaries, account_id, ts_init, ts_event)
995 }
996
997 fn generate_ts_init(&self) -> UnixNanos {
999 get_atomic_clock_realtime().get_time_ns()
1000 }
1001
1002 pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
1004 for inst in instruments {
1005 self.instruments_cache
1006 .insert(inst.raw_symbol().inner(), inst);
1007 }
1008 self.cache_initialized.store(true, Ordering::Release);
1009 }
1010
1011 #[must_use]
1013 pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
1014 self.instruments_cache
1015 .get(symbol)
1016 .map(|entry| entry.value().clone())
1017 }
1018
1019 #[must_use]
1021 pub fn is_cache_initialized(&self) -> bool {
1022 self.cache_initialized.load(Ordering::Acquire)
1023 }
1024
1025 #[must_use]
1027 pub fn is_testnet(&self) -> bool {
1028 self.inner.is_testnet()
1029 }
1030}