nautilus_deribit/http/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Deribit HTTP client implementation.
17
18use std::sync::{
19    Arc,
20    atomic::{AtomicBool, AtomicU64, Ordering},
21};
22
23use dashmap::DashMap;
24use nautilus_core::{nanos::UnixNanos, time::get_atomic_clock_realtime};
25use nautilus_model::{
26    events::AccountState,
27    identifiers::{AccountId, InstrumentId},
28    instruments::{Instrument, InstrumentAny},
29};
30use nautilus_network::{
31    http::{HttpClient, Method},
32    retry::{RetryConfig, RetryManager},
33};
34use serde::{Serialize, de::DeserializeOwned};
35use tokio_util::sync::CancellationToken;
36use ustr::Ustr;
37
38use super::{
39    error::DeribitHttpError,
40    models::{
41        DeribitAccountSummariesResponse, DeribitCurrency, DeribitInstrument, DeribitJsonRpcRequest,
42        DeribitJsonRpcResponse,
43    },
44    query::{GetAccountSummariesParams, GetInstrumentParams, GetInstrumentsParams},
45};
46use crate::common::{
47    consts::{DERIBIT_API_PATH, JSONRPC_VERSION, should_retry_error_code},
48    credential::Credential,
49    parse::{extract_server_timestamp, parse_account_state, parse_deribit_instrument_any},
50    urls::get_http_base_url,
51};
52
53#[allow(dead_code)]
54const DERIBIT_SUCCESS_CODE: i64 = 0;
55
56/// Low-level Deribit HTTP client for raw API operations.
57///
58/// This client handles JSON-RPC 2.0 protocol, request signing, rate limiting,
59/// and retry logic. It returns venue-specific response types.
60#[derive(Debug)]
61pub struct DeribitRawHttpClient {
62    base_url: String,
63    client: HttpClient,
64    credential: Option<Credential>,
65    retry_manager: RetryManager<DeribitHttpError>,
66    cancellation_token: CancellationToken,
67    request_id: AtomicU64,
68}
69
70impl DeribitRawHttpClient {
71    /// Creates a new [`DeribitRawHttpClient`].
72    ///
73    /// # Errors
74    ///
75    /// Returns an error if the HTTP client cannot be created.
76    #[allow(clippy::too_many_arguments)]
77    pub fn new(
78        base_url: Option<String>,
79        is_testnet: bool,
80        timeout_secs: Option<u64>,
81        max_retries: Option<u32>,
82        retry_delay_ms: Option<u64>,
83        retry_delay_max_ms: Option<u64>,
84        proxy_url: Option<String>,
85    ) -> Result<Self, DeribitHttpError> {
86        let base_url = base_url
87            .unwrap_or_else(|| format!("{}{}", get_http_base_url(is_testnet), DERIBIT_API_PATH));
88        let retry_config = RetryConfig {
89            max_retries: max_retries.unwrap_or(3),
90            initial_delay_ms: retry_delay_ms.unwrap_or(1000),
91            max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
92            backoff_factor: 2.0,
93            jitter_ms: 1000,
94            operation_timeout_ms: Some(60_000),
95            immediate_first: false,
96            max_elapsed_ms: Some(180_000),
97        };
98
99        let retry_manager = RetryManager::new(retry_config);
100
101        Ok(Self {
102            base_url,
103            client: HttpClient::new(
104                std::collections::HashMap::new(), // headers
105                Vec::new(),                       // header_keys
106                Vec::new(),                       // keyed_quotas
107                None,                             // default_quota
108                timeout_secs,
109                proxy_url,
110            )
111            .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))?,
112            credential: None,
113            retry_manager,
114            cancellation_token: CancellationToken::new(),
115            request_id: AtomicU64::new(1),
116        })
117    }
118
119    /// Get the cancellation token for this client.
120    pub fn cancellation_token(&self) -> &CancellationToken {
121        &self.cancellation_token
122    }
123
124    /// Creates a new [`DeribitRawHttpClient`] with explicit credentials.
125    ///
126    /// # Errors
127    ///
128    /// Returns an error if the HTTP client cannot be created.
129    #[allow(clippy::too_many_arguments)]
130    pub fn with_credentials(
131        api_key: String,
132        api_secret: String,
133        base_url: Option<String>,
134        is_testnet: bool,
135        timeout_secs: Option<u64>,
136        max_retries: Option<u32>,
137        retry_delay_ms: Option<u64>,
138        retry_delay_max_ms: Option<u64>,
139        proxy_url: Option<String>,
140    ) -> Result<Self, DeribitHttpError> {
141        let base_url = base_url
142            .unwrap_or_else(|| format!("{}{}", get_http_base_url(is_testnet), DERIBIT_API_PATH));
143        let retry_config = RetryConfig {
144            max_retries: max_retries.unwrap_or(3),
145            initial_delay_ms: retry_delay_ms.unwrap_or(1000),
146            max_delay_ms: retry_delay_max_ms.unwrap_or(10_000),
147            backoff_factor: 2.0,
148            jitter_ms: 1000,
149            operation_timeout_ms: Some(60_000),
150            immediate_first: false,
151            max_elapsed_ms: Some(180_000),
152        };
153
154        let retry_manager = RetryManager::new(retry_config);
155        let credential = Credential::new(api_key, api_secret);
156
157        Ok(Self {
158            base_url,
159            client: HttpClient::new(
160                std::collections::HashMap::new(),
161                Vec::new(),
162                Vec::new(),
163                None,
164                timeout_secs,
165                proxy_url,
166            )
167            .map_err(|e| anyhow::anyhow!("Failed to create HTTP client: {e}"))?,
168            credential: Some(credential),
169            retry_manager,
170            cancellation_token: CancellationToken::new(),
171            request_id: AtomicU64::new(1),
172        })
173    }
174
175    /// Creates a new [`DeribitRawHttpClient`] with credentials from environment variables.
176    ///
177    /// If `api_key` or `api_secret` are not provided, they will be loaded from environment:
178    /// - Mainnet: `DERIBIT_API_KEY`, `DERIBIT_API_SECRET`
179    /// - Testnet: `DERIBIT_TESTNET_API_KEY`, `DERIBIT_TESTNET_API_SECRET`
180    ///
181    /// # Errors
182    ///
183    /// Returns an error if:
184    /// - The HTTP client cannot be created
185    /// - Credentials are not provided and environment variables are not set
186    #[allow(clippy::too_many_arguments)]
187    pub fn new_with_env(
188        api_key: Option<String>,
189        api_secret: Option<String>,
190        is_testnet: bool,
191        timeout_secs: Option<u64>,
192        max_retries: Option<u32>,
193        retry_delay_ms: Option<u64>,
194        retry_delay_max_ms: Option<u64>,
195        proxy_url: Option<String>,
196    ) -> Result<Self, DeribitHttpError> {
197        // Determine environment variable names based on environment
198        let (key_env, secret_env) = if is_testnet {
199            ("DERIBIT_TESTNET_API_KEY", "DERIBIT_TESTNET_API_SECRET")
200        } else {
201            ("DERIBIT_API_KEY", "DERIBIT_API_SECRET")
202        };
203
204        // Resolve credentials from explicit params or environment
205        let api_key = nautilus_core::env::get_or_env_var_opt(api_key, key_env);
206        let api_secret = nautilus_core::env::get_or_env_var_opt(api_secret, secret_env);
207
208        // If credentials were resolved, create authenticated client
209        if let (Some(key), Some(secret)) = (api_key, api_secret) {
210            Self::with_credentials(
211                key,
212                secret,
213                None,
214                is_testnet,
215                timeout_secs,
216                max_retries,
217                retry_delay_ms,
218                retry_delay_max_ms,
219                proxy_url,
220            )
221        } else {
222            // No credentials - create unauthenticated client
223            Self::new(
224                None,
225                is_testnet,
226                timeout_secs,
227                max_retries,
228                retry_delay_ms,
229                retry_delay_max_ms,
230                proxy_url,
231            )
232        }
233    }
234
235    /// Sends a JSON-RPC 2.0 request to the Deribit API.
236    async fn send_request<T, P>(
237        &self,
238        method: &str,
239        params: P,
240        authenticate: bool,
241    ) -> Result<DeribitJsonRpcResponse<T>, DeribitHttpError>
242    where
243        T: DeserializeOwned,
244        P: Serialize,
245    {
246        // Create operation identifier combining URL and RPC method
247        let operation_id = format!("{}#{}", self.base_url, method);
248        let operation = || {
249            let method = method.to_string();
250            let params_clone = serde_json::to_value(&params).unwrap();
251
252            async move {
253                // Build JSON-RPC request
254                let id = self.request_id.fetch_add(1, Ordering::SeqCst);
255                let request = DeribitJsonRpcRequest {
256                    jsonrpc: JSONRPC_VERSION,
257                    id,
258                    method: method.clone(),
259                    params: params_clone.clone(),
260                };
261
262                let body = serde_json::to_vec(&request)?;
263
264                // Build headers
265                let mut headers = std::collections::HashMap::new();
266                headers.insert("Content-Type".to_string(), "application/json".to_string());
267
268                // Add authentication headers if required
269                if authenticate {
270                    let credentials = self
271                        .credential
272                        .as_ref()
273                        .ok_or(DeribitHttpError::MissingCredentials)?;
274                    let auth_headers = credentials.sign_auth_headers("POST", "/api/v2", &body)?;
275                    headers.extend(auth_headers);
276                }
277
278                let resp = self
279                    .client
280                    .request(
281                        Method::POST,
282                        self.base_url.clone(),
283                        None,
284                        Some(headers),
285                        Some(body),
286                        None,
287                        None,
288                    )
289                    .await
290                    .map_err(|e| DeribitHttpError::NetworkError(e.to_string()))?;
291
292                // Parse JSON-RPC response
293                // Note: Deribit may return JSON-RPC errors with non-2xx HTTP status (e.g., 400)
294                // Always try to parse as JSON-RPC first, then fall back to HTTP error handling
295
296                // Try to parse as JSON first
297                let json_value: serde_json::Value = match serde_json::from_slice(&resp.body) {
298                    Ok(json) => json,
299                    Err(_) => {
300                        // Not valid JSON - treat as HTTP error
301                        let error_body = String::from_utf8_lossy(&resp.body);
302                        tracing::error!(
303                            method = %method,
304                            status = resp.status.as_u16(),
305                            "Non-JSON response: {error_body}"
306                        );
307                        return Err(DeribitHttpError::UnexpectedStatus {
308                            status: resp.status.as_u16(),
309                            body: error_body.to_string(),
310                        });
311                    }
312                };
313
314                // Try to parse as JSON-RPC response
315                let json_rpc_response: DeribitJsonRpcResponse<T> =
316                    serde_json::from_value(json_value.clone()).map_err(|e| {
317                        tracing::error!(
318                            method = %method,
319                            status = resp.status.as_u16(),
320                            error = %e,
321                            "Failed to deserialize Deribit JSON-RPC response"
322                        );
323                        tracing::debug!(
324                            "Response JSON (first 2000 chars): {}",
325                            &json_value
326                                .to_string()
327                                .chars()
328                                .take(2000)
329                                .collect::<String>()
330                        );
331                        DeribitHttpError::JsonError(e.to_string())
332                    })?;
333
334                // Check if it's a success or error result
335                if json_rpc_response.result.is_some() {
336                    Ok(json_rpc_response)
337                } else if let Some(error) = &json_rpc_response.error {
338                    // JSON-RPC error (may come with any HTTP status)
339                    tracing::warn!(
340                        method = %method,
341                        http_status = resp.status.as_u16(),
342                        error_code = error.code,
343                        error_message = %error.message,
344                        error_data = ?error.data,
345                        "Deribit RPC error response"
346                    );
347
348                    // Map JSON-RPC error to appropriate error variant
349                    Err(DeribitHttpError::from_jsonrpc_error(
350                        error.code,
351                        error.message.clone(),
352                        error.data.clone(),
353                    ))
354                } else {
355                    tracing::error!(
356                        method = %method,
357                        status = resp.status.as_u16(),
358                        request_id = ?json_rpc_response.id,
359                        "Response contains neither result nor error field"
360                    );
361                    Err(DeribitHttpError::JsonError(
362                        "Response contains neither result nor error".to_string(),
363                    ))
364                }
365            }
366        };
367
368        // Retry strategy based on Deribit error responses and HTTP status codes:
369        //
370        // 1. Network errors: always retry (transient connection issues)
371        // 2. HTTP 5xx/429: server errors and rate limiting should be retried
372        // 3. Deribit-specific retryable error codes (defined in common::consts)
373        //
374        // Note: Deribit returns many permanent errors which should NOT be retried
375        // (e.g., "invalid_credentials", "not_enough_funds", "order_not_found")
376        let should_retry = |error: &DeribitHttpError| -> bool {
377            match error {
378                DeribitHttpError::NetworkError(_) => true,
379                DeribitHttpError::UnexpectedStatus { status, .. } => {
380                    *status >= 500 || *status == 429
381                }
382                DeribitHttpError::DeribitError { error_code, .. } => {
383                    should_retry_error_code(*error_code)
384                }
385                _ => false,
386            }
387        };
388
389        let create_error = |msg: String| -> DeribitHttpError {
390            if msg == "canceled" {
391                DeribitHttpError::Canceled("Adapter disconnecting or shutting down".to_string())
392            } else {
393                DeribitHttpError::NetworkError(msg)
394            }
395        };
396
397        self.retry_manager
398            .execute_with_retry_with_cancel(
399                &operation_id,
400                operation,
401                should_retry,
402                create_error,
403                &self.cancellation_token,
404            )
405            .await
406    }
407
408    /// Gets available trading instruments.
409    ///
410    /// # Errors
411    ///
412    /// Returns an error if the request fails or the response cannot be parsed.
413    pub async fn get_instruments(
414        &self,
415        params: GetInstrumentsParams,
416    ) -> Result<DeribitJsonRpcResponse<Vec<DeribitInstrument>>, DeribitHttpError> {
417        self.send_request("public/get_instruments", params, false)
418            .await
419    }
420
421    /// Gets details for a specific trading instrument.
422    ///
423    /// # Errors
424    ///
425    /// Returns an error if the request fails or the response cannot be parsed.
426    pub async fn get_instrument(
427        &self,
428        params: GetInstrumentParams,
429    ) -> Result<DeribitJsonRpcResponse<DeribitInstrument>, DeribitHttpError> {
430        self.send_request("public/get_instrument", params, false)
431            .await
432    }
433
434    /// Gets account summaries for all currencies.
435    ///
436    /// # Errors
437    ///
438    /// Returns an error if:
439    /// - Credentials are missing ([`DeribitHttpError::MissingCredentials`])
440    /// - Authentication fails (invalid signature, expired timestamp)
441    /// - The request fails or the response cannot be parsed
442    pub async fn get_account_summaries(
443        &self,
444        params: GetAccountSummariesParams,
445    ) -> Result<DeribitJsonRpcResponse<DeribitAccountSummariesResponse>, DeribitHttpError> {
446        self.send_request("private/get_account_summaries", params, true)
447            .await
448    }
449}
450
451/// High-level Deribit HTTP client with domain-level abstractions.
452///
453/// This client wraps the raw HTTP client and provides methods that use Nautilus
454/// domain types. It maintains an instrument cache for efficient lookups.
455#[derive(Debug)]
456pub struct DeribitHttpClient {
457    pub(crate) inner: Arc<DeribitRawHttpClient>,
458    pub(crate) instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
459    cache_initialized: AtomicBool,
460}
461
462impl DeribitHttpClient {
463    /// Creates a new [`DeribitHttpClient`] with default configuration.
464    ///
465    /// # Parameters
466    /// - `base_url`: Optional custom base URL (for testing)
467    /// - `is_testnet`: Whether to use the testnet environment
468    ///
469    /// # Errors
470    ///
471    /// Returns an error if the HTTP client cannot be created.
472    #[allow(clippy::too_many_arguments)]
473    pub fn new(
474        base_url: Option<String>,
475        is_testnet: bool,
476        timeout_secs: Option<u64>,
477        max_retries: Option<u32>,
478        retry_delay_ms: Option<u64>,
479        retry_delay_max_ms: Option<u64>,
480        proxy_url: Option<String>,
481    ) -> anyhow::Result<Self> {
482        let raw_client = Arc::new(DeribitRawHttpClient::new(
483            base_url,
484            is_testnet,
485            timeout_secs,
486            max_retries,
487            retry_delay_ms,
488            retry_delay_max_ms,
489            proxy_url,
490        )?);
491
492        Ok(Self {
493            inner: raw_client,
494            instruments_cache: Arc::new(DashMap::new()),
495            cache_initialized: AtomicBool::new(false),
496        })
497    }
498
499    /// Creates a new [`DeribitHttpClient`] with credentials from environment variables.
500    ///
501    /// If `api_key` or `api_secret` are not provided, they will be loaded from environment:
502    /// - Mainnet: `DERIBIT_API_KEY`, `DERIBIT_API_SECRET`
503    /// - Testnet: `DERIBIT_TESTNET_API_KEY`, `DERIBIT_TESTNET_API_SECRET`
504    ///
505    /// # Errors
506    ///
507    /// Returns an error if:
508    /// - The HTTP client cannot be created
509    /// - Credentials are not provided and environment variables are not set
510    #[allow(clippy::too_many_arguments)]
511    pub fn new_with_env(
512        api_key: Option<String>,
513        api_secret: Option<String>,
514        is_testnet: bool,
515        timeout_secs: Option<u64>,
516        max_retries: Option<u32>,
517        retry_delay_ms: Option<u64>,
518        retry_delay_max_ms: Option<u64>,
519        proxy_url: Option<String>,
520    ) -> anyhow::Result<Self> {
521        let raw_client = Arc::new(DeribitRawHttpClient::new_with_env(
522            api_key,
523            api_secret,
524            is_testnet,
525            timeout_secs,
526            max_retries,
527            retry_delay_ms,
528            retry_delay_max_ms,
529            proxy_url,
530        )?);
531
532        Ok(Self {
533            inner: raw_client,
534            instruments_cache: Arc::new(DashMap::new()),
535            cache_initialized: AtomicBool::new(false),
536        })
537    }
538
539    /// Requests instruments for a specific currency.
540    ///
541    /// # Errors
542    ///
543    /// Returns an error if the request fails or instruments cannot be parsed.
544    pub async fn request_instruments(
545        &self,
546        currency: DeribitCurrency,
547        kind: Option<super::models::DeribitInstrumentKind>,
548    ) -> anyhow::Result<Vec<InstrumentAny>> {
549        // Build parameters
550        let params = if let Some(k) = kind {
551            GetInstrumentsParams::with_kind(currency, k)
552        } else {
553            GetInstrumentsParams::new(currency)
554        };
555
556        // Call raw client
557        let full_response = self.inner.get_instruments(params).await?;
558        let result = full_response
559            .result
560            .ok_or_else(|| anyhow::anyhow!("No result in response"))?;
561        let ts_event = extract_server_timestamp(full_response.us_out)?;
562        let ts_init = self.generate_ts_init();
563
564        // Parse each instrument
565        let mut instruments = Vec::new();
566        let mut skipped_count = 0;
567        let mut error_count = 0;
568
569        for raw_instrument in result {
570            match parse_deribit_instrument_any(&raw_instrument, ts_init, ts_event) {
571                Ok(Some(instrument)) => {
572                    instruments.push(instrument);
573                }
574                Ok(None) => {
575                    // Unsupported instrument type (e.g., combos)
576                    skipped_count += 1;
577                    tracing::debug!(
578                        "Skipped unsupported instrument type: {} (kind: {:?})",
579                        raw_instrument.instrument_name,
580                        raw_instrument.kind
581                    );
582                }
583                Err(e) => {
584                    error_count += 1;
585                    tracing::warn!(
586                        "Failed to parse instrument {}: {}",
587                        raw_instrument.instrument_name,
588                        e
589                    );
590                }
591            }
592        }
593
594        tracing::info!(
595            "Parsed {} instruments ({} skipped, {} errors)",
596            instruments.len(),
597            skipped_count,
598            error_count
599        );
600
601        Ok(instruments)
602    }
603
604    /// Requests a specific instrument by its Nautilus instrument ID.
605    ///
606    /// This is a high-level method that fetches the raw instrument data from Deribit
607    /// and converts it to a Nautilus `InstrumentAny` type.
608    ///
609    /// # Errors
610    ///
611    /// Returns an error if:
612    /// - The instrument name format is invalid (error code `-32602`)
613    /// - The instrument doesn't exist (error code `13020`)
614    /// - Network or API errors occur
615    pub async fn request_instrument(
616        &self,
617        instrument_id: InstrumentId,
618    ) -> anyhow::Result<InstrumentAny> {
619        let params = GetInstrumentParams {
620            instrument_name: instrument_id.symbol.to_string(),
621        };
622
623        let full_response = self.inner.get_instrument(params).await?;
624        let response = full_response
625            .result
626            .ok_or_else(|| anyhow::anyhow!("No result in response"))?;
627        let ts_event = extract_server_timestamp(full_response.us_out)?;
628        let ts_init = self.generate_ts_init();
629
630        match parse_deribit_instrument_any(&response, ts_init, ts_event)? {
631            Some(instrument) => Ok(instrument),
632            None => anyhow::bail!(
633                "Unsupported instrument type: {} (kind: {:?})",
634                response.instrument_name,
635                response.kind
636            ),
637        }
638    }
639
640    /// Requests account state for all currencies.
641    ///
642    /// Fetches account balance and margin information for all currencies from Deribit
643    /// and converts it to Nautilus [`AccountState`] event.
644    ///
645    /// # Errors
646    ///
647    /// Returns an error if:
648    /// - The request fails
649    /// - Currency conversion fails
650    pub async fn request_account_state(
651        &self,
652        account_id: AccountId,
653    ) -> anyhow::Result<AccountState> {
654        let params = GetAccountSummariesParams::default();
655        let full_response = self
656            .inner
657            .get_account_summaries(params)
658            .await
659            .map_err(|e| anyhow::anyhow!(e))?;
660        let response_data = full_response
661            .result
662            .ok_or_else(|| anyhow::anyhow!("No result in response"))?;
663        let ts_init = self.generate_ts_init();
664        let ts_event = extract_server_timestamp(full_response.us_out)?;
665
666        parse_account_state(&response_data.summaries, account_id, ts_init, ts_event)
667    }
668
669    /// Generates a timestamp for initialization.
670    fn generate_ts_init(&self) -> UnixNanos {
671        get_atomic_clock_realtime().get_time_ns()
672    }
673
674    /// Caches instruments for later retrieval.
675    pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
676        for inst in instruments {
677            self.instruments_cache
678                .insert(inst.raw_symbol().inner(), inst);
679        }
680        self.cache_initialized.store(true, Ordering::Release);
681    }
682
683    /// Retrieves a cached instrument by symbol.
684    #[must_use]
685    pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
686        self.instruments_cache
687            .get(symbol)
688            .map(|entry| entry.value().clone())
689    }
690
691    /// Checks if the instrument cache has been initialized.
692    #[must_use]
693    pub fn is_cache_initialized(&self) -> bool {
694        self.cache_initialized.load(Ordering::Acquire)
695    }
696}