1use futures_util::StreamExt;
39use nautilus_common::live::get_runtime;
40use nautilus_core::python::{to_pyruntime_err, to_pyvalue_err};
41use nautilus_model::{
42 data::{Data, OrderBookDeltas_API},
43 identifiers::InstrumentId,
44 python::{
45 data::data_to_pycapsule,
46 instruments::{instrument_any_to_pyobject, pyobject_to_instrument_any},
47 },
48};
49use pyo3::{exceptions::PyRuntimeError, prelude::*};
50
51use crate::websocket::{
52 client::DeribitWebSocketClient, enums::DeribitUpdateInterval, messages::NautilusWsMessage,
53};
54
55fn call_python_with_data<F>(callback: &Py<PyAny>, f: F)
57where
58 F: for<'py> FnOnce(Python<'py>) -> PyResult<Py<PyAny>>,
59{
60 Python::attach(|py| {
61 let result = f(py);
62 match result {
63 Ok(obj) => {
64 if let Err(e) = callback.call1(py, (obj,)) {
65 tracing::error!("Error calling Python callback: {e}");
66 }
67 }
68 Err(e) => {
69 tracing::error!("Error converting to Python object: {e}");
70 }
71 }
72 });
73}
74
75fn call_python(py: Python<'_>, callback: &Py<PyAny>, obj: Py<PyAny>) {
77 if let Err(e) = callback.call1(py, (obj,)) {
78 tracing::error!("Error calling Python callback: {e}");
79 }
80}
81
82#[pymethods]
83impl DeribitWebSocketClient {
84 #[new]
85 #[pyo3(signature = (
86 url=None,
87 api_key=None,
88 api_secret=None,
89 heartbeat_interval=None,
90 is_testnet=false,
91 ))]
92 fn py_new(
93 url: Option<String>,
94 api_key: Option<String>,
95 api_secret: Option<String>,
96 heartbeat_interval: Option<u64>,
97 is_testnet: bool,
98 ) -> PyResult<Self> {
99 Self::new(url, api_key, api_secret, heartbeat_interval, is_testnet).map_err(to_pyvalue_err)
100 }
101
102 #[staticmethod]
103 #[pyo3(name = "new_public")]
104 fn py_new_public(is_testnet: bool) -> PyResult<Self> {
105 Self::new_public(is_testnet).map_err(to_pyvalue_err)
106 }
107
108 #[staticmethod]
109 #[pyo3(name = "with_credentials")]
110 fn py_with_credentials(is_testnet: bool) -> PyResult<Self> {
111 Self::with_credentials(is_testnet).map_err(to_pyvalue_err)
112 }
113
114 #[getter]
115 #[pyo3(name = "url")]
116 #[must_use]
117 pub fn py_url(&self) -> String {
118 self.url().to_string()
119 }
120
121 #[getter]
122 #[pyo3(name = "is_testnet")]
123 #[must_use]
124 pub fn py_is_testnet(&self) -> bool {
125 self.url().contains("test")
127 }
128
129 #[pyo3(name = "is_active")]
130 #[must_use]
131 fn py_is_active(&self) -> bool {
132 self.is_active()
133 }
134
135 #[pyo3(name = "is_closed")]
136 #[must_use]
137 fn py_is_closed(&self) -> bool {
138 self.is_closed()
139 }
140
141 #[pyo3(name = "has_credentials")]
142 #[must_use]
143 fn py_has_credentials(&self) -> bool {
144 self.has_credentials()
145 }
146
147 #[pyo3(name = "is_authenticated")]
148 #[must_use]
149 fn py_is_authenticated(&self) -> bool {
150 self.is_authenticated()
151 }
152
153 #[pyo3(name = "cancel_all_requests")]
154 pub fn py_cancel_all_requests(&self) {
155 self.cancel_all_requests();
156 }
157
158 #[pyo3(name = "cache_instruments")]
164 pub fn py_cache_instruments(
165 &self,
166 py: Python<'_>,
167 instruments: Vec<Py<PyAny>>,
168 ) -> PyResult<()> {
169 let instruments: Result<Vec<_>, _> = instruments
170 .into_iter()
171 .map(|inst| pyobject_to_instrument_any(py, inst))
172 .collect();
173 self.cache_instruments(instruments?);
174 Ok(())
175 }
176
177 #[pyo3(name = "cache_instrument")]
183 pub fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
184 let inst = pyobject_to_instrument_any(py, instrument)?;
185 self.cache_instrument(inst);
186 Ok(())
187 }
188
189 #[pyo3(name = "connect")]
194 fn py_connect<'py>(
195 &mut self,
196 py: Python<'py>,
197 instruments: Vec<Py<PyAny>>,
198 callback: Py<PyAny>,
199 ) -> PyResult<Bound<'py, PyAny>> {
200 let mut instruments_any = Vec::new();
201 for inst in instruments {
202 let inst_any = pyobject_to_instrument_any(py, inst)?;
203 instruments_any.push(inst_any);
204 }
205
206 self.cache_instruments(instruments_any);
207
208 let mut client = self.clone();
209
210 pyo3_async_runtimes::tokio::future_into_py(py, async move {
211 client.connect().await.map_err(to_pyruntime_err)?;
212
213 let stream = client.stream();
214
215 get_runtime().spawn(async move {
217 let _client = client;
218 tokio::pin!(stream);
219
220 while let Some(msg) = stream.next().await {
221 match msg {
222 NautilusWsMessage::Instrument(msg) => {
223 call_python_with_data(&callback, |py| {
224 instrument_any_to_pyobject(py, *msg)
225 });
226 }
227 NautilusWsMessage::Data(msg) => Python::attach(|py| {
228 for data in msg {
229 let py_obj = data_to_pycapsule(py, data);
230 call_python(py, &callback, py_obj);
231 }
232 }),
233 NautilusWsMessage::Deltas(msg) => Python::attach(|py| {
234 let py_obj =
235 data_to_pycapsule(py, Data::Deltas(OrderBookDeltas_API::new(msg)));
236 call_python(py, &callback, py_obj);
237 }),
238 NautilusWsMessage::Error(err) => {
239 tracing::error!("Deribit WebSocket error: {err}");
240 }
241 NautilusWsMessage::Reconnected => {
242 tracing::info!("Deribit WebSocket reconnected");
243 }
244 NautilusWsMessage::Authenticated(auth_result) => {
245 tracing::info!(
246 "Deribit WebSocket authenticated (scope: {})",
247 auth_result.scope
248 );
249 }
250 NautilusWsMessage::Raw(msg) => {
251 tracing::debug!("Received raw message, skipping: {msg}");
252 }
253 }
254 }
255 });
256
257 Ok(())
258 })
259 }
260
261 #[pyo3(name = "wait_until_active")]
262 fn py_wait_until_active<'py>(
263 &self,
264 py: Python<'py>,
265 timeout_secs: f64,
266 ) -> PyResult<Bound<'py, PyAny>> {
267 let client = self.clone();
268
269 pyo3_async_runtimes::tokio::future_into_py(py, async move {
270 client
271 .wait_until_active(timeout_secs)
272 .await
273 .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
274 Ok(())
275 })
276 }
277
278 #[pyo3(name = "close")]
279 fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
280 let client = self.clone();
281
282 pyo3_async_runtimes::tokio::future_into_py(py, async move {
283 if let Err(e) = client.close().await {
284 tracing::error!("Error on close: {e}");
285 }
286 Ok(())
287 })
288 }
289
290 #[pyo3(name = "authenticate")]
295 #[pyo3(signature = (session_name=None))]
296 fn py_authenticate<'py>(
297 &self,
298 py: Python<'py>,
299 session_name: Option<String>,
300 ) -> PyResult<Bound<'py, PyAny>> {
301 let client = self.clone();
302
303 pyo3_async_runtimes::tokio::future_into_py(py, async move {
304 client
305 .authenticate(session_name.as_deref())
306 .await
307 .map_err(to_pyruntime_err)?;
308 Ok(())
309 })
310 }
311
312 #[pyo3(name = "authenticate_session")]
316 fn py_authenticate_session<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
317 let client = self.clone();
318
319 pyo3_async_runtimes::tokio::future_into_py(py, async move {
320 client
321 .authenticate_session()
322 .await
323 .map_err(to_pyruntime_err)?;
324 Ok(())
325 })
326 }
327
328 #[pyo3(name = "subscribe_trades")]
339 #[pyo3(signature = (instrument_id, interval=None))]
340 fn py_subscribe_trades<'py>(
341 &self,
342 py: Python<'py>,
343 instrument_id: InstrumentId,
344 interval: Option<DeribitUpdateInterval>,
345 ) -> PyResult<Bound<'py, PyAny>> {
346 let client = self.clone();
347
348 pyo3_async_runtimes::tokio::future_into_py(py, async move {
349 client
350 .subscribe_trades(instrument_id, interval)
351 .await
352 .map_err(to_pyvalue_err)
353 })
354 }
355
356 #[pyo3(name = "subscribe_trades_raw")]
358 fn py_subscribe_trades_raw<'py>(
359 &self,
360 py: Python<'py>,
361 instrument_id: InstrumentId,
362 ) -> PyResult<Bound<'py, PyAny>> {
363 let client = self.clone();
364
365 pyo3_async_runtimes::tokio::future_into_py(py, async move {
366 client
367 .subscribe_trades_raw(instrument_id)
368 .await
369 .map_err(to_pyvalue_err)
370 })
371 }
372
373 #[pyo3(name = "unsubscribe_trades")]
375 #[pyo3(signature = (instrument_id, interval=None))]
376 fn py_unsubscribe_trades<'py>(
377 &self,
378 py: Python<'py>,
379 instrument_id: InstrumentId,
380 interval: Option<DeribitUpdateInterval>,
381 ) -> PyResult<Bound<'py, PyAny>> {
382 let client = self.clone();
383
384 pyo3_async_runtimes::tokio::future_into_py(py, async move {
385 client
386 .unsubscribe_trades(instrument_id, interval)
387 .await
388 .map_err(to_pyvalue_err)
389 })
390 }
391
392 #[pyo3(name = "subscribe_book")]
399 #[pyo3(signature = (instrument_id, interval=None))]
400 fn py_subscribe_book<'py>(
401 &self,
402 py: Python<'py>,
403 instrument_id: InstrumentId,
404 interval: Option<DeribitUpdateInterval>,
405 ) -> PyResult<Bound<'py, PyAny>> {
406 let client = self.clone();
407
408 pyo3_async_runtimes::tokio::future_into_py(py, async move {
409 client
410 .subscribe_book(instrument_id, interval)
411 .await
412 .map_err(to_pyvalue_err)
413 })
414 }
415
416 #[pyo3(name = "subscribe_book_raw")]
418 fn py_subscribe_book_raw<'py>(
419 &self,
420 py: Python<'py>,
421 instrument_id: InstrumentId,
422 ) -> PyResult<Bound<'py, PyAny>> {
423 let client = self.clone();
424
425 pyo3_async_runtimes::tokio::future_into_py(py, async move {
426 client
427 .subscribe_book_raw(instrument_id)
428 .await
429 .map_err(to_pyvalue_err)
430 })
431 }
432
433 #[pyo3(name = "unsubscribe_book")]
435 #[pyo3(signature = (instrument_id, interval=None))]
436 fn py_unsubscribe_book<'py>(
437 &self,
438 py: Python<'py>,
439 instrument_id: InstrumentId,
440 interval: Option<DeribitUpdateInterval>,
441 ) -> PyResult<Bound<'py, PyAny>> {
442 let client = self.clone();
443
444 pyo3_async_runtimes::tokio::future_into_py(py, async move {
445 client
446 .unsubscribe_book(instrument_id, interval)
447 .await
448 .map_err(to_pyvalue_err)
449 })
450 }
451
452 #[pyo3(name = "subscribe_ticker")]
459 #[pyo3(signature = (instrument_id, interval=None))]
460 fn py_subscribe_ticker<'py>(
461 &self,
462 py: Python<'py>,
463 instrument_id: InstrumentId,
464 interval: Option<DeribitUpdateInterval>,
465 ) -> PyResult<Bound<'py, PyAny>> {
466 let client = self.clone();
467
468 pyo3_async_runtimes::tokio::future_into_py(py, async move {
469 client
470 .subscribe_ticker(instrument_id, interval)
471 .await
472 .map_err(to_pyvalue_err)
473 })
474 }
475
476 #[pyo3(name = "subscribe_ticker_raw")]
478 fn py_subscribe_ticker_raw<'py>(
479 &self,
480 py: Python<'py>,
481 instrument_id: InstrumentId,
482 ) -> PyResult<Bound<'py, PyAny>> {
483 let client = self.clone();
484
485 pyo3_async_runtimes::tokio::future_into_py(py, async move {
486 client
487 .subscribe_ticker_raw(instrument_id)
488 .await
489 .map_err(to_pyvalue_err)
490 })
491 }
492
493 #[pyo3(name = "unsubscribe_ticker")]
495 #[pyo3(signature = (instrument_id, interval=None))]
496 fn py_unsubscribe_ticker<'py>(
497 &self,
498 py: Python<'py>,
499 instrument_id: InstrumentId,
500 interval: Option<DeribitUpdateInterval>,
501 ) -> PyResult<Bound<'py, PyAny>> {
502 let client = self.clone();
503
504 pyo3_async_runtimes::tokio::future_into_py(py, async move {
505 client
506 .unsubscribe_ticker(instrument_id, interval)
507 .await
508 .map_err(to_pyvalue_err)
509 })
510 }
511
512 #[pyo3(name = "subscribe_quotes")]
514 fn py_subscribe_quotes<'py>(
515 &self,
516 py: Python<'py>,
517 instrument_id: InstrumentId,
518 ) -> PyResult<Bound<'py, PyAny>> {
519 let client = self.clone();
520
521 pyo3_async_runtimes::tokio::future_into_py(py, async move {
522 client
523 .subscribe_quotes(instrument_id)
524 .await
525 .map_err(to_pyvalue_err)
526 })
527 }
528
529 #[pyo3(name = "unsubscribe_quotes")]
531 fn py_unsubscribe_quotes<'py>(
532 &self,
533 py: Python<'py>,
534 instrument_id: InstrumentId,
535 ) -> PyResult<Bound<'py, PyAny>> {
536 let client = self.clone();
537
538 pyo3_async_runtimes::tokio::future_into_py(py, async move {
539 client
540 .unsubscribe_quotes(instrument_id)
541 .await
542 .map_err(to_pyvalue_err)
543 })
544 }
545
546 #[pyo3(name = "subscribe")]
548 fn py_subscribe<'py>(
549 &self,
550 py: Python<'py>,
551 channels: Vec<String>,
552 ) -> PyResult<Bound<'py, PyAny>> {
553 let client = self.clone();
554
555 pyo3_async_runtimes::tokio::future_into_py(py, async move {
556 client.subscribe(channels).await.map_err(to_pyvalue_err)
557 })
558 }
559
560 #[pyo3(name = "unsubscribe")]
562 fn py_unsubscribe<'py>(
563 &self,
564 py: Python<'py>,
565 channels: Vec<String>,
566 ) -> PyResult<Bound<'py, PyAny>> {
567 let client = self.clone();
568
569 pyo3_async_runtimes::tokio::future_into_py(py, async move {
570 client.unsubscribe(channels).await.map_err(to_pyvalue_err)
571 })
572 }
573}