1use nautilus_common::live::get_runtime;
19use nautilus_core::python::to_pyruntime_err;
20use nautilus_model::{
21 data::{BarType, Data, OrderBookDeltas_API},
22 identifiers::{AccountId, InstrumentId},
23 python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
24};
25use pyo3::{conversion::IntoPyObjectExt, exceptions::PyRuntimeError, prelude::*};
26
27use crate::{
28 common::HyperliquidProductType,
29 websocket::{
30 HyperliquidWebSocketClient,
31 messages::{ExecutionReport, NautilusWsMessage},
32 },
33};
34
35#[pymethods]
36impl HyperliquidWebSocketClient {
37 #[new]
38 #[pyo3(signature = (url=None, testnet=false, product_type=HyperliquidProductType::Perp, account_id=None))]
39 fn py_new(
40 url: Option<String>,
41 testnet: bool,
42 product_type: HyperliquidProductType,
43 account_id: Option<String>,
44 ) -> PyResult<Self> {
45 let account_id = account_id.map(|s| AccountId::from(s.as_str()));
46 Ok(Self::new(url, testnet, product_type, account_id))
47 }
48
49 #[getter]
50 #[pyo3(name = "url")]
51 #[must_use]
52 pub fn py_url(&self) -> String {
53 self.url().to_string()
54 }
55
56 #[pyo3(name = "is_active")]
57 fn py_is_active(&self) -> bool {
58 self.is_active()
59 }
60
61 #[pyo3(name = "is_closed")]
62 fn py_is_closed(&self) -> bool {
63 !self.is_active()
64 }
65
66 #[pyo3(name = "connect")]
67 fn py_connect<'py>(
68 &self,
69 py: Python<'py>,
70 instruments: Vec<Py<PyAny>>,
71 callback: Py<PyAny>,
72 ) -> PyResult<Bound<'py, PyAny>> {
73 for inst in instruments {
74 let inst_any = pyobject_to_instrument_any(py, inst)?;
75 self.cache_instrument(inst_any);
76 }
77
78 let mut client = self.clone();
79
80 pyo3_async_runtimes::tokio::future_into_py(py, async move {
81 client.connect().await.map_err(to_pyruntime_err)?;
82
83 get_runtime().spawn(async move {
84 loop {
85 let event = client.next_event().await;
86
87 match event {
88 Some(msg) => {
89 log::trace!("Received WebSocket message: {msg:?}");
90
91 match msg {
92 NautilusWsMessage::Trades(trade_ticks) => {
93 Python::attach(|py| {
94 for tick in trade_ticks {
95 let py_obj = data_to_pycapsule(py, Data::Trade(tick));
96 if let Err(e) = callback.bind(py).call1((py_obj,)) {
97 log::error!(
98 "Error calling Python callback: {e}"
99 );
100 }
101 }
102 });
103 }
104 NautilusWsMessage::Quote(quote_tick) => {
105 Python::attach(|py| {
106 let py_obj = data_to_pycapsule(py, Data::Quote(quote_tick));
107 if let Err(e) = callback.bind(py).call1((py_obj,)) {
108 log::error!("Error calling Python callback: {e}");
109 }
110 });
111 }
112 NautilusWsMessage::Deltas(deltas) => {
113 Python::attach(|py| {
114 let py_obj = data_to_pycapsule(
115 py,
116 Data::Deltas(OrderBookDeltas_API::new(deltas)),
117 );
118 if let Err(e) = callback.bind(py).call1((py_obj,)) {
119 log::error!("Error calling Python callback: {e}");
120 }
121 });
122 }
123 NautilusWsMessage::Candle(bar) => {
124 Python::attach(|py| {
125 let py_obj = data_to_pycapsule(py, Data::Bar(bar));
126 if let Err(e) = callback.bind(py).call1((py_obj,)) {
127 log::error!("Error calling Python callback: {e}");
128 }
129 });
130 }
131 NautilusWsMessage::MarkPrice(mark_price) => {
132 Python::attach(|py| {
133 let py_obj = data_to_pycapsule(
134 py,
135 Data::MarkPriceUpdate(mark_price),
136 );
137 if let Err(e) = callback.bind(py).call1((py_obj,)) {
138 log::error!("Error calling Python callback: {e}");
139 }
140 });
141 }
142 NautilusWsMessage::IndexPrice(index_price) => {
143 Python::attach(|py| {
144 let py_obj = data_to_pycapsule(
145 py,
146 Data::IndexPriceUpdate(index_price),
147 );
148 if let Err(e) = callback.bind(py).call1((py_obj,)) {
149 log::error!("Error calling Python callback: {e}");
150 }
151 });
152 }
153 NautilusWsMessage::FundingRate(funding_rate) => {
154 Python::attach(|py| {
155 if let Ok(py_obj) = funding_rate.into_py_any(py)
156 && let Err(e) = callback.bind(py).call1((py_obj,))
157 {
158 log::error!("Error calling Python callback: {e}");
159 }
160 });
161 }
162 NautilusWsMessage::ExecutionReports(reports) => {
163 Python::attach(|py| {
164 for report in reports {
165 match report {
166 ExecutionReport::Order(order_report) => {
167 log::debug!(
168 "Forwarding order status report: order_id={}, status={:?}",
169 order_report.venue_order_id,
170 order_report.order_status
171 );
172 match Py::new(py, order_report) {
173 Ok(py_obj) => {
174 if let Err(e) =
175 callback.bind(py).call1((py_obj,))
176 {
177 log::error!("Error calling Python callback: {e}");
178 }
179 }
180 Err(e) => {
181 log::error!("Error converting OrderStatusReport to Python: {e}");
182 }
183 }
184 }
185 ExecutionReport::Fill(fill_report) => {
186 log::debug!(
187 "Forwarding fill report: trade_id={}, side={:?}, qty={}, price={}",
188 fill_report.trade_id,
189 fill_report.order_side,
190 fill_report.last_qty,
191 fill_report.last_px
192 );
193 match Py::new(py, fill_report) {
194 Ok(py_obj) => {
195 if let Err(e) =
196 callback.bind(py).call1((py_obj,))
197 {
198 log::error!("Error calling Python callback: {e}");
199 }
200 }
201 Err(e) => {
202 log::error!("Error converting FillReport to Python: {e}");
203 }
204 }
205 }
206 }
207 }
208 });
209 }
210 _ => {
211 log::debug!("Unhandled message type: {msg:?}");
212 }
213 }
214 }
215 None => {
216 log::info!("WebSocket connection closed");
217 break;
218 }
219 }
220 }
221 });
222
223 Ok(())
224 })
225 }
226
227 #[pyo3(name = "wait_until_active")]
228 fn py_wait_until_active<'py>(
229 &self,
230 py: Python<'py>,
231 timeout_secs: f64,
232 ) -> PyResult<Bound<'py, PyAny>> {
233 let client = self.clone();
234
235 pyo3_async_runtimes::tokio::future_into_py(py, async move {
236 let start = std::time::Instant::now();
237 loop {
238 if client.is_active() {
239 return Ok(());
240 }
241
242 if start.elapsed().as_secs_f64() >= timeout_secs {
243 return Err(PyRuntimeError::new_err(format!(
244 "WebSocket connection did not become active within {timeout_secs} seconds"
245 )));
246 }
247
248 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
249 }
250 })
251 }
252
253 #[pyo3(name = "close")]
254 fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
255 let mut client = self.clone();
256
257 pyo3_async_runtimes::tokio::future_into_py(py, async move {
258 if let Err(e) = client.disconnect().await {
259 log::error!("Error on close: {e}");
260 }
261 Ok(())
262 })
263 }
264
265 #[pyo3(name = "subscribe_trades")]
266 fn py_subscribe_trades<'py>(
267 &self,
268 py: Python<'py>,
269 instrument_id: InstrumentId,
270 ) -> PyResult<Bound<'py, PyAny>> {
271 let client = self.clone();
272
273 pyo3_async_runtimes::tokio::future_into_py(py, async move {
274 client
275 .subscribe_trades(instrument_id)
276 .await
277 .map_err(to_pyruntime_err)?;
278 Ok(())
279 })
280 }
281
282 #[pyo3(name = "unsubscribe_trades")]
283 fn py_unsubscribe_trades<'py>(
284 &self,
285 py: Python<'py>,
286 instrument_id: InstrumentId,
287 ) -> PyResult<Bound<'py, PyAny>> {
288 let client = self.clone();
289
290 pyo3_async_runtimes::tokio::future_into_py(py, async move {
291 client
292 .unsubscribe_trades(instrument_id)
293 .await
294 .map_err(to_pyruntime_err)?;
295 Ok(())
296 })
297 }
298
299 #[pyo3(name = "subscribe_book")]
300 fn py_subscribe_book<'py>(
301 &self,
302 py: Python<'py>,
303 instrument_id: InstrumentId,
304 ) -> PyResult<Bound<'py, PyAny>> {
305 let client = self.clone();
306
307 pyo3_async_runtimes::tokio::future_into_py(py, async move {
308 client
309 .subscribe_book(instrument_id)
310 .await
311 .map_err(to_pyruntime_err)?;
312 Ok(())
313 })
314 }
315
316 #[pyo3(name = "unsubscribe_book")]
317 fn py_unsubscribe_book<'py>(
318 &self,
319 py: Python<'py>,
320 instrument_id: InstrumentId,
321 ) -> PyResult<Bound<'py, PyAny>> {
322 let client = self.clone();
323
324 pyo3_async_runtimes::tokio::future_into_py(py, async move {
325 client
326 .unsubscribe_book(instrument_id)
327 .await
328 .map_err(to_pyruntime_err)?;
329 Ok(())
330 })
331 }
332
333 #[pyo3(name = "subscribe_book_deltas")]
334 fn py_subscribe_book_deltas<'py>(
335 &self,
336 py: Python<'py>,
337 instrument_id: InstrumentId,
338 _book_type: u8,
339 _depth: u64,
340 ) -> PyResult<Bound<'py, PyAny>> {
341 let client = self.clone();
342
343 pyo3_async_runtimes::tokio::future_into_py(py, async move {
344 client
345 .subscribe_book(instrument_id)
346 .await
347 .map_err(to_pyruntime_err)?;
348 Ok(())
349 })
350 }
351
352 #[pyo3(name = "unsubscribe_book_deltas")]
353 fn py_unsubscribe_book_deltas<'py>(
354 &self,
355 py: Python<'py>,
356 instrument_id: InstrumentId,
357 ) -> PyResult<Bound<'py, PyAny>> {
358 let client = self.clone();
359
360 pyo3_async_runtimes::tokio::future_into_py(py, async move {
361 client
362 .unsubscribe_book(instrument_id)
363 .await
364 .map_err(to_pyruntime_err)?;
365 Ok(())
366 })
367 }
368
369 #[pyo3(name = "subscribe_book_snapshots")]
370 fn py_subscribe_book_snapshots<'py>(
371 &self,
372 py: Python<'py>,
373 instrument_id: InstrumentId,
374 _book_type: u8,
375 _depth: u64,
376 ) -> PyResult<Bound<'py, PyAny>> {
377 let client = self.clone();
378
379 pyo3_async_runtimes::tokio::future_into_py(py, async move {
380 client
381 .subscribe_book(instrument_id)
382 .await
383 .map_err(to_pyruntime_err)?;
384 Ok(())
385 })
386 }
387
388 #[pyo3(name = "subscribe_quotes")]
389 fn py_subscribe_quotes<'py>(
390 &self,
391 py: Python<'py>,
392 instrument_id: InstrumentId,
393 ) -> PyResult<Bound<'py, PyAny>> {
394 let client = self.clone();
395
396 pyo3_async_runtimes::tokio::future_into_py(py, async move {
397 client
398 .subscribe_quotes(instrument_id)
399 .await
400 .map_err(to_pyruntime_err)?;
401 Ok(())
402 })
403 }
404
405 #[pyo3(name = "unsubscribe_quotes")]
406 fn py_unsubscribe_quotes<'py>(
407 &self,
408 py: Python<'py>,
409 instrument_id: InstrumentId,
410 ) -> PyResult<Bound<'py, PyAny>> {
411 let client = self.clone();
412
413 pyo3_async_runtimes::tokio::future_into_py(py, async move {
414 client
415 .unsubscribe_quotes(instrument_id)
416 .await
417 .map_err(to_pyruntime_err)?;
418 Ok(())
419 })
420 }
421
422 #[pyo3(name = "subscribe_bars")]
423 fn py_subscribe_bars<'py>(
424 &self,
425 py: Python<'py>,
426 bar_type: BarType,
427 ) -> PyResult<Bound<'py, PyAny>> {
428 let client = self.clone();
429
430 pyo3_async_runtimes::tokio::future_into_py(py, async move {
431 client
432 .subscribe_bars(bar_type)
433 .await
434 .map_err(to_pyruntime_err)?;
435 Ok(())
436 })
437 }
438
439 #[pyo3(name = "unsubscribe_bars")]
440 fn py_unsubscribe_bars<'py>(
441 &self,
442 py: Python<'py>,
443 bar_type: BarType,
444 ) -> PyResult<Bound<'py, PyAny>> {
445 let client = self.clone();
446
447 pyo3_async_runtimes::tokio::future_into_py(py, async move {
448 client
449 .unsubscribe_bars(bar_type)
450 .await
451 .map_err(to_pyruntime_err)?;
452 Ok(())
453 })
454 }
455
456 #[pyo3(name = "subscribe_order_updates")]
457 fn py_subscribe_order_updates<'py>(
458 &self,
459 py: Python<'py>,
460 user: String,
461 ) -> PyResult<Bound<'py, PyAny>> {
462 let client = self.clone();
463
464 pyo3_async_runtimes::tokio::future_into_py(py, async move {
465 client
466 .subscribe_order_updates(&user)
467 .await
468 .map_err(to_pyruntime_err)?;
469 Ok(())
470 })
471 }
472
473 #[pyo3(name = "subscribe_user_events")]
474 fn py_subscribe_user_events<'py>(
475 &self,
476 py: Python<'py>,
477 user: String,
478 ) -> PyResult<Bound<'py, PyAny>> {
479 let client = self.clone();
480
481 pyo3_async_runtimes::tokio::future_into_py(py, async move {
482 client
483 .subscribe_user_events(&user)
484 .await
485 .map_err(to_pyruntime_err)?;
486 Ok(())
487 })
488 }
489
490 #[pyo3(name = "subscribe_mark_prices")]
491 fn py_subscribe_mark_prices<'py>(
492 &self,
493 py: Python<'py>,
494 instrument_id: InstrumentId,
495 ) -> PyResult<Bound<'py, PyAny>> {
496 let client = self.clone();
497
498 pyo3_async_runtimes::tokio::future_into_py(py, async move {
499 client
500 .subscribe_mark_prices(instrument_id)
501 .await
502 .map_err(to_pyruntime_err)?;
503 Ok(())
504 })
505 }
506
507 #[pyo3(name = "unsubscribe_mark_prices")]
508 fn py_unsubscribe_mark_prices<'py>(
509 &self,
510 py: Python<'py>,
511 instrument_id: InstrumentId,
512 ) -> PyResult<Bound<'py, PyAny>> {
513 let client = self.clone();
514
515 pyo3_async_runtimes::tokio::future_into_py(py, async move {
516 client
517 .unsubscribe_mark_prices(instrument_id)
518 .await
519 .map_err(to_pyruntime_err)?;
520 Ok(())
521 })
522 }
523
524 #[pyo3(name = "subscribe_index_prices")]
525 fn py_subscribe_index_prices<'py>(
526 &self,
527 py: Python<'py>,
528 instrument_id: InstrumentId,
529 ) -> PyResult<Bound<'py, PyAny>> {
530 let client = self.clone();
531
532 pyo3_async_runtimes::tokio::future_into_py(py, async move {
533 client
534 .subscribe_index_prices(instrument_id)
535 .await
536 .map_err(to_pyruntime_err)?;
537 Ok(())
538 })
539 }
540
541 #[pyo3(name = "unsubscribe_index_prices")]
542 fn py_unsubscribe_index_prices<'py>(
543 &self,
544 py: Python<'py>,
545 instrument_id: InstrumentId,
546 ) -> PyResult<Bound<'py, PyAny>> {
547 let client = self.clone();
548
549 pyo3_async_runtimes::tokio::future_into_py(py, async move {
550 client
551 .unsubscribe_index_prices(instrument_id)
552 .await
553 .map_err(to_pyruntime_err)?;
554 Ok(())
555 })
556 }
557
558 #[pyo3(name = "subscribe_funding_rates")]
559 fn py_subscribe_funding_rates<'py>(
560 &self,
561 py: Python<'py>,
562 instrument_id: InstrumentId,
563 ) -> PyResult<Bound<'py, PyAny>> {
564 let client = self.clone();
565
566 pyo3_async_runtimes::tokio::future_into_py(py, async move {
567 client
568 .subscribe_funding_rates(instrument_id)
569 .await
570 .map_err(to_pyruntime_err)?;
571 Ok(())
572 })
573 }
574
575 #[pyo3(name = "unsubscribe_funding_rates")]
576 fn py_unsubscribe_funding_rates<'py>(
577 &self,
578 py: Python<'py>,
579 instrument_id: InstrumentId,
580 ) -> PyResult<Bound<'py, PyAny>> {
581 let client = self.clone();
582
583 pyo3_async_runtimes::tokio::future_into_py(py, async move {
584 client
585 .unsubscribe_funding_rates(instrument_id)
586 .await
587 .map_err(to_pyruntime_err)?;
588 Ok(())
589 })
590 }
591}