1use nautilus_core::python::to_pyruntime_err;
19use nautilus_model::{
20 data::{BarType, Data, OrderBookDeltas_API},
21 identifiers::{AccountId, InstrumentId},
22 python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
23};
24use pyo3::{conversion::IntoPyObjectExt, exceptions::PyRuntimeError, prelude::*};
25
26use crate::{
27 common::HyperliquidProductType,
28 websocket::{
29 HyperliquidWebSocketClient,
30 messages::{ExecutionReport, NautilusWsMessage},
31 },
32};
33
34#[pymethods]
35impl HyperliquidWebSocketClient {
36 #[new]
37 #[pyo3(signature = (url=None, testnet=false, product_type=HyperliquidProductType::Perp, account_id=None))]
38 fn py_new(
39 url: Option<String>,
40 testnet: bool,
41 product_type: HyperliquidProductType,
42 account_id: Option<String>,
43 ) -> PyResult<Self> {
44 let account_id = account_id.map(|s| AccountId::from(s.as_str()));
45 Ok(Self::new(url, testnet, product_type, account_id))
46 }
47
48 #[getter]
49 #[pyo3(name = "url")]
50 #[must_use]
51 pub fn py_url(&self) -> String {
52 self.url().to_string()
53 }
54
55 #[pyo3(name = "is_active")]
56 fn py_is_active(&self) -> bool {
57 self.is_active()
58 }
59
60 #[pyo3(name = "is_closed")]
61 fn py_is_closed(&self) -> bool {
62 !self.is_active()
63 }
64
65 #[pyo3(name = "connect")]
66 fn py_connect<'py>(
67 &self,
68 py: Python<'py>,
69 instruments: Vec<Py<PyAny>>,
70 callback: Py<PyAny>,
71 ) -> PyResult<Bound<'py, PyAny>> {
72 for inst in instruments {
73 let inst_any = pyobject_to_instrument_any(py, inst)?;
74 self.cache_instrument(inst_any);
75 }
76
77 let mut client = self.clone();
78
79 pyo3_async_runtimes::tokio::future_into_py(py, async move {
80 client.connect().await.map_err(to_pyruntime_err)?;
81
82 tokio::spawn(async move {
83 loop {
84 let event = client.next_event().await;
85
86 match event {
87 Some(msg) => {
88 tracing::trace!("Received WebSocket message: {msg:?}");
89
90 match msg {
91 NautilusWsMessage::Trades(trade_ticks) => {
92 Python::attach(|py| {
93 for tick in trade_ticks {
94 let py_obj = data_to_pycapsule(py, Data::Trade(tick));
95 if let Err(e) = callback.bind(py).call1((py_obj,)) {
96 tracing::error!(
97 "Error calling Python callback: {}",
98 e
99 );
100 }
101 }
102 });
103 }
104 NautilusWsMessage::Quote(quote_tick) => {
105 Python::attach(|py| {
106 let py_obj = data_to_pycapsule(py, Data::Quote(quote_tick));
107 if let Err(e) = callback.bind(py).call1((py_obj,)) {
108 tracing::error!("Error calling Python callback: {}", e);
109 }
110 });
111 }
112 NautilusWsMessage::Deltas(deltas) => {
113 Python::attach(|py| {
114 let py_obj = data_to_pycapsule(
115 py,
116 Data::Deltas(OrderBookDeltas_API::new(deltas)),
117 );
118 if let Err(e) = callback.bind(py).call1((py_obj,)) {
119 tracing::error!("Error calling Python callback: {}", e);
120 }
121 });
122 }
123 NautilusWsMessage::Candle(bar) => {
124 Python::attach(|py| {
125 let py_obj = data_to_pycapsule(py, Data::Bar(bar));
126 if let Err(e) = callback.bind(py).call1((py_obj,)) {
127 tracing::error!("Error calling Python callback: {}", e);
128 }
129 });
130 }
131 NautilusWsMessage::MarkPrice(mark_price) => {
132 Python::attach(|py| {
133 let py_obj = data_to_pycapsule(
134 py,
135 Data::MarkPriceUpdate(mark_price),
136 );
137 if let Err(e) = callback.bind(py).call1((py_obj,)) {
138 tracing::error!("Error calling Python callback: {}", e);
139 }
140 });
141 }
142 NautilusWsMessage::IndexPrice(index_price) => {
143 Python::attach(|py| {
144 let py_obj = data_to_pycapsule(
145 py,
146 Data::IndexPriceUpdate(index_price),
147 );
148 if let Err(e) = callback.bind(py).call1((py_obj,)) {
149 tracing::error!("Error calling Python callback: {}", e);
150 }
151 });
152 }
153 NautilusWsMessage::FundingRate(funding_rate) => {
154 Python::attach(|py| {
155 if let Ok(py_obj) = funding_rate.into_py_any(py)
156 && let Err(e) = callback.bind(py).call1((py_obj,))
157 {
158 tracing::error!("Error calling Python callback: {}", e);
159 }
160 });
161 }
162 NautilusWsMessage::ExecutionReports(reports) => {
163 Python::attach(|py| {
164 for report in reports {
165 match report {
166 ExecutionReport::Order(order_report) => {
167 tracing::debug!(
168 "Forwarding order status report: order_id={}, status={:?}",
169 order_report.venue_order_id,
170 order_report.order_status
171 );
172 match Py::new(py, order_report) {
173 Ok(py_obj) => {
174 if let Err(e) =
175 callback.bind(py).call1((py_obj,))
176 {
177 tracing::error!(
178 "Error calling Python callback: {}",
179 e
180 );
181 }
182 }
183 Err(e) => {
184 tracing::error!(
185 "Error converting OrderStatusReport to Python: {}",
186 e
187 );
188 }
189 }
190 }
191 ExecutionReport::Fill(fill_report) => {
192 tracing::debug!(
193 "Forwarding fill report: trade_id={}, side={:?}, qty={}, price={}",
194 fill_report.trade_id,
195 fill_report.order_side,
196 fill_report.last_qty,
197 fill_report.last_px
198 );
199 match Py::new(py, fill_report) {
200 Ok(py_obj) => {
201 if let Err(e) =
202 callback.bind(py).call1((py_obj,))
203 {
204 tracing::error!(
205 "Error calling Python callback: {}",
206 e
207 );
208 }
209 }
210 Err(e) => {
211 tracing::error!(
212 "Error converting FillReport to Python: {}",
213 e
214 );
215 }
216 }
217 }
218 }
219 }
220 });
221 }
222 _ => {
223 tracing::debug!("Unhandled message type: {:?}", msg);
224 }
225 }
226 }
227 None => {
228 tracing::info!("WebSocket connection closed");
229 break;
230 }
231 }
232 }
233 });
234
235 Ok(())
236 })
237 }
238
239 #[pyo3(name = "wait_until_active")]
240 fn py_wait_until_active<'py>(
241 &self,
242 py: Python<'py>,
243 timeout_secs: f64,
244 ) -> PyResult<Bound<'py, PyAny>> {
245 let client = self.clone();
246
247 pyo3_async_runtimes::tokio::future_into_py(py, async move {
248 let start = std::time::Instant::now();
249 loop {
250 if client.is_active() {
251 return Ok(());
252 }
253
254 if start.elapsed().as_secs_f64() >= timeout_secs {
255 return Err(PyRuntimeError::new_err(format!(
256 "WebSocket connection did not become active within {timeout_secs} seconds"
257 )));
258 }
259
260 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
261 }
262 })
263 }
264
265 #[pyo3(name = "close")]
266 fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
267 let mut client = self.clone();
268
269 pyo3_async_runtimes::tokio::future_into_py(py, async move {
270 if let Err(e) = client.disconnect().await {
271 tracing::error!("Error on close: {e}");
272 }
273 Ok(())
274 })
275 }
276
277 #[pyo3(name = "subscribe_trades")]
278 fn py_subscribe_trades<'py>(
279 &self,
280 py: Python<'py>,
281 instrument_id: InstrumentId,
282 ) -> PyResult<Bound<'py, PyAny>> {
283 let client = self.clone();
284
285 pyo3_async_runtimes::tokio::future_into_py(py, async move {
286 client
287 .subscribe_trades(instrument_id)
288 .await
289 .map_err(to_pyruntime_err)?;
290 Ok(())
291 })
292 }
293
294 #[pyo3(name = "unsubscribe_trades")]
295 fn py_unsubscribe_trades<'py>(
296 &self,
297 py: Python<'py>,
298 instrument_id: InstrumentId,
299 ) -> PyResult<Bound<'py, PyAny>> {
300 let client = self.clone();
301
302 pyo3_async_runtimes::tokio::future_into_py(py, async move {
303 client
304 .unsubscribe_trades(instrument_id)
305 .await
306 .map_err(to_pyruntime_err)?;
307 Ok(())
308 })
309 }
310
311 #[pyo3(name = "subscribe_book")]
312 fn py_subscribe_book<'py>(
313 &self,
314 py: Python<'py>,
315 instrument_id: InstrumentId,
316 ) -> PyResult<Bound<'py, PyAny>> {
317 let client = self.clone();
318
319 pyo3_async_runtimes::tokio::future_into_py(py, async move {
320 client
321 .subscribe_book(instrument_id)
322 .await
323 .map_err(to_pyruntime_err)?;
324 Ok(())
325 })
326 }
327
328 #[pyo3(name = "unsubscribe_book")]
329 fn py_unsubscribe_book<'py>(
330 &self,
331 py: Python<'py>,
332 instrument_id: InstrumentId,
333 ) -> PyResult<Bound<'py, PyAny>> {
334 let client = self.clone();
335
336 pyo3_async_runtimes::tokio::future_into_py(py, async move {
337 client
338 .unsubscribe_book(instrument_id)
339 .await
340 .map_err(to_pyruntime_err)?;
341 Ok(())
342 })
343 }
344
345 #[pyo3(name = "subscribe_book_deltas")]
346 fn py_subscribe_book_deltas<'py>(
347 &self,
348 py: Python<'py>,
349 instrument_id: InstrumentId,
350 _book_type: u8,
351 _depth: u64,
352 ) -> PyResult<Bound<'py, PyAny>> {
353 let client = self.clone();
354
355 pyo3_async_runtimes::tokio::future_into_py(py, async move {
356 client
357 .subscribe_book(instrument_id)
358 .await
359 .map_err(to_pyruntime_err)?;
360 Ok(())
361 })
362 }
363
364 #[pyo3(name = "unsubscribe_book_deltas")]
365 fn py_unsubscribe_book_deltas<'py>(
366 &self,
367 py: Python<'py>,
368 instrument_id: InstrumentId,
369 ) -> PyResult<Bound<'py, PyAny>> {
370 let client = self.clone();
371
372 pyo3_async_runtimes::tokio::future_into_py(py, async move {
373 client
374 .unsubscribe_book(instrument_id)
375 .await
376 .map_err(to_pyruntime_err)?;
377 Ok(())
378 })
379 }
380
381 #[pyo3(name = "subscribe_book_snapshots")]
382 fn py_subscribe_book_snapshots<'py>(
383 &self,
384 py: Python<'py>,
385 instrument_id: InstrumentId,
386 _book_type: u8,
387 _depth: u64,
388 ) -> PyResult<Bound<'py, PyAny>> {
389 let client = self.clone();
390
391 pyo3_async_runtimes::tokio::future_into_py(py, async move {
392 client
393 .subscribe_book(instrument_id)
394 .await
395 .map_err(to_pyruntime_err)?;
396 Ok(())
397 })
398 }
399
400 #[pyo3(name = "subscribe_quotes")]
401 fn py_subscribe_quotes<'py>(
402 &self,
403 py: Python<'py>,
404 instrument_id: InstrumentId,
405 ) -> PyResult<Bound<'py, PyAny>> {
406 let client = self.clone();
407
408 pyo3_async_runtimes::tokio::future_into_py(py, async move {
409 client
410 .subscribe_quotes(instrument_id)
411 .await
412 .map_err(to_pyruntime_err)?;
413 Ok(())
414 })
415 }
416
417 #[pyo3(name = "unsubscribe_quotes")]
418 fn py_unsubscribe_quotes<'py>(
419 &self,
420 py: Python<'py>,
421 instrument_id: InstrumentId,
422 ) -> PyResult<Bound<'py, PyAny>> {
423 let client = self.clone();
424
425 pyo3_async_runtimes::tokio::future_into_py(py, async move {
426 client
427 .unsubscribe_quotes(instrument_id)
428 .await
429 .map_err(to_pyruntime_err)?;
430 Ok(())
431 })
432 }
433
434 #[pyo3(name = "subscribe_bars")]
435 fn py_subscribe_bars<'py>(
436 &self,
437 py: Python<'py>,
438 bar_type: BarType,
439 ) -> PyResult<Bound<'py, PyAny>> {
440 let client = self.clone();
441
442 pyo3_async_runtimes::tokio::future_into_py(py, async move {
443 client
444 .subscribe_bars(bar_type)
445 .await
446 .map_err(to_pyruntime_err)?;
447 Ok(())
448 })
449 }
450
451 #[pyo3(name = "unsubscribe_bars")]
452 fn py_unsubscribe_bars<'py>(
453 &self,
454 py: Python<'py>,
455 bar_type: BarType,
456 ) -> PyResult<Bound<'py, PyAny>> {
457 let client = self.clone();
458
459 pyo3_async_runtimes::tokio::future_into_py(py, async move {
460 client
461 .unsubscribe_bars(bar_type)
462 .await
463 .map_err(to_pyruntime_err)?;
464 Ok(())
465 })
466 }
467
468 #[pyo3(name = "subscribe_order_updates")]
469 fn py_subscribe_order_updates<'py>(
470 &self,
471 py: Python<'py>,
472 user: String,
473 ) -> PyResult<Bound<'py, PyAny>> {
474 let client = self.clone();
475
476 pyo3_async_runtimes::tokio::future_into_py(py, async move {
477 client
478 .subscribe_order_updates(&user)
479 .await
480 .map_err(to_pyruntime_err)?;
481 Ok(())
482 })
483 }
484
485 #[pyo3(name = "subscribe_user_events")]
486 fn py_subscribe_user_events<'py>(
487 &self,
488 py: Python<'py>,
489 user: String,
490 ) -> PyResult<Bound<'py, PyAny>> {
491 let client = self.clone();
492
493 pyo3_async_runtimes::tokio::future_into_py(py, async move {
494 client
495 .subscribe_user_events(&user)
496 .await
497 .map_err(to_pyruntime_err)?;
498 Ok(())
499 })
500 }
501
502 #[pyo3(name = "subscribe_mark_prices")]
503 fn py_subscribe_mark_prices<'py>(
504 &self,
505 py: Python<'py>,
506 instrument_id: InstrumentId,
507 ) -> PyResult<Bound<'py, PyAny>> {
508 let client = self.clone();
509
510 pyo3_async_runtimes::tokio::future_into_py(py, async move {
511 client
512 .subscribe_mark_prices(instrument_id)
513 .await
514 .map_err(to_pyruntime_err)?;
515 Ok(())
516 })
517 }
518
519 #[pyo3(name = "unsubscribe_mark_prices")]
520 fn py_unsubscribe_mark_prices<'py>(
521 &self,
522 py: Python<'py>,
523 instrument_id: InstrumentId,
524 ) -> PyResult<Bound<'py, PyAny>> {
525 let client = self.clone();
526
527 pyo3_async_runtimes::tokio::future_into_py(py, async move {
528 client
529 .unsubscribe_mark_prices(instrument_id)
530 .await
531 .map_err(to_pyruntime_err)?;
532 Ok(())
533 })
534 }
535
536 #[pyo3(name = "subscribe_index_prices")]
537 fn py_subscribe_index_prices<'py>(
538 &self,
539 py: Python<'py>,
540 instrument_id: InstrumentId,
541 ) -> PyResult<Bound<'py, PyAny>> {
542 let client = self.clone();
543
544 pyo3_async_runtimes::tokio::future_into_py(py, async move {
545 client
546 .subscribe_index_prices(instrument_id)
547 .await
548 .map_err(to_pyruntime_err)?;
549 Ok(())
550 })
551 }
552
553 #[pyo3(name = "unsubscribe_index_prices")]
554 fn py_unsubscribe_index_prices<'py>(
555 &self,
556 py: Python<'py>,
557 instrument_id: InstrumentId,
558 ) -> PyResult<Bound<'py, PyAny>> {
559 let client = self.clone();
560
561 pyo3_async_runtimes::tokio::future_into_py(py, async move {
562 client
563 .unsubscribe_index_prices(instrument_id)
564 .await
565 .map_err(to_pyruntime_err)?;
566 Ok(())
567 })
568 }
569
570 #[pyo3(name = "subscribe_funding_rates")]
571 fn py_subscribe_funding_rates<'py>(
572 &self,
573 py: Python<'py>,
574 instrument_id: InstrumentId,
575 ) -> PyResult<Bound<'py, PyAny>> {
576 let client = self.clone();
577
578 pyo3_async_runtimes::tokio::future_into_py(py, async move {
579 client
580 .subscribe_funding_rates(instrument_id)
581 .await
582 .map_err(to_pyruntime_err)?;
583 Ok(())
584 })
585 }
586
587 #[pyo3(name = "unsubscribe_funding_rates")]
588 fn py_unsubscribe_funding_rates<'py>(
589 &self,
590 py: Python<'py>,
591 instrument_id: InstrumentId,
592 ) -> PyResult<Bound<'py, PyAny>> {
593 let client = self.clone();
594
595 pyo3_async_runtimes::tokio::future_into_py(py, async move {
596 client
597 .unsubscribe_funding_rates(instrument_id)
598 .await
599 .map_err(to_pyruntime_err)?;
600 Ok(())
601 })
602 }
603}