nautilus_deribit/execution/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live execution client implementation for the Deribit adapter.
17
18use std::{
19    future::Future,
20    sync::{
21        Mutex,
22        atomic::{AtomicBool, Ordering},
23    },
24};
25
26use anyhow::Context;
27use async_trait::async_trait;
28use nautilus_common::{
29    clients::ExecutionClient,
30    live::{runner::get_exec_event_sender, runtime::get_runtime},
31    messages::{
32        ExecutionEvent,
33        execution::{
34            BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
35            GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
36            ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
37        },
38    },
39};
40use nautilus_core::{MUTEX_POISONED, UnixNanos};
41use nautilus_live::ExecutionClientCore;
42use nautilus_model::{
43    accounts::AccountAny,
44    enums::OmsType,
45    events::AccountState,
46    identifiers::{AccountId, ClientId, Venue},
47    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
48    types::{AccountBalance, MarginBalance},
49};
50use tokio::task::JoinHandle;
51
52use crate::{
53    common::consts::DERIBIT_VENUE,
54    config::DeribitExecClientConfig,
55    http::{client::DeribitHttpClient, models::DeribitCurrency},
56};
57
58/// Deribit live execution client.
59#[derive(Debug)]
60pub struct DeribitExecutionClient {
61    core: ExecutionClientCore,
62    config: DeribitExecClientConfig,
63    http_client: DeribitHttpClient,
64    exec_event_sender: Option<tokio::sync::mpsc::UnboundedSender<ExecutionEvent>>,
65    started: bool,
66    connected: AtomicBool,
67    instruments_initialized: AtomicBool,
68    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
69}
70
71impl DeribitExecutionClient {
72    /// Creates a new [`DeribitExecutionClient`].
73    ///
74    /// # Errors
75    ///
76    /// Returns an error if the client fails to initialize.
77    pub fn new(core: ExecutionClientCore, config: DeribitExecClientConfig) -> anyhow::Result<Self> {
78        let http_client = if config.has_api_credentials() {
79            DeribitHttpClient::new_with_env(
80                config.api_key.clone(),
81                config.api_secret.clone(),
82                config.use_testnet,
83                config.http_timeout_secs,
84                config.max_retries,
85                config.retry_delay_initial_ms,
86                config.retry_delay_max_ms,
87                None, // proxy_url
88            )?
89        } else {
90            DeribitHttpClient::new(
91                config.base_url_http.clone(),
92                config.use_testnet,
93                config.http_timeout_secs,
94                config.max_retries,
95                config.retry_delay_initial_ms,
96                config.retry_delay_max_ms,
97                None, // proxy_url
98            )?
99        };
100
101        Ok(Self {
102            core,
103            config,
104            http_client,
105            exec_event_sender: None,
106            started: false,
107            connected: AtomicBool::new(false),
108            instruments_initialized: AtomicBool::new(false),
109            pending_tasks: Mutex::new(Vec::new()),
110        })
111    }
112
113    /// Spawns an async task for execution operations.
114    fn spawn_task<F>(&self, description: &'static str, fut: F)
115    where
116        F: Future<Output = anyhow::Result<()>> + Send + 'static,
117    {
118        let runtime = get_runtime();
119        let handle = runtime.spawn(async move {
120            if let Err(e) = fut.await {
121                log::warn!("{description} failed: {e:?}");
122            }
123        });
124
125        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
126        tasks.retain(|handle| !handle.is_finished());
127        tasks.push(handle);
128    }
129
130    /// Aborts all pending async tasks.
131    fn abort_pending_tasks(&self) {
132        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
133        for handle in tasks.drain(..) {
134            handle.abort();
135        }
136    }
137
138    /// Dispatches an account state event to the execution event sender.
139    fn dispatch_account_state(&self, account_state: AccountState) -> anyhow::Result<()> {
140        if let Some(sender) = &self.exec_event_sender {
141            sender
142                .send(ExecutionEvent::Account(account_state))
143                .map_err(|e| anyhow::anyhow!("Failed to send account state: {e}"))?;
144        }
145        Ok(())
146    }
147}
148
149#[async_trait(?Send)]
150impl ExecutionClient for DeribitExecutionClient {
151    fn is_connected(&self) -> bool {
152        self.connected.load(Ordering::Acquire)
153    }
154
155    fn client_id(&self) -> ClientId {
156        self.core.client_id
157    }
158
159    fn account_id(&self) -> AccountId {
160        self.core.account_id
161    }
162
163    fn venue(&self) -> Venue {
164        *DERIBIT_VENUE
165    }
166
167    fn oms_type(&self) -> OmsType {
168        self.core.oms_type
169    }
170
171    fn get_account(&self) -> Option<AccountAny> {
172        self.core.get_account()
173    }
174
175    fn generate_account_state(
176        &self,
177        balances: Vec<AccountBalance>,
178        margins: Vec<MarginBalance>,
179        reported: bool,
180        ts_event: UnixNanos,
181    ) -> anyhow::Result<()> {
182        self.core
183            .generate_account_state(balances, margins, reported, ts_event)
184    }
185
186    fn start(&mut self) -> anyhow::Result<()> {
187        if self.started {
188            return Ok(());
189        }
190
191        self.started = true;
192
193        log::info!(
194            "Started: client_id={}, account_id={}, account_type={:?}, instrument_kinds={:?}, use_testnet={}",
195            self.core.client_id,
196            self.core.account_id,
197            self.core.account_type,
198            self.config.instrument_kinds,
199            self.config.use_testnet
200        );
201        Ok(())
202    }
203
204    fn stop(&mut self) -> anyhow::Result<()> {
205        if !self.started {
206            return Ok(());
207        }
208
209        self.started = false;
210        self.connected.store(false, Ordering::Release);
211        self.abort_pending_tasks();
212        log::info!("Stopped: client_id={}", self.core.client_id);
213        Ok(())
214    }
215
216    async fn connect(&mut self) -> anyhow::Result<()> {
217        if self.connected.load(Ordering::Acquire) {
218            return Ok(());
219        }
220
221        // Initialize exec event sender (must be done in async context after runner is set up)
222        if self.exec_event_sender.is_none() {
223            self.exec_event_sender = Some(get_exec_event_sender());
224        }
225
226        // Fetch and cache instruments
227        if !self.instruments_initialized.load(Ordering::Acquire) {
228            for kind in &self.config.instrument_kinds {
229                let instruments = self
230                    .http_client
231                    .request_instruments(DeribitCurrency::ANY, Some(*kind))
232                    .await
233                    .with_context(|| {
234                        format!("failed to request Deribit instruments for {kind:?}")
235                    })?;
236
237                if instruments.is_empty() {
238                    log::warn!("No instruments returned for {kind:?}");
239                    continue;
240                }
241
242                self.http_client.cache_instruments(instruments);
243            }
244            self.instruments_initialized.store(true, Ordering::Release);
245        }
246
247        // Check if credentials are available before requesting account state
248        if !self.config.has_api_credentials() {
249            let (key_env, secret_env) = if self.config.use_testnet {
250                ("DERIBIT_TESTNET_API_KEY", "DERIBIT_TESTNET_API_SECRET")
251            } else {
252                ("DERIBIT_API_KEY", "DERIBIT_API_SECRET")
253            };
254            anyhow::bail!(
255                "Missing Deribit API credentials. Set environment variables: {key_env} and {secret_env}"
256            );
257        }
258
259        // Fetch initial account state
260        let account_state = self
261            .http_client
262            .request_account_state(self.core.account_id)
263            .await
264            .context("failed to request Deribit account state")?;
265
266        self.dispatch_account_state(account_state)?;
267
268        self.connected.store(true, Ordering::Release);
269        log::info!("Connected: client_id={}", self.core.client_id);
270        Ok(())
271    }
272
273    async fn disconnect(&mut self) -> anyhow::Result<()> {
274        if !self.connected.load(Ordering::Acquire) {
275            return Ok(());
276        }
277
278        self.abort_pending_tasks();
279
280        self.connected.store(false, Ordering::Release);
281        log::info!("Disconnected: client_id={}", self.core.client_id);
282        Ok(())
283    }
284
285    async fn generate_order_status_report(
286        &self,
287        _cmd: &GenerateOrderStatusReport,
288    ) -> anyhow::Result<Option<OrderStatusReport>> {
289        todo!("Implement generate_order_status_report for Deribit execution client");
290    }
291
292    async fn generate_order_status_reports(
293        &self,
294        _cmd: &GenerateOrderStatusReports,
295    ) -> anyhow::Result<Vec<OrderStatusReport>> {
296        todo!("Implement generate_order_status_reports for Deribit execution client");
297    }
298
299    async fn generate_fill_reports(
300        &self,
301        _cmd: GenerateFillReports,
302    ) -> anyhow::Result<Vec<FillReport>> {
303        todo!("Implement generate_fill_reports for Deribit execution client");
304    }
305
306    async fn generate_position_status_reports(
307        &self,
308        _cmd: &GeneratePositionStatusReports,
309    ) -> anyhow::Result<Vec<PositionStatusReport>> {
310        todo!("Implement generate_position_status_reports for Deribit execution client");
311    }
312
313    async fn generate_mass_status(
314        &self,
315        lookback_mins: Option<u64>,
316    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
317        log::warn!("generate_mass_status not yet implemented (lookback_mins={lookback_mins:?})");
318        Ok(None)
319    }
320
321    fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
322        let http_client = self.http_client.clone();
323        let account_id = self.core.account_id;
324        let exec_sender = self.exec_event_sender.clone();
325
326        self.spawn_task("query_account", async move {
327            let account_state = http_client
328                .request_account_state(account_id)
329                .await
330                .context(
331                    "failed to query Deribit account state (check API credentials are valid)",
332                )?;
333
334            if let Some(sender) = exec_sender {
335                sender
336                    .send(ExecutionEvent::Account(account_state))
337                    .map_err(|e| anyhow::anyhow!("Failed to send account state: {e}"))?;
338            }
339            Ok(())
340        });
341
342        Ok(())
343    }
344
345    fn query_order(&self, _cmd: &QueryOrder) -> anyhow::Result<()> {
346        todo!("Implement query_order for Deribit execution client")
347    }
348
349    fn submit_order(&self, _cmd: &SubmitOrder) -> anyhow::Result<()> {
350        todo!("Implement submit_order for Deribit execution client");
351    }
352
353    fn submit_order_list(&self, _cmd: &SubmitOrderList) -> anyhow::Result<()> {
354        todo!("Implement submit_order_list for Deribit execution client");
355    }
356
357    fn modify_order(&self, _cmd: &ModifyOrder) -> anyhow::Result<()> {
358        todo!("Implement modify_order for Deribit execution client");
359    }
360
361    fn cancel_order(&self, _cmd: &CancelOrder) -> anyhow::Result<()> {
362        todo!("Implement cancel_order for Deribit execution client");
363    }
364
365    fn cancel_all_orders(&self, _cmd: &CancelAllOrders) -> anyhow::Result<()> {
366        todo!("Implement cancel_all_orders for Deribit execution client");
367    }
368
369    fn batch_cancel_orders(&self, _cmd: &BatchCancelOrders) -> anyhow::Result<()> {
370        todo!("Implement batch_cancel_orders for Deribit execution client");
371    }
372}