nautilus_deribit/python/
websocket.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the Deribit WebSocket client.
17//!
18//! # Design Pattern: Clone and Share State
19//!
20//! The WebSocket client must be cloned for async operations because PyO3's `future_into_py`
21//! requires `'static` futures (cannot borrow from `self`). To ensure clones share the same
22//! connection state, key fields use `Arc<RwLock<T>>`:
23//!
24//! - Connection mode and signal are shared via Arc.
25//!
26//! ## Connection Flow
27//!
28//! 1. Clone the client for async operation.
29//! 2. Connect and populate shared state on the clone.
30//! 3. Spawn stream handler as background task.
31//! 4. Return immediately (non-blocking).
32//!
33//! ## Important Notes
34//!
35//! - Never use `block_on()` - it blocks the runtime.
36//! - Always clone before async blocks for lifetime requirements.
37
38use futures_util::StreamExt;
39use nautilus_common::live::get_runtime;
40use nautilus_core::python::{to_pyruntime_err, to_pyvalue_err};
41use nautilus_model::{
42    data::{Data, OrderBookDeltas_API},
43    identifiers::InstrumentId,
44    python::{
45        data::data_to_pycapsule,
46        instruments::{instrument_any_to_pyobject, pyobject_to_instrument_any},
47    },
48};
49use pyo3::{exceptions::PyRuntimeError, prelude::*};
50
51use crate::websocket::{
52    client::DeribitWebSocketClient, enums::DeribitUpdateInterval, messages::NautilusWsMessage,
53};
54
55/// Helper function to call Python callback with data conversion.
56fn call_python_with_data<F>(callback: &Py<PyAny>, f: F)
57where
58    F: for<'py> FnOnce(Python<'py>) -> PyResult<Py<PyAny>>,
59{
60    Python::attach(|py| {
61        let result = f(py);
62        match result {
63            Ok(obj) => {
64                if let Err(e) = callback.call1(py, (obj,)) {
65                    tracing::error!("Error calling Python callback: {e}");
66                }
67            }
68            Err(e) => {
69                tracing::error!("Error converting to Python object: {e}");
70            }
71        }
72    });
73}
74
75/// Helper function to call Python callback with a PyObject.
76fn call_python(py: Python<'_>, callback: &Py<PyAny>, obj: Py<PyAny>) {
77    if let Err(e) = callback.call1(py, (obj,)) {
78        tracing::error!("Error calling Python callback: {e}");
79    }
80}
81
82#[pymethods]
83impl DeribitWebSocketClient {
84    #[new]
85    #[pyo3(signature = (
86        url=None,
87        api_key=None,
88        api_secret=None,
89        heartbeat_interval=None,
90        is_testnet=false,
91    ))]
92    fn py_new(
93        url: Option<String>,
94        api_key: Option<String>,
95        api_secret: Option<String>,
96        heartbeat_interval: Option<u64>,
97        is_testnet: bool,
98    ) -> PyResult<Self> {
99        Self::new(url, api_key, api_secret, heartbeat_interval, is_testnet).map_err(to_pyvalue_err)
100    }
101
102    #[staticmethod]
103    #[pyo3(name = "new_public")]
104    fn py_new_public(is_testnet: bool) -> PyResult<Self> {
105        Self::new_public(is_testnet).map_err(to_pyvalue_err)
106    }
107
108    #[staticmethod]
109    #[pyo3(name = "with_credentials")]
110    fn py_with_credentials(is_testnet: bool) -> PyResult<Self> {
111        Self::with_credentials(is_testnet).map_err(to_pyvalue_err)
112    }
113
114    #[getter]
115    #[pyo3(name = "url")]
116    #[must_use]
117    pub fn py_url(&self) -> String {
118        self.url().to_string()
119    }
120
121    #[getter]
122    #[pyo3(name = "is_testnet")]
123    #[must_use]
124    pub fn py_is_testnet(&self) -> bool {
125        // Check if the URL contains "test"
126        self.url().contains("test")
127    }
128
129    #[pyo3(name = "is_active")]
130    #[must_use]
131    fn py_is_active(&self) -> bool {
132        self.is_active()
133    }
134
135    #[pyo3(name = "is_closed")]
136    #[must_use]
137    fn py_is_closed(&self) -> bool {
138        self.is_closed()
139    }
140
141    #[pyo3(name = "has_credentials")]
142    #[must_use]
143    fn py_has_credentials(&self) -> bool {
144        self.has_credentials()
145    }
146
147    #[pyo3(name = "is_authenticated")]
148    #[must_use]
149    fn py_is_authenticated(&self) -> bool {
150        self.is_authenticated()
151    }
152
153    #[pyo3(name = "cancel_all_requests")]
154    pub fn py_cancel_all_requests(&self) {
155        self.cancel_all_requests();
156    }
157
158    /// Caches instruments for use during message parsing.
159    ///
160    /// # Errors
161    ///
162    /// Returns a Python exception if converting instruments fails.
163    #[pyo3(name = "cache_instruments")]
164    pub fn py_cache_instruments(
165        &self,
166        py: Python<'_>,
167        instruments: Vec<Py<PyAny>>,
168    ) -> PyResult<()> {
169        let instruments: Result<Vec<_>, _> = instruments
170            .into_iter()
171            .map(|inst| pyobject_to_instrument_any(py, inst))
172            .collect();
173        self.cache_instruments(instruments?);
174        Ok(())
175    }
176
177    /// Caches a single instrument.
178    ///
179    /// # Errors
180    ///
181    /// Returns a Python exception if converting the instrument fails.
182    #[pyo3(name = "cache_instrument")]
183    pub fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
184        let inst = pyobject_to_instrument_any(py, instrument)?;
185        self.cache_instrument(inst);
186        Ok(())
187    }
188
189    /// Connects to the Deribit WebSocket and starts processing messages.
190    ///
191    /// This is a non-blocking call that spawns a background task for message processing.
192    /// Messages are dispatched to the provided callback function.
193    #[pyo3(name = "connect")]
194    fn py_connect<'py>(
195        &mut self,
196        py: Python<'py>,
197        instruments: Vec<Py<PyAny>>,
198        callback: Py<PyAny>,
199    ) -> PyResult<Bound<'py, PyAny>> {
200        let mut instruments_any = Vec::new();
201        for inst in instruments {
202            let inst_any = pyobject_to_instrument_any(py, inst)?;
203            instruments_any.push(inst_any);
204        }
205
206        self.cache_instruments(instruments_any);
207
208        let mut client = self.clone();
209
210        pyo3_async_runtimes::tokio::future_into_py(py, async move {
211            client.connect().await.map_err(to_pyruntime_err)?;
212
213            let stream = client.stream();
214
215            // Keep client alive in the spawned task to prevent handler from dropping
216            get_runtime().spawn(async move {
217                let _client = client;
218                tokio::pin!(stream);
219
220                while let Some(msg) = stream.next().await {
221                    match msg {
222                        NautilusWsMessage::Instrument(msg) => {
223                            call_python_with_data(&callback, |py| {
224                                instrument_any_to_pyobject(py, *msg)
225                            });
226                        }
227                        NautilusWsMessage::Data(msg) => Python::attach(|py| {
228                            for data in msg {
229                                let py_obj = data_to_pycapsule(py, data);
230                                call_python(py, &callback, py_obj);
231                            }
232                        }),
233                        NautilusWsMessage::Deltas(msg) => Python::attach(|py| {
234                            let py_obj =
235                                data_to_pycapsule(py, Data::Deltas(OrderBookDeltas_API::new(msg)));
236                            call_python(py, &callback, py_obj);
237                        }),
238                        NautilusWsMessage::Error(err) => {
239                            tracing::error!("Deribit WebSocket error: {err}");
240                        }
241                        NautilusWsMessage::Reconnected => {
242                            tracing::info!("Deribit WebSocket reconnected");
243                        }
244                        NautilusWsMessage::Authenticated(auth_result) => {
245                            tracing::info!(
246                                "Deribit WebSocket authenticated (scope: {})",
247                                auth_result.scope
248                            );
249                        }
250                        NautilusWsMessage::Raw(msg) => {
251                            tracing::debug!("Received raw message, skipping: {msg}");
252                        }
253                    }
254                }
255            });
256
257            Ok(())
258        })
259    }
260
261    #[pyo3(name = "wait_until_active")]
262    fn py_wait_until_active<'py>(
263        &self,
264        py: Python<'py>,
265        timeout_secs: f64,
266    ) -> PyResult<Bound<'py, PyAny>> {
267        let client = self.clone();
268
269        pyo3_async_runtimes::tokio::future_into_py(py, async move {
270            client
271                .wait_until_active(timeout_secs)
272                .await
273                .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
274            Ok(())
275        })
276    }
277
278    #[pyo3(name = "close")]
279    fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
280        let client = self.clone();
281
282        pyo3_async_runtimes::tokio::future_into_py(py, async move {
283            if let Err(e) = client.close().await {
284                tracing::error!("Error on close: {e}");
285            }
286            Ok(())
287        })
288    }
289
290    /// Authenticates the WebSocket session with Deribit.
291    ///
292    /// Uses the `client_signature` grant type with HMAC-SHA256 signature.
293    /// This must be called before subscribing to raw data streams.
294    #[pyo3(name = "authenticate")]
295    #[pyo3(signature = (session_name=None))]
296    fn py_authenticate<'py>(
297        &self,
298        py: Python<'py>,
299        session_name: Option<String>,
300    ) -> PyResult<Bound<'py, PyAny>> {
301        let client = self.clone();
302
303        pyo3_async_runtimes::tokio::future_into_py(py, async move {
304            client
305                .authenticate(session_name.as_deref())
306                .await
307                .map_err(to_pyruntime_err)?;
308            Ok(())
309        })
310    }
311
312    /// Authenticates with session scope using default session name.
313    ///
314    /// Convenience method equivalent to `authenticate(Some("nautilus"))`.
315    #[pyo3(name = "authenticate_session")]
316    fn py_authenticate_session<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
317        let client = self.clone();
318
319        pyo3_async_runtimes::tokio::future_into_py(py, async move {
320            client
321                .authenticate_session()
322                .await
323                .map_err(to_pyruntime_err)?;
324            Ok(())
325        })
326    }
327
328    // ------------------------------------------------------------------------------------------------
329    // Subscription Methods
330    // ------------------------------------------------------------------------------------------------
331
332    /// Subscribes to trade updates for an instrument.
333    ///
334    /// # Arguments
335    ///
336    /// * `instrument_id` - The instrument to subscribe to.
337    /// * `interval` - Update interval. Defaults to `Ms100` (100ms). `Raw` requires authentication.
338    #[pyo3(name = "subscribe_trades")]
339    #[pyo3(signature = (instrument_id, interval=None))]
340    fn py_subscribe_trades<'py>(
341        &self,
342        py: Python<'py>,
343        instrument_id: InstrumentId,
344        interval: Option<DeribitUpdateInterval>,
345    ) -> PyResult<Bound<'py, PyAny>> {
346        let client = self.clone();
347
348        pyo3_async_runtimes::tokio::future_into_py(py, async move {
349            client
350                .subscribe_trades(instrument_id, interval)
351                .await
352                .map_err(to_pyvalue_err)
353        })
354    }
355
356    /// Subscribes to raw trade updates (requires authentication).
357    #[pyo3(name = "subscribe_trades_raw")]
358    fn py_subscribe_trades_raw<'py>(
359        &self,
360        py: Python<'py>,
361        instrument_id: InstrumentId,
362    ) -> PyResult<Bound<'py, PyAny>> {
363        let client = self.clone();
364
365        pyo3_async_runtimes::tokio::future_into_py(py, async move {
366            client
367                .subscribe_trades_raw(instrument_id)
368                .await
369                .map_err(to_pyvalue_err)
370        })
371    }
372
373    /// Unsubscribes from trade updates for an instrument.
374    #[pyo3(name = "unsubscribe_trades")]
375    #[pyo3(signature = (instrument_id, interval=None))]
376    fn py_unsubscribe_trades<'py>(
377        &self,
378        py: Python<'py>,
379        instrument_id: InstrumentId,
380        interval: Option<DeribitUpdateInterval>,
381    ) -> PyResult<Bound<'py, PyAny>> {
382        let client = self.clone();
383
384        pyo3_async_runtimes::tokio::future_into_py(py, async move {
385            client
386                .unsubscribe_trades(instrument_id, interval)
387                .await
388                .map_err(to_pyvalue_err)
389        })
390    }
391
392    /// Subscribes to order book updates for an instrument.
393    ///
394    /// # Arguments
395    ///
396    /// * `instrument_id` - The instrument to subscribe to.
397    /// * `interval` - Update interval. Defaults to `Ms100` (100ms). `Raw` requires authentication.
398    #[pyo3(name = "subscribe_book")]
399    #[pyo3(signature = (instrument_id, interval=None))]
400    fn py_subscribe_book<'py>(
401        &self,
402        py: Python<'py>,
403        instrument_id: InstrumentId,
404        interval: Option<DeribitUpdateInterval>,
405    ) -> PyResult<Bound<'py, PyAny>> {
406        let client = self.clone();
407
408        pyo3_async_runtimes::tokio::future_into_py(py, async move {
409            client
410                .subscribe_book(instrument_id, interval)
411                .await
412                .map_err(to_pyvalue_err)
413        })
414    }
415
416    /// Subscribes to raw order book updates (requires authentication).
417    #[pyo3(name = "subscribe_book_raw")]
418    fn py_subscribe_book_raw<'py>(
419        &self,
420        py: Python<'py>,
421        instrument_id: InstrumentId,
422    ) -> PyResult<Bound<'py, PyAny>> {
423        let client = self.clone();
424
425        pyo3_async_runtimes::tokio::future_into_py(py, async move {
426            client
427                .subscribe_book_raw(instrument_id)
428                .await
429                .map_err(to_pyvalue_err)
430        })
431    }
432
433    /// Unsubscribes from order book updates for an instrument.
434    #[pyo3(name = "unsubscribe_book")]
435    #[pyo3(signature = (instrument_id, interval=None))]
436    fn py_unsubscribe_book<'py>(
437        &self,
438        py: Python<'py>,
439        instrument_id: InstrumentId,
440        interval: Option<DeribitUpdateInterval>,
441    ) -> PyResult<Bound<'py, PyAny>> {
442        let client = self.clone();
443
444        pyo3_async_runtimes::tokio::future_into_py(py, async move {
445            client
446                .unsubscribe_book(instrument_id, interval)
447                .await
448                .map_err(to_pyvalue_err)
449        })
450    }
451
452    /// Subscribes to ticker updates for an instrument.
453    ///
454    /// # Arguments
455    ///
456    /// * `instrument_id` - The instrument to subscribe to.
457    /// * `interval` - Update interval. Defaults to `Ms100` (100ms). `Raw` requires authentication.
458    #[pyo3(name = "subscribe_ticker")]
459    #[pyo3(signature = (instrument_id, interval=None))]
460    fn py_subscribe_ticker<'py>(
461        &self,
462        py: Python<'py>,
463        instrument_id: InstrumentId,
464        interval: Option<DeribitUpdateInterval>,
465    ) -> PyResult<Bound<'py, PyAny>> {
466        let client = self.clone();
467
468        pyo3_async_runtimes::tokio::future_into_py(py, async move {
469            client
470                .subscribe_ticker(instrument_id, interval)
471                .await
472                .map_err(to_pyvalue_err)
473        })
474    }
475
476    /// Subscribes to raw ticker updates (requires authentication).
477    #[pyo3(name = "subscribe_ticker_raw")]
478    fn py_subscribe_ticker_raw<'py>(
479        &self,
480        py: Python<'py>,
481        instrument_id: InstrumentId,
482    ) -> PyResult<Bound<'py, PyAny>> {
483        let client = self.clone();
484
485        pyo3_async_runtimes::tokio::future_into_py(py, async move {
486            client
487                .subscribe_ticker_raw(instrument_id)
488                .await
489                .map_err(to_pyvalue_err)
490        })
491    }
492
493    /// Unsubscribes from ticker updates for an instrument.
494    #[pyo3(name = "unsubscribe_ticker")]
495    #[pyo3(signature = (instrument_id, interval=None))]
496    fn py_unsubscribe_ticker<'py>(
497        &self,
498        py: Python<'py>,
499        instrument_id: InstrumentId,
500        interval: Option<DeribitUpdateInterval>,
501    ) -> PyResult<Bound<'py, PyAny>> {
502        let client = self.clone();
503
504        pyo3_async_runtimes::tokio::future_into_py(py, async move {
505            client
506                .unsubscribe_ticker(instrument_id, interval)
507                .await
508                .map_err(to_pyvalue_err)
509        })
510    }
511
512    /// Subscribes to quote (best bid/ask) updates for an instrument.
513    #[pyo3(name = "subscribe_quotes")]
514    fn py_subscribe_quotes<'py>(
515        &self,
516        py: Python<'py>,
517        instrument_id: InstrumentId,
518    ) -> PyResult<Bound<'py, PyAny>> {
519        let client = self.clone();
520
521        pyo3_async_runtimes::tokio::future_into_py(py, async move {
522            client
523                .subscribe_quotes(instrument_id)
524                .await
525                .map_err(to_pyvalue_err)
526        })
527    }
528
529    /// Unsubscribes from quote updates for an instrument.
530    #[pyo3(name = "unsubscribe_quotes")]
531    fn py_unsubscribe_quotes<'py>(
532        &self,
533        py: Python<'py>,
534        instrument_id: InstrumentId,
535    ) -> PyResult<Bound<'py, PyAny>> {
536        let client = self.clone();
537
538        pyo3_async_runtimes::tokio::future_into_py(py, async move {
539            client
540                .unsubscribe_quotes(instrument_id)
541                .await
542                .map_err(to_pyvalue_err)
543        })
544    }
545
546    /// Subscribes to multiple channels at once.
547    #[pyo3(name = "subscribe")]
548    fn py_subscribe<'py>(
549        &self,
550        py: Python<'py>,
551        channels: Vec<String>,
552    ) -> PyResult<Bound<'py, PyAny>> {
553        let client = self.clone();
554
555        pyo3_async_runtimes::tokio::future_into_py(py, async move {
556            client.subscribe(channels).await.map_err(to_pyvalue_err)
557        })
558    }
559
560    /// Unsubscribes from multiple channels at once.
561    #[pyo3(name = "unsubscribe")]
562    fn py_unsubscribe<'py>(
563        &self,
564        py: Python<'py>,
565        channels: Vec<String>,
566    ) -> PyResult<Bound<'py, PyAny>> {
567        let client = self.clone();
568
569        pyo3_async_runtimes::tokio::future_into_py(py, async move {
570            client.unsubscribe(channels).await.map_err(to_pyvalue_err)
571        })
572    }
573}