1use nautilus_core::python::to_pyruntime_err;
19use nautilus_model::{
20 data::{BarType, Data, OrderBookDeltas_API},
21 identifiers::{AccountId, InstrumentId},
22 python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
23};
24use pyo3::{conversion::IntoPyObjectExt, exceptions::PyRuntimeError, prelude::*};
25
26use crate::{
27 common::HyperliquidProductType,
28 websocket::{
29 HyperliquidWebSocketClient,
30 messages::{ExecutionReport, NautilusWsMessage},
31 },
32};
33
34#[pymethods]
35impl HyperliquidWebSocketClient {
36 #[new]
37 #[pyo3(signature = (url=None, testnet=false, product_type=HyperliquidProductType::Perp, account_id=None))]
38 fn py_new(
39 url: Option<String>,
40 testnet: bool,
41 product_type: HyperliquidProductType,
42 account_id: Option<String>,
43 ) -> PyResult<Self> {
44 let account_id = account_id.map(|s| AccountId::from(s.as_str()));
45 Ok(Self::new(url, testnet, product_type, account_id))
46 }
47
48 #[getter]
49 #[pyo3(name = "url")]
50 #[must_use]
51 pub fn py_url(&self) -> String {
52 self.url().to_string()
53 }
54
55 #[pyo3(name = "is_active")]
56 fn py_is_active(&self) -> bool {
57 self.is_active()
58 }
59
60 #[pyo3(name = "is_closed")]
61 fn py_is_closed(&self) -> bool {
62 !self.is_active()
63 }
64
65 #[pyo3(name = "connect")]
66 fn py_connect<'py>(
67 &self,
68 py: Python<'py>,
69 instruments: Vec<Py<PyAny>>,
70 callback: Py<PyAny>,
71 ) -> PyResult<Bound<'py, PyAny>> {
72 for inst in instruments {
73 let inst_any = pyobject_to_instrument_any(py, inst)?;
74 self.cache_instrument(inst_any);
75 }
76
77 let mut client = self.clone();
78
79 pyo3_async_runtimes::tokio::future_into_py(py, async move {
80 client.connect().await.map_err(to_pyruntime_err)?;
81
82 tokio::spawn(async move {
83 loop {
84 let event = client.next_event().await;
85
86 match event {
87 Some(msg) => {
88 tracing::trace!("Received WebSocket message: {msg:?}");
89
90 match msg {
91 NautilusWsMessage::Trades(trade_ticks) => {
92 Python::attach(|py| {
93 for tick in trade_ticks {
94 let py_obj = data_to_pycapsule(py, Data::Trade(tick));
95 if let Err(e) = callback.bind(py).call1((py_obj,)) {
96 tracing::error!(
97 "Error calling Python callback: {}",
98 e
99 );
100 }
101 }
102 });
103 }
104 NautilusWsMessage::Quote(quote_tick) => {
105 Python::attach(|py| {
106 let py_obj = data_to_pycapsule(py, Data::Quote(quote_tick));
107 if let Err(e) = callback.bind(py).call1((py_obj,)) {
108 tracing::error!("Error calling Python callback: {}", e);
109 }
110 });
111 }
112 NautilusWsMessage::Deltas(deltas) => {
113 Python::attach(|py| {
114 let py_obj = data_to_pycapsule(
115 py,
116 Data::Deltas(OrderBookDeltas_API::new(deltas)),
117 );
118 if let Err(e) = callback.bind(py).call1((py_obj,)) {
119 tracing::error!("Error calling Python callback: {}", e);
120 }
121 });
122 }
123 NautilusWsMessage::Candle(bar) => {
124 Python::attach(|py| {
125 let py_obj = data_to_pycapsule(py, Data::Bar(bar));
126 if let Err(e) = callback.bind(py).call1((py_obj,)) {
127 tracing::error!("Error calling Python callback: {}", e);
128 }
129 });
130 }
131 NautilusWsMessage::MarkPrice(mark_price) => {
132 Python::attach(|py| {
133 let py_obj = data_to_pycapsule(
134 py,
135 Data::MarkPriceUpdate(mark_price),
136 );
137 if let Err(e) = callback.bind(py).call1((py_obj,)) {
138 tracing::error!("Error calling Python callback: {}", e);
139 }
140 });
141 }
142 NautilusWsMessage::IndexPrice(index_price) => {
143 Python::attach(|py| {
144 let py_obj = data_to_pycapsule(
145 py,
146 Data::IndexPriceUpdate(index_price),
147 );
148 if let Err(e) = callback.bind(py).call1((py_obj,)) {
149 tracing::error!("Error calling Python callback: {}", e);
150 }
151 });
152 }
153 NautilusWsMessage::FundingRate(funding_rate) => {
154 Python::attach(|py| {
155 if let Ok(py_obj) = funding_rate.into_py_any(py)
156 && let Err(e) = callback.bind(py).call1((py_obj,))
157 {
158 tracing::error!("Error calling Python callback: {}", e);
159 }
160 });
161 }
162 NautilusWsMessage::ExecutionReports(reports) => {
163 Python::attach(|py| {
164 for report in reports {
165 match report {
166 ExecutionReport::Order(order_report) => {
167 tracing::debug!(
168 "Forwarding order status report: order_id={}, status={:?}",
169 order_report.venue_order_id,
170 order_report.order_status
171 );
172 match Py::new(py, order_report) {
173 Ok(py_obj) => {
174 if let Err(e) =
175 callback.bind(py).call1((py_obj,))
176 {
177 tracing::error!(
178 "Error calling Python callback: {}",
179 e
180 );
181 }
182 }
183 Err(e) => {
184 tracing::error!(
185 "Error converting OrderStatusReport to Python: {}",
186 e
187 );
188 }
189 }
190 }
191 ExecutionReport::Fill(fill_report) => {
192 tracing::debug!(
193 "Forwarding fill report: trade_id={}, side={:?}, qty={}, price={}",
194 fill_report.trade_id,
195 fill_report.order_side,
196 fill_report.last_qty,
197 fill_report.last_px
198 );
199 match Py::new(py, fill_report) {
200 Ok(py_obj) => {
201 if let Err(e) =
202 callback.bind(py).call1((py_obj,))
203 {
204 tracing::error!(
205 "Error calling Python callback: {}",
206 e
207 );
208 }
209 }
210 Err(e) => {
211 tracing::error!(
212 "Error converting FillReport to Python: {}",
213 e
214 );
215 }
216 }
217 }
218 }
219 }
220 });
221 }
222 _ => {
223 tracing::debug!("Unhandled message type: {:?}", msg);
224 }
225 }
226 }
227 None => {
228 tracing::info!("WebSocket connection closed");
229 break;
230 }
231 }
232 }
233 });
234
235 Ok(())
236 })
237 }
238
239 #[pyo3(name = "wait_until_active")]
240 fn py_wait_until_active<'py>(
241 &self,
242 py: Python<'py>,
243 timeout_secs: f64,
244 ) -> PyResult<Bound<'py, PyAny>> {
245 let client = self.clone();
246
247 pyo3_async_runtimes::tokio::future_into_py(py, async move {
248 let start = std::time::Instant::now();
249 loop {
250 if client.is_active() {
251 return Ok(());
252 }
253
254 if start.elapsed().as_secs_f64() >= timeout_secs {
255 return Err(PyRuntimeError::new_err(format!(
256 "WebSocket connection did not become active within {} seconds",
257 timeout_secs
258 )));
259 }
260
261 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
262 }
263 })
264 }
265
266 #[pyo3(name = "close")]
267 fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
268 let mut client = self.clone();
269
270 pyo3_async_runtimes::tokio::future_into_py(py, async move {
271 if let Err(e) = client.disconnect().await {
272 tracing::error!("Error on close: {e}");
273 }
274 Ok(())
275 })
276 }
277
278 #[pyo3(name = "subscribe_trades")]
279 fn py_subscribe_trades<'py>(
280 &self,
281 py: Python<'py>,
282 instrument_id: InstrumentId,
283 ) -> PyResult<Bound<'py, PyAny>> {
284 let client = self.clone();
285
286 pyo3_async_runtimes::tokio::future_into_py(py, async move {
287 client
288 .subscribe_trades(instrument_id)
289 .await
290 .map_err(to_pyruntime_err)?;
291 Ok(())
292 })
293 }
294
295 #[pyo3(name = "unsubscribe_trades")]
296 fn py_unsubscribe_trades<'py>(
297 &self,
298 py: Python<'py>,
299 instrument_id: InstrumentId,
300 ) -> PyResult<Bound<'py, PyAny>> {
301 let client = self.clone();
302
303 pyo3_async_runtimes::tokio::future_into_py(py, async move {
304 client
305 .unsubscribe_trades(instrument_id)
306 .await
307 .map_err(to_pyruntime_err)?;
308 Ok(())
309 })
310 }
311
312 #[pyo3(name = "subscribe_book")]
313 fn py_subscribe_book<'py>(
314 &self,
315 py: Python<'py>,
316 instrument_id: InstrumentId,
317 ) -> PyResult<Bound<'py, PyAny>> {
318 let client = self.clone();
319
320 pyo3_async_runtimes::tokio::future_into_py(py, async move {
321 client
322 .subscribe_book(instrument_id)
323 .await
324 .map_err(to_pyruntime_err)?;
325 Ok(())
326 })
327 }
328
329 #[pyo3(name = "unsubscribe_book")]
330 fn py_unsubscribe_book<'py>(
331 &self,
332 py: Python<'py>,
333 instrument_id: InstrumentId,
334 ) -> PyResult<Bound<'py, PyAny>> {
335 let client = self.clone();
336
337 pyo3_async_runtimes::tokio::future_into_py(py, async move {
338 client
339 .unsubscribe_book(instrument_id)
340 .await
341 .map_err(to_pyruntime_err)?;
342 Ok(())
343 })
344 }
345
346 #[pyo3(name = "subscribe_book_deltas")]
347 fn py_subscribe_book_deltas<'py>(
348 &self,
349 py: Python<'py>,
350 instrument_id: InstrumentId,
351 _book_type: u8,
352 _depth: u64,
353 ) -> PyResult<Bound<'py, PyAny>> {
354 let client = self.clone();
355
356 pyo3_async_runtimes::tokio::future_into_py(py, async move {
357 client
358 .subscribe_book(instrument_id)
359 .await
360 .map_err(to_pyruntime_err)?;
361 Ok(())
362 })
363 }
364
365 #[pyo3(name = "unsubscribe_book_deltas")]
366 fn py_unsubscribe_book_deltas<'py>(
367 &self,
368 py: Python<'py>,
369 instrument_id: InstrumentId,
370 ) -> PyResult<Bound<'py, PyAny>> {
371 let client = self.clone();
372
373 pyo3_async_runtimes::tokio::future_into_py(py, async move {
374 client
375 .unsubscribe_book(instrument_id)
376 .await
377 .map_err(to_pyruntime_err)?;
378 Ok(())
379 })
380 }
381
382 #[pyo3(name = "subscribe_book_snapshots")]
383 fn py_subscribe_book_snapshots<'py>(
384 &self,
385 py: Python<'py>,
386 instrument_id: InstrumentId,
387 _book_type: u8,
388 _depth: u64,
389 ) -> PyResult<Bound<'py, PyAny>> {
390 let client = self.clone();
391
392 pyo3_async_runtimes::tokio::future_into_py(py, async move {
393 client
394 .subscribe_book(instrument_id)
395 .await
396 .map_err(to_pyruntime_err)?;
397 Ok(())
398 })
399 }
400
401 #[pyo3(name = "subscribe_quotes")]
402 fn py_subscribe_quotes<'py>(
403 &self,
404 py: Python<'py>,
405 instrument_id: InstrumentId,
406 ) -> PyResult<Bound<'py, PyAny>> {
407 let client = self.clone();
408
409 pyo3_async_runtimes::tokio::future_into_py(py, async move {
410 client
411 .subscribe_quotes(instrument_id)
412 .await
413 .map_err(to_pyruntime_err)?;
414 Ok(())
415 })
416 }
417
418 #[pyo3(name = "unsubscribe_quotes")]
419 fn py_unsubscribe_quotes<'py>(
420 &self,
421 py: Python<'py>,
422 instrument_id: InstrumentId,
423 ) -> PyResult<Bound<'py, PyAny>> {
424 let client = self.clone();
425
426 pyo3_async_runtimes::tokio::future_into_py(py, async move {
427 client
428 .unsubscribe_quotes(instrument_id)
429 .await
430 .map_err(to_pyruntime_err)?;
431 Ok(())
432 })
433 }
434
435 #[pyo3(name = "subscribe_bars")]
436 fn py_subscribe_bars<'py>(
437 &self,
438 py: Python<'py>,
439 bar_type: BarType,
440 ) -> PyResult<Bound<'py, PyAny>> {
441 let client = self.clone();
442
443 pyo3_async_runtimes::tokio::future_into_py(py, async move {
444 client
445 .subscribe_bars(bar_type)
446 .await
447 .map_err(to_pyruntime_err)?;
448 Ok(())
449 })
450 }
451
452 #[pyo3(name = "unsubscribe_bars")]
453 fn py_unsubscribe_bars<'py>(
454 &self,
455 py: Python<'py>,
456 bar_type: BarType,
457 ) -> PyResult<Bound<'py, PyAny>> {
458 let client = self.clone();
459
460 pyo3_async_runtimes::tokio::future_into_py(py, async move {
461 client
462 .unsubscribe_bars(bar_type)
463 .await
464 .map_err(to_pyruntime_err)?;
465 Ok(())
466 })
467 }
468
469 #[pyo3(name = "subscribe_order_updates")]
470 fn py_subscribe_order_updates<'py>(
471 &self,
472 py: Python<'py>,
473 user: String,
474 ) -> PyResult<Bound<'py, PyAny>> {
475 let client = self.clone();
476
477 pyo3_async_runtimes::tokio::future_into_py(py, async move {
478 client
479 .subscribe_order_updates(&user)
480 .await
481 .map_err(to_pyruntime_err)?;
482 Ok(())
483 })
484 }
485
486 #[pyo3(name = "subscribe_user_events")]
487 fn py_subscribe_user_events<'py>(
488 &self,
489 py: Python<'py>,
490 user: String,
491 ) -> PyResult<Bound<'py, PyAny>> {
492 let client = self.clone();
493
494 pyo3_async_runtimes::tokio::future_into_py(py, async move {
495 client
496 .subscribe_user_events(&user)
497 .await
498 .map_err(to_pyruntime_err)?;
499 Ok(())
500 })
501 }
502
503 #[pyo3(name = "subscribe_mark_prices")]
504 fn py_subscribe_mark_prices<'py>(
505 &self,
506 py: Python<'py>,
507 instrument_id: InstrumentId,
508 ) -> PyResult<Bound<'py, PyAny>> {
509 let client = self.clone();
510
511 pyo3_async_runtimes::tokio::future_into_py(py, async move {
512 client
513 .subscribe_mark_prices(instrument_id)
514 .await
515 .map_err(to_pyruntime_err)?;
516 Ok(())
517 })
518 }
519
520 #[pyo3(name = "unsubscribe_mark_prices")]
521 fn py_unsubscribe_mark_prices<'py>(
522 &self,
523 py: Python<'py>,
524 instrument_id: InstrumentId,
525 ) -> PyResult<Bound<'py, PyAny>> {
526 let client = self.clone();
527
528 pyo3_async_runtimes::tokio::future_into_py(py, async move {
529 client
530 .unsubscribe_mark_prices(instrument_id)
531 .await
532 .map_err(to_pyruntime_err)?;
533 Ok(())
534 })
535 }
536
537 #[pyo3(name = "subscribe_index_prices")]
538 fn py_subscribe_index_prices<'py>(
539 &self,
540 py: Python<'py>,
541 instrument_id: InstrumentId,
542 ) -> PyResult<Bound<'py, PyAny>> {
543 let client = self.clone();
544
545 pyo3_async_runtimes::tokio::future_into_py(py, async move {
546 client
547 .subscribe_index_prices(instrument_id)
548 .await
549 .map_err(to_pyruntime_err)?;
550 Ok(())
551 })
552 }
553
554 #[pyo3(name = "unsubscribe_index_prices")]
555 fn py_unsubscribe_index_prices<'py>(
556 &self,
557 py: Python<'py>,
558 instrument_id: InstrumentId,
559 ) -> PyResult<Bound<'py, PyAny>> {
560 let client = self.clone();
561
562 pyo3_async_runtimes::tokio::future_into_py(py, async move {
563 client
564 .unsubscribe_index_prices(instrument_id)
565 .await
566 .map_err(to_pyruntime_err)?;
567 Ok(())
568 })
569 }
570
571 #[pyo3(name = "subscribe_funding_rates")]
572 fn py_subscribe_funding_rates<'py>(
573 &self,
574 py: Python<'py>,
575 instrument_id: InstrumentId,
576 ) -> PyResult<Bound<'py, PyAny>> {
577 let client = self.clone();
578
579 pyo3_async_runtimes::tokio::future_into_py(py, async move {
580 client
581 .subscribe_funding_rates(instrument_id)
582 .await
583 .map_err(to_pyruntime_err)?;
584 Ok(())
585 })
586 }
587
588 #[pyo3(name = "unsubscribe_funding_rates")]
589 fn py_unsubscribe_funding_rates<'py>(
590 &self,
591 py: Python<'py>,
592 instrument_id: InstrumentId,
593 ) -> PyResult<Bound<'py, PyAny>> {
594 let client = self.clone();
595
596 pyo3_async_runtimes::tokio::future_into_py(py, async move {
597 client
598 .unsubscribe_funding_rates(instrument_id)
599 .await
600 .map_err(to_pyruntime_err)?;
601 Ok(())
602 })
603 }
604}