1use std::sync::{
19 Arc,
20 atomic::{AtomicBool, AtomicU64, Ordering},
21};
22
23use dashmap::DashMap;
24use nautilus_core::{nanos::UnixNanos, time::get_atomic_clock_realtime};
25use nautilus_model::{
26 events::AccountState,
27 identifiers::{AccountId, InstrumentId},
28 instruments::{Instrument, InstrumentAny},
29};
30use nautilus_network::{
31 http::{HttpClient, Method},
32 retry::{RetryConfig, RetryManager},
33};
34use serde::{Serialize, de::DeserializeOwned};
35use tokio_util::sync::CancellationToken;
36use ustr::Ustr;
37
38use super::{
39 error::DeribitHttpError,
40 models::{
41 DeribitAccountSummariesResponse, DeribitCurrency, DeribitInstrument, DeribitJsonRpcRequest,
42 DeribitJsonRpcResponse,
43 },
44 query::{GetAccountSummariesParams, GetInstrumentParams, GetInstrumentsParams},
45};
46use crate::common::{
47 consts::{DERIBIT_API_PATH, JSONRPC_VERSION, should_retry_error_code},
48 credential::Credential,
49 parse::{extract_server_timestamp, parse_account_state, parse_deribit_instrument_any},
50 urls::get_http_base_url,
51};
52
53#[allow(dead_code)]
54const DERIBIT_SUCCESS_CODE: i64 = 0;
55
56#[derive(Debug)]
61pub struct DeribitRawHttpClient {
62 base_url: String,
63 client: HttpClient,
64 credential: Option<Credential>,
65 retry_manager: RetryManager<DeribitHttpError>,
66 cancellation_token: CancellationToken,
67 request_id: AtomicU64,
68}
69
70impl DeribitRawHttpClient {
71 #[allow(clippy::too_many_arguments)]
77 pub fn new(
78 base_url: Option<String>,
79 is_testnet: bool,
80 timeout_secs: Option<u64>,
81 max_retries: Option<u32>,
82 retry_delay_ms: Option<u64>,
83 retry_delay_max_ms: Option<u64>,
84 proxy_url: Option<String>,
85 ) -> Result<Self, DeribitHttpError> {
86 let base_url = base_url
87 .unwrap_or_else(|| format!("{}{}", get_http_base_url(is_testnet), DERIBIT_API_PATH));
88 let retry_config = RetryConfig {
89 max_retries: max_retries.unwrap_or(3),
90 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
91 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
92 backoff_factor: 2.0,
93 jitter_ms: 1000,
94 operation_timeout_ms: Some(60_000),
95 immediate_first: false,
96 max_elapsed_ms: Some(180_000),
97 };
98
99 let retry_manager = RetryManager::new(retry_config);
100
101 Ok(Self {
102 base_url,
103 client: HttpClient::new(
104 std::collections::HashMap::new(), Vec::new(), Vec::new(), None, timeout_secs,
109 proxy_url,
110 )
111 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))?,
112 credential: None,
113 retry_manager,
114 cancellation_token: CancellationToken::new(),
115 request_id: AtomicU64::new(1),
116 })
117 }
118
119 pub fn cancellation_token(&self) -> &CancellationToken {
121 &self.cancellation_token
122 }
123
124 #[allow(clippy::too_many_arguments)]
130 pub fn with_credentials(
131 api_key: String,
132 api_secret: String,
133 base_url: Option<String>,
134 is_testnet: bool,
135 timeout_secs: Option<u64>,
136 max_retries: Option<u32>,
137 retry_delay_ms: Option<u64>,
138 retry_delay_max_ms: Option<u64>,
139 proxy_url: Option<String>,
140 ) -> Result<Self, DeribitHttpError> {
141 let base_url = base_url
142 .unwrap_or_else(|| format!("{}{}", get_http_base_url(is_testnet), DERIBIT_API_PATH));
143 let retry_config = RetryConfig {
144 max_retries: max_retries.unwrap_or(3),
145 initial_delay_ms: retry_delay_ms.unwrap_or(1000),
146 max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
147 backoff_factor: 2.0,
148 jitter_ms: 1000,
149 operation_timeout_ms: Some(60_000),
150 immediate_first: false,
151 max_elapsed_ms: Some(180_000),
152 };
153
154 let retry_manager = RetryManager::new(retry_config);
155 let credential = Credential::new(api_key, api_secret);
156
157 Ok(Self {
158 base_url,
159 client: HttpClient::new(
160 std::collections::HashMap::new(),
161 Vec::new(),
162 Vec::new(),
163 None,
164 timeout_secs,
165 proxy_url,
166 )
167 .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))?,
168 credential: Some(credential),
169 retry_manager,
170 cancellation_token: CancellationToken::new(),
171 request_id: AtomicU64::new(1),
172 })
173 }
174
175 #[allow(clippy::too_many_arguments)]
187 pub fn new_with_env(
188 api_key: Option<String>,
189 api_secret: Option<String>,
190 is_testnet: bool,
191 timeout_secs: Option<u64>,
192 max_retries: Option<u32>,
193 retry_delay_ms: Option<u64>,
194 retry_delay_max_ms: Option<u64>,
195 proxy_url: Option<String>,
196 ) -> Result<Self, DeribitHttpError> {
197 let (key_env, secret_env) = if is_testnet {
199 ("DERIBIT_TESTNET_API_KEY", "DERIBIT_TESTNET_API_SECRET")
200 } else {
201 ("DERIBIT_API_KEY", "DERIBIT_API_SECRET")
202 };
203
204 let api_key = nautilus_core::env::get_or_env_var_opt(api_key, key_env);
206 let api_secret = nautilus_core::env::get_or_env_var_opt(api_secret, secret_env);
207
208 if let (Some(key), Some(secret)) = (api_key, api_secret) {
210 Self::with_credentials(
211 key,
212 secret,
213 None,
214 is_testnet,
215 timeout_secs,
216 max_retries,
217 retry_delay_ms,
218 retry_delay_max_ms,
219 proxy_url,
220 )
221 } else {
222 Self::new(
224 None,
225 is_testnet,
226 timeout_secs,
227 max_retries,
228 retry_delay_ms,
229 retry_delay_max_ms,
230 proxy_url,
231 )
232 }
233 }
234
235 async fn send_request<T, P>(
237 &self,
238 method: &str,
239 params: P,
240 authenticate: bool,
241 ) -> Result<DeribitJsonRpcResponse<T>, DeribitHttpError>
242 where
243 T: DeserializeOwned,
244 P: Serialize,
245 {
246 let operation_id = format!("{}#{}", self.base_url, method);
248 let operation = || {
249 let method = method.to_string();
250 let params_clone = serde_json::to_value(¶ms).unwrap();
251
252 async move {
253 let id = self.request_id.fetch_add(1, Ordering::SeqCst);
255 let request = DeribitJsonRpcRequest {
256 jsonrpc: JSONRPC_VERSION,
257 id,
258 method: method.clone(),
259 params: params_clone.clone(),
260 };
261
262 let body = serde_json::to_vec(&request)?;
263
264 let mut headers = std::collections::HashMap::new();
266 headers.insert("Content-Type".to_string(), "application/json".to_string());
267
268 if authenticate {
270 let credentials = self
271 .credential
272 .as_ref()
273 .ok_or(DeribitHttpError::MissingCredentials)?;
274 let auth_headers = credentials.sign_auth_headers("POST", "/api/v2", &body)?;
275 headers.extend(auth_headers);
276 }
277
278 let resp = self
279 .client
280 .request(
281 Method::POST,
282 self.base_url.clone(),
283 None,
284 Some(headers),
285 Some(body),
286 None,
287 None,
288 )
289 .await
290 .map_err(|e| DeribitHttpError::NetworkError(e.to_string()))?;
291
292 let json_value: serde_json::Value = match serde_json::from_slice(&resp.body) {
298 Ok(json) => json,
299 Err(_) => {
300 let error_body = String::from_utf8_lossy(&resp.body);
302 tracing::error!(
303 method = %method,
304 status = resp.status.as_u16(),
305 "Non-JSON response: {error_body}"
306 );
307 return Err(DeribitHttpError::UnexpectedStatus {
308 status: resp.status.as_u16(),
309 body: error_body.to_string(),
310 });
311 }
312 };
313
314 let json_rpc_response: DeribitJsonRpcResponse<T> =
316 serde_json::from_value(json_value.clone()).map_err(|e| {
317 tracing::error!(
318 method = %method,
319 status = resp.status.as_u16(),
320 error = %e,
321 "Failed to deserialize Deribit JSON-RPC response"
322 );
323 tracing::debug!(
324 "Response JSON (first 2000 chars): {}",
325 &json_value
326 .to_string()
327 .chars()
328 .take(2000)
329 .collect::<String>()
330 );
331 DeribitHttpError::JsonError(e.to_string())
332 })?;
333
334 if json_rpc_response.result.is_some() {
336 Ok(json_rpc_response)
337 } else if let Some(error) = &json_rpc_response.error {
338 tracing::warn!(
340 method = %method,
341 http_status = resp.status.as_u16(),
342 error_code = error.code,
343 error_message = %error.message,
344 error_data = ?error.data,
345 "Deribit RPC error response"
346 );
347
348 Err(DeribitHttpError::from_jsonrpc_error(
350 error.code,
351 error.message.clone(),
352 error.data.clone(),
353 ))
354 } else {
355 tracing::error!(
356 method = %method,
357 status = resp.status.as_u16(),
358 request_id = ?json_rpc_response.id,
359 "Response contains neither result nor error field"
360 );
361 Err(DeribitHttpError::JsonError(
362 "Response contains neither result nor error".to_string(),
363 ))
364 }
365 }
366 };
367
368 let should_retry = |error: &DeribitHttpError| -> bool {
377 match error {
378 DeribitHttpError::NetworkError(_) => true,
379 DeribitHttpError::UnexpectedStatus { status, .. } => {
380 *status >= 500 || *status == 429
381 }
382 DeribitHttpError::DeribitError { error_code, .. } => {
383 should_retry_error_code(*error_code)
384 }
385 _ => false,
386 }
387 };
388
389 let create_error = |msg: String| -> DeribitHttpError {
390 if msg == "canceled" {
391 DeribitHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
392 } else {
393 DeribitHttpError::NetworkError(msg)
394 }
395 };
396
397 self.retry_manager
398 .execute_with_retry_with_cancel(
399 &operation_id,
400 operation,
401 should_retry,
402 create_error,
403 &self.cancellation_token,
404 )
405 .await
406 }
407
408 pub async fn get_instruments(
414 &self,
415 params: GetInstrumentsParams,
416 ) -> Result<DeribitJsonRpcResponse<Vec<DeribitInstrument>>, DeribitHttpError> {
417 self.send_request("public/get_instruments", params, false)
418 .await
419 }
420
421 pub async fn get_instrument(
427 &self,
428 params: GetInstrumentParams,
429 ) -> Result<DeribitJsonRpcResponse<DeribitInstrument>, DeribitHttpError> {
430 self.send_request("public/get_instrument", params, false)
431 .await
432 }
433
434 pub async fn get_account_summaries(
443 &self,
444 params: GetAccountSummariesParams,
445 ) -> Result<DeribitJsonRpcResponse<DeribitAccountSummariesResponse>, DeribitHttpError> {
446 self.send_request("private/get_account_summaries", params, true)
447 .await
448 }
449}
450
451#[derive(Debug)]
456pub struct DeribitHttpClient {
457 pub(crate) inner: Arc<DeribitRawHttpClient>,
458 pub(crate) instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
459 cache_initialized: AtomicBool,
460}
461
462impl DeribitHttpClient {
463 #[allow(clippy::too_many_arguments)]
473 pub fn new(
474 base_url: Option<String>,
475 is_testnet: bool,
476 timeout_secs: Option<u64>,
477 max_retries: Option<u32>,
478 retry_delay_ms: Option<u64>,
479 retry_delay_max_ms: Option<u64>,
480 proxy_url: Option<String>,
481 ) -> anyhow::Result<Self> {
482 let raw_client = Arc::new(DeribitRawHttpClient::new(
483 base_url,
484 is_testnet,
485 timeout_secs,
486 max_retries,
487 retry_delay_ms,
488 retry_delay_max_ms,
489 proxy_url,
490 )?);
491
492 Ok(Self {
493 inner: raw_client,
494 instruments_cache: Arc::new(DashMap::new()),
495 cache_initialized: AtomicBool::new(false),
496 })
497 }
498
499 #[allow(clippy::too_many_arguments)]
511 pub fn new_with_env(
512 api_key: Option<String>,
513 api_secret: Option<String>,
514 is_testnet: bool,
515 timeout_secs: Option<u64>,
516 max_retries: Option<u32>,
517 retry_delay_ms: Option<u64>,
518 retry_delay_max_ms: Option<u64>,
519 proxy_url: Option<String>,
520 ) -> anyhow::Result<Self> {
521 let raw_client = Arc::new(DeribitRawHttpClient::new_with_env(
522 api_key,
523 api_secret,
524 is_testnet,
525 timeout_secs,
526 max_retries,
527 retry_delay_ms,
528 retry_delay_max_ms,
529 proxy_url,
530 )?);
531
532 Ok(Self {
533 inner: raw_client,
534 instruments_cache: Arc::new(DashMap::new()),
535 cache_initialized: AtomicBool::new(false),
536 })
537 }
538
539 pub async fn request_instruments(
545 &self,
546 currency: DeribitCurrency,
547 kind: Option<super::models::DeribitInstrumentKind>,
548 ) -> anyhow::Result<Vec<InstrumentAny>> {
549 let params = if let Some(k) = kind {
551 GetInstrumentsParams::with_kind(currency, k)
552 } else {
553 GetInstrumentsParams::new(currency)
554 };
555
556 let full_response = self.inner.get_instruments(params).await?;
558 let result = full_response
559 .result
560 .ok_or_else(|| anyhow::anyhow!("No result in response"))?;
561 let ts_event = extract_server_timestamp(full_response.us_out)?;
562 let ts_init = self.generate_ts_init();
563
564 let mut instruments = Vec::new();
566 let mut skipped_count = 0;
567 let mut error_count = 0;
568
569 for raw_instrument in result {
570 match parse_deribit_instrument_any(&raw_instrument, ts_init, ts_event) {
571 Ok(Some(instrument)) => {
572 instruments.push(instrument);
573 }
574 Ok(None) => {
575 skipped_count += 1;
577 tracing::debug!(
578 "Skipped unsupported instrument type: {} (kind: {:?})",
579 raw_instrument.instrument_name,
580 raw_instrument.kind
581 );
582 }
583 Err(e) => {
584 error_count += 1;
585 tracing::warn!(
586 "Failed to parse instrument {}: {}",
587 raw_instrument.instrument_name,
588 e
589 );
590 }
591 }
592 }
593
594 tracing::info!(
595 "Parsed {} instruments ({} skipped, {} errors)",
596 instruments.len(),
597 skipped_count,
598 error_count
599 );
600
601 Ok(instruments)
602 }
603
604 pub async fn request_instrument(
616 &self,
617 instrument_id: InstrumentId,
618 ) -> anyhow::Result<InstrumentAny> {
619 let params = GetInstrumentParams {
620 instrument_name: instrument_id.symbol.to_string(),
621 };
622
623 let full_response = self.inner.get_instrument(params).await?;
624 let response = full_response
625 .result
626 .ok_or_else(|| anyhow::anyhow!("No result in response"))?;
627 let ts_event = extract_server_timestamp(full_response.us_out)?;
628 let ts_init = self.generate_ts_init();
629
630 match parse_deribit_instrument_any(&response, ts_init, ts_event)? {
631 Some(instrument) => Ok(instrument),
632 None => anyhow::bail!(
633 "Unsupported instrument type: {} (kind: {:?})",
634 response.instrument_name,
635 response.kind
636 ),
637 }
638 }
639
640 pub async fn request_account_state(
651 &self,
652 account_id: AccountId,
653 ) -> anyhow::Result<AccountState> {
654 let params = GetAccountSummariesParams::default();
655 let full_response = self
656 .inner
657 .get_account_summaries(params)
658 .await
659 .map_err(|e| anyhow::anyhow!(e))?;
660 let response_data = full_response
661 .result
662 .ok_or_else(|| anyhow::anyhow!("No result in response"))?;
663 let ts_init = self.generate_ts_init();
664 let ts_event = extract_server_timestamp(full_response.us_out)?;
665
666 parse_account_state(&response_data.summaries, account_id, ts_init, ts_event)
667 }
668
669 fn generate_ts_init(&self) -> UnixNanos {
671 get_atomic_clock_realtime().get_time_ns()
672 }
673
674 pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
676 for inst in instruments {
677 self.instruments_cache
678 .insert(inst.raw_symbol().inner(), inst);
679 }
680 self.cache_initialized.store(true, Ordering::Release);
681 }
682
683 #[must_use]
685 pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
686 self.instruments_cache
687 .get(symbol)
688 .map(|entry| entry.value().clone())
689 }
690
691 #[must_use]
693 pub fn is_cache_initialized(&self) -> bool {
694 self.cache_initialized.load(Ordering::Acquire)
695 }
696}