1use futures_util::StreamExt;
39use nautilus_common::live::get_runtime;
40use nautilus_core::python::{call_python, to_pyruntime_err, to_pyvalue_err};
41use nautilus_model::{
42 data::{Data, OrderBookDeltas_API},
43 identifiers::InstrumentId,
44 python::{
45 data::data_to_pycapsule,
46 instruments::{instrument_any_to_pyobject, pyobject_to_instrument_any},
47 },
48};
49use pyo3::{exceptions::PyRuntimeError, prelude::*};
50
51use crate::websocket::{
52 client::DeribitWebSocketClient, enums::DeribitUpdateInterval, messages::NautilusWsMessage,
53};
54
55fn call_python_with_data<F>(callback: &Py<PyAny>, f: F)
57where
58 F: for<'py> FnOnce(Python<'py>) -> PyResult<Py<PyAny>>,
59{
60 Python::attach(|py| {
61 let result = f(py);
62 match result {
63 Ok(obj) => {
64 if let Err(e) = callback.call1(py, (obj,)) {
65 log::error!("Error calling Python callback: {e}");
66 }
67 }
68 Err(e) => {
69 log::error!("Error converting to Python object: {e}");
70 }
71 }
72 });
73}
74
75#[pymethods]
76impl DeribitWebSocketClient {
77 #[new]
78 #[pyo3(signature = (
79 url=None,
80 api_key=None,
81 api_secret=None,
82 heartbeat_interval=None,
83 is_testnet=false,
84 ))]
85 fn py_new(
86 url: Option<String>,
87 api_key: Option<String>,
88 api_secret: Option<String>,
89 heartbeat_interval: Option<u64>,
90 is_testnet: bool,
91 ) -> PyResult<Self> {
92 Self::new(url, api_key, api_secret, heartbeat_interval, is_testnet).map_err(to_pyvalue_err)
93 }
94
95 #[staticmethod]
96 #[pyo3(name = "new_public")]
97 fn py_new_public(is_testnet: bool) -> PyResult<Self> {
98 Self::new_public(is_testnet).map_err(to_pyvalue_err)
99 }
100
101 #[staticmethod]
102 #[pyo3(name = "with_credentials")]
103 fn py_with_credentials(is_testnet: bool) -> PyResult<Self> {
104 Self::with_credentials(is_testnet).map_err(to_pyvalue_err)
105 }
106
107 #[getter]
108 #[pyo3(name = "url")]
109 #[must_use]
110 pub fn py_url(&self) -> String {
111 self.url().to_string()
112 }
113
114 #[getter]
115 #[pyo3(name = "is_testnet")]
116 #[must_use]
117 pub fn py_is_testnet(&self) -> bool {
118 self.url().contains("test")
120 }
121
122 #[pyo3(name = "is_active")]
123 #[must_use]
124 fn py_is_active(&self) -> bool {
125 self.is_active()
126 }
127
128 #[pyo3(name = "is_closed")]
129 #[must_use]
130 fn py_is_closed(&self) -> bool {
131 self.is_closed()
132 }
133
134 #[pyo3(name = "has_credentials")]
135 #[must_use]
136 fn py_has_credentials(&self) -> bool {
137 self.has_credentials()
138 }
139
140 #[pyo3(name = "is_authenticated")]
141 #[must_use]
142 fn py_is_authenticated(&self) -> bool {
143 self.is_authenticated()
144 }
145
146 #[pyo3(name = "cancel_all_requests")]
147 pub fn py_cancel_all_requests(&self) {
148 self.cancel_all_requests();
149 }
150
151 #[pyo3(name = "cache_instruments")]
157 pub fn py_cache_instruments(
158 &self,
159 py: Python<'_>,
160 instruments: Vec<Py<PyAny>>,
161 ) -> PyResult<()> {
162 let instruments: Result<Vec<_>, _> = instruments
163 .into_iter()
164 .map(|inst| pyobject_to_instrument_any(py, inst))
165 .collect();
166 self.cache_instruments(instruments?);
167 Ok(())
168 }
169
170 #[pyo3(name = "cache_instrument")]
176 pub fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
177 let inst = pyobject_to_instrument_any(py, instrument)?;
178 self.cache_instrument(inst);
179 Ok(())
180 }
181
182 #[pyo3(name = "connect")]
187 fn py_connect<'py>(
188 &mut self,
189 py: Python<'py>,
190 instruments: Vec<Py<PyAny>>,
191 callback: Py<PyAny>,
192 ) -> PyResult<Bound<'py, PyAny>> {
193 let mut instruments_any = Vec::new();
194 for inst in instruments {
195 let inst_any = pyobject_to_instrument_any(py, inst)?;
196 instruments_any.push(inst_any);
197 }
198
199 self.cache_instruments(instruments_any);
200
201 let mut client = self.clone();
202
203 pyo3_async_runtimes::tokio::future_into_py(py, async move {
204 client.connect().await.map_err(to_pyruntime_err)?;
205
206 let stream = client.stream();
207
208 get_runtime().spawn(async move {
210 let _client = client;
211 tokio::pin!(stream);
212
213 while let Some(msg) = stream.next().await {
214 match msg {
215 NautilusWsMessage::Instrument(msg) => {
216 call_python_with_data(&callback, |py| {
217 instrument_any_to_pyobject(py, *msg)
218 });
219 }
220 NautilusWsMessage::Data(msg) => Python::attach(|py| {
221 for data in msg {
222 let py_obj = data_to_pycapsule(py, data);
223 call_python(py, &callback, py_obj);
224 }
225 }),
226 NautilusWsMessage::Deltas(msg) => Python::attach(|py| {
227 let py_obj =
228 data_to_pycapsule(py, Data::Deltas(OrderBookDeltas_API::new(msg)));
229 call_python(py, &callback, py_obj);
230 }),
231 NautilusWsMessage::Error(err) => {
232 log::error!("Deribit WebSocket error: {err}");
233 }
234 NautilusWsMessage::Reconnected => {
235 log::info!("Deribit WebSocket reconnected");
236 }
237 NautilusWsMessage::Authenticated(auth_result) => {
238 log::info!(
239 "Deribit WebSocket authenticated (scope: {})",
240 auth_result.scope
241 );
242 }
243 NautilusWsMessage::Raw(msg) => {
244 log::debug!("Received raw message, skipping: {msg}");
245 }
246 NautilusWsMessage::FundingRates(funding_rates) => Python::attach(|py| {
247 for funding_rate in funding_rates {
248 let py_obj = Py::new(py, funding_rate)
249 .expect("Failed to create FundingRateUpdate PyObject")
250 .into_any();
251 call_python(py, &callback, py_obj);
252 }
253 }),
254 }
255 }
256 });
257
258 Ok(())
259 })
260 }
261
262 #[pyo3(name = "wait_until_active")]
263 fn py_wait_until_active<'py>(
264 &self,
265 py: Python<'py>,
266 timeout_secs: f64,
267 ) -> PyResult<Bound<'py, PyAny>> {
268 let client = self.clone();
269
270 pyo3_async_runtimes::tokio::future_into_py(py, async move {
271 client
272 .wait_until_active(timeout_secs)
273 .await
274 .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
275 Ok(())
276 })
277 }
278
279 #[pyo3(name = "close")]
280 fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
281 let client = self.clone();
282
283 pyo3_async_runtimes::tokio::future_into_py(py, async move {
284 if let Err(e) = client.close().await {
285 log::error!("Error on close: {e}");
286 }
287 Ok(())
288 })
289 }
290
291 #[pyo3(name = "authenticate")]
296 #[pyo3(signature = (session_name=None))]
297 fn py_authenticate<'py>(
298 &self,
299 py: Python<'py>,
300 session_name: Option<String>,
301 ) -> PyResult<Bound<'py, PyAny>> {
302 let client = self.clone();
303
304 pyo3_async_runtimes::tokio::future_into_py(py, async move {
305 client
306 .authenticate(session_name.as_deref())
307 .await
308 .map_err(to_pyruntime_err)?;
309 Ok(())
310 })
311 }
312
313 #[pyo3(name = "authenticate_session")]
318 fn py_authenticate_session<'py>(
319 &self,
320 py: Python<'py>,
321 session_name: String,
322 ) -> PyResult<Bound<'py, PyAny>> {
323 let client = self.clone();
324
325 pyo3_async_runtimes::tokio::future_into_py(py, async move {
326 client
327 .authenticate_session(&session_name)
328 .await
329 .map_err(|e| {
330 to_pyruntime_err(format!(
331 "Failed to authenticate Deribit websocket session '{session_name}': {e}"
332 ))
333 })?;
334 Ok(())
335 })
336 }
337
338 #[pyo3(name = "subscribe_trades")]
349 #[pyo3(signature = (instrument_id, interval=None))]
350 fn py_subscribe_trades<'py>(
351 &self,
352 py: Python<'py>,
353 instrument_id: InstrumentId,
354 interval: Option<DeribitUpdateInterval>,
355 ) -> PyResult<Bound<'py, PyAny>> {
356 let client = self.clone();
357
358 pyo3_async_runtimes::tokio::future_into_py(py, async move {
359 client
360 .subscribe_trades(instrument_id, interval)
361 .await
362 .map_err(to_pyvalue_err)
363 })
364 }
365
366 #[pyo3(name = "subscribe_trades_raw")]
368 fn py_subscribe_trades_raw<'py>(
369 &self,
370 py: Python<'py>,
371 instrument_id: InstrumentId,
372 ) -> PyResult<Bound<'py, PyAny>> {
373 let client = self.clone();
374
375 pyo3_async_runtimes::tokio::future_into_py(py, async move {
376 client
377 .subscribe_trades_raw(instrument_id)
378 .await
379 .map_err(to_pyvalue_err)
380 })
381 }
382
383 #[pyo3(name = "unsubscribe_trades")]
385 #[pyo3(signature = (instrument_id, interval=None))]
386 fn py_unsubscribe_trades<'py>(
387 &self,
388 py: Python<'py>,
389 instrument_id: InstrumentId,
390 interval: Option<DeribitUpdateInterval>,
391 ) -> PyResult<Bound<'py, PyAny>> {
392 let client = self.clone();
393
394 pyo3_async_runtimes::tokio::future_into_py(py, async move {
395 client
396 .unsubscribe_trades(instrument_id, interval)
397 .await
398 .map_err(to_pyvalue_err)
399 })
400 }
401
402 #[pyo3(name = "subscribe_book")]
409 #[pyo3(signature = (instrument_id, interval=None))]
410 fn py_subscribe_book<'py>(
411 &self,
412 py: Python<'py>,
413 instrument_id: InstrumentId,
414 interval: Option<DeribitUpdateInterval>,
415 ) -> PyResult<Bound<'py, PyAny>> {
416 let client = self.clone();
417
418 pyo3_async_runtimes::tokio::future_into_py(py, async move {
419 client
420 .subscribe_book(instrument_id, interval)
421 .await
422 .map_err(to_pyvalue_err)
423 })
424 }
425
426 #[pyo3(name = "subscribe_book_raw")]
428 fn py_subscribe_book_raw<'py>(
429 &self,
430 py: Python<'py>,
431 instrument_id: InstrumentId,
432 ) -> PyResult<Bound<'py, PyAny>> {
433 let client = self.clone();
434
435 pyo3_async_runtimes::tokio::future_into_py(py, async move {
436 client
437 .subscribe_book_raw(instrument_id)
438 .await
439 .map_err(to_pyvalue_err)
440 })
441 }
442
443 #[pyo3(name = "unsubscribe_book")]
445 #[pyo3(signature = (instrument_id, interval=None))]
446 fn py_unsubscribe_book<'py>(
447 &self,
448 py: Python<'py>,
449 instrument_id: InstrumentId,
450 interval: Option<DeribitUpdateInterval>,
451 ) -> PyResult<Bound<'py, PyAny>> {
452 let client = self.clone();
453
454 pyo3_async_runtimes::tokio::future_into_py(py, async move {
455 client
456 .unsubscribe_book(instrument_id, interval)
457 .await
458 .map_err(to_pyvalue_err)
459 })
460 }
461
462 #[pyo3(name = "subscribe_book_grouped")]
474 #[pyo3(signature = (instrument_id, group, depth, interval=None))]
475 fn py_subscribe_book_grouped<'py>(
476 &self,
477 py: Python<'py>,
478 instrument_id: InstrumentId,
479 group: String,
480 depth: u32,
481 interval: Option<DeribitUpdateInterval>,
482 ) -> PyResult<Bound<'py, PyAny>> {
483 let client = self.clone();
484
485 pyo3_async_runtimes::tokio::future_into_py(py, async move {
486 client
487 .subscribe_book_grouped(instrument_id, &group, depth, interval)
488 .await
489 .map_err(to_pyvalue_err)
490 })
491 }
492
493 #[pyo3(name = "unsubscribe_book_grouped")]
502 #[pyo3(signature = (instrument_id, group, depth, interval=None))]
503 fn py_unsubscribe_book_grouped<'py>(
504 &self,
505 py: Python<'py>,
506 instrument_id: InstrumentId,
507 group: String,
508 depth: u32,
509 interval: Option<DeribitUpdateInterval>,
510 ) -> PyResult<Bound<'py, PyAny>> {
511 let client = self.clone();
512
513 pyo3_async_runtimes::tokio::future_into_py(py, async move {
514 client
515 .unsubscribe_book_grouped(instrument_id, &group, depth, interval)
516 .await
517 .map_err(to_pyvalue_err)
518 })
519 }
520
521 #[pyo3(name = "subscribe_ticker")]
528 #[pyo3(signature = (instrument_id, interval=None))]
529 fn py_subscribe_ticker<'py>(
530 &self,
531 py: Python<'py>,
532 instrument_id: InstrumentId,
533 interval: Option<DeribitUpdateInterval>,
534 ) -> PyResult<Bound<'py, PyAny>> {
535 let client = self.clone();
536
537 pyo3_async_runtimes::tokio::future_into_py(py, async move {
538 client
539 .subscribe_ticker(instrument_id, interval)
540 .await
541 .map_err(to_pyvalue_err)
542 })
543 }
544
545 #[pyo3(name = "subscribe_ticker_raw")]
547 fn py_subscribe_ticker_raw<'py>(
548 &self,
549 py: Python<'py>,
550 instrument_id: InstrumentId,
551 ) -> PyResult<Bound<'py, PyAny>> {
552 let client = self.clone();
553
554 pyo3_async_runtimes::tokio::future_into_py(py, async move {
555 client
556 .subscribe_ticker_raw(instrument_id)
557 .await
558 .map_err(to_pyvalue_err)
559 })
560 }
561
562 #[pyo3(name = "unsubscribe_ticker")]
564 #[pyo3(signature = (instrument_id, interval=None))]
565 fn py_unsubscribe_ticker<'py>(
566 &self,
567 py: Python<'py>,
568 instrument_id: InstrumentId,
569 interval: Option<DeribitUpdateInterval>,
570 ) -> PyResult<Bound<'py, PyAny>> {
571 let client = self.clone();
572
573 pyo3_async_runtimes::tokio::future_into_py(py, async move {
574 client
575 .unsubscribe_ticker(instrument_id, interval)
576 .await
577 .map_err(to_pyvalue_err)
578 })
579 }
580
581 #[pyo3(name = "subscribe_quotes")]
583 fn py_subscribe_quotes<'py>(
584 &self,
585 py: Python<'py>,
586 instrument_id: InstrumentId,
587 ) -> PyResult<Bound<'py, PyAny>> {
588 let client = self.clone();
589
590 pyo3_async_runtimes::tokio::future_into_py(py, async move {
591 client
592 .subscribe_quotes(instrument_id)
593 .await
594 .map_err(to_pyvalue_err)
595 })
596 }
597
598 #[pyo3(name = "unsubscribe_quotes")]
600 fn py_unsubscribe_quotes<'py>(
601 &self,
602 py: Python<'py>,
603 instrument_id: InstrumentId,
604 ) -> PyResult<Bound<'py, PyAny>> {
605 let client = self.clone();
606
607 pyo3_async_runtimes::tokio::future_into_py(py, async move {
608 client
609 .unsubscribe_quotes(instrument_id)
610 .await
611 .map_err(to_pyvalue_err)
612 })
613 }
614
615 #[pyo3(name = "subscribe")]
617 fn py_subscribe<'py>(
618 &self,
619 py: Python<'py>,
620 channels: Vec<String>,
621 ) -> PyResult<Bound<'py, PyAny>> {
622 let client = self.clone();
623
624 pyo3_async_runtimes::tokio::future_into_py(py, async move {
625 client.subscribe(channels).await.map_err(to_pyvalue_err)
626 })
627 }
628
629 #[pyo3(name = "unsubscribe")]
631 fn py_unsubscribe<'py>(
632 &self,
633 py: Python<'py>,
634 channels: Vec<String>,
635 ) -> PyResult<Bound<'py, PyAny>> {
636 let client = self.clone();
637
638 pyo3_async_runtimes::tokio::future_into_py(py, async move {
639 client.unsubscribe(channels).await.map_err(to_pyvalue_err)
640 })
641 }
642
643 #[pyo3(name = "subscribe_instrument_state")]
650 fn py_subscribe_instrument_state<'py>(
651 &self,
652 py: Python<'py>,
653 kind: String,
654 currency: String,
655 ) -> PyResult<Bound<'py, PyAny>> {
656 let client = self.clone();
657
658 pyo3_async_runtimes::tokio::future_into_py(py, async move {
659 client
660 .subscribe_instrument_state(&kind, ¤cy)
661 .await
662 .map_err(to_pyvalue_err)
663 })
664 }
665
666 #[pyo3(name = "unsubscribe_instrument_state")]
673 fn py_unsubscribe_instrument_state<'py>(
674 &self,
675 py: Python<'py>,
676 kind: String,
677 currency: String,
678 ) -> PyResult<Bound<'py, PyAny>> {
679 let client = self.clone();
680
681 pyo3_async_runtimes::tokio::future_into_py(py, async move {
682 client
683 .unsubscribe_instrument_state(&kind, ¤cy)
684 .await
685 .map_err(to_pyvalue_err)
686 })
687 }
688
689 #[pyo3(name = "subscribe_perpetual_interest_rates")]
699 #[pyo3(signature = (instrument_id, interval=None))]
700 fn py_subscribe_perpetual_interest_rates<'py>(
701 &self,
702 py: Python<'py>,
703 instrument_id: InstrumentId,
704 interval: Option<DeribitUpdateInterval>,
705 ) -> PyResult<Bound<'py, PyAny>> {
706 let client = self.clone();
707
708 pyo3_async_runtimes::tokio::future_into_py(py, async move {
709 client
710 .subscribe_perpetual_interests_rates_updates(instrument_id, interval)
711 .await
712 .map_err(to_pyvalue_err)
713 })
714 }
715
716 #[pyo3(name = "unsubscribe_perpetual_interest_rates")]
723 #[pyo3(signature = (instrument_id, interval=None))]
724 fn py_unsubscribe_perpetual_interest_rates<'py>(
725 &self,
726 py: Python<'py>,
727 instrument_id: InstrumentId,
728 interval: Option<DeribitUpdateInterval>,
729 ) -> PyResult<Bound<'py, PyAny>> {
730 let client = self.clone();
731
732 pyo3_async_runtimes::tokio::future_into_py(py, async move {
733 client
734 .unsubscribe_perpetual_interest_rates_updates(instrument_id, interval)
735 .await
736 .map_err(to_pyvalue_err)
737 })
738 }
739
740 #[pyo3(name = "subscribe_chart")]
748 fn py_subscribe_chart<'py>(
749 &self,
750 py: Python<'py>,
751 instrument_id: InstrumentId,
752 resolution: String,
753 ) -> PyResult<Bound<'py, PyAny>> {
754 let client = self.clone();
755
756 pyo3_async_runtimes::tokio::future_into_py(py, async move {
757 client
758 .subscribe_chart(instrument_id, &resolution)
759 .await
760 .map_err(to_pyvalue_err)
761 })
762 }
763
764 #[pyo3(name = "unsubscribe_chart")]
766 fn py_unsubscribe_chart<'py>(
767 &self,
768 py: Python<'py>,
769 instrument_id: InstrumentId,
770 resolution: String,
771 ) -> PyResult<Bound<'py, PyAny>> {
772 let client = self.clone();
773
774 pyo3_async_runtimes::tokio::future_into_py(py, async move {
775 client
776 .unsubscribe_chart(instrument_id, &resolution)
777 .await
778 .map_err(to_pyvalue_err)
779 })
780 }
781}