nautilus_dydx/python/
websocket.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the dYdX WebSocket client.
17
18use std::{
19    sync::atomic::Ordering,
20    time::{Duration, Instant},
21};
22
23use nautilus_common::live::get_runtime;
24use nautilus_core::python::to_pyvalue_err;
25use nautilus_model::{
26    data::BarType,
27    identifiers::{AccountId, InstrumentId},
28    python::instruments::pyobject_to_instrument_any,
29};
30use nautilus_network::mode::ConnectionMode;
31use pyo3::prelude::*;
32
33use crate::{
34    common::{credential::DydxCredential, parse::extract_raw_symbol},
35    websocket::{client::DydxWebSocketClient, error::DydxWsError, handler::HandlerCommand},
36};
37
38fn to_pyvalue_err_dydx(e: DydxWsError) -> PyErr {
39    pyo3::exceptions::PyValueError::new_err(e.to_string())
40}
41
42#[pymethods]
43impl DydxWebSocketClient {
44    #[staticmethod]
45    #[pyo3(name = "new_public")]
46    fn py_new_public(url: String, heartbeat: Option<u64>) -> Self {
47        Self::new_public(url, heartbeat)
48    }
49
50    #[staticmethod]
51    #[pyo3(name = "new_private")]
52    fn py_new_private(
53        url: String,
54        mnemonic: String,
55        account_index: u32,
56        authenticator_ids: Vec<u64>,
57        account_id: AccountId,
58        heartbeat: Option<u64>,
59    ) -> PyResult<Self> {
60        let credential = DydxCredential::from_mnemonic(&mnemonic, account_index, authenticator_ids)
61            .map_err(to_pyvalue_err)?;
62        Ok(Self::new_private(url, credential, account_id, heartbeat))
63    }
64
65    #[pyo3(name = "is_connected")]
66    fn py_is_connected(&self) -> bool {
67        self.is_connected()
68    }
69
70    #[pyo3(name = "set_account_id")]
71    fn py_set_account_id(&mut self, account_id: AccountId) {
72        self.set_account_id(account_id);
73    }
74
75    #[pyo3(name = "account_id")]
76    fn py_account_id(&self) -> Option<AccountId> {
77        self.account_id()
78    }
79
80    #[getter]
81    fn py_url(&self) -> String {
82        self.url().to_string()
83    }
84
85    #[pyo3(name = "connect")]
86    fn py_connect<'py>(
87        &mut self,
88        py: Python<'py>,
89        instruments: Vec<Py<PyAny>>,
90        callback: Py<PyAny>,
91    ) -> PyResult<Bound<'py, PyAny>> {
92        // Convert Python instruments to Rust InstrumentAny
93        let mut instruments_any = Vec::new();
94        for inst in instruments {
95            let inst_any = pyobject_to_instrument_any(py, inst)?;
96            instruments_any.push(inst_any);
97        }
98
99        // Cache instruments first
100        self.cache_instruments(instruments_any);
101
102        let mut client = self.clone();
103
104        pyo3_async_runtimes::tokio::future_into_py(py, async move {
105            // Connect the WebSocket client
106            client.connect().await.map_err(to_pyvalue_err_dydx)?;
107
108            // Take the receiver for messages
109            if let Some(mut rx) = client.take_receiver() {
110                // Spawn task to process messages and call Python callback
111                get_runtime().spawn(async move {
112                    let _client = client; // Keep client alive in spawned task
113
114                    while let Some(msg) = rx.recv().await {
115                        match msg {
116                            crate::websocket::enums::NautilusWsMessage::Data(items) => {
117                                Python::attach(|py| {
118                                    for data in items {
119                                        use nautilus_model::python::data::data_to_pycapsule;
120                                        let py_obj = data_to_pycapsule(py, data);
121                                        if let Err(e) = callback.call1(py, (py_obj,)) {
122                                            log::error!("Error calling Python callback: {e}");
123                                        }
124                                    }
125                                });
126                            }
127                            crate::websocket::enums::NautilusWsMessage::Deltas(deltas) => {
128                                Python::attach(|py| {
129                                    use nautilus_model::{
130                                        data::{Data, OrderBookDeltas_API},
131                                        python::data::data_to_pycapsule,
132                                    };
133                                    let data = Data::Deltas(OrderBookDeltas_API::new(*deltas));
134                                    let py_obj = data_to_pycapsule(py, data);
135                                    if let Err(e) = callback.call1(py, (py_obj,)) {
136                                        log::error!("Error calling Python callback: {e}");
137                                    }
138                                });
139                            }
140                            crate::websocket::enums::NautilusWsMessage::Error(err) => {
141                                log::error!("dYdX WebSocket error: {err}");
142                            }
143                            crate::websocket::enums::NautilusWsMessage::Reconnected => {
144                                log::info!("dYdX WebSocket reconnected");
145                            }
146                            _ => {
147                                // Handle other message types if needed
148                            }
149                        }
150                    }
151                });
152            }
153
154            Ok(())
155        })
156    }
157
158    #[pyo3(name = "disconnect")]
159    fn py_disconnect<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
160        let mut client = self.clone();
161        pyo3_async_runtimes::tokio::future_into_py(py, async move {
162            client.disconnect().await.map_err(to_pyvalue_err_dydx)?;
163            Ok(())
164        })
165    }
166
167    #[pyo3(name = "wait_until_active")]
168    fn py_wait_until_active<'py>(
169        &self,
170        py: Python<'py>,
171        timeout_secs: f64,
172    ) -> PyResult<Bound<'py, PyAny>> {
173        let connection_mode = self.connection_mode_atomic();
174
175        pyo3_async_runtimes::tokio::future_into_py(py, async move {
176            let timeout = Duration::from_secs_f64(timeout_secs);
177            let start = Instant::now();
178
179            loop {
180                let mode = connection_mode.load();
181                let mode_u8 = mode.load(Ordering::Relaxed);
182                let is_connected = matches!(
183                    mode_u8,
184                    x if x == ConnectionMode::Active as u8 || x == ConnectionMode::Reconnect as u8
185                );
186
187                if is_connected {
188                    break;
189                }
190
191                if start.elapsed() > timeout {
192                    return Err(to_pyvalue_err(std::io::Error::new(
193                        std::io::ErrorKind::TimedOut,
194                        format!("Client did not become active within {timeout_secs}s"),
195                    )));
196                }
197                tokio::time::sleep(std::time::Duration::from_millis(10)).await;
198            }
199
200            Ok(())
201        })
202    }
203
204    #[pyo3(name = "cache_instrument")]
205    fn py_cache_instrument(&self, instrument: Py<PyAny>, py: Python<'_>) -> PyResult<()> {
206        let inst_any = pyobject_to_instrument_any(py, instrument)?;
207        self.cache_instrument(inst_any);
208        Ok(())
209    }
210
211    #[pyo3(name = "cache_instruments")]
212    fn py_cache_instruments(&self, instruments: Vec<Py<PyAny>>, py: Python<'_>) -> PyResult<()> {
213        let mut instruments_any = Vec::new();
214        for inst in instruments {
215            let inst_any = pyobject_to_instrument_any(py, inst)?;
216            instruments_any.push(inst_any);
217        }
218        self.cache_instruments(instruments_any);
219        Ok(())
220    }
221
222    #[pyo3(name = "is_closed")]
223    fn py_is_closed(&self) -> bool {
224        !self.is_connected()
225    }
226
227    #[pyo3(name = "subscribe_trades")]
228    fn py_subscribe_trades<'py>(
229        &self,
230        py: Python<'py>,
231        instrument_id: InstrumentId,
232    ) -> PyResult<Bound<'py, PyAny>> {
233        let client = self.clone();
234        pyo3_async_runtimes::tokio::future_into_py(py, async move {
235            client
236                .subscribe_trades(instrument_id)
237                .await
238                .map_err(to_pyvalue_err_dydx)?;
239            Ok(())
240        })
241    }
242
243    #[pyo3(name = "unsubscribe_trades")]
244    fn py_unsubscribe_trades<'py>(
245        &self,
246        py: Python<'py>,
247        instrument_id: InstrumentId,
248    ) -> PyResult<Bound<'py, PyAny>> {
249        let client = self.clone();
250        pyo3_async_runtimes::tokio::future_into_py(py, async move {
251            client
252                .unsubscribe_trades(instrument_id)
253                .await
254                .map_err(to_pyvalue_err_dydx)?;
255            Ok(())
256        })
257    }
258
259    #[pyo3(name = "subscribe_orderbook")]
260    fn py_subscribe_orderbook<'py>(
261        &self,
262        py: Python<'py>,
263        instrument_id: InstrumentId,
264    ) -> PyResult<Bound<'py, PyAny>> {
265        let client = self.clone();
266        pyo3_async_runtimes::tokio::future_into_py(py, async move {
267            client
268                .subscribe_orderbook(instrument_id)
269                .await
270                .map_err(to_pyvalue_err_dydx)?;
271            Ok(())
272        })
273    }
274
275    #[pyo3(name = "unsubscribe_orderbook")]
276    fn py_unsubscribe_orderbook<'py>(
277        &self,
278        py: Python<'py>,
279        instrument_id: InstrumentId,
280    ) -> PyResult<Bound<'py, PyAny>> {
281        let client = self.clone();
282        pyo3_async_runtimes::tokio::future_into_py(py, async move {
283            client
284                .unsubscribe_orderbook(instrument_id)
285                .await
286                .map_err(to_pyvalue_err_dydx)?;
287            Ok(())
288        })
289    }
290
291    #[pyo3(name = "subscribe_bars")]
292    fn py_subscribe_bars<'py>(
293        &self,
294        py: Python<'py>,
295        bar_type: BarType,
296        resolution: String,
297    ) -> PyResult<Bound<'py, PyAny>> {
298        let client = self.clone();
299        let instrument_id = bar_type.instrument_id();
300
301        // Build topic for bar type registration (e.g., "ETH-USD/1MIN")
302        let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
303        let topic = format!("{ticker}/{resolution}");
304
305        pyo3_async_runtimes::tokio::future_into_py(py, async move {
306            // Register bar type in handler before subscribing
307            client
308                .send_command(HandlerCommand::RegisterBarType { topic, bar_type })
309                .map_err(to_pyvalue_err_dydx)?;
310
311            // Brief delay to ensure handler processes registration
312            tokio::time::sleep(Duration::from_millis(50)).await;
313
314            client
315                .subscribe_candles(instrument_id, &resolution)
316                .await
317                .map_err(to_pyvalue_err_dydx)?;
318            Ok(())
319        })
320    }
321
322    #[pyo3(name = "unsubscribe_bars")]
323    fn py_unsubscribe_bars<'py>(
324        &self,
325        py: Python<'py>,
326        bar_type: BarType,
327        resolution: String,
328    ) -> PyResult<Bound<'py, PyAny>> {
329        let client = self.clone();
330        let instrument_id = bar_type.instrument_id();
331
332        // Build topic for unregistration
333        let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
334        let topic = format!("{ticker}/{resolution}");
335
336        pyo3_async_runtimes::tokio::future_into_py(py, async move {
337            client
338                .unsubscribe_candles(instrument_id, &resolution)
339                .await
340                .map_err(to_pyvalue_err_dydx)?;
341
342            // Unregister bar type after unsubscribing
343            client
344                .send_command(HandlerCommand::UnregisterBarType { topic })
345                .map_err(to_pyvalue_err_dydx)?;
346
347            Ok(())
348        })
349    }
350
351    #[pyo3(name = "subscribe_markets")]
352    fn py_subscribe_markets<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
353        let client = self.clone();
354        pyo3_async_runtimes::tokio::future_into_py(py, async move {
355            client
356                .subscribe_markets()
357                .await
358                .map_err(to_pyvalue_err_dydx)?;
359            Ok(())
360        })
361    }
362
363    #[pyo3(name = "unsubscribe_markets")]
364    fn py_unsubscribe_markets<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
365        let client = self.clone();
366        pyo3_async_runtimes::tokio::future_into_py(py, async move {
367            client
368                .unsubscribe_markets()
369                .await
370                .map_err(to_pyvalue_err_dydx)?;
371            Ok(())
372        })
373    }
374
375    #[pyo3(name = "subscribe_subaccount")]
376    fn py_subscribe_subaccount<'py>(
377        &self,
378        py: Python<'py>,
379        address: String,
380        subaccount_number: u32,
381    ) -> PyResult<Bound<'py, PyAny>> {
382        let client = self.clone();
383        pyo3_async_runtimes::tokio::future_into_py(py, async move {
384            client
385                .subscribe_subaccount(&address, subaccount_number)
386                .await
387                .map_err(to_pyvalue_err_dydx)?;
388            Ok(())
389        })
390    }
391
392    #[pyo3(name = "unsubscribe_subaccount")]
393    fn py_unsubscribe_subaccount<'py>(
394        &self,
395        py: Python<'py>,
396        address: String,
397        subaccount_number: u32,
398    ) -> PyResult<Bound<'py, PyAny>> {
399        let client = self.clone();
400        pyo3_async_runtimes::tokio::future_into_py(py, async move {
401            client
402                .unsubscribe_subaccount(&address, subaccount_number)
403                .await
404                .map_err(to_pyvalue_err_dydx)?;
405            Ok(())
406        })
407    }
408
409    #[pyo3(name = "subscribe_block_height")]
410    fn py_subscribe_block_height<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
411        let client = self.clone();
412        pyo3_async_runtimes::tokio::future_into_py(py, async move {
413            client
414                .subscribe_block_height()
415                .await
416                .map_err(to_pyvalue_err_dydx)?;
417            Ok(())
418        })
419    }
420
421    #[pyo3(name = "unsubscribe_block_height")]
422    fn py_unsubscribe_block_height<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
423        let client = self.clone();
424        pyo3_async_runtimes::tokio::future_into_py(py, async move {
425            client
426                .unsubscribe_block_height()
427                .await
428                .map_err(to_pyvalue_err_dydx)?;
429            Ok(())
430        })
431    }
432}