1use nautilus_common::live::get_runtime;
19use nautilus_core::python::to_pyruntime_err;
20use nautilus_model::{
21 data::{BarType, Data, OrderBookDeltas_API},
22 identifiers::{AccountId, InstrumentId},
23 python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
24};
25use pyo3::{conversion::IntoPyObjectExt, exceptions::PyRuntimeError, prelude::*};
26
27use crate::{
28 common::HyperliquidProductType,
29 websocket::{
30 HyperliquidWebSocketClient,
31 messages::{ExecutionReport, NautilusWsMessage},
32 },
33};
34
35#[pymethods]
36impl HyperliquidWebSocketClient {
37 #[new]
38 #[pyo3(signature = (url=None, testnet=false, product_type=HyperliquidProductType::Perp, account_id=None))]
39 fn py_new(
40 url: Option<String>,
41 testnet: bool,
42 product_type: HyperliquidProductType,
43 account_id: Option<String>,
44 ) -> PyResult<Self> {
45 let account_id = account_id.map(|s| AccountId::from(s.as_str()));
46 Ok(Self::new(url, testnet, product_type, account_id))
47 }
48
49 #[getter]
50 #[pyo3(name = "url")]
51 #[must_use]
52 pub fn py_url(&self) -> String {
53 self.url().to_string()
54 }
55
56 #[pyo3(name = "is_active")]
57 fn py_is_active(&self) -> bool {
58 self.is_active()
59 }
60
61 #[pyo3(name = "is_closed")]
62 fn py_is_closed(&self) -> bool {
63 !self.is_active()
64 }
65
66 #[pyo3(name = "connect")]
67 fn py_connect<'py>(
68 &self,
69 py: Python<'py>,
70 instruments: Vec<Py<PyAny>>,
71 callback: Py<PyAny>,
72 ) -> PyResult<Bound<'py, PyAny>> {
73 for inst in instruments {
74 let inst_any = pyobject_to_instrument_any(py, inst)?;
75 self.cache_instrument(inst_any);
76 }
77
78 let mut client = self.clone();
79
80 pyo3_async_runtimes::tokio::future_into_py(py, async move {
81 client.connect().await.map_err(to_pyruntime_err)?;
82
83 get_runtime().spawn(async move {
84 loop {
85 let event = client.next_event().await;
86
87 match event {
88 Some(msg) => {
89 tracing::trace!("Received WebSocket message: {msg:?}");
90
91 match msg {
92 NautilusWsMessage::Trades(trade_ticks) => {
93 Python::attach(|py| {
94 for tick in trade_ticks {
95 let py_obj = data_to_pycapsule(py, Data::Trade(tick));
96 if let Err(e) = callback.bind(py).call1((py_obj,)) {
97 tracing::error!(
98 "Error calling Python callback: {}",
99 e
100 );
101 }
102 }
103 });
104 }
105 NautilusWsMessage::Quote(quote_tick) => {
106 Python::attach(|py| {
107 let py_obj = data_to_pycapsule(py, Data::Quote(quote_tick));
108 if let Err(e) = callback.bind(py).call1((py_obj,)) {
109 tracing::error!("Error calling Python callback: {}", e);
110 }
111 });
112 }
113 NautilusWsMessage::Deltas(deltas) => {
114 Python::attach(|py| {
115 let py_obj = data_to_pycapsule(
116 py,
117 Data::Deltas(OrderBookDeltas_API::new(deltas)),
118 );
119 if let Err(e) = callback.bind(py).call1((py_obj,)) {
120 tracing::error!("Error calling Python callback: {}", e);
121 }
122 });
123 }
124 NautilusWsMessage::Candle(bar) => {
125 Python::attach(|py| {
126 let py_obj = data_to_pycapsule(py, Data::Bar(bar));
127 if let Err(e) = callback.bind(py).call1((py_obj,)) {
128 tracing::error!("Error calling Python callback: {}", e);
129 }
130 });
131 }
132 NautilusWsMessage::MarkPrice(mark_price) => {
133 Python::attach(|py| {
134 let py_obj = data_to_pycapsule(
135 py,
136 Data::MarkPriceUpdate(mark_price),
137 );
138 if let Err(e) = callback.bind(py).call1((py_obj,)) {
139 tracing::error!("Error calling Python callback: {}", e);
140 }
141 });
142 }
143 NautilusWsMessage::IndexPrice(index_price) => {
144 Python::attach(|py| {
145 let py_obj = data_to_pycapsule(
146 py,
147 Data::IndexPriceUpdate(index_price),
148 );
149 if let Err(e) = callback.bind(py).call1((py_obj,)) {
150 tracing::error!("Error calling Python callback: {}", e);
151 }
152 });
153 }
154 NautilusWsMessage::FundingRate(funding_rate) => {
155 Python::attach(|py| {
156 if let Ok(py_obj) = funding_rate.into_py_any(py)
157 && let Err(e) = callback.bind(py).call1((py_obj,))
158 {
159 tracing::error!("Error calling Python callback: {}", e);
160 }
161 });
162 }
163 NautilusWsMessage::ExecutionReports(reports) => {
164 Python::attach(|py| {
165 for report in reports {
166 match report {
167 ExecutionReport::Order(order_report) => {
168 tracing::debug!(
169 "Forwarding order status report: order_id={}, status={:?}",
170 order_report.venue_order_id,
171 order_report.order_status
172 );
173 match Py::new(py, order_report) {
174 Ok(py_obj) => {
175 if let Err(e) =
176 callback.bind(py).call1((py_obj,))
177 {
178 tracing::error!(
179 "Error calling Python callback: {}",
180 e
181 );
182 }
183 }
184 Err(e) => {
185 tracing::error!(
186 "Error converting OrderStatusReport to Python: {}",
187 e
188 );
189 }
190 }
191 }
192 ExecutionReport::Fill(fill_report) => {
193 tracing::debug!(
194 "Forwarding fill report: trade_id={}, side={:?}, qty={}, price={}",
195 fill_report.trade_id,
196 fill_report.order_side,
197 fill_report.last_qty,
198 fill_report.last_px
199 );
200 match Py::new(py, fill_report) {
201 Ok(py_obj) => {
202 if let Err(e) =
203 callback.bind(py).call1((py_obj,))
204 {
205 tracing::error!(
206 "Error calling Python callback: {}",
207 e
208 );
209 }
210 }
211 Err(e) => {
212 tracing::error!(
213 "Error converting FillReport to Python: {}",
214 e
215 );
216 }
217 }
218 }
219 }
220 }
221 });
222 }
223 _ => {
224 tracing::debug!("Unhandled message type: {:?}", msg);
225 }
226 }
227 }
228 None => {
229 tracing::info!("WebSocket connection closed");
230 break;
231 }
232 }
233 }
234 });
235
236 Ok(())
237 })
238 }
239
240 #[pyo3(name = "wait_until_active")]
241 fn py_wait_until_active<'py>(
242 &self,
243 py: Python<'py>,
244 timeout_secs: f64,
245 ) -> PyResult<Bound<'py, PyAny>> {
246 let client = self.clone();
247
248 pyo3_async_runtimes::tokio::future_into_py(py, async move {
249 let start = std::time::Instant::now();
250 loop {
251 if client.is_active() {
252 return Ok(());
253 }
254
255 if start.elapsed().as_secs_f64() >= timeout_secs {
256 return Err(PyRuntimeError::new_err(format!(
257 "WebSocket connection did not become active within {timeout_secs} seconds"
258 )));
259 }
260
261 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
262 }
263 })
264 }
265
266 #[pyo3(name = "close")]
267 fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
268 let mut client = self.clone();
269
270 pyo3_async_runtimes::tokio::future_into_py(py, async move {
271 if let Err(e) = client.disconnect().await {
272 tracing::error!("Error on close: {e}");
273 }
274 Ok(())
275 })
276 }
277
278 #[pyo3(name = "subscribe_trades")]
279 fn py_subscribe_trades<'py>(
280 &self,
281 py: Python<'py>,
282 instrument_id: InstrumentId,
283 ) -> PyResult<Bound<'py, PyAny>> {
284 let client = self.clone();
285
286 pyo3_async_runtimes::tokio::future_into_py(py, async move {
287 client
288 .subscribe_trades(instrument_id)
289 .await
290 .map_err(to_pyruntime_err)?;
291 Ok(())
292 })
293 }
294
295 #[pyo3(name = "unsubscribe_trades")]
296 fn py_unsubscribe_trades<'py>(
297 &self,
298 py: Python<'py>,
299 instrument_id: InstrumentId,
300 ) -> PyResult<Bound<'py, PyAny>> {
301 let client = self.clone();
302
303 pyo3_async_runtimes::tokio::future_into_py(py, async move {
304 client
305 .unsubscribe_trades(instrument_id)
306 .await
307 .map_err(to_pyruntime_err)?;
308 Ok(())
309 })
310 }
311
312 #[pyo3(name = "subscribe_book")]
313 fn py_subscribe_book<'py>(
314 &self,
315 py: Python<'py>,
316 instrument_id: InstrumentId,
317 ) -> PyResult<Bound<'py, PyAny>> {
318 let client = self.clone();
319
320 pyo3_async_runtimes::tokio::future_into_py(py, async move {
321 client
322 .subscribe_book(instrument_id)
323 .await
324 .map_err(to_pyruntime_err)?;
325 Ok(())
326 })
327 }
328
329 #[pyo3(name = "unsubscribe_book")]
330 fn py_unsubscribe_book<'py>(
331 &self,
332 py: Python<'py>,
333 instrument_id: InstrumentId,
334 ) -> PyResult<Bound<'py, PyAny>> {
335 let client = self.clone();
336
337 pyo3_async_runtimes::tokio::future_into_py(py, async move {
338 client
339 .unsubscribe_book(instrument_id)
340 .await
341 .map_err(to_pyruntime_err)?;
342 Ok(())
343 })
344 }
345
346 #[pyo3(name = "subscribe_book_deltas")]
347 fn py_subscribe_book_deltas<'py>(
348 &self,
349 py: Python<'py>,
350 instrument_id: InstrumentId,
351 _book_type: u8,
352 _depth: u64,
353 ) -> PyResult<Bound<'py, PyAny>> {
354 let client = self.clone();
355
356 pyo3_async_runtimes::tokio::future_into_py(py, async move {
357 client
358 .subscribe_book(instrument_id)
359 .await
360 .map_err(to_pyruntime_err)?;
361 Ok(())
362 })
363 }
364
365 #[pyo3(name = "unsubscribe_book_deltas")]
366 fn py_unsubscribe_book_deltas<'py>(
367 &self,
368 py: Python<'py>,
369 instrument_id: InstrumentId,
370 ) -> PyResult<Bound<'py, PyAny>> {
371 let client = self.clone();
372
373 pyo3_async_runtimes::tokio::future_into_py(py, async move {
374 client
375 .unsubscribe_book(instrument_id)
376 .await
377 .map_err(to_pyruntime_err)?;
378 Ok(())
379 })
380 }
381
382 #[pyo3(name = "subscribe_book_snapshots")]
383 fn py_subscribe_book_snapshots<'py>(
384 &self,
385 py: Python<'py>,
386 instrument_id: InstrumentId,
387 _book_type: u8,
388 _depth: u64,
389 ) -> PyResult<Bound<'py, PyAny>> {
390 let client = self.clone();
391
392 pyo3_async_runtimes::tokio::future_into_py(py, async move {
393 client
394 .subscribe_book(instrument_id)
395 .await
396 .map_err(to_pyruntime_err)?;
397 Ok(())
398 })
399 }
400
401 #[pyo3(name = "subscribe_quotes")]
402 fn py_subscribe_quotes<'py>(
403 &self,
404 py: Python<'py>,
405 instrument_id: InstrumentId,
406 ) -> PyResult<Bound<'py, PyAny>> {
407 let client = self.clone();
408
409 pyo3_async_runtimes::tokio::future_into_py(py, async move {
410 client
411 .subscribe_quotes(instrument_id)
412 .await
413 .map_err(to_pyruntime_err)?;
414 Ok(())
415 })
416 }
417
418 #[pyo3(name = "unsubscribe_quotes")]
419 fn py_unsubscribe_quotes<'py>(
420 &self,
421 py: Python<'py>,
422 instrument_id: InstrumentId,
423 ) -> PyResult<Bound<'py, PyAny>> {
424 let client = self.clone();
425
426 pyo3_async_runtimes::tokio::future_into_py(py, async move {
427 client
428 .unsubscribe_quotes(instrument_id)
429 .await
430 .map_err(to_pyruntime_err)?;
431 Ok(())
432 })
433 }
434
435 #[pyo3(name = "subscribe_bars")]
436 fn py_subscribe_bars<'py>(
437 &self,
438 py: Python<'py>,
439 bar_type: BarType,
440 ) -> PyResult<Bound<'py, PyAny>> {
441 let client = self.clone();
442
443 pyo3_async_runtimes::tokio::future_into_py(py, async move {
444 client
445 .subscribe_bars(bar_type)
446 .await
447 .map_err(to_pyruntime_err)?;
448 Ok(())
449 })
450 }
451
452 #[pyo3(name = "unsubscribe_bars")]
453 fn py_unsubscribe_bars<'py>(
454 &self,
455 py: Python<'py>,
456 bar_type: BarType,
457 ) -> PyResult<Bound<'py, PyAny>> {
458 let client = self.clone();
459
460 pyo3_async_runtimes::tokio::future_into_py(py, async move {
461 client
462 .unsubscribe_bars(bar_type)
463 .await
464 .map_err(to_pyruntime_err)?;
465 Ok(())
466 })
467 }
468
469 #[pyo3(name = "subscribe_order_updates")]
470 fn py_subscribe_order_updates<'py>(
471 &self,
472 py: Python<'py>,
473 user: String,
474 ) -> PyResult<Bound<'py, PyAny>> {
475 let client = self.clone();
476
477 pyo3_async_runtimes::tokio::future_into_py(py, async move {
478 client
479 .subscribe_order_updates(&user)
480 .await
481 .map_err(to_pyruntime_err)?;
482 Ok(())
483 })
484 }
485
486 #[pyo3(name = "subscribe_user_events")]
487 fn py_subscribe_user_events<'py>(
488 &self,
489 py: Python<'py>,
490 user: String,
491 ) -> PyResult<Bound<'py, PyAny>> {
492 let client = self.clone();
493
494 pyo3_async_runtimes::tokio::future_into_py(py, async move {
495 client
496 .subscribe_user_events(&user)
497 .await
498 .map_err(to_pyruntime_err)?;
499 Ok(())
500 })
501 }
502
503 #[pyo3(name = "subscribe_mark_prices")]
504 fn py_subscribe_mark_prices<'py>(
505 &self,
506 py: Python<'py>,
507 instrument_id: InstrumentId,
508 ) -> PyResult<Bound<'py, PyAny>> {
509 let client = self.clone();
510
511 pyo3_async_runtimes::tokio::future_into_py(py, async move {
512 client
513 .subscribe_mark_prices(instrument_id)
514 .await
515 .map_err(to_pyruntime_err)?;
516 Ok(())
517 })
518 }
519
520 #[pyo3(name = "unsubscribe_mark_prices")]
521 fn py_unsubscribe_mark_prices<'py>(
522 &self,
523 py: Python<'py>,
524 instrument_id: InstrumentId,
525 ) -> PyResult<Bound<'py, PyAny>> {
526 let client = self.clone();
527
528 pyo3_async_runtimes::tokio::future_into_py(py, async move {
529 client
530 .unsubscribe_mark_prices(instrument_id)
531 .await
532 .map_err(to_pyruntime_err)?;
533 Ok(())
534 })
535 }
536
537 #[pyo3(name = "subscribe_index_prices")]
538 fn py_subscribe_index_prices<'py>(
539 &self,
540 py: Python<'py>,
541 instrument_id: InstrumentId,
542 ) -> PyResult<Bound<'py, PyAny>> {
543 let client = self.clone();
544
545 pyo3_async_runtimes::tokio::future_into_py(py, async move {
546 client
547 .subscribe_index_prices(instrument_id)
548 .await
549 .map_err(to_pyruntime_err)?;
550 Ok(())
551 })
552 }
553
554 #[pyo3(name = "unsubscribe_index_prices")]
555 fn py_unsubscribe_index_prices<'py>(
556 &self,
557 py: Python<'py>,
558 instrument_id: InstrumentId,
559 ) -> PyResult<Bound<'py, PyAny>> {
560 let client = self.clone();
561
562 pyo3_async_runtimes::tokio::future_into_py(py, async move {
563 client
564 .unsubscribe_index_prices(instrument_id)
565 .await
566 .map_err(to_pyruntime_err)?;
567 Ok(())
568 })
569 }
570
571 #[pyo3(name = "subscribe_funding_rates")]
572 fn py_subscribe_funding_rates<'py>(
573 &self,
574 py: Python<'py>,
575 instrument_id: InstrumentId,
576 ) -> PyResult<Bound<'py, PyAny>> {
577 let client = self.clone();
578
579 pyo3_async_runtimes::tokio::future_into_py(py, async move {
580 client
581 .subscribe_funding_rates(instrument_id)
582 .await
583 .map_err(to_pyruntime_err)?;
584 Ok(())
585 })
586 }
587
588 #[pyo3(name = "unsubscribe_funding_rates")]
589 fn py_unsubscribe_funding_rates<'py>(
590 &self,
591 py: Python<'py>,
592 instrument_id: InstrumentId,
593 ) -> PyResult<Bound<'py, PyAny>> {
594 let client = self.clone();
595
596 pyo3_async_runtimes::tokio::future_into_py(py, async move {
597 client
598 .unsubscribe_funding_rates(instrument_id)
599 .await
600 .map_err(to_pyruntime_err)?;
601 Ok(())
602 })
603 }
604}