nautilus_deribit/execution/
mod.rs1use std::{
19 future::Future,
20 sync::{
21 Mutex,
22 atomic::{AtomicBool, Ordering},
23 },
24};
25
26use anyhow::Context;
27use async_trait::async_trait;
28use nautilus_common::{
29 clients::ExecutionClient,
30 live::{runner::get_exec_event_sender, runtime::get_runtime},
31 messages::{
32 ExecutionEvent,
33 execution::{
34 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
35 GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
36 ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
37 },
38 },
39};
40use nautilus_core::{MUTEX_POISONED, UnixNanos};
41use nautilus_live::ExecutionClientCore;
42use nautilus_model::{
43 accounts::AccountAny,
44 enums::OmsType,
45 events::AccountState,
46 identifiers::{AccountId, ClientId, Venue},
47 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
48 types::{AccountBalance, MarginBalance},
49};
50use tokio::task::JoinHandle;
51
52use crate::{
53 common::consts::DERIBIT_VENUE,
54 config::DeribitExecClientConfig,
55 http::{client::DeribitHttpClient, models::DeribitCurrency},
56};
57
58#[derive(Debug)]
60pub struct DeribitExecutionClient {
61 core: ExecutionClientCore,
62 config: DeribitExecClientConfig,
63 http_client: DeribitHttpClient,
64 exec_event_sender: Option<tokio::sync::mpsc::UnboundedSender<ExecutionEvent>>,
65 started: bool,
66 connected: AtomicBool,
67 instruments_initialized: AtomicBool,
68 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
69}
70
71impl DeribitExecutionClient {
72 pub fn new(core: ExecutionClientCore, config: DeribitExecClientConfig) -> anyhow::Result<Self> {
78 let http_client = if config.has_api_credentials() {
79 DeribitHttpClient::new_with_env(
80 config.api_key.clone(),
81 config.api_secret.clone(),
82 config.use_testnet,
83 config.http_timeout_secs,
84 config.max_retries,
85 config.retry_delay_initial_ms,
86 config.retry_delay_max_ms,
87 None, )?
89 } else {
90 DeribitHttpClient::new(
91 config.base_url_http.clone(),
92 config.use_testnet,
93 config.http_timeout_secs,
94 config.max_retries,
95 config.retry_delay_initial_ms,
96 config.retry_delay_max_ms,
97 None, )?
99 };
100
101 Ok(Self {
102 core,
103 config,
104 http_client,
105 exec_event_sender: None,
106 started: false,
107 connected: AtomicBool::new(false),
108 instruments_initialized: AtomicBool::new(false),
109 pending_tasks: Mutex::new(Vec::new()),
110 })
111 }
112
113 fn spawn_task<F>(&self, description: &'static str, fut: F)
115 where
116 F: Future<Output = anyhow::Result<()>> + Send + 'static,
117 {
118 let runtime = get_runtime();
119 let handle = runtime.spawn(async move {
120 if let Err(e) = fut.await {
121 log::warn!("{description} failed: {e:?}");
122 }
123 });
124
125 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
126 tasks.retain(|handle| !handle.is_finished());
127 tasks.push(handle);
128 }
129
130 fn abort_pending_tasks(&self) {
132 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
133 for handle in tasks.drain(..) {
134 handle.abort();
135 }
136 }
137
138 fn dispatch_account_state(&self, account_state: AccountState) -> anyhow::Result<()> {
140 if let Some(sender) = &self.exec_event_sender {
141 sender
142 .send(ExecutionEvent::Account(account_state))
143 .map_err(|e| anyhow::anyhow!("Failed to send account state: {e}"))?;
144 }
145 Ok(())
146 }
147}
148
149#[async_trait(?Send)]
150impl ExecutionClient for DeribitExecutionClient {
151 fn is_connected(&self) -> bool {
152 self.connected.load(Ordering::Acquire)
153 }
154
155 fn client_id(&self) -> ClientId {
156 self.core.client_id
157 }
158
159 fn account_id(&self) -> AccountId {
160 self.core.account_id
161 }
162
163 fn venue(&self) -> Venue {
164 *DERIBIT_VENUE
165 }
166
167 fn oms_type(&self) -> OmsType {
168 self.core.oms_type
169 }
170
171 fn get_account(&self) -> Option<AccountAny> {
172 self.core.get_account()
173 }
174
175 fn generate_account_state(
176 &self,
177 balances: Vec<AccountBalance>,
178 margins: Vec<MarginBalance>,
179 reported: bool,
180 ts_event: UnixNanos,
181 ) -> anyhow::Result<()> {
182 self.core
183 .generate_account_state(balances, margins, reported, ts_event)
184 }
185
186 fn start(&mut self) -> anyhow::Result<()> {
187 if self.started {
188 return Ok(());
189 }
190
191 self.started = true;
192
193 log::info!(
194 "Started: client_id={}, account_id={}, account_type={:?}, instrument_kinds={:?}, use_testnet={}",
195 self.core.client_id,
196 self.core.account_id,
197 self.core.account_type,
198 self.config.instrument_kinds,
199 self.config.use_testnet
200 );
201 Ok(())
202 }
203
204 fn stop(&mut self) -> anyhow::Result<()> {
205 if !self.started {
206 return Ok(());
207 }
208
209 self.started = false;
210 self.connected.store(false, Ordering::Release);
211 self.abort_pending_tasks();
212 log::info!("Stopped: client_id={}", self.core.client_id);
213 Ok(())
214 }
215
216 async fn connect(&mut self) -> anyhow::Result<()> {
217 if self.connected.load(Ordering::Acquire) {
218 return Ok(());
219 }
220
221 if self.exec_event_sender.is_none() {
223 self.exec_event_sender = Some(get_exec_event_sender());
224 }
225
226 if !self.instruments_initialized.load(Ordering::Acquire) {
228 for kind in &self.config.instrument_kinds {
229 let instruments = self
230 .http_client
231 .request_instruments(DeribitCurrency::ANY, Some(*kind))
232 .await
233 .with_context(|| {
234 format!("failed to request Deribit instruments for {kind:?}")
235 })?;
236
237 if instruments.is_empty() {
238 log::warn!("No instruments returned for {kind:?}");
239 continue;
240 }
241
242 self.http_client.cache_instruments(instruments);
243 }
244 self.instruments_initialized.store(true, Ordering::Release);
245 }
246
247 if !self.config.has_api_credentials() {
249 let (key_env, secret_env) = if self.config.use_testnet {
250 ("DERIBIT_TESTNET_API_KEY", "DERIBIT_TESTNET_API_SECRET")
251 } else {
252 ("DERIBIT_API_KEY", "DERIBIT_API_SECRET")
253 };
254 anyhow::bail!(
255 "Missing Deribit API credentials. Set environment variables: {key_env} and {secret_env}"
256 );
257 }
258
259 let account_state = self
261 .http_client
262 .request_account_state(self.core.account_id)
263 .await
264 .context("failed to request Deribit account state")?;
265
266 self.dispatch_account_state(account_state)?;
267
268 self.connected.store(true, Ordering::Release);
269 log::info!("Connected: client_id={}", self.core.client_id);
270 Ok(())
271 }
272
273 async fn disconnect(&mut self) -> anyhow::Result<()> {
274 if !self.connected.load(Ordering::Acquire) {
275 return Ok(());
276 }
277
278 self.abort_pending_tasks();
279
280 self.connected.store(false, Ordering::Release);
281 log::info!("Disconnected: client_id={}", self.core.client_id);
282 Ok(())
283 }
284
285 async fn generate_order_status_report(
286 &self,
287 _cmd: &GenerateOrderStatusReport,
288 ) -> anyhow::Result<Option<OrderStatusReport>> {
289 todo!("Implement generate_order_status_report for Deribit execution client");
290 }
291
292 async fn generate_order_status_reports(
293 &self,
294 _cmd: &GenerateOrderStatusReports,
295 ) -> anyhow::Result<Vec<OrderStatusReport>> {
296 todo!("Implement generate_order_status_reports for Deribit execution client");
297 }
298
299 async fn generate_fill_reports(
300 &self,
301 _cmd: GenerateFillReports,
302 ) -> anyhow::Result<Vec<FillReport>> {
303 todo!("Implement generate_fill_reports for Deribit execution client");
304 }
305
306 async fn generate_position_status_reports(
307 &self,
308 _cmd: &GeneratePositionStatusReports,
309 ) -> anyhow::Result<Vec<PositionStatusReport>> {
310 todo!("Implement generate_position_status_reports for Deribit execution client");
311 }
312
313 async fn generate_mass_status(
314 &self,
315 lookback_mins: Option<u64>,
316 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
317 log::warn!("generate_mass_status not yet implemented (lookback_mins={lookback_mins:?})");
318 Ok(None)
319 }
320
321 fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
322 let http_client = self.http_client.clone();
323 let account_id = self.core.account_id;
324 let exec_sender = self.exec_event_sender.clone();
325
326 self.spawn_task("query_account", async move {
327 let account_state = http_client
328 .request_account_state(account_id)
329 .await
330 .context(
331 "failed to query Deribit account state (check API credentials are valid)",
332 )?;
333
334 if let Some(sender) = exec_sender {
335 sender
336 .send(ExecutionEvent::Account(account_state))
337 .map_err(|e| anyhow::anyhow!("Failed to send account state: {e}"))?;
338 }
339 Ok(())
340 });
341
342 Ok(())
343 }
344
345 fn query_order(&self, _cmd: &QueryOrder) -> anyhow::Result<()> {
346 todo!("Implement query_order for Deribit execution client")
347 }
348
349 fn submit_order(&self, _cmd: &SubmitOrder) -> anyhow::Result<()> {
350 todo!("Implement submit_order for Deribit execution client");
351 }
352
353 fn submit_order_list(&self, _cmd: &SubmitOrderList) -> anyhow::Result<()> {
354 todo!("Implement submit_order_list for Deribit execution client");
355 }
356
357 fn modify_order(&self, _cmd: &ModifyOrder) -> anyhow::Result<()> {
358 todo!("Implement modify_order for Deribit execution client");
359 }
360
361 fn cancel_order(&self, _cmd: &CancelOrder) -> anyhow::Result<()> {
362 todo!("Implement cancel_order for Deribit execution client");
363 }
364
365 fn cancel_all_orders(&self, _cmd: &CancelAllOrders) -> anyhow::Result<()> {
366 todo!("Implement cancel_all_orders for Deribit execution client");
367 }
368
369 fn batch_cancel_orders(&self, _cmd: &BatchCancelOrders) -> anyhow::Result<()> {
370 todo!("Implement batch_cancel_orders for Deribit execution client");
371 }
372}