nautilus_deribit/execution/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live execution client implementation for the Deribit adapter.
17
18use std::{
19    future::Future,
20    sync::{
21        Mutex,
22        atomic::{AtomicBool, Ordering},
23    },
24};
25
26use anyhow::Context;
27use async_trait::async_trait;
28use nautilus_common::{
29    live::{runner::get_exec_event_sender, runtime::get_runtime},
30    messages::{
31        ExecutionEvent,
32        execution::{
33            BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
34            GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
35            ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
36        },
37    },
38};
39use nautilus_core::{MUTEX_POISONED, UnixNanos};
40use nautilus_execution::client::{ExecutionClient, base::ExecutionClientCore};
41use nautilus_model::{
42    accounts::AccountAny,
43    enums::OmsType,
44    events::AccountState,
45    identifiers::{AccountId, ClientId, Venue},
46    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
47    types::{AccountBalance, MarginBalance},
48};
49use tokio::task::JoinHandle;
50
51use crate::{
52    common::consts::DERIBIT_VENUE,
53    config::DeribitExecClientConfig,
54    http::{client::DeribitHttpClient, models::DeribitCurrency},
55};
56
57/// Deribit live execution client.
58#[derive(Debug)]
59pub struct DeribitExecutionClient {
60    core: ExecutionClientCore,
61    config: DeribitExecClientConfig,
62    http_client: DeribitHttpClient,
63    exec_event_sender: Option<tokio::sync::mpsc::UnboundedSender<ExecutionEvent>>,
64    started: bool,
65    connected: AtomicBool,
66    instruments_initialized: AtomicBool,
67    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
68}
69
70impl DeribitExecutionClient {
71    /// Creates a new [`DeribitExecutionClient`].
72    ///
73    /// # Errors
74    ///
75    /// Returns an error if the client fails to initialize.
76    pub fn new(core: ExecutionClientCore, config: DeribitExecClientConfig) -> anyhow::Result<Self> {
77        let http_client = if config.has_api_credentials() {
78            DeribitHttpClient::new_with_env(
79                config.api_key.clone(),
80                config.api_secret.clone(),
81                config.use_testnet,
82                config.http_timeout_secs,
83                config.max_retries,
84                config.retry_delay_initial_ms,
85                config.retry_delay_max_ms,
86                None, // proxy_url
87            )?
88        } else {
89            DeribitHttpClient::new(
90                config.base_url_http.clone(),
91                config.use_testnet,
92                config.http_timeout_secs,
93                config.max_retries,
94                config.retry_delay_initial_ms,
95                config.retry_delay_max_ms,
96                None, // proxy_url
97            )?
98        };
99
100        Ok(Self {
101            core,
102            config,
103            http_client,
104            exec_event_sender: None,
105            started: false,
106            connected: AtomicBool::new(false),
107            instruments_initialized: AtomicBool::new(false),
108            pending_tasks: Mutex::new(Vec::new()),
109        })
110    }
111
112    /// Spawns an async task for execution operations.
113    fn spawn_task<F>(&self, description: &'static str, fut: F)
114    where
115        F: Future<Output = anyhow::Result<()>> + Send + 'static,
116    {
117        let runtime = get_runtime();
118        let handle = runtime.spawn(async move {
119            if let Err(e) = fut.await {
120                tracing::warn!("{description} failed: {e:?}");
121            }
122        });
123
124        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
125        tasks.retain(|handle| !handle.is_finished());
126        tasks.push(handle);
127    }
128
129    /// Aborts all pending async tasks.
130    fn abort_pending_tasks(&self) {
131        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
132        for handle in tasks.drain(..) {
133            handle.abort();
134        }
135    }
136
137    /// Dispatches an account state event to the execution event sender.
138    fn dispatch_account_state(&self, account_state: AccountState) -> anyhow::Result<()> {
139        if let Some(sender) = &self.exec_event_sender {
140            sender
141                .send(ExecutionEvent::Account(account_state))
142                .map_err(|e| anyhow::anyhow!("Failed to send account state: {e}"))?;
143        }
144        Ok(())
145    }
146}
147
148#[async_trait(?Send)]
149impl ExecutionClient for DeribitExecutionClient {
150    fn is_connected(&self) -> bool {
151        self.connected.load(Ordering::Acquire)
152    }
153
154    fn client_id(&self) -> ClientId {
155        self.core.client_id
156    }
157
158    fn account_id(&self) -> AccountId {
159        self.core.account_id
160    }
161
162    fn venue(&self) -> Venue {
163        *DERIBIT_VENUE
164    }
165
166    fn oms_type(&self) -> OmsType {
167        self.core.oms_type
168    }
169
170    fn get_account(&self) -> Option<AccountAny> {
171        self.core.get_account()
172    }
173
174    fn generate_account_state(
175        &self,
176        balances: Vec<AccountBalance>,
177        margins: Vec<MarginBalance>,
178        reported: bool,
179        ts_event: UnixNanos,
180    ) -> anyhow::Result<()> {
181        self.core
182            .generate_account_state(balances, margins, reported, ts_event)
183    }
184
185    fn start(&mut self) -> anyhow::Result<()> {
186        if self.started {
187            return Ok(());
188        }
189
190        self.started = true;
191
192        tracing::info!(
193            client_id = %self.core.client_id,
194            account_id = %self.core.account_id,
195            account_type = ?self.core.account_type,
196            instrument_kinds = ?self.config.instrument_kinds,
197            use_testnet = self.config.use_testnet,
198            "Started"
199        );
200        Ok(())
201    }
202
203    fn stop(&mut self) -> anyhow::Result<()> {
204        if !self.started {
205            return Ok(());
206        }
207
208        self.started = false;
209        self.connected.store(false, Ordering::Release);
210        self.abort_pending_tasks();
211        tracing::info!(client_id = %self.core.client_id, "Stopped");
212        Ok(())
213    }
214
215    async fn connect(&mut self) -> anyhow::Result<()> {
216        if self.connected.load(Ordering::Acquire) {
217            return Ok(());
218        }
219
220        // Initialize exec event sender (must be done in async context after runner is set up)
221        if self.exec_event_sender.is_none() {
222            self.exec_event_sender = Some(get_exec_event_sender());
223        }
224
225        // Fetch and cache instruments
226        if !self.instruments_initialized.load(Ordering::Acquire) {
227            for kind in &self.config.instrument_kinds {
228                let instruments = self
229                    .http_client
230                    .request_instruments(DeribitCurrency::ANY, Some(*kind))
231                    .await
232                    .with_context(|| {
233                        format!("failed to request Deribit instruments for {kind:?}")
234                    })?;
235
236                if instruments.is_empty() {
237                    tracing::warn!("No instruments returned for {kind:?}");
238                    continue;
239                }
240
241                self.http_client.cache_instruments(instruments);
242            }
243            self.instruments_initialized.store(true, Ordering::Release);
244        }
245
246        // Check if credentials are available before requesting account state
247        if !self.config.has_api_credentials() {
248            let (key_env, secret_env) = if self.config.use_testnet {
249                ("DERIBIT_TESTNET_API_KEY", "DERIBIT_TESTNET_API_SECRET")
250            } else {
251                ("DERIBIT_API_KEY", "DERIBIT_API_SECRET")
252            };
253            anyhow::bail!(
254                "Missing Deribit API credentials. Set environment variables: {key_env} and {secret_env}"
255            );
256        }
257
258        // Fetch initial account state
259        let account_state = self
260            .http_client
261            .request_account_state(self.core.account_id)
262            .await
263            .context("failed to request Deribit account state")?;
264
265        self.dispatch_account_state(account_state)?;
266
267        self.connected.store(true, Ordering::Release);
268        tracing::info!(client_id = %self.core.client_id, "Connected");
269        Ok(())
270    }
271
272    async fn disconnect(&mut self) -> anyhow::Result<()> {
273        if !self.connected.load(Ordering::Acquire) {
274            return Ok(());
275        }
276
277        self.abort_pending_tasks();
278
279        self.connected.store(false, Ordering::Release);
280        tracing::info!(client_id = %self.core.client_id, "Disconnected");
281        Ok(())
282    }
283
284    async fn generate_order_status_report(
285        &self,
286        _cmd: &GenerateOrderStatusReport,
287    ) -> anyhow::Result<Option<OrderStatusReport>> {
288        todo!("Implement generate_order_status_report for Deribit execution client");
289    }
290
291    async fn generate_order_status_reports(
292        &self,
293        _cmd: &GenerateOrderStatusReports,
294    ) -> anyhow::Result<Vec<OrderStatusReport>> {
295        todo!("Implement generate_order_status_reports for Deribit execution client");
296    }
297
298    async fn generate_fill_reports(
299        &self,
300        _cmd: GenerateFillReports,
301    ) -> anyhow::Result<Vec<FillReport>> {
302        todo!("Implement generate_fill_reports for Deribit execution client");
303    }
304
305    async fn generate_position_status_reports(
306        &self,
307        _cmd: &GeneratePositionStatusReports,
308    ) -> anyhow::Result<Vec<PositionStatusReport>> {
309        todo!("Implement generate_position_status_reports for Deribit execution client");
310    }
311
312    async fn generate_mass_status(
313        &self,
314        lookback_mins: Option<u64>,
315    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
316        tracing::warn!(
317            "generate_mass_status not yet implemented (lookback_mins={lookback_mins:?})"
318        );
319        Ok(None)
320    }
321
322    fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
323        let http_client = self.http_client.clone();
324        let account_id = self.core.account_id;
325        let exec_sender = self.exec_event_sender.clone();
326
327        self.spawn_task("query_account", async move {
328            let account_state = http_client
329                .request_account_state(account_id)
330                .await
331                .context(
332                    "failed to query Deribit account state (check API credentials are valid)",
333                )?;
334
335            if let Some(sender) = exec_sender {
336                sender
337                    .send(ExecutionEvent::Account(account_state))
338                    .map_err(|e| anyhow::anyhow!("Failed to send account state: {e}"))?;
339            }
340            Ok(())
341        });
342
343        Ok(())
344    }
345
346    fn query_order(&self, _cmd: &QueryOrder) -> anyhow::Result<()> {
347        todo!("Implement query_order for Deribit execution client")
348    }
349
350    fn submit_order(&self, _cmd: &SubmitOrder) -> anyhow::Result<()> {
351        todo!("Implement submit_order for Deribit execution client");
352    }
353
354    fn submit_order_list(&self, _cmd: &SubmitOrderList) -> anyhow::Result<()> {
355        todo!("Implement submit_order_list for Deribit execution client");
356    }
357
358    fn modify_order(&self, _cmd: &ModifyOrder) -> anyhow::Result<()> {
359        todo!("Implement modify_order for Deribit execution client");
360    }
361
362    fn cancel_order(&self, _cmd: &CancelOrder) -> anyhow::Result<()> {
363        todo!("Implement cancel_order for Deribit execution client");
364    }
365
366    fn cancel_all_orders(&self, _cmd: &CancelAllOrders) -> anyhow::Result<()> {
367        todo!("Implement cancel_all_orders for Deribit execution client");
368    }
369
370    fn batch_cancel_orders(&self, _cmd: &BatchCancelOrders) -> anyhow::Result<()> {
371        todo!("Implement batch_cancel_orders for Deribit execution client");
372    }
373}