1use nautilus_common::live::get_runtime;
19use nautilus_core::python::to_pyruntime_err;
20use nautilus_model::{
21 data::{BarType, Data, OrderBookDeltas_API},
22 identifiers::{AccountId, ClientOrderId, InstrumentId},
23 python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
24};
25use pyo3::{conversion::IntoPyObjectExt, prelude::*};
26
27use crate::websocket::{
28 HyperliquidWebSocketClient,
29 messages::{ExecutionReport, NautilusWsMessage},
30};
31
32#[pymethods]
33impl HyperliquidWebSocketClient {
34 #[new]
35 #[pyo3(signature = (url=None, testnet=false, account_id=None))]
36 fn py_new(url: Option<String>, testnet: bool, account_id: Option<String>) -> PyResult<Self> {
37 let account_id = account_id.map(|s| AccountId::from(s.as_str()));
38 Ok(Self::new(url, testnet, account_id))
39 }
40
41 #[getter]
42 #[pyo3(name = "url")]
43 #[must_use]
44 pub fn py_url(&self) -> String {
45 self.url().to_string()
46 }
47
48 #[pyo3(name = "is_active")]
49 fn py_is_active(&self) -> bool {
50 self.is_active()
51 }
52
53 #[pyo3(name = "is_closed")]
54 fn py_is_closed(&self) -> bool {
55 !self.is_active()
56 }
57
58 #[pyo3(name = "cache_spot_fill_coins")]
59 fn py_cache_spot_fill_coins(&self, mapping: std::collections::HashMap<String, String>) {
60 let ahash_mapping: ahash::AHashMap<ustr::Ustr, ustr::Ustr> = mapping
61 .into_iter()
62 .map(|(k, v)| (ustr::Ustr::from(&k), ustr::Ustr::from(&v)))
63 .collect();
64 self.cache_spot_fill_coins(ahash_mapping);
65 }
66
67 #[pyo3(name = "cache_cloid_mapping")]
68 fn py_cache_cloid_mapping(&self, cloid: String, client_order_id: ClientOrderId) {
69 self.cache_cloid_mapping(ustr::Ustr::from(&cloid), client_order_id);
70 }
71
72 #[pyo3(name = "remove_cloid_mapping")]
73 fn py_remove_cloid_mapping(&self, cloid: String) {
74 self.remove_cloid_mapping(&ustr::Ustr::from(&cloid));
75 }
76
77 #[pyo3(name = "clear_cloid_cache")]
78 fn py_clear_cloid_cache(&self) {
79 self.clear_cloid_cache();
80 }
81
82 #[pyo3(name = "cloid_cache_len")]
83 fn py_cloid_cache_len(&self) -> usize {
84 self.cloid_cache_len()
85 }
86
87 #[pyo3(name = "get_cloid_mapping")]
88 fn py_get_cloid_mapping(&self, cloid: String) -> Option<ClientOrderId> {
89 self.get_cloid_mapping(&ustr::Ustr::from(&cloid))
90 }
91
92 #[pyo3(name = "connect")]
93 fn py_connect<'py>(
94 &self,
95 py: Python<'py>,
96 instruments: Vec<Py<PyAny>>,
97 callback: Py<PyAny>,
98 ) -> PyResult<Bound<'py, PyAny>> {
99 for inst in instruments {
100 let inst_any = pyobject_to_instrument_any(py, inst)?;
101 self.cache_instrument(inst_any);
102 }
103
104 let mut client = self.clone();
105
106 pyo3_async_runtimes::tokio::future_into_py(py, async move {
107 client.connect().await.map_err(to_pyruntime_err)?;
108
109 get_runtime().spawn(async move {
110 loop {
111 let event = client.next_event().await;
112
113 match event {
114 Some(msg) => {
115 log::trace!("Received WebSocket message: {msg:?}");
116
117 match msg {
118 NautilusWsMessage::Trades(trade_ticks) => {
119 Python::attach(|py| {
120 for tick in trade_ticks {
121 let py_obj = data_to_pycapsule(py, Data::Trade(tick));
122 if let Err(e) = callback.bind(py).call1((py_obj,)) {
123 log::error!(
124 "Error calling Python callback: {e}"
125 );
126 }
127 }
128 });
129 }
130 NautilusWsMessage::Quote(quote_tick) => {
131 Python::attach(|py| {
132 let py_obj = data_to_pycapsule(py, Data::Quote(quote_tick));
133 if let Err(e) = callback.bind(py).call1((py_obj,)) {
134 log::error!("Error calling Python callback: {e}");
135 }
136 });
137 }
138 NautilusWsMessage::Deltas(deltas) => {
139 Python::attach(|py| {
140 let py_obj = data_to_pycapsule(
141 py,
142 Data::Deltas(OrderBookDeltas_API::new(deltas)),
143 );
144 if let Err(e) = callback.bind(py).call1((py_obj,)) {
145 log::error!("Error calling Python callback: {e}");
146 }
147 });
148 }
149 NautilusWsMessage::Candle(bar) => {
150 Python::attach(|py| {
151 let py_obj = data_to_pycapsule(py, Data::Bar(bar));
152 if let Err(e) = callback.bind(py).call1((py_obj,)) {
153 log::error!("Error calling Python callback: {e}");
154 }
155 });
156 }
157 NautilusWsMessage::MarkPrice(mark_price) => {
158 Python::attach(|py| {
159 let py_obj = data_to_pycapsule(
160 py,
161 Data::MarkPriceUpdate(mark_price),
162 );
163 if let Err(e) = callback.bind(py).call1((py_obj,)) {
164 log::error!("Error calling Python callback: {e}");
165 }
166 });
167 }
168 NautilusWsMessage::IndexPrice(index_price) => {
169 Python::attach(|py| {
170 let py_obj = data_to_pycapsule(
171 py,
172 Data::IndexPriceUpdate(index_price),
173 );
174 if let Err(e) = callback.bind(py).call1((py_obj,)) {
175 log::error!("Error calling Python callback: {e}");
176 }
177 });
178 }
179 NautilusWsMessage::FundingRate(funding_rate) => {
180 Python::attach(|py| {
181 if let Ok(py_obj) = funding_rate.into_py_any(py)
182 && let Err(e) = callback.bind(py).call1((py_obj,))
183 {
184 log::error!("Error calling Python callback: {e}");
185 }
186 });
187 }
188 NautilusWsMessage::ExecutionReports(reports) => {
189 Python::attach(|py| {
190 for report in reports {
191 match report {
192 ExecutionReport::Order(order_report) => {
193 log::debug!(
194 "Forwarding order status report: order_id={}, status={:?}",
195 order_report.venue_order_id,
196 order_report.order_status
197 );
198 match Py::new(py, order_report) {
199 Ok(py_obj) => {
200 if let Err(e) =
201 callback.bind(py).call1((py_obj,))
202 {
203 log::error!("Error calling Python callback: {e}");
204 }
205 }
206 Err(e) => {
207 log::error!("Error converting OrderStatusReport to Python: {e}");
208 }
209 }
210 }
211 ExecutionReport::Fill(fill_report) => {
212 log::debug!(
213 "Forwarding fill report: trade_id={}, side={:?}, qty={}, price={}",
214 fill_report.trade_id,
215 fill_report.order_side,
216 fill_report.last_qty,
217 fill_report.last_px
218 );
219 match Py::new(py, fill_report) {
220 Ok(py_obj) => {
221 if let Err(e) =
222 callback.bind(py).call1((py_obj,))
223 {
224 log::error!("Error calling Python callback: {e}");
225 }
226 }
227 Err(e) => {
228 log::error!("Error converting FillReport to Python: {e}");
229 }
230 }
231 }
232 }
233 }
234 });
235 }
236 _ => {
237 log::debug!("Unhandled message type: {msg:?}");
238 }
239 }
240 }
241 None => {
242 log::debug!("WebSocket connection closed");
243 break;
244 }
245 }
246 }
247 });
248
249 Ok(())
250 })
251 }
252
253 #[pyo3(name = "wait_until_active")]
254 fn py_wait_until_active<'py>(
255 &self,
256 py: Python<'py>,
257 timeout_secs: f64,
258 ) -> PyResult<Bound<'py, PyAny>> {
259 let client = self.clone();
260
261 pyo3_async_runtimes::tokio::future_into_py(py, async move {
262 let start = std::time::Instant::now();
263 loop {
264 if client.is_active() {
265 return Ok(());
266 }
267
268 if start.elapsed().as_secs_f64() >= timeout_secs {
269 return Err(to_pyruntime_err(format!(
270 "WebSocket connection did not become active within {timeout_secs} seconds"
271 )));
272 }
273
274 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
275 }
276 })
277 }
278
279 #[pyo3(name = "close")]
280 fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
281 let mut client = self.clone();
282
283 pyo3_async_runtimes::tokio::future_into_py(py, async move {
284 if let Err(e) = client.disconnect().await {
285 log::error!("Error on close: {e}");
286 }
287 Ok(())
288 })
289 }
290
291 #[pyo3(name = "subscribe_trades")]
292 fn py_subscribe_trades<'py>(
293 &self,
294 py: Python<'py>,
295 instrument_id: InstrumentId,
296 ) -> PyResult<Bound<'py, PyAny>> {
297 let client = self.clone();
298
299 pyo3_async_runtimes::tokio::future_into_py(py, async move {
300 client
301 .subscribe_trades(instrument_id)
302 .await
303 .map_err(to_pyruntime_err)?;
304 Ok(())
305 })
306 }
307
308 #[pyo3(name = "unsubscribe_trades")]
309 fn py_unsubscribe_trades<'py>(
310 &self,
311 py: Python<'py>,
312 instrument_id: InstrumentId,
313 ) -> PyResult<Bound<'py, PyAny>> {
314 let client = self.clone();
315
316 pyo3_async_runtimes::tokio::future_into_py(py, async move {
317 client
318 .unsubscribe_trades(instrument_id)
319 .await
320 .map_err(to_pyruntime_err)?;
321 Ok(())
322 })
323 }
324
325 #[pyo3(name = "subscribe_book")]
326 fn py_subscribe_book<'py>(
327 &self,
328 py: Python<'py>,
329 instrument_id: InstrumentId,
330 ) -> PyResult<Bound<'py, PyAny>> {
331 let client = self.clone();
332
333 pyo3_async_runtimes::tokio::future_into_py(py, async move {
334 client
335 .subscribe_book(instrument_id)
336 .await
337 .map_err(to_pyruntime_err)?;
338 Ok(())
339 })
340 }
341
342 #[pyo3(name = "unsubscribe_book")]
343 fn py_unsubscribe_book<'py>(
344 &self,
345 py: Python<'py>,
346 instrument_id: InstrumentId,
347 ) -> PyResult<Bound<'py, PyAny>> {
348 let client = self.clone();
349
350 pyo3_async_runtimes::tokio::future_into_py(py, async move {
351 client
352 .unsubscribe_book(instrument_id)
353 .await
354 .map_err(to_pyruntime_err)?;
355 Ok(())
356 })
357 }
358
359 #[pyo3(name = "subscribe_book_deltas")]
360 fn py_subscribe_book_deltas<'py>(
361 &self,
362 py: Python<'py>,
363 instrument_id: InstrumentId,
364 _book_type: u8,
365 _depth: u64,
366 ) -> PyResult<Bound<'py, PyAny>> {
367 let client = self.clone();
368
369 pyo3_async_runtimes::tokio::future_into_py(py, async move {
370 client
371 .subscribe_book(instrument_id)
372 .await
373 .map_err(to_pyruntime_err)?;
374 Ok(())
375 })
376 }
377
378 #[pyo3(name = "unsubscribe_book_deltas")]
379 fn py_unsubscribe_book_deltas<'py>(
380 &self,
381 py: Python<'py>,
382 instrument_id: InstrumentId,
383 ) -> PyResult<Bound<'py, PyAny>> {
384 let client = self.clone();
385
386 pyo3_async_runtimes::tokio::future_into_py(py, async move {
387 client
388 .unsubscribe_book(instrument_id)
389 .await
390 .map_err(to_pyruntime_err)?;
391 Ok(())
392 })
393 }
394
395 #[pyo3(name = "subscribe_book_snapshots")]
396 fn py_subscribe_book_snapshots<'py>(
397 &self,
398 py: Python<'py>,
399 instrument_id: InstrumentId,
400 _book_type: u8,
401 _depth: u64,
402 ) -> PyResult<Bound<'py, PyAny>> {
403 let client = self.clone();
404
405 pyo3_async_runtimes::tokio::future_into_py(py, async move {
406 client
407 .subscribe_book(instrument_id)
408 .await
409 .map_err(to_pyruntime_err)?;
410 Ok(())
411 })
412 }
413
414 #[pyo3(name = "subscribe_quotes")]
415 fn py_subscribe_quotes<'py>(
416 &self,
417 py: Python<'py>,
418 instrument_id: InstrumentId,
419 ) -> PyResult<Bound<'py, PyAny>> {
420 let client = self.clone();
421
422 pyo3_async_runtimes::tokio::future_into_py(py, async move {
423 client
424 .subscribe_quotes(instrument_id)
425 .await
426 .map_err(to_pyruntime_err)?;
427 Ok(())
428 })
429 }
430
431 #[pyo3(name = "unsubscribe_quotes")]
432 fn py_unsubscribe_quotes<'py>(
433 &self,
434 py: Python<'py>,
435 instrument_id: InstrumentId,
436 ) -> PyResult<Bound<'py, PyAny>> {
437 let client = self.clone();
438
439 pyo3_async_runtimes::tokio::future_into_py(py, async move {
440 client
441 .unsubscribe_quotes(instrument_id)
442 .await
443 .map_err(to_pyruntime_err)?;
444 Ok(())
445 })
446 }
447
448 #[pyo3(name = "subscribe_bars")]
449 fn py_subscribe_bars<'py>(
450 &self,
451 py: Python<'py>,
452 bar_type: BarType,
453 ) -> PyResult<Bound<'py, PyAny>> {
454 let client = self.clone();
455
456 pyo3_async_runtimes::tokio::future_into_py(py, async move {
457 client
458 .subscribe_bars(bar_type)
459 .await
460 .map_err(to_pyruntime_err)?;
461 Ok(())
462 })
463 }
464
465 #[pyo3(name = "unsubscribe_bars")]
466 fn py_unsubscribe_bars<'py>(
467 &self,
468 py: Python<'py>,
469 bar_type: BarType,
470 ) -> PyResult<Bound<'py, PyAny>> {
471 let client = self.clone();
472
473 pyo3_async_runtimes::tokio::future_into_py(py, async move {
474 client
475 .unsubscribe_bars(bar_type)
476 .await
477 .map_err(to_pyruntime_err)?;
478 Ok(())
479 })
480 }
481
482 #[pyo3(name = "subscribe_order_updates")]
483 fn py_subscribe_order_updates<'py>(
484 &self,
485 py: Python<'py>,
486 user: String,
487 ) -> PyResult<Bound<'py, PyAny>> {
488 let client = self.clone();
489
490 pyo3_async_runtimes::tokio::future_into_py(py, async move {
491 client
492 .subscribe_order_updates(&user)
493 .await
494 .map_err(to_pyruntime_err)?;
495 Ok(())
496 })
497 }
498
499 #[pyo3(name = "subscribe_user_events")]
500 fn py_subscribe_user_events<'py>(
501 &self,
502 py: Python<'py>,
503 user: String,
504 ) -> PyResult<Bound<'py, PyAny>> {
505 let client = self.clone();
506
507 pyo3_async_runtimes::tokio::future_into_py(py, async move {
508 client
509 .subscribe_user_events(&user)
510 .await
511 .map_err(to_pyruntime_err)?;
512 Ok(())
513 })
514 }
515
516 #[pyo3(name = "subscribe_user_fills")]
517 fn py_subscribe_user_fills<'py>(
518 &self,
519 py: Python<'py>,
520 user: String,
521 ) -> PyResult<Bound<'py, PyAny>> {
522 let client = self.clone();
523
524 pyo3_async_runtimes::tokio::future_into_py(py, async move {
525 client
526 .subscribe_user_fills(&user)
527 .await
528 .map_err(to_pyruntime_err)?;
529 Ok(())
530 })
531 }
532
533 #[pyo3(name = "subscribe_mark_prices")]
534 fn py_subscribe_mark_prices<'py>(
535 &self,
536 py: Python<'py>,
537 instrument_id: InstrumentId,
538 ) -> PyResult<Bound<'py, PyAny>> {
539 let client = self.clone();
540
541 pyo3_async_runtimes::tokio::future_into_py(py, async move {
542 client
543 .subscribe_mark_prices(instrument_id)
544 .await
545 .map_err(to_pyruntime_err)?;
546 Ok(())
547 })
548 }
549
550 #[pyo3(name = "unsubscribe_mark_prices")]
551 fn py_unsubscribe_mark_prices<'py>(
552 &self,
553 py: Python<'py>,
554 instrument_id: InstrumentId,
555 ) -> PyResult<Bound<'py, PyAny>> {
556 let client = self.clone();
557
558 pyo3_async_runtimes::tokio::future_into_py(py, async move {
559 client
560 .unsubscribe_mark_prices(instrument_id)
561 .await
562 .map_err(to_pyruntime_err)?;
563 Ok(())
564 })
565 }
566
567 #[pyo3(name = "subscribe_index_prices")]
568 fn py_subscribe_index_prices<'py>(
569 &self,
570 py: Python<'py>,
571 instrument_id: InstrumentId,
572 ) -> PyResult<Bound<'py, PyAny>> {
573 let client = self.clone();
574
575 pyo3_async_runtimes::tokio::future_into_py(py, async move {
576 client
577 .subscribe_index_prices(instrument_id)
578 .await
579 .map_err(to_pyruntime_err)?;
580 Ok(())
581 })
582 }
583
584 #[pyo3(name = "unsubscribe_index_prices")]
585 fn py_unsubscribe_index_prices<'py>(
586 &self,
587 py: Python<'py>,
588 instrument_id: InstrumentId,
589 ) -> PyResult<Bound<'py, PyAny>> {
590 let client = self.clone();
591
592 pyo3_async_runtimes::tokio::future_into_py(py, async move {
593 client
594 .unsubscribe_index_prices(instrument_id)
595 .await
596 .map_err(to_pyruntime_err)?;
597 Ok(())
598 })
599 }
600
601 #[pyo3(name = "subscribe_funding_rates")]
602 fn py_subscribe_funding_rates<'py>(
603 &self,
604 py: Python<'py>,
605 instrument_id: InstrumentId,
606 ) -> PyResult<Bound<'py, PyAny>> {
607 let client = self.clone();
608
609 pyo3_async_runtimes::tokio::future_into_py(py, async move {
610 client
611 .subscribe_funding_rates(instrument_id)
612 .await
613 .map_err(to_pyruntime_err)?;
614 Ok(())
615 })
616 }
617
618 #[pyo3(name = "unsubscribe_funding_rates")]
619 fn py_unsubscribe_funding_rates<'py>(
620 &self,
621 py: Python<'py>,
622 instrument_id: InstrumentId,
623 ) -> PyResult<Bound<'py, PyAny>> {
624 let client = self.clone();
625
626 pyo3_async_runtimes::tokio::future_into_py(py, async move {
627 client
628 .unsubscribe_funding_rates(instrument_id)
629 .await
630 .map_err(to_pyruntime_err)?;
631 Ok(())
632 })
633 }
634}