nautilus_deribit/execution/
mod.rs1use std::{
19 future::Future,
20 sync::{
21 Mutex,
22 atomic::{AtomicBool, Ordering},
23 },
24};
25
26use anyhow::Context;
27use async_trait::async_trait;
28use nautilus_common::{
29 live::{runner::get_exec_event_sender, runtime::get_runtime},
30 messages::{
31 ExecutionEvent,
32 execution::{
33 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
34 GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
35 ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
36 },
37 },
38};
39use nautilus_core::{MUTEX_POISONED, UnixNanos};
40use nautilus_execution::client::{ExecutionClient, base::ExecutionClientCore};
41use nautilus_model::{
42 accounts::AccountAny,
43 enums::OmsType,
44 events::AccountState,
45 identifiers::{AccountId, ClientId, Venue},
46 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
47 types::{AccountBalance, MarginBalance},
48};
49use tokio::task::JoinHandle;
50
51use crate::{
52 common::consts::DERIBIT_VENUE,
53 config::DeribitExecClientConfig,
54 http::{client::DeribitHttpClient, models::DeribitCurrency},
55};
56
57#[derive(Debug)]
59pub struct DeribitExecutionClient {
60 core: ExecutionClientCore,
61 config: DeribitExecClientConfig,
62 http_client: DeribitHttpClient,
63 exec_event_sender: Option<tokio::sync::mpsc::UnboundedSender<ExecutionEvent>>,
64 started: bool,
65 connected: AtomicBool,
66 instruments_initialized: AtomicBool,
67 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
68}
69
70impl DeribitExecutionClient {
71 pub fn new(core: ExecutionClientCore, config: DeribitExecClientConfig) -> anyhow::Result<Self> {
77 let http_client = if config.has_api_credentials() {
78 DeribitHttpClient::new_with_env(
79 config.api_key.clone(),
80 config.api_secret.clone(),
81 config.use_testnet,
82 config.http_timeout_secs,
83 config.max_retries,
84 config.retry_delay_initial_ms,
85 config.retry_delay_max_ms,
86 None, )?
88 } else {
89 DeribitHttpClient::new(
90 config.base_url_http.clone(),
91 config.use_testnet,
92 config.http_timeout_secs,
93 config.max_retries,
94 config.retry_delay_initial_ms,
95 config.retry_delay_max_ms,
96 None, )?
98 };
99
100 Ok(Self {
101 core,
102 config,
103 http_client,
104 exec_event_sender: None,
105 started: false,
106 connected: AtomicBool::new(false),
107 instruments_initialized: AtomicBool::new(false),
108 pending_tasks: Mutex::new(Vec::new()),
109 })
110 }
111
112 fn spawn_task<F>(&self, description: &'static str, fut: F)
114 where
115 F: Future<Output = anyhow::Result<()>> + Send + 'static,
116 {
117 let runtime = get_runtime();
118 let handle = runtime.spawn(async move {
119 if let Err(e) = fut.await {
120 tracing::warn!("{description} failed: {e:?}");
121 }
122 });
123
124 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
125 tasks.retain(|handle| !handle.is_finished());
126 tasks.push(handle);
127 }
128
129 fn abort_pending_tasks(&self) {
131 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
132 for handle in tasks.drain(..) {
133 handle.abort();
134 }
135 }
136
137 fn dispatch_account_state(&self, account_state: AccountState) -> anyhow::Result<()> {
139 if let Some(sender) = &self.exec_event_sender {
140 sender
141 .send(ExecutionEvent::Account(account_state))
142 .map_err(|e| anyhow::anyhow!("Failed to send account state: {e}"))?;
143 }
144 Ok(())
145 }
146}
147
148#[async_trait(?Send)]
149impl ExecutionClient for DeribitExecutionClient {
150 fn is_connected(&self) -> bool {
151 self.connected.load(Ordering::Acquire)
152 }
153
154 fn client_id(&self) -> ClientId {
155 self.core.client_id
156 }
157
158 fn account_id(&self) -> AccountId {
159 self.core.account_id
160 }
161
162 fn venue(&self) -> Venue {
163 *DERIBIT_VENUE
164 }
165
166 fn oms_type(&self) -> OmsType {
167 self.core.oms_type
168 }
169
170 fn get_account(&self) -> Option<AccountAny> {
171 self.core.get_account()
172 }
173
174 fn generate_account_state(
175 &self,
176 balances: Vec<AccountBalance>,
177 margins: Vec<MarginBalance>,
178 reported: bool,
179 ts_event: UnixNanos,
180 ) -> anyhow::Result<()> {
181 self.core
182 .generate_account_state(balances, margins, reported, ts_event)
183 }
184
185 fn start(&mut self) -> anyhow::Result<()> {
186 if self.started {
187 return Ok(());
188 }
189
190 self.started = true;
191
192 tracing::info!(
193 client_id = %self.core.client_id,
194 account_id = %self.core.account_id,
195 account_type = ?self.core.account_type,
196 instrument_kinds = ?self.config.instrument_kinds,
197 use_testnet = self.config.use_testnet,
198 "Started"
199 );
200 Ok(())
201 }
202
203 fn stop(&mut self) -> anyhow::Result<()> {
204 if !self.started {
205 return Ok(());
206 }
207
208 self.started = false;
209 self.connected.store(false, Ordering::Release);
210 self.abort_pending_tasks();
211 tracing::info!(client_id = %self.core.client_id, "Stopped");
212 Ok(())
213 }
214
215 async fn connect(&mut self) -> anyhow::Result<()> {
216 if self.connected.load(Ordering::Acquire) {
217 return Ok(());
218 }
219
220 if self.exec_event_sender.is_none() {
222 self.exec_event_sender = Some(get_exec_event_sender());
223 }
224
225 if !self.instruments_initialized.load(Ordering::Acquire) {
227 for kind in &self.config.instrument_kinds {
228 let instruments = self
229 .http_client
230 .request_instruments(DeribitCurrency::ANY, Some(*kind))
231 .await
232 .with_context(|| {
233 format!("failed to request Deribit instruments for {kind:?}")
234 })?;
235
236 if instruments.is_empty() {
237 tracing::warn!("No instruments returned for {kind:?}");
238 continue;
239 }
240
241 self.http_client.cache_instruments(instruments);
242 }
243 self.instruments_initialized.store(true, Ordering::Release);
244 }
245
246 if !self.config.has_api_credentials() {
248 let (key_env, secret_env) = if self.config.use_testnet {
249 ("DERIBIT_TESTNET_API_KEY", "DERIBIT_TESTNET_API_SECRET")
250 } else {
251 ("DERIBIT_API_KEY", "DERIBIT_API_SECRET")
252 };
253 anyhow::bail!(
254 "Missing Deribit API credentials. Set environment variables: {key_env} and {secret_env}"
255 );
256 }
257
258 let account_state = self
260 .http_client
261 .request_account_state(self.core.account_id)
262 .await
263 .context("failed to request Deribit account state")?;
264
265 self.dispatch_account_state(account_state)?;
266
267 self.connected.store(true, Ordering::Release);
268 tracing::info!(client_id = %self.core.client_id, "Connected");
269 Ok(())
270 }
271
272 async fn disconnect(&mut self) -> anyhow::Result<()> {
273 if !self.connected.load(Ordering::Acquire) {
274 return Ok(());
275 }
276
277 self.abort_pending_tasks();
278
279 self.connected.store(false, Ordering::Release);
280 tracing::info!(client_id = %self.core.client_id, "Disconnected");
281 Ok(())
282 }
283
284 async fn generate_order_status_report(
285 &self,
286 _cmd: &GenerateOrderStatusReport,
287 ) -> anyhow::Result<Option<OrderStatusReport>> {
288 todo!("Implement generate_order_status_report for Deribit execution client");
289 }
290
291 async fn generate_order_status_reports(
292 &self,
293 _cmd: &GenerateOrderStatusReports,
294 ) -> anyhow::Result<Vec<OrderStatusReport>> {
295 todo!("Implement generate_order_status_reports for Deribit execution client");
296 }
297
298 async fn generate_fill_reports(
299 &self,
300 _cmd: GenerateFillReports,
301 ) -> anyhow::Result<Vec<FillReport>> {
302 todo!("Implement generate_fill_reports for Deribit execution client");
303 }
304
305 async fn generate_position_status_reports(
306 &self,
307 _cmd: &GeneratePositionStatusReports,
308 ) -> anyhow::Result<Vec<PositionStatusReport>> {
309 todo!("Implement generate_position_status_reports for Deribit execution client");
310 }
311
312 async fn generate_mass_status(
313 &self,
314 lookback_mins: Option<u64>,
315 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
316 tracing::warn!(
317 "generate_mass_status not yet implemented (lookback_mins={lookback_mins:?})"
318 );
319 Ok(None)
320 }
321
322 fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
323 let http_client = self.http_client.clone();
324 let account_id = self.core.account_id;
325 let exec_sender = self.exec_event_sender.clone();
326
327 self.spawn_task("query_account", async move {
328 let account_state = http_client
329 .request_account_state(account_id)
330 .await
331 .context(
332 "failed to query Deribit account state (check API credentials are valid)",
333 )?;
334
335 if let Some(sender) = exec_sender {
336 sender
337 .send(ExecutionEvent::Account(account_state))
338 .map_err(|e| anyhow::anyhow!("Failed to send account state: {e}"))?;
339 }
340 Ok(())
341 });
342
343 Ok(())
344 }
345
346 fn query_order(&self, _cmd: &QueryOrder) -> anyhow::Result<()> {
347 todo!("Implement query_order for Deribit execution client")
348 }
349
350 fn submit_order(&self, _cmd: &SubmitOrder) -> anyhow::Result<()> {
351 todo!("Implement submit_order for Deribit execution client");
352 }
353
354 fn submit_order_list(&self, _cmd: &SubmitOrderList) -> anyhow::Result<()> {
355 todo!("Implement submit_order_list for Deribit execution client");
356 }
357
358 fn modify_order(&self, _cmd: &ModifyOrder) -> anyhow::Result<()> {
359 todo!("Implement modify_order for Deribit execution client");
360 }
361
362 fn cancel_order(&self, _cmd: &CancelOrder) -> anyhow::Result<()> {
363 todo!("Implement cancel_order for Deribit execution client");
364 }
365
366 fn cancel_all_orders(&self, _cmd: &CancelAllOrders) -> anyhow::Result<()> {
367 todo!("Implement cancel_all_orders for Deribit execution client");
368 }
369
370 fn batch_cancel_orders(&self, _cmd: &BatchCancelOrders) -> anyhow::Result<()> {
371 todo!("Implement batch_cancel_orders for Deribit execution client");
372 }
373}