nautilus_hyperliquid/python/
websocket.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the Hyperliquid WebSocket client.
17
18use nautilus_common::live::get_runtime;
19use nautilus_core::python::to_pyruntime_err;
20use nautilus_model::{
21    data::{BarType, Data, OrderBookDeltas_API},
22    identifiers::{AccountId, InstrumentId},
23    python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
24};
25use pyo3::{conversion::IntoPyObjectExt, exceptions::PyRuntimeError, prelude::*};
26
27use crate::{
28    common::HyperliquidProductType,
29    websocket::{
30        HyperliquidWebSocketClient,
31        messages::{ExecutionReport, NautilusWsMessage},
32    },
33};
34
35#[pymethods]
36impl HyperliquidWebSocketClient {
37    #[new]
38    #[pyo3(signature = (url=None, testnet=false, product_type=HyperliquidProductType::Perp, account_id=None))]
39    fn py_new(
40        url: Option<String>,
41        testnet: bool,
42        product_type: HyperliquidProductType,
43        account_id: Option<String>,
44    ) -> PyResult<Self> {
45        let account_id = account_id.map(|s| AccountId::from(s.as_str()));
46        Ok(Self::new(url, testnet, product_type, account_id))
47    }
48
49    #[getter]
50    #[pyo3(name = "url")]
51    #[must_use]
52    pub fn py_url(&self) -> String {
53        self.url().to_string()
54    }
55
56    #[pyo3(name = "is_active")]
57    fn py_is_active(&self) -> bool {
58        self.is_active()
59    }
60
61    #[pyo3(name = "is_closed")]
62    fn py_is_closed(&self) -> bool {
63        !self.is_active()
64    }
65
66    #[pyo3(name = "connect")]
67    fn py_connect<'py>(
68        &self,
69        py: Python<'py>,
70        instruments: Vec<Py<PyAny>>,
71        callback: Py<PyAny>,
72    ) -> PyResult<Bound<'py, PyAny>> {
73        for inst in instruments {
74            let inst_any = pyobject_to_instrument_any(py, inst)?;
75            self.cache_instrument(inst_any);
76        }
77
78        let mut client = self.clone();
79
80        pyo3_async_runtimes::tokio::future_into_py(py, async move {
81            client.connect().await.map_err(to_pyruntime_err)?;
82
83            get_runtime().spawn(async move {
84                loop {
85                    let event = client.next_event().await;
86
87                    match event {
88                        Some(msg) => {
89                            tracing::trace!("Received WebSocket message: {msg:?}");
90
91                            match msg {
92                                NautilusWsMessage::Trades(trade_ticks) => {
93                                    Python::attach(|py| {
94                                        for tick in trade_ticks {
95                                            let py_obj = data_to_pycapsule(py, Data::Trade(tick));
96                                            if let Err(e) = callback.bind(py).call1((py_obj,)) {
97                                                tracing::error!(
98                                                    "Error calling Python callback: {}",
99                                                    e
100                                                );
101                                            }
102                                        }
103                                    });
104                                }
105                                NautilusWsMessage::Quote(quote_tick) => {
106                                    Python::attach(|py| {
107                                        let py_obj = data_to_pycapsule(py, Data::Quote(quote_tick));
108                                        if let Err(e) = callback.bind(py).call1((py_obj,)) {
109                                            tracing::error!("Error calling Python callback: {}", e);
110                                        }
111                                    });
112                                }
113                                NautilusWsMessage::Deltas(deltas) => {
114                                    Python::attach(|py| {
115                                        let py_obj = data_to_pycapsule(
116                                            py,
117                                            Data::Deltas(OrderBookDeltas_API::new(deltas)),
118                                        );
119                                        if let Err(e) = callback.bind(py).call1((py_obj,)) {
120                                            tracing::error!("Error calling Python callback: {}", e);
121                                        }
122                                    });
123                                }
124                                NautilusWsMessage::Candle(bar) => {
125                                    Python::attach(|py| {
126                                        let py_obj = data_to_pycapsule(py, Data::Bar(bar));
127                                        if let Err(e) = callback.bind(py).call1((py_obj,)) {
128                                            tracing::error!("Error calling Python callback: {}", e);
129                                        }
130                                    });
131                                }
132                                NautilusWsMessage::MarkPrice(mark_price) => {
133                                    Python::attach(|py| {
134                                        let py_obj = data_to_pycapsule(
135                                            py,
136                                            Data::MarkPriceUpdate(mark_price),
137                                        );
138                                        if let Err(e) = callback.bind(py).call1((py_obj,)) {
139                                            tracing::error!("Error calling Python callback: {}", e);
140                                        }
141                                    });
142                                }
143                                NautilusWsMessage::IndexPrice(index_price) => {
144                                    Python::attach(|py| {
145                                        let py_obj = data_to_pycapsule(
146                                            py,
147                                            Data::IndexPriceUpdate(index_price),
148                                        );
149                                        if let Err(e) = callback.bind(py).call1((py_obj,)) {
150                                            tracing::error!("Error calling Python callback: {}", e);
151                                        }
152                                    });
153                                }
154                                NautilusWsMessage::FundingRate(funding_rate) => {
155                                    Python::attach(|py| {
156                                        if let Ok(py_obj) = funding_rate.into_py_any(py)
157                                            && let Err(e) = callback.bind(py).call1((py_obj,))
158                                        {
159                                            tracing::error!("Error calling Python callback: {}", e);
160                                        }
161                                    });
162                                }
163                                NautilusWsMessage::ExecutionReports(reports) => {
164                                    Python::attach(|py| {
165                                        for report in reports {
166                                            match report {
167                                                ExecutionReport::Order(order_report) => {
168                                                    tracing::debug!(
169                                                        "Forwarding order status report: order_id={}, status={:?}",
170                                                        order_report.venue_order_id,
171                                                        order_report.order_status
172                                                    );
173                                                    match Py::new(py, order_report) {
174                                                        Ok(py_obj) => {
175                                                            if let Err(e) =
176                                                                callback.bind(py).call1((py_obj,))
177                                                            {
178                                                                tracing::error!(
179                                                                    "Error calling Python callback: {}",
180                                                                    e
181                                                                );
182                                                            }
183                                                        }
184                                                        Err(e) => {
185                                                            tracing::error!(
186                                                                "Error converting OrderStatusReport to Python: {}",
187                                                                e
188                                                            );
189                                                        }
190                                                    }
191                                                }
192                                                ExecutionReport::Fill(fill_report) => {
193                                                    tracing::debug!(
194                                                        "Forwarding fill report: trade_id={}, side={:?}, qty={}, price={}",
195                                                        fill_report.trade_id,
196                                                        fill_report.order_side,
197                                                        fill_report.last_qty,
198                                                        fill_report.last_px
199                                                    );
200                                                    match Py::new(py, fill_report) {
201                                                        Ok(py_obj) => {
202                                                            if let Err(e) =
203                                                                callback.bind(py).call1((py_obj,))
204                                                            {
205                                                                tracing::error!(
206                                                                    "Error calling Python callback: {}",
207                                                                    e
208                                                                );
209                                                            }
210                                                        }
211                                                        Err(e) => {
212                                                            tracing::error!(
213                                                                "Error converting FillReport to Python: {}",
214                                                                e
215                                                            );
216                                                        }
217                                                    }
218                                                }
219                                            }
220                                        }
221                                    });
222                                }
223                                _ => {
224                                    tracing::debug!("Unhandled message type: {:?}", msg);
225                                }
226                            }
227                        }
228                        None => {
229                            tracing::info!("WebSocket connection closed");
230                            break;
231                        }
232                    }
233                }
234            });
235
236            Ok(())
237        })
238    }
239
240    #[pyo3(name = "wait_until_active")]
241    fn py_wait_until_active<'py>(
242        &self,
243        py: Python<'py>,
244        timeout_secs: f64,
245    ) -> PyResult<Bound<'py, PyAny>> {
246        let client = self.clone();
247
248        pyo3_async_runtimes::tokio::future_into_py(py, async move {
249            let start = std::time::Instant::now();
250            loop {
251                if client.is_active() {
252                    return Ok(());
253                }
254
255                if start.elapsed().as_secs_f64() >= timeout_secs {
256                    return Err(PyRuntimeError::new_err(format!(
257                        "WebSocket connection did not become active within {timeout_secs} seconds"
258                    )));
259                }
260
261                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
262            }
263        })
264    }
265
266    #[pyo3(name = "close")]
267    fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
268        let mut client = self.clone();
269
270        pyo3_async_runtimes::tokio::future_into_py(py, async move {
271            if let Err(e) = client.disconnect().await {
272                tracing::error!("Error on close: {e}");
273            }
274            Ok(())
275        })
276    }
277
278    #[pyo3(name = "subscribe_trades")]
279    fn py_subscribe_trades<'py>(
280        &self,
281        py: Python<'py>,
282        instrument_id: InstrumentId,
283    ) -> PyResult<Bound<'py, PyAny>> {
284        let client = self.clone();
285
286        pyo3_async_runtimes::tokio::future_into_py(py, async move {
287            client
288                .subscribe_trades(instrument_id)
289                .await
290                .map_err(to_pyruntime_err)?;
291            Ok(())
292        })
293    }
294
295    #[pyo3(name = "unsubscribe_trades")]
296    fn py_unsubscribe_trades<'py>(
297        &self,
298        py: Python<'py>,
299        instrument_id: InstrumentId,
300    ) -> PyResult<Bound<'py, PyAny>> {
301        let client = self.clone();
302
303        pyo3_async_runtimes::tokio::future_into_py(py, async move {
304            client
305                .unsubscribe_trades(instrument_id)
306                .await
307                .map_err(to_pyruntime_err)?;
308            Ok(())
309        })
310    }
311
312    #[pyo3(name = "subscribe_book")]
313    fn py_subscribe_book<'py>(
314        &self,
315        py: Python<'py>,
316        instrument_id: InstrumentId,
317    ) -> PyResult<Bound<'py, PyAny>> {
318        let client = self.clone();
319
320        pyo3_async_runtimes::tokio::future_into_py(py, async move {
321            client
322                .subscribe_book(instrument_id)
323                .await
324                .map_err(to_pyruntime_err)?;
325            Ok(())
326        })
327    }
328
329    #[pyo3(name = "unsubscribe_book")]
330    fn py_unsubscribe_book<'py>(
331        &self,
332        py: Python<'py>,
333        instrument_id: InstrumentId,
334    ) -> PyResult<Bound<'py, PyAny>> {
335        let client = self.clone();
336
337        pyo3_async_runtimes::tokio::future_into_py(py, async move {
338            client
339                .unsubscribe_book(instrument_id)
340                .await
341                .map_err(to_pyruntime_err)?;
342            Ok(())
343        })
344    }
345
346    #[pyo3(name = "subscribe_book_deltas")]
347    fn py_subscribe_book_deltas<'py>(
348        &self,
349        py: Python<'py>,
350        instrument_id: InstrumentId,
351        _book_type: u8,
352        _depth: u64,
353    ) -> PyResult<Bound<'py, PyAny>> {
354        let client = self.clone();
355
356        pyo3_async_runtimes::tokio::future_into_py(py, async move {
357            client
358                .subscribe_book(instrument_id)
359                .await
360                .map_err(to_pyruntime_err)?;
361            Ok(())
362        })
363    }
364
365    #[pyo3(name = "unsubscribe_book_deltas")]
366    fn py_unsubscribe_book_deltas<'py>(
367        &self,
368        py: Python<'py>,
369        instrument_id: InstrumentId,
370    ) -> PyResult<Bound<'py, PyAny>> {
371        let client = self.clone();
372
373        pyo3_async_runtimes::tokio::future_into_py(py, async move {
374            client
375                .unsubscribe_book(instrument_id)
376                .await
377                .map_err(to_pyruntime_err)?;
378            Ok(())
379        })
380    }
381
382    #[pyo3(name = "subscribe_book_snapshots")]
383    fn py_subscribe_book_snapshots<'py>(
384        &self,
385        py: Python<'py>,
386        instrument_id: InstrumentId,
387        _book_type: u8,
388        _depth: u64,
389    ) -> PyResult<Bound<'py, PyAny>> {
390        let client = self.clone();
391
392        pyo3_async_runtimes::tokio::future_into_py(py, async move {
393            client
394                .subscribe_book(instrument_id)
395                .await
396                .map_err(to_pyruntime_err)?;
397            Ok(())
398        })
399    }
400
401    #[pyo3(name = "subscribe_quotes")]
402    fn py_subscribe_quotes<'py>(
403        &self,
404        py: Python<'py>,
405        instrument_id: InstrumentId,
406    ) -> PyResult<Bound<'py, PyAny>> {
407        let client = self.clone();
408
409        pyo3_async_runtimes::tokio::future_into_py(py, async move {
410            client
411                .subscribe_quotes(instrument_id)
412                .await
413                .map_err(to_pyruntime_err)?;
414            Ok(())
415        })
416    }
417
418    #[pyo3(name = "unsubscribe_quotes")]
419    fn py_unsubscribe_quotes<'py>(
420        &self,
421        py: Python<'py>,
422        instrument_id: InstrumentId,
423    ) -> PyResult<Bound<'py, PyAny>> {
424        let client = self.clone();
425
426        pyo3_async_runtimes::tokio::future_into_py(py, async move {
427            client
428                .unsubscribe_quotes(instrument_id)
429                .await
430                .map_err(to_pyruntime_err)?;
431            Ok(())
432        })
433    }
434
435    #[pyo3(name = "subscribe_bars")]
436    fn py_subscribe_bars<'py>(
437        &self,
438        py: Python<'py>,
439        bar_type: BarType,
440    ) -> PyResult<Bound<'py, PyAny>> {
441        let client = self.clone();
442
443        pyo3_async_runtimes::tokio::future_into_py(py, async move {
444            client
445                .subscribe_bars(bar_type)
446                .await
447                .map_err(to_pyruntime_err)?;
448            Ok(())
449        })
450    }
451
452    #[pyo3(name = "unsubscribe_bars")]
453    fn py_unsubscribe_bars<'py>(
454        &self,
455        py: Python<'py>,
456        bar_type: BarType,
457    ) -> PyResult<Bound<'py, PyAny>> {
458        let client = self.clone();
459
460        pyo3_async_runtimes::tokio::future_into_py(py, async move {
461            client
462                .unsubscribe_bars(bar_type)
463                .await
464                .map_err(to_pyruntime_err)?;
465            Ok(())
466        })
467    }
468
469    #[pyo3(name = "subscribe_order_updates")]
470    fn py_subscribe_order_updates<'py>(
471        &self,
472        py: Python<'py>,
473        user: String,
474    ) -> PyResult<Bound<'py, PyAny>> {
475        let client = self.clone();
476
477        pyo3_async_runtimes::tokio::future_into_py(py, async move {
478            client
479                .subscribe_order_updates(&user)
480                .await
481                .map_err(to_pyruntime_err)?;
482            Ok(())
483        })
484    }
485
486    #[pyo3(name = "subscribe_user_events")]
487    fn py_subscribe_user_events<'py>(
488        &self,
489        py: Python<'py>,
490        user: String,
491    ) -> PyResult<Bound<'py, PyAny>> {
492        let client = self.clone();
493
494        pyo3_async_runtimes::tokio::future_into_py(py, async move {
495            client
496                .subscribe_user_events(&user)
497                .await
498                .map_err(to_pyruntime_err)?;
499            Ok(())
500        })
501    }
502
503    #[pyo3(name = "subscribe_mark_prices")]
504    fn py_subscribe_mark_prices<'py>(
505        &self,
506        py: Python<'py>,
507        instrument_id: InstrumentId,
508    ) -> PyResult<Bound<'py, PyAny>> {
509        let client = self.clone();
510
511        pyo3_async_runtimes::tokio::future_into_py(py, async move {
512            client
513                .subscribe_mark_prices(instrument_id)
514                .await
515                .map_err(to_pyruntime_err)?;
516            Ok(())
517        })
518    }
519
520    #[pyo3(name = "unsubscribe_mark_prices")]
521    fn py_unsubscribe_mark_prices<'py>(
522        &self,
523        py: Python<'py>,
524        instrument_id: InstrumentId,
525    ) -> PyResult<Bound<'py, PyAny>> {
526        let client = self.clone();
527
528        pyo3_async_runtimes::tokio::future_into_py(py, async move {
529            client
530                .unsubscribe_mark_prices(instrument_id)
531                .await
532                .map_err(to_pyruntime_err)?;
533            Ok(())
534        })
535    }
536
537    #[pyo3(name = "subscribe_index_prices")]
538    fn py_subscribe_index_prices<'py>(
539        &self,
540        py: Python<'py>,
541        instrument_id: InstrumentId,
542    ) -> PyResult<Bound<'py, PyAny>> {
543        let client = self.clone();
544
545        pyo3_async_runtimes::tokio::future_into_py(py, async move {
546            client
547                .subscribe_index_prices(instrument_id)
548                .await
549                .map_err(to_pyruntime_err)?;
550            Ok(())
551        })
552    }
553
554    #[pyo3(name = "unsubscribe_index_prices")]
555    fn py_unsubscribe_index_prices<'py>(
556        &self,
557        py: Python<'py>,
558        instrument_id: InstrumentId,
559    ) -> PyResult<Bound<'py, PyAny>> {
560        let client = self.clone();
561
562        pyo3_async_runtimes::tokio::future_into_py(py, async move {
563            client
564                .unsubscribe_index_prices(instrument_id)
565                .await
566                .map_err(to_pyruntime_err)?;
567            Ok(())
568        })
569    }
570
571    #[pyo3(name = "subscribe_funding_rates")]
572    fn py_subscribe_funding_rates<'py>(
573        &self,
574        py: Python<'py>,
575        instrument_id: InstrumentId,
576    ) -> PyResult<Bound<'py, PyAny>> {
577        let client = self.clone();
578
579        pyo3_async_runtimes::tokio::future_into_py(py, async move {
580            client
581                .subscribe_funding_rates(instrument_id)
582                .await
583                .map_err(to_pyruntime_err)?;
584            Ok(())
585        })
586    }
587
588    #[pyo3(name = "unsubscribe_funding_rates")]
589    fn py_unsubscribe_funding_rates<'py>(
590        &self,
591        py: Python<'py>,
592        instrument_id: InstrumentId,
593    ) -> PyResult<Bound<'py, PyAny>> {
594        let client = self.clone();
595
596        pyo3_async_runtimes::tokio::future_into_py(py, async move {
597            client
598                .unsubscribe_funding_rates(instrument_id)
599                .await
600                .map_err(to_pyruntime_err)?;
601            Ok(())
602        })
603    }
604}