1use std::{
19 sync::atomic::Ordering,
20 time::{Duration, Instant},
21};
22
23use nautilus_common::live::get_runtime;
24use nautilus_core::python::to_pyvalue_err;
25use nautilus_model::{
26 data::BarType,
27 identifiers::{AccountId, InstrumentId},
28 python::instruments::pyobject_to_instrument_any,
29};
30use nautilus_network::mode::ConnectionMode;
31use pyo3::prelude::*;
32
33use crate::{
34 common::{credential::DydxCredential, parse::extract_raw_symbol},
35 websocket::{client::DydxWebSocketClient, error::DydxWsError, handler::HandlerCommand},
36};
37
38fn to_pyvalue_err_dydx(e: DydxWsError) -> PyErr {
39 pyo3::exceptions::PyValueError::new_err(e.to_string())
40}
41
42#[pymethods]
43impl DydxWebSocketClient {
44 #[staticmethod]
45 #[pyo3(name = "new_public")]
46 fn py_new_public(url: String, heartbeat: Option<u64>) -> Self {
47 Self::new_public(url, heartbeat)
48 }
49
50 #[staticmethod]
51 #[pyo3(name = "new_private")]
52 fn py_new_private(
53 url: String,
54 mnemonic: String,
55 account_index: u32,
56 authenticator_ids: Vec<u64>,
57 account_id: AccountId,
58 heartbeat: Option<u64>,
59 ) -> PyResult<Self> {
60 let credential = DydxCredential::from_mnemonic(&mnemonic, account_index, authenticator_ids)
61 .map_err(to_pyvalue_err)?;
62 Ok(Self::new_private(url, credential, account_id, heartbeat))
63 }
64
65 #[pyo3(name = "is_connected")]
66 fn py_is_connected(&self) -> bool {
67 self.is_connected()
68 }
69
70 #[pyo3(name = "set_account_id")]
71 fn py_set_account_id(&mut self, account_id: AccountId) {
72 self.set_account_id(account_id);
73 }
74
75 #[pyo3(name = "account_id")]
76 fn py_account_id(&self) -> Option<AccountId> {
77 self.account_id()
78 }
79
80 #[getter]
81 fn py_url(&self) -> String {
82 self.url().to_string()
83 }
84
85 #[pyo3(name = "connect")]
86 fn py_connect<'py>(
87 &mut self,
88 py: Python<'py>,
89 instruments: Vec<Py<PyAny>>,
90 callback: Py<PyAny>,
91 ) -> PyResult<Bound<'py, PyAny>> {
92 let mut instruments_any = Vec::new();
94 for inst in instruments {
95 let inst_any = pyobject_to_instrument_any(py, inst)?;
96 instruments_any.push(inst_any);
97 }
98
99 self.cache_instruments(instruments_any);
101
102 let mut client = self.clone();
103
104 pyo3_async_runtimes::tokio::future_into_py(py, async move {
105 client.connect().await.map_err(to_pyvalue_err_dydx)?;
107
108 if let Some(mut rx) = client.take_receiver() {
110 get_runtime().spawn(async move {
112 let _client = client; while let Some(msg) = rx.recv().await {
115 match msg {
116 crate::websocket::enums::NautilusWsMessage::Data(items) => {
117 Python::attach(|py| {
118 for data in items {
119 use nautilus_model::python::data::data_to_pycapsule;
120 let py_obj = data_to_pycapsule(py, data);
121 if let Err(e) = callback.call1(py, (py_obj,)) {
122 log::error!("Error calling Python callback: {e}");
123 }
124 }
125 });
126 }
127 crate::websocket::enums::NautilusWsMessage::Deltas(deltas) => {
128 Python::attach(|py| {
129 use nautilus_model::{
130 data::{Data, OrderBookDeltas_API},
131 python::data::data_to_pycapsule,
132 };
133 let data = Data::Deltas(OrderBookDeltas_API::new(*deltas));
134 let py_obj = data_to_pycapsule(py, data);
135 if let Err(e) = callback.call1(py, (py_obj,)) {
136 log::error!("Error calling Python callback: {e}");
137 }
138 });
139 }
140 crate::websocket::enums::NautilusWsMessage::Error(err) => {
141 log::error!("dYdX WebSocket error: {err}");
142 }
143 crate::websocket::enums::NautilusWsMessage::Reconnected => {
144 log::info!("dYdX WebSocket reconnected");
145 }
146 _ => {
147 }
149 }
150 }
151 });
152 }
153
154 Ok(())
155 })
156 }
157
158 #[pyo3(name = "disconnect")]
159 fn py_disconnect<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
160 let mut client = self.clone();
161 pyo3_async_runtimes::tokio::future_into_py(py, async move {
162 client.disconnect().await.map_err(to_pyvalue_err_dydx)?;
163 Ok(())
164 })
165 }
166
167 #[pyo3(name = "wait_until_active")]
168 fn py_wait_until_active<'py>(
169 &self,
170 py: Python<'py>,
171 timeout_secs: f64,
172 ) -> PyResult<Bound<'py, PyAny>> {
173 let connection_mode = self.connection_mode_atomic();
174
175 pyo3_async_runtimes::tokio::future_into_py(py, async move {
176 let timeout = Duration::from_secs_f64(timeout_secs);
177 let start = Instant::now();
178
179 loop {
180 let mode = connection_mode.load();
181 let mode_u8 = mode.load(Ordering::Relaxed);
182 let is_connected = matches!(
183 mode_u8,
184 x if x == ConnectionMode::Active as u8 || x == ConnectionMode::Reconnect as u8
185 );
186
187 if is_connected {
188 break;
189 }
190
191 if start.elapsed() > timeout {
192 return Err(to_pyvalue_err(std::io::Error::new(
193 std::io::ErrorKind::TimedOut,
194 format!("Client did not become active within {timeout_secs}s"),
195 )));
196 }
197 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
198 }
199
200 Ok(())
201 })
202 }
203
204 #[pyo3(name = "cache_instrument")]
205 fn py_cache_instrument(&self, instrument: Py<PyAny>, py: Python<'_>) -> PyResult<()> {
206 let inst_any = pyobject_to_instrument_any(py, instrument)?;
207 self.cache_instrument(inst_any);
208 Ok(())
209 }
210
211 #[pyo3(name = "cache_instruments")]
212 fn py_cache_instruments(&self, instruments: Vec<Py<PyAny>>, py: Python<'_>) -> PyResult<()> {
213 let mut instruments_any = Vec::new();
214 for inst in instruments {
215 let inst_any = pyobject_to_instrument_any(py, inst)?;
216 instruments_any.push(inst_any);
217 }
218 self.cache_instruments(instruments_any);
219 Ok(())
220 }
221
222 #[pyo3(name = "is_closed")]
223 fn py_is_closed(&self) -> bool {
224 !self.is_connected()
225 }
226
227 #[pyo3(name = "subscribe_trades")]
228 fn py_subscribe_trades<'py>(
229 &self,
230 py: Python<'py>,
231 instrument_id: InstrumentId,
232 ) -> PyResult<Bound<'py, PyAny>> {
233 let client = self.clone();
234 pyo3_async_runtimes::tokio::future_into_py(py, async move {
235 client
236 .subscribe_trades(instrument_id)
237 .await
238 .map_err(to_pyvalue_err_dydx)?;
239 Ok(())
240 })
241 }
242
243 #[pyo3(name = "unsubscribe_trades")]
244 fn py_unsubscribe_trades<'py>(
245 &self,
246 py: Python<'py>,
247 instrument_id: InstrumentId,
248 ) -> PyResult<Bound<'py, PyAny>> {
249 let client = self.clone();
250 pyo3_async_runtimes::tokio::future_into_py(py, async move {
251 client
252 .unsubscribe_trades(instrument_id)
253 .await
254 .map_err(to_pyvalue_err_dydx)?;
255 Ok(())
256 })
257 }
258
259 #[pyo3(name = "subscribe_orderbook")]
260 fn py_subscribe_orderbook<'py>(
261 &self,
262 py: Python<'py>,
263 instrument_id: InstrumentId,
264 ) -> PyResult<Bound<'py, PyAny>> {
265 let client = self.clone();
266 pyo3_async_runtimes::tokio::future_into_py(py, async move {
267 client
268 .subscribe_orderbook(instrument_id)
269 .await
270 .map_err(to_pyvalue_err_dydx)?;
271 Ok(())
272 })
273 }
274
275 #[pyo3(name = "unsubscribe_orderbook")]
276 fn py_unsubscribe_orderbook<'py>(
277 &self,
278 py: Python<'py>,
279 instrument_id: InstrumentId,
280 ) -> PyResult<Bound<'py, PyAny>> {
281 let client = self.clone();
282 pyo3_async_runtimes::tokio::future_into_py(py, async move {
283 client
284 .unsubscribe_orderbook(instrument_id)
285 .await
286 .map_err(to_pyvalue_err_dydx)?;
287 Ok(())
288 })
289 }
290
291 #[pyo3(name = "subscribe_bars")]
292 fn py_subscribe_bars<'py>(
293 &self,
294 py: Python<'py>,
295 bar_type: BarType,
296 resolution: String,
297 ) -> PyResult<Bound<'py, PyAny>> {
298 let client = self.clone();
299 let instrument_id = bar_type.instrument_id();
300
301 let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
303 let topic = format!("{ticker}/{resolution}");
304
305 pyo3_async_runtimes::tokio::future_into_py(py, async move {
306 client
308 .send_command(HandlerCommand::RegisterBarType { topic, bar_type })
309 .map_err(to_pyvalue_err_dydx)?;
310
311 tokio::time::sleep(Duration::from_millis(50)).await;
313
314 client
315 .subscribe_candles(instrument_id, &resolution)
316 .await
317 .map_err(to_pyvalue_err_dydx)?;
318 Ok(())
319 })
320 }
321
322 #[pyo3(name = "unsubscribe_bars")]
323 fn py_unsubscribe_bars<'py>(
324 &self,
325 py: Python<'py>,
326 bar_type: BarType,
327 resolution: String,
328 ) -> PyResult<Bound<'py, PyAny>> {
329 let client = self.clone();
330 let instrument_id = bar_type.instrument_id();
331
332 let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
334 let topic = format!("{ticker}/{resolution}");
335
336 pyo3_async_runtimes::tokio::future_into_py(py, async move {
337 client
338 .unsubscribe_candles(instrument_id, &resolution)
339 .await
340 .map_err(to_pyvalue_err_dydx)?;
341
342 client
344 .send_command(HandlerCommand::UnregisterBarType { topic })
345 .map_err(to_pyvalue_err_dydx)?;
346
347 Ok(())
348 })
349 }
350
351 #[pyo3(name = "subscribe_markets")]
352 fn py_subscribe_markets<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
353 let client = self.clone();
354 pyo3_async_runtimes::tokio::future_into_py(py, async move {
355 client
356 .subscribe_markets()
357 .await
358 .map_err(to_pyvalue_err_dydx)?;
359 Ok(())
360 })
361 }
362
363 #[pyo3(name = "unsubscribe_markets")]
364 fn py_unsubscribe_markets<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
365 let client = self.clone();
366 pyo3_async_runtimes::tokio::future_into_py(py, async move {
367 client
368 .unsubscribe_markets()
369 .await
370 .map_err(to_pyvalue_err_dydx)?;
371 Ok(())
372 })
373 }
374
375 #[pyo3(name = "subscribe_subaccount")]
376 fn py_subscribe_subaccount<'py>(
377 &self,
378 py: Python<'py>,
379 address: String,
380 subaccount_number: u32,
381 ) -> PyResult<Bound<'py, PyAny>> {
382 let client = self.clone();
383 pyo3_async_runtimes::tokio::future_into_py(py, async move {
384 client
385 .subscribe_subaccount(&address, subaccount_number)
386 .await
387 .map_err(to_pyvalue_err_dydx)?;
388 Ok(())
389 })
390 }
391
392 #[pyo3(name = "unsubscribe_subaccount")]
393 fn py_unsubscribe_subaccount<'py>(
394 &self,
395 py: Python<'py>,
396 address: String,
397 subaccount_number: u32,
398 ) -> PyResult<Bound<'py, PyAny>> {
399 let client = self.clone();
400 pyo3_async_runtimes::tokio::future_into_py(py, async move {
401 client
402 .unsubscribe_subaccount(&address, subaccount_number)
403 .await
404 .map_err(to_pyvalue_err_dydx)?;
405 Ok(())
406 })
407 }
408
409 #[pyo3(name = "subscribe_block_height")]
410 fn py_subscribe_block_height<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
411 let client = self.clone();
412 pyo3_async_runtimes::tokio::future_into_py(py, async move {
413 client
414 .subscribe_block_height()
415 .await
416 .map_err(to_pyvalue_err_dydx)?;
417 Ok(())
418 })
419 }
420
421 #[pyo3(name = "unsubscribe_block_height")]
422 fn py_unsubscribe_block_height<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
423 let client = self.clone();
424 pyo3_async_runtimes::tokio::future_into_py(py, async move {
425 client
426 .unsubscribe_block_height()
427 .await
428 .map_err(to_pyvalue_err_dydx)?;
429 Ok(())
430 })
431 }
432}