1use std::{
19 collections::HashMap,
20 sync::{
21 Arc,
22 atomic::{AtomicBool, AtomicU64, Ordering},
23 },
24};
25
26use ahash::AHashSet;
27use chrono::{DateTime, Utc};
28use dashmap::DashMap;
29use nautilus_core::{datetime::nanos_to_millis, nanos::UnixNanos, time::get_atomic_clock_realtime};
30use nautilus_model::{
31 data::{Bar, BarType, TradeTick},
32 enums::{AggregationSource, BarAggregation},
33 events::AccountState,
34 identifiers::{AccountId, InstrumentId},
35 instruments::{Instrument, InstrumentAny},
36 orderbook::OrderBook,
37 reports::{FillReport, OrderStatusReport, PositionStatusReport},
38};
39use nautilus_network::{
40 http::{HttpClient, Method},
41 ratelimiter::quota::Quota,
42 retry::{RetryConfig, RetryManager},
43};
44use serde::{Serialize, de::DeserializeOwned};
45use strum::IntoEnumIterator;
46use tokio_util::sync::CancellationToken;
47use ustr::Ustr;
48
49use super::{
50 error::DeribitHttpError,
51 models::{
52 DeribitAccountSummariesResponse, DeribitCurrency, DeribitInstrument, DeribitJsonRpcRequest,
53 DeribitJsonRpcResponse, DeribitPosition, DeribitProductType, DeribitUserTradesResponse,
54 },
55 query::{
56 GetAccountSummariesParams, GetInstrumentParams, GetInstrumentsParams,
57 GetOpenOrdersByInstrumentParams, GetOpenOrdersParams, GetOrderHistoryByCurrencyParams,
58 GetOrderHistoryByInstrumentParams, GetOrderStateParams, GetPositionsParams,
59 GetUserTradesByCurrencyAndTimeParams, GetUserTradesByInstrumentAndTimeParams,
60 },
61};
62use crate::{
63 common::{
64 consts::{
65 DERIBIT_ACCOUNT_RATE_KEY, DERIBIT_API_PATH, DERIBIT_GLOBAL_RATE_KEY,
66 DERIBIT_HTTP_ACCOUNT_QUOTA, DERIBIT_HTTP_ORDER_QUOTA, DERIBIT_HTTP_REST_QUOTA,
67 DERIBIT_ORDER_RATE_KEY, JSONRPC_VERSION, should_retry_error_code,
68 },
69 credential::Credential,
70 parse::{
71 extract_server_timestamp, parse_account_state, parse_bars,
72 parse_deribit_instrument_any, parse_order_book, parse_trade_tick,
73 },
74 urls::get_http_base_url,
75 },
76 http::{
77 models::{DeribitOrderBook, DeribitTradesResponse, DeribitTradingViewChartData},
78 query::{
79 GetLastTradesByInstrumentAndTimeParams, GetOrderBookParams,
80 GetTradingViewChartDataParams,
81 },
82 },
83 websocket::{
84 messages::{DeribitOrderMsg, DeribitUserTradeMsg},
85 parse::{parse_position_status_report, parse_user_order_msg, parse_user_trade_msg},
86 },
87};
88
89pub const DERIBIT_HISTORICAL_TRADES_MAX_COUNT: u32 = 1000;
93
94#[derive(Debug)]
99pub struct DeribitRawHttpClient {
100 base_url: String,
101 client: HttpClient,
102 credential: Option<Credential>,
103 retry_manager: RetryManager<DeribitHttpError>,
104 cancellation_token: CancellationToken,
105 request_id: AtomicU64,
106}
107
108impl DeribitRawHttpClient {
109 #[allow(clippy::too_many_arguments)]
115 pub fn new(
116 base_url: Option<String>,
117 is_testnet: bool,
118 timeout_secs: Option<u64>,
119 max_retries: Option<u32>,
120 retry_delay_ms: Option<u64>,
121 retry_delay_max_ms: Option<u64>,
122 proxy_url: Option<String>,
123 ) -> Result<Self, DeribitHttpError> {
124 let base_url = base_url
125 .unwrap_or_else(|| format!("{}{}", get_http_base_url(is_testnet), DERIBIT_API_PATH));
126 let retry_config = RetryConfig {
127 max_retries: max_retries.unwrap_or(3),
128 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
129 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
130 backoff_factor: 2.0,
131 jitter_ms: 1000,
132 operation_timeout_ms: Some(60_000),
133 immediate_first: false,
134 max_elapsed_ms: Some(180_000),
135 };
136
137 let retry_manager = RetryManager::new(retry_config);
138
139 Ok(Self {
140 base_url,
141 client: HttpClient::new(
142 HashMap::new(),
143 Vec::new(),
144 Self::rate_limiter_quotas(),
145 Some(*DERIBIT_HTTP_REST_QUOTA),
146 timeout_secs,
147 proxy_url,
148 )
149 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))?,
150 credential: None,
151 retry_manager,
152 cancellation_token: CancellationToken::new(),
153 request_id: AtomicU64::new(1),
154 })
155 }
156
157 pub fn cancellation_token(&self) -> &CancellationToken {
159 &self.cancellation_token
160 }
161
162 #[must_use]
164 pub fn is_testnet(&self) -> bool {
165 self.base_url.contains("test")
166 }
167
168 fn rate_limiter_quotas() -> Vec<(String, Quota)> {
175 vec![
176 (
177 DERIBIT_GLOBAL_RATE_KEY.to_string(),
178 *DERIBIT_HTTP_REST_QUOTA,
179 ),
180 (
181 DERIBIT_ORDER_RATE_KEY.to_string(),
182 *DERIBIT_HTTP_ORDER_QUOTA,
183 ),
184 (
185 DERIBIT_ACCOUNT_RATE_KEY.to_string(),
186 *DERIBIT_HTTP_ACCOUNT_QUOTA,
187 ),
188 ]
189 }
190
191 fn rate_limit_keys(method: &str) -> Vec<String> {
195 let mut keys = vec![DERIBIT_GLOBAL_RATE_KEY.to_string()];
196
197 if Self::is_order_method(method) {
199 keys.push(DERIBIT_ORDER_RATE_KEY.to_string());
200 } else if Self::is_account_method(method) {
201 keys.push(DERIBIT_ACCOUNT_RATE_KEY.to_string());
202 }
203
204 keys.push(format!("deribit:{method}"));
206
207 keys
208 }
209
210 fn is_order_method(method: &str) -> bool {
212 matches!(
213 method,
214 "private/buy"
215 | "private/sell"
216 | "private/edit"
217 | "private/cancel"
218 | "private/cancel_all"
219 | "private/cancel_all_by_currency"
220 | "private/cancel_all_by_instrument"
221 | "private/cancel_by_label"
222 | "private/close_position"
223 )
224 }
225
226 fn is_account_method(method: &str) -> bool {
228 matches!(
229 method,
230 "private/get_account_summaries"
231 | "private/get_account_summary"
232 | "private/get_positions"
233 | "private/get_position"
234 | "private/get_open_orders_by_currency"
235 | "private/get_open_orders_by_instrument"
236 | "private/get_order_state"
237 | "private/get_user_trades_by_currency"
238 | "private/get_user_trades_by_instrument"
239 )
240 }
241
242 #[allow(clippy::too_many_arguments)]
248 pub fn with_credentials(
249 api_key: String,
250 api_secret: String,
251 base_url: Option<String>,
252 is_testnet: bool,
253 timeout_secs: Option<u64>,
254 max_retries: Option<u32>,
255 retry_delay_ms: Option<u64>,
256 retry_delay_max_ms: Option<u64>,
257 proxy_url: Option<String>,
258 ) -> Result<Self, DeribitHttpError> {
259 let base_url = base_url
260 .unwrap_or_else(|| format!("{}{}", get_http_base_url(is_testnet), DERIBIT_API_PATH));
261 let retry_config = RetryConfig {
262 max_retries: max_retries.unwrap_or(3),
263 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
264 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
265 backoff_factor: 2.0,
266 jitter_ms: 1000,
267 operation_timeout_ms: Some(60_000),
268 immediate_first: false,
269 max_elapsed_ms: Some(180_000),
270 };
271
272 let retry_manager = RetryManager::new(retry_config);
273 let credential = Credential::new(api_key, api_secret);
274
275 Ok(Self {
276 base_url,
277 client: HttpClient::new(
278 HashMap::new(),
279 Vec::new(),
280 Self::rate_limiter_quotas(),
281 Some(*DERIBIT_HTTP_REST_QUOTA),
282 timeout_secs,
283 proxy_url,
284 )
285 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))?,
286 credential: Some(credential),
287 retry_manager,
288 cancellation_token: CancellationToken::new(),
289 request_id: AtomicU64::new(1),
290 })
291 }
292
293 #[allow(clippy::too_many_arguments)]
305 pub fn new_with_env(
306 api_key: Option<String>,
307 api_secret: Option<String>,
308 is_testnet: bool,
309 timeout_secs: Option<u64>,
310 max_retries: Option<u32>,
311 retry_delay_ms: Option<u64>,
312 retry_delay_max_ms: Option<u64>,
313 proxy_url: Option<String>,
314 ) -> Result<Self, DeribitHttpError> {
315 let (key_env, secret_env) = if is_testnet {
317 ("DERIBIT_TESTNET_API_KEY", "DERIBIT_TESTNET_API_SECRET")
318 } else {
319 ("DERIBIT_API_KEY", "DERIBIT_API_SECRET")
320 };
321
322 let api_key = nautilus_core::env::get_or_env_var_opt(api_key, key_env);
324 let api_secret = nautilus_core::env::get_or_env_var_opt(api_secret, secret_env);
325
326 if let (Some(key), Some(secret)) = (api_key, api_secret) {
328 Self::with_credentials(
329 key,
330 secret,
331 None,
332 is_testnet,
333 timeout_secs,
334 max_retries,
335 retry_delay_ms,
336 retry_delay_max_ms,
337 proxy_url,
338 )
339 } else {
340 Self::new(
342 None,
343 is_testnet,
344 timeout_secs,
345 max_retries,
346 retry_delay_ms,
347 retry_delay_max_ms,
348 proxy_url,
349 )
350 }
351 }
352
353 async fn send_request<T, P>(
355 &self,
356 method: &str,
357 params: P,
358 authenticate: bool,
359 ) -> Result<DeribitJsonRpcResponse<T>, DeribitHttpError>
360 where
361 T: DeserializeOwned,
362 P: Serialize,
363 {
364 let operation_id = format!("{}#{}", self.base_url, method);
366 let params_clone = serde_json::to_value(¶ms)?;
367
368 let operation = || {
369 let method = method.to_string();
370 let params_clone = params_clone.clone();
371
372 async move {
373 let id = self.request_id.fetch_add(1, Ordering::SeqCst);
375 let request = DeribitJsonRpcRequest {
376 jsonrpc: JSONRPC_VERSION,
377 id,
378 method: method.clone(),
379 params: params_clone.clone(),
380 };
381
382 let body = serde_json::to_vec(&request)?;
383
384 let mut headers = HashMap::new();
386 headers.insert("Content-Type".to_string(), "application/json".to_string());
387
388 if authenticate {
390 let credentials = self
391 .credential
392 .as_ref()
393 .ok_or(DeribitHttpError::MissingCredentials)?;
394 let auth_headers = credentials.sign_auth_headers("POST", "/api/v2", &body)?;
395 headers.extend(auth_headers);
396 }
397
398 let rate_limit_keys = Self::rate_limit_keys(&method);
399 let resp = self
400 .client
401 .request(
402 Method::POST,
403 self.base_url.clone(),
404 None,
405 Some(headers),
406 Some(body),
407 None,
408 Some(rate_limit_keys),
409 )
410 .await
411 .map_err(|e| DeribitHttpError::NetworkError(e.to_string()))?;
412
413 let json_value: serde_json::Value = match serde_json::from_slice(&resp.body) {
419 Ok(json) => json,
420 Err(_) => {
421 let error_body = String::from_utf8_lossy(&resp.body);
423 log::error!(
424 "Non-JSON response: method={method}, status={}, body={error_body}",
425 resp.status.as_u16()
426 );
427 return Err(DeribitHttpError::UnexpectedStatus {
428 status: resp.status.as_u16(),
429 body: error_body.to_string(),
430 });
431 }
432 };
433
434 let json_rpc_response: DeribitJsonRpcResponse<T> =
436 serde_json::from_value(json_value.clone()).map_err(|e| {
437 log::error!(
438 "Failed to deserialize Deribit JSON-RPC response: method={method}, status={}, error={e}",
439 resp.status.as_u16()
440 );
441 log::debug!(
442 "Response JSON (first 2000 chars): {}",
443 &json_value
444 .to_string()
445 .chars()
446 .take(2000)
447 .collect::<String>()
448 );
449 DeribitHttpError::JsonError(e.to_string())
450 })?;
451
452 if json_rpc_response.result.is_some() {
454 Ok(json_rpc_response)
455 } else if let Some(error) = &json_rpc_response.error {
456 log::warn!(
458 "Deribit RPC error response: method={method}, http_status={}, error_code={}, error_message={}, error_data={:?}",
459 resp.status.as_u16(),
460 error.code,
461 error.message,
462 error.data
463 );
464
465 Err(DeribitHttpError::from_jsonrpc_error(
467 error.code,
468 error.message.clone(),
469 error.data.clone(),
470 ))
471 } else {
472 log::error!(
473 "Response contains neither result nor error field: method={method}, status={}, request_id={:?}",
474 resp.status.as_u16(),
475 json_rpc_response.id
476 );
477 Err(DeribitHttpError::JsonError(
478 "Response contains neither result nor error".to_string(),
479 ))
480 }
481 }
482 };
483
484 let should_retry = |error: &DeribitHttpError| -> bool {
493 match error {
494 DeribitHttpError::NetworkError(_) => true,
495 DeribitHttpError::UnexpectedStatus { status, .. } => {
496 *status >= 500 || *status == 429
497 }
498 DeribitHttpError::DeribitError { error_code, .. } => {
499 should_retry_error_code(*error_code)
500 }
501 _ => false,
502 }
503 };
504
505 let create_error = |msg: String| -> DeribitHttpError {
506 if msg == "canceled" {
507 DeribitHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
508 } else {
509 DeribitHttpError::NetworkError(msg)
510 }
511 };
512
513 self.retry_manager
514 .execute_with_retry_with_cancel(
515 &operation_id,
516 operation,
517 should_retry,
518 create_error,
519 &self.cancellation_token,
520 )
521 .await
522 }
523
524 pub async fn get_instruments(
530 &self,
531 params: GetInstrumentsParams,
532 ) -> Result<DeribitJsonRpcResponse<Vec<DeribitInstrument>>, DeribitHttpError> {
533 self.send_request("public/get_instruments", params, false)
534 .await
535 }
536
537 pub async fn get_instrument(
543 &self,
544 params: GetInstrumentParams,
545 ) -> Result<DeribitJsonRpcResponse<DeribitInstrument>, DeribitHttpError> {
546 self.send_request("public/get_instrument", params, false)
547 .await
548 }
549
550 pub async fn get_last_trades_by_instrument_and_time(
556 &self,
557 params: GetLastTradesByInstrumentAndTimeParams,
558 ) -> Result<DeribitJsonRpcResponse<DeribitTradesResponse>, DeribitHttpError> {
559 self.send_request(
560 "public/get_last_trades_by_instrument_and_time",
561 params,
562 false,
563 )
564 .await
565 }
566
567 pub async fn get_tradingview_chart_data(
573 &self,
574 params: GetTradingViewChartDataParams,
575 ) -> Result<DeribitJsonRpcResponse<DeribitTradingViewChartData>, DeribitHttpError> {
576 self.send_request("public/get_tradingview_chart_data", params, false)
577 .await
578 }
579
580 pub async fn get_account_summaries(
589 &self,
590 params: GetAccountSummariesParams,
591 ) -> Result<DeribitJsonRpcResponse<DeribitAccountSummariesResponse>, DeribitHttpError> {
592 self.send_request("private/get_account_summaries", params, true)
593 .await
594 }
595
596 pub async fn get_order_book(
602 &self,
603 params: GetOrderBookParams,
604 ) -> Result<DeribitJsonRpcResponse<DeribitOrderBook>, DeribitHttpError> {
605 self.send_request("public/get_order_book", params, false)
606 .await
607 }
608
609 pub async fn get_order_state(
618 &self,
619 params: GetOrderStateParams,
620 ) -> Result<DeribitJsonRpcResponse<DeribitOrderMsg>, DeribitHttpError> {
621 self.send_request("private/get_order_state", params, true)
622 .await
623 }
624
625 pub async fn get_open_orders(
634 &self,
635 params: GetOpenOrdersParams,
636 ) -> Result<DeribitJsonRpcResponse<Vec<DeribitOrderMsg>>, DeribitHttpError> {
637 self.send_request("private/get_open_orders", params, true)
638 .await
639 }
640
641 pub async fn get_open_orders_by_instrument(
650 &self,
651 params: GetOpenOrdersByInstrumentParams,
652 ) -> Result<DeribitJsonRpcResponse<Vec<DeribitOrderMsg>>, DeribitHttpError> {
653 self.send_request("private/get_open_orders_by_instrument", params, true)
654 .await
655 }
656
657 pub async fn get_order_history_by_instrument(
666 &self,
667 params: GetOrderHistoryByInstrumentParams,
668 ) -> Result<DeribitJsonRpcResponse<Vec<DeribitOrderMsg>>, DeribitHttpError> {
669 self.send_request("private/get_order_history_by_instrument", params, true)
670 .await
671 }
672
673 pub async fn get_order_history_by_currency(
682 &self,
683 params: GetOrderHistoryByCurrencyParams,
684 ) -> Result<DeribitJsonRpcResponse<Vec<DeribitOrderMsg>>, DeribitHttpError> {
685 self.send_request("private/get_order_history_by_currency", params, true)
686 .await
687 }
688
689 pub async fn get_user_trades_by_instrument_and_time(
698 &self,
699 params: GetUserTradesByInstrumentAndTimeParams,
700 ) -> Result<DeribitJsonRpcResponse<DeribitUserTradesResponse>, DeribitHttpError> {
701 self.send_request(
702 "private/get_user_trades_by_instrument_and_time",
703 params,
704 true,
705 )
706 .await
707 }
708
709 pub async fn get_user_trades_by_currency_and_time(
718 &self,
719 params: GetUserTradesByCurrencyAndTimeParams,
720 ) -> Result<DeribitJsonRpcResponse<DeribitUserTradesResponse>, DeribitHttpError> {
721 self.send_request("private/get_user_trades_by_currency_and_time", params, true)
722 .await
723 }
724
725 pub async fn get_positions(
734 &self,
735 params: GetPositionsParams,
736 ) -> Result<DeribitJsonRpcResponse<Vec<DeribitPosition>>, DeribitHttpError> {
737 self.send_request("private/get_positions", params, true)
738 .await
739 }
740}
741
742#[derive(Debug)]
747#[cfg_attr(
748 feature = "python",
749 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.deribit")
750)]
751pub struct DeribitHttpClient {
752 pub(crate) inner: Arc<DeribitRawHttpClient>,
753 pub(crate) instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
754 cache_initialized: AtomicBool,
755}
756
757impl Clone for DeribitHttpClient {
758 fn clone(&self) -> Self {
759 let cache_initialized = AtomicBool::new(false);
760
761 let is_initialized = self.cache_initialized.load(Ordering::Acquire);
762 if is_initialized {
763 cache_initialized.store(true, Ordering::Release);
764 }
765
766 Self {
767 inner: self.inner.clone(),
768 instruments_cache: self.instruments_cache.clone(),
769 cache_initialized,
770 }
771 }
772}
773
774impl DeribitHttpClient {
775 #[allow(clippy::too_many_arguments)]
785 pub fn new(
786 base_url: Option<String>,
787 is_testnet: bool,
788 timeout_secs: Option<u64>,
789 max_retries: Option<u32>,
790 retry_delay_ms: Option<u64>,
791 retry_delay_max_ms: Option<u64>,
792 proxy_url: Option<String>,
793 ) -> anyhow::Result<Self> {
794 let raw_client = Arc::new(DeribitRawHttpClient::new(
795 base_url,
796 is_testnet,
797 timeout_secs,
798 max_retries,
799 retry_delay_ms,
800 retry_delay_max_ms,
801 proxy_url,
802 )?);
803
804 Ok(Self {
805 inner: raw_client,
806 instruments_cache: Arc::new(DashMap::new()),
807 cache_initialized: AtomicBool::new(false),
808 })
809 }
810
811 #[allow(clippy::too_many_arguments)]
823 pub fn new_with_env(
824 api_key: Option<String>,
825 api_secret: Option<String>,
826 is_testnet: bool,
827 timeout_secs: Option<u64>,
828 max_retries: Option<u32>,
829 retry_delay_ms: Option<u64>,
830 retry_delay_max_ms: Option<u64>,
831 proxy_url: Option<String>,
832 ) -> anyhow::Result<Self> {
833 let raw_client = Arc::new(DeribitRawHttpClient::new_with_env(
834 api_key,
835 api_secret,
836 is_testnet,
837 timeout_secs,
838 max_retries,
839 retry_delay_ms,
840 retry_delay_max_ms,
841 proxy_url,
842 )?);
843
844 Ok(Self {
845 inner: raw_client,
846 instruments_cache: Arc::new(DashMap::new()),
847 cache_initialized: AtomicBool::new(false),
848 })
849 }
850
851 pub async fn request_instruments(
857 &self,
858 currency: DeribitCurrency,
859 product_type: Option<DeribitProductType>,
860 ) -> anyhow::Result<Vec<InstrumentAny>> {
861 let params = if let Some(pt) = product_type {
863 GetInstrumentsParams::with_kind(currency, pt)
864 } else {
865 GetInstrumentsParams::new(currency)
866 };
867
868 let full_response = self.inner.get_instruments(params).await?;
870 let result = full_response
871 .result
872 .ok_or_else(|| anyhow::anyhow!("No result in response"))?;
873 let ts_event = extract_server_timestamp(full_response.us_out)?;
874 let ts_init = self.generate_ts_init();
875
876 let mut instruments = Vec::new();
878 let mut skipped_count = 0;
879 let mut error_count = 0;
880
881 for raw_instrument in result {
882 match parse_deribit_instrument_any(&raw_instrument, ts_init, ts_event) {
883 Ok(Some(instrument)) => {
884 instruments.push(instrument);
885 }
886 Ok(None) => {
887 skipped_count += 1;
889 log::debug!(
890 "Skipped unsupported instrument type: {} (kind: {:?})",
891 raw_instrument.instrument_name,
892 raw_instrument.kind
893 );
894 }
895 Err(e) => {
896 error_count += 1;
897 log::warn!(
898 "Failed to parse instrument {}: {}",
899 raw_instrument.instrument_name,
900 e
901 );
902 }
903 }
904 }
905
906 log::info!(
907 "Parsed {} instruments ({} skipped, {} errors)",
908 instruments.len(),
909 skipped_count,
910 error_count
911 );
912
913 Ok(instruments)
914 }
915
916 pub async fn request_instrument(
928 &self,
929 instrument_id: InstrumentId,
930 ) -> anyhow::Result<InstrumentAny> {
931 let params = GetInstrumentParams {
932 instrument_name: instrument_id.symbol.to_string(),
933 };
934
935 let full_response = self.inner.get_instrument(params).await?;
936 let response = full_response
937 .result
938 .ok_or_else(|| anyhow::anyhow!("No result in response"))?;
939 let ts_event = extract_server_timestamp(full_response.us_out)?;
940 let ts_init = self.generate_ts_init();
941
942 match parse_deribit_instrument_any(&response, ts_init, ts_event)? {
943 Some(instrument) => Ok(instrument),
944 None => anyhow::bail!(
945 "Unsupported instrument type: {} (kind: {:?})",
946 response.instrument_name,
947 response.kind
948 ),
949 }
950 }
951
952 pub async fn request_trades(
975 &self,
976 instrument_id: InstrumentId,
977 start: Option<DateTime<Utc>>,
978 end: Option<DateTime<Utc>>,
979 limit: Option<u32>,
980 ) -> anyhow::Result<Vec<TradeTick>> {
981 let (price_precision, size_precision) =
983 if let Some(instrument) = self.get_instrument(&instrument_id.symbol.inner()) {
984 (instrument.price_precision(), instrument.size_precision())
985 } else {
986 log::warn!("Instrument {instrument_id} not in cache, skipping trades request");
987 anyhow::bail!("Instrument {instrument_id} not in cache");
988 };
989
990 let now = Utc::now();
992 let end_dt = end.unwrap_or(now);
993 let start_dt = start.unwrap_or(end_dt - chrono::Duration::hours(1));
994
995 if let (Some(s), Some(e)) = (start, end) {
996 anyhow::ensure!(s < e, "Invalid time range: start={s:?} end={e:?}");
997 }
998
999 let mut current_start_timestamp = start_dt.timestamp_millis();
1000 let end_timestamp = end_dt.timestamp_millis();
1001 let ts_init = self.generate_ts_init();
1002 let mut all_trades = Vec::new();
1003 let mut has_more = true;
1004
1005 while has_more {
1007 let params = GetLastTradesByInstrumentAndTimeParams::new(
1008 instrument_id.symbol.to_string(),
1009 current_start_timestamp,
1010 end_timestamp,
1011 Some(DERIBIT_HISTORICAL_TRADES_MAX_COUNT),
1012 Some("asc".to_string()), );
1014
1015 let full_response = self
1016 .inner
1017 .get_last_trades_by_instrument_and_time(params)
1018 .await
1019 .map_err(|e| anyhow::anyhow!(e))?;
1020
1021 let response_data = full_response
1022 .result
1023 .ok_or_else(|| anyhow::anyhow!("No result in response"))?;
1024
1025 has_more = response_data.has_more;
1026
1027 if response_data.trades.is_empty() {
1028 break;
1029 }
1030
1031 let mut last_timestamp = current_start_timestamp;
1033
1034 for raw_trade in &response_data.trades {
1035 match parse_trade_tick(
1036 raw_trade,
1037 instrument_id,
1038 price_precision,
1039 size_precision,
1040 ts_init,
1041 ) {
1042 Ok(trade) => {
1043 last_timestamp = raw_trade.timestamp;
1044 all_trades.push(trade);
1045
1046 if let Some(max) = limit
1048 && all_trades.len() >= max as usize
1049 {
1050 return Ok(all_trades);
1051 }
1052 }
1053 Err(e) => {
1054 log::warn!(
1055 "Failed to parse trade {} for {}: {}",
1056 raw_trade.trade_id,
1057 instrument_id,
1058 e
1059 );
1060 }
1061 }
1062 }
1063
1064 current_start_timestamp = last_timestamp + 1;
1067
1068 if current_start_timestamp >= end_timestamp {
1070 break;
1071 }
1072 }
1073
1074 log::info!(
1075 "Fetched {} historical trades for {} from {} to {}",
1076 all_trades.len(),
1077 instrument_id,
1078 start_dt,
1079 end_dt
1080 );
1081
1082 Ok(all_trades)
1083 }
1084
1085 pub async fn request_bars(
1100 &self,
1101 bar_type: BarType,
1102 start: Option<DateTime<Utc>>,
1103 end: Option<DateTime<Utc>>,
1104 _limit: Option<u32>,
1105 ) -> anyhow::Result<Vec<Bar>> {
1106 anyhow::ensure!(
1107 bar_type.aggregation_source() == AggregationSource::External,
1108 "Only EXTERNAL aggregation is supported"
1109 );
1110
1111 let now = Utc::now();
1112
1113 let end_dt = end.unwrap_or(now);
1115 let start_dt = start.unwrap_or(end_dt - chrono::Duration::hours(1));
1116
1117 if let (Some(s), Some(e)) = (start, end) {
1118 anyhow::ensure!(s < e, "Invalid time range: start={s:?} end={e:?}");
1119 }
1120
1121 let spec = bar_type.spec();
1123 let step = spec.step.get();
1124 let resolution = match spec.aggregation {
1125 BarAggregation::Minute => format!("{step}"),
1126 BarAggregation::Hour => format!("{}", step * 60),
1127 BarAggregation::Day => "1D".to_string(),
1128 a => anyhow::bail!("Deribit does not support {a:?} aggregation"),
1129 };
1130
1131 let supported_resolutions = [
1133 "1", "3", "5", "10", "15", "30", "60", "120", "180", "360", "720", "1D",
1134 ];
1135 if !supported_resolutions.contains(&resolution.as_str()) {
1136 anyhow::bail!(
1137 "Deribit does not support resolution '{resolution}'. Supported: {supported_resolutions:?}"
1138 );
1139 }
1140
1141 let instrument_name = bar_type.instrument_id().symbol.to_string();
1142 let start_timestamp = start_dt.timestamp_millis();
1143 let end_timestamp = end_dt.timestamp_millis();
1144
1145 let params = GetTradingViewChartDataParams::new(
1146 instrument_name,
1147 start_timestamp,
1148 end_timestamp,
1149 resolution,
1150 );
1151
1152 let full_response = self.inner.get_tradingview_chart_data(params).await?;
1153 let chart_data = full_response
1154 .result
1155 .ok_or_else(|| anyhow::anyhow!("No result in response"))?;
1156
1157 if chart_data.status == "no_data" {
1158 log::debug!("No bar data returned for {bar_type}");
1159 return Ok(Vec::new());
1160 }
1161
1162 let instrument_id = bar_type.instrument_id();
1164 let (price_precision, size_precision) =
1165 if let Some(instrument) = self.get_instrument(&instrument_id.symbol.inner()) {
1166 (instrument.price_precision(), instrument.size_precision())
1167 } else {
1168 log::warn!("Instrument {instrument_id} not in cache, skipping bars request");
1169 anyhow::bail!("Instrument {instrument_id} not in cache");
1170 };
1171
1172 let ts_init = self.generate_ts_init();
1173 let bars = parse_bars(
1174 &chart_data,
1175 bar_type,
1176 price_precision,
1177 size_precision,
1178 ts_init,
1179 )?;
1180
1181 log::info!("Parsed {} bars for {}", bars.len(), bar_type);
1182
1183 Ok(bars)
1184 }
1185
1186 pub async fn request_book_snapshot(
1201 &self,
1202 instrument_id: InstrumentId,
1203 depth: Option<u32>,
1204 ) -> anyhow::Result<OrderBook> {
1205 let (price_precision, size_precision) =
1207 if let Some(instrument) = self.get_instrument(&instrument_id.symbol.inner()) {
1208 (instrument.price_precision(), instrument.size_precision())
1209 } else {
1210 log::warn!("Instrument {instrument_id} not in cache, using default precisions");
1212 (8u8, 8u8)
1213 };
1214
1215 let params = GetOrderBookParams::new(instrument_id.symbol.to_string(), depth);
1216 let full_response = self
1217 .inner
1218 .get_order_book(params)
1219 .await
1220 .map_err(|e| anyhow::anyhow!(e))?;
1221
1222 let order_book_data = full_response
1223 .result
1224 .ok_or_else(|| anyhow::anyhow!("No result in response"))?;
1225
1226 let ts_init = self.generate_ts_init();
1227 let book = parse_order_book(
1228 &order_book_data,
1229 instrument_id,
1230 price_precision,
1231 size_precision,
1232 ts_init,
1233 )?;
1234
1235 log::info!(
1236 "Fetched order book for {} with {} bids and {} asks",
1237 instrument_id,
1238 order_book_data.bids.len(),
1239 order_book_data.asks.len()
1240 );
1241
1242 Ok(book)
1243 }
1244
1245 pub async fn request_account_state(
1256 &self,
1257 account_id: AccountId,
1258 ) -> anyhow::Result<AccountState> {
1259 let params = GetAccountSummariesParams::default();
1260 let full_response = self
1261 .inner
1262 .get_account_summaries(params)
1263 .await
1264 .map_err(|e| anyhow::anyhow!(e))?;
1265 let response_data = full_response
1266 .result
1267 .ok_or_else(|| anyhow::anyhow!("No result in response"))?;
1268 let ts_init = self.generate_ts_init();
1269 let ts_event = extract_server_timestamp(full_response.us_out)?;
1270
1271 parse_account_state(&response_data.summaries, account_id, ts_init, ts_event)
1272 }
1273
1274 fn generate_ts_init(&self) -> UnixNanos {
1276 get_atomic_clock_realtime().get_time_ns()
1277 }
1278
1279 pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
1281 for inst in instruments {
1282 self.instruments_cache
1283 .insert(inst.raw_symbol().inner(), inst);
1284 }
1285 self.cache_initialized.store(true, Ordering::Release);
1286 }
1287
1288 #[must_use]
1290 pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
1291 self.instruments_cache
1292 .get(symbol)
1293 .map(|entry| entry.value().clone())
1294 }
1295
1296 #[must_use]
1298 pub fn is_cache_initialized(&self) -> bool {
1299 self.cache_initialized.load(Ordering::Acquire)
1300 }
1301
1302 #[must_use]
1304 pub fn is_testnet(&self) -> bool {
1305 self.inner.is_testnet()
1306 }
1307
1308 pub async fn request_order_status_reports(
1321 &self,
1322 account_id: AccountId,
1323 instrument_id: Option<InstrumentId>,
1324 start: Option<UnixNanos>,
1325 end: Option<UnixNanos>,
1326 open_only: bool,
1327 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1328 let ts_init = self.generate_ts_init();
1329 let mut reports = Vec::new();
1330 let mut seen_order_ids = AHashSet::new();
1331
1332 let mut parse_and_add = |order: &DeribitOrderMsg| {
1333 let symbol = Ustr::from(&order.instrument_name);
1334 if let Some(instrument) = self.get_instrument(&symbol) {
1335 match parse_user_order_msg(order, &instrument, account_id, ts_init) {
1336 Ok(report) => {
1337 let ts_last = report.ts_last;
1339 let in_range = match (start, end) {
1340 (Some(s), Some(e)) => ts_last >= s && ts_last <= e,
1341 (Some(s), None) => ts_last >= s,
1342 (None, Some(e)) => ts_last <= e,
1343 (None, None) => true,
1344 };
1345 if in_range && seen_order_ids.insert(order.order_id.clone()) {
1347 reports.push(report);
1348 }
1349 }
1350 Err(e) => {
1351 log::warn!(
1352 "Failed to parse order {} for {}: {}",
1353 order.order_id,
1354 order.instrument_name,
1355 e
1356 );
1357 }
1358 }
1359 } else {
1360 log::debug!(
1361 "Skipping order {} - instrument {} not in cache",
1362 order.order_id,
1363 order.instrument_name
1364 );
1365 }
1366 };
1367
1368 if let Some(instrument_id) = instrument_id {
1369 let instrument_name = instrument_id.symbol.to_string();
1371
1372 let open_params = GetOpenOrdersByInstrumentParams {
1374 instrument_name: instrument_name.clone(),
1375 r#type: None,
1376 };
1377 if let Some(orders) = self
1378 .inner
1379 .get_open_orders_by_instrument(open_params)
1380 .await?
1381 .result
1382 {
1383 for order in &orders {
1384 parse_and_add(order);
1385 }
1386 }
1387
1388 if !open_only {
1390 let history_params = GetOrderHistoryByInstrumentParams {
1391 instrument_name,
1392 count: Some(100),
1393 offset: None,
1394 include_old: Some(true),
1395 include_unfilled: Some(true),
1396 };
1397 if let Some(orders) = self
1398 .inner
1399 .get_order_history_by_instrument(history_params)
1400 .await?
1401 .result
1402 {
1403 for order in &orders {
1404 parse_and_add(order);
1405 }
1406 }
1407 }
1408 } else {
1409 let open_params = GetOpenOrdersParams::default();
1411 if let Some(orders) = self.inner.get_open_orders(open_params).await?.result {
1412 for order in &orders {
1413 parse_and_add(order);
1414 }
1415 }
1416
1417 if !open_only {
1419 for currency in DeribitCurrency::iter().filter(|c| *c != DeribitCurrency::ANY) {
1420 let history_params = GetOrderHistoryByCurrencyParams {
1421 currency,
1422 kind: None,
1423 count: Some(100),
1424 include_unfilled: Some(true),
1425 };
1426 if let Some(orders) = self
1427 .inner
1428 .get_order_history_by_currency(history_params)
1429 .await?
1430 .result
1431 {
1432 for order in &orders {
1433 parse_and_add(order);
1434 }
1435 }
1436 }
1437 }
1438 }
1439
1440 log::debug!("Generated {} order status reports", reports.len());
1441 Ok(reports)
1442 }
1443
1444 pub async fn request_fill_reports(
1456 &self,
1457 account_id: AccountId,
1458 instrument_id: Option<InstrumentId>,
1459 start: Option<UnixNanos>,
1460 end: Option<UnixNanos>,
1461 ) -> anyhow::Result<Vec<FillReport>> {
1462 let ts_init = self.generate_ts_init();
1463 let now_ms = Utc::now().timestamp_millis();
1464
1465 let start_ms = start.map_or(0, |ns| nanos_to_millis(ns.as_u64()) as i64);
1467 let end_ms = end.map_or(now_ms, |ns| nanos_to_millis(ns.as_u64()) as i64);
1468 let mut reports = Vec::new();
1469
1470 let mut parse_and_add = |trade: &DeribitUserTradeMsg| {
1472 let symbol = Ustr::from(&trade.instrument_name);
1473 if let Some(instrument) = self.get_instrument(&symbol) {
1474 match parse_user_trade_msg(trade, &instrument, account_id, ts_init) {
1475 Ok(report) => reports.push(report),
1476 Err(e) => {
1477 log::warn!(
1478 "Failed to parse trade {} for {}: {}",
1479 trade.trade_id,
1480 trade.instrument_name,
1481 e
1482 );
1483 }
1484 }
1485 } else {
1486 log::debug!(
1487 "Skipping trade {} - instrument {} not in cache",
1488 trade.trade_id,
1489 trade.instrument_name
1490 );
1491 }
1492 };
1493
1494 if let Some(instrument_id) = instrument_id {
1495 let params = GetUserTradesByInstrumentAndTimeParams {
1497 instrument_name: instrument_id.symbol.to_string(),
1498 start_timestamp: start_ms,
1499 end_timestamp: end_ms,
1500 count: Some(1000),
1501 sorting: None,
1502 };
1503 if let Some(response) = self
1504 .inner
1505 .get_user_trades_by_instrument_and_time(params)
1506 .await?
1507 .result
1508 {
1509 for trade in &response.trades {
1510 parse_and_add(trade);
1511 }
1512 }
1513 } else {
1514 for currency in DeribitCurrency::iter().filter(|c| *c != DeribitCurrency::ANY) {
1516 let params = GetUserTradesByCurrencyAndTimeParams {
1517 currency,
1518 start_timestamp: start_ms,
1519 end_timestamp: end_ms,
1520 kind: None,
1521 count: Some(1000),
1522 };
1523 if let Some(response) = self
1524 .inner
1525 .get_user_trades_by_currency_and_time(params)
1526 .await?
1527 .result
1528 {
1529 for trade in &response.trades {
1530 parse_and_add(trade);
1531 }
1532 }
1533 }
1534 }
1535
1536 log::debug!("Generated {} fill reports", reports.len());
1537 Ok(reports)
1538 }
1539
1540 pub async fn request_position_status_reports(
1552 &self,
1553 account_id: AccountId,
1554 instrument_id: Option<InstrumentId>,
1555 ) -> anyhow::Result<Vec<PositionStatusReport>> {
1556 let ts_init = self.generate_ts_init();
1557 let mut reports = Vec::new();
1558
1559 let params = GetPositionsParams {
1561 currency: DeribitCurrency::ANY,
1562 kind: None,
1563 };
1564 if let Some(positions) = self.inner.get_positions(params).await?.result {
1565 for position in &positions {
1566 if position.size.is_zero() {
1568 continue;
1569 }
1570
1571 let symbol = position.instrument_name;
1572 if let Some(instrument) = self.get_instrument(&symbol) {
1573 let report =
1574 parse_position_status_report(position, &instrument, account_id, ts_init);
1575 reports.push(report);
1576 } else {
1577 log::debug!(
1578 "Skipping position - instrument {} not in cache",
1579 position.instrument_name
1580 );
1581 }
1582 }
1583 }
1584
1585 if let Some(instrument_id) = instrument_id {
1587 reports.retain(|r| r.instrument_id == instrument_id);
1588 }
1589
1590 log::debug!("Generated {} position status reports", reports.len());
1591 Ok(reports)
1592 }
1593}
1594
1595#[cfg(test)]
1596mod tests {
1597 use rstest::rstest;
1598
1599 use super::*;
1600 use crate::common::consts::{
1601 DERIBIT_ACCOUNT_RATE_KEY, DERIBIT_GLOBAL_RATE_KEY, DERIBIT_ORDER_RATE_KEY,
1602 };
1603
1604 #[rstest]
1605 #[case("private/buy", true, false)]
1606 #[case("private/cancel", true, false)]
1607 #[case("private/get_account_summaries", false, true)]
1608 #[case("private/get_positions", false, true)]
1609 #[case("public/get_instruments", false, false)]
1610 fn test_method_classification(
1611 #[case] method: &str,
1612 #[case] is_order: bool,
1613 #[case] is_account: bool,
1614 ) {
1615 assert_eq!(DeribitRawHttpClient::is_order_method(method), is_order);
1616 assert_eq!(DeribitRawHttpClient::is_account_method(method), is_account);
1617 }
1618
1619 #[rstest]
1620 #[case("private/buy", vec![DERIBIT_GLOBAL_RATE_KEY, DERIBIT_ORDER_RATE_KEY])]
1621 #[case("private/get_account_summaries", vec![DERIBIT_GLOBAL_RATE_KEY, DERIBIT_ACCOUNT_RATE_KEY])]
1622 #[case("public/get_instruments", vec![DERIBIT_GLOBAL_RATE_KEY])]
1623 fn test_rate_limit_keys(#[case] method: &str, #[case] expected_keys: Vec<&str>) {
1624 let keys = DeribitRawHttpClient::rate_limit_keys(method);
1625
1626 for key in &expected_keys {
1627 assert!(keys.contains(&key.to_string()));
1628 }
1629 assert!(keys.contains(&format!("deribit:{method}")));
1630 }
1631}