nautilus_hyperliquid/python/
websocket.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the Hyperliquid WebSocket client.
17
18use nautilus_common::live::get_runtime;
19use nautilus_core::python::to_pyruntime_err;
20use nautilus_model::{
21    data::{BarType, Data, OrderBookDeltas_API},
22    identifiers::{AccountId, InstrumentId},
23    python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
24};
25use pyo3::{conversion::IntoPyObjectExt, exceptions::PyRuntimeError, prelude::*};
26
27use crate::{
28    common::HyperliquidProductType,
29    websocket::{
30        HyperliquidWebSocketClient,
31        messages::{ExecutionReport, NautilusWsMessage},
32    },
33};
34
35#[pymethods]
36impl HyperliquidWebSocketClient {
37    #[new]
38    #[pyo3(signature = (url=None, testnet=false, product_type=HyperliquidProductType::Perp, account_id=None))]
39    fn py_new(
40        url: Option<String>,
41        testnet: bool,
42        product_type: HyperliquidProductType,
43        account_id: Option<String>,
44    ) -> PyResult<Self> {
45        let account_id = account_id.map(|s| AccountId::from(s.as_str()));
46        Ok(Self::new(url, testnet, product_type, account_id))
47    }
48
49    #[getter]
50    #[pyo3(name = "url")]
51    #[must_use]
52    pub fn py_url(&self) -> String {
53        self.url().to_string()
54    }
55
56    #[pyo3(name = "is_active")]
57    fn py_is_active(&self) -> bool {
58        self.is_active()
59    }
60
61    #[pyo3(name = "is_closed")]
62    fn py_is_closed(&self) -> bool {
63        !self.is_active()
64    }
65
66    #[pyo3(name = "connect")]
67    fn py_connect<'py>(
68        &self,
69        py: Python<'py>,
70        instruments: Vec<Py<PyAny>>,
71        callback: Py<PyAny>,
72    ) -> PyResult<Bound<'py, PyAny>> {
73        for inst in instruments {
74            let inst_any = pyobject_to_instrument_any(py, inst)?;
75            self.cache_instrument(inst_any);
76        }
77
78        let mut client = self.clone();
79
80        pyo3_async_runtimes::tokio::future_into_py(py, async move {
81            client.connect().await.map_err(to_pyruntime_err)?;
82
83            get_runtime().spawn(async move {
84                loop {
85                    let event = client.next_event().await;
86
87                    match event {
88                        Some(msg) => {
89                            log::trace!("Received WebSocket message: {msg:?}");
90
91                            match msg {
92                                NautilusWsMessage::Trades(trade_ticks) => {
93                                    Python::attach(|py| {
94                                        for tick in trade_ticks {
95                                            let py_obj = data_to_pycapsule(py, Data::Trade(tick));
96                                            if let Err(e) = callback.bind(py).call1((py_obj,)) {
97                                                log::error!(
98                                                    "Error calling Python callback: {e}"
99                                                );
100                                            }
101                                        }
102                                    });
103                                }
104                                NautilusWsMessage::Quote(quote_tick) => {
105                                    Python::attach(|py| {
106                                        let py_obj = data_to_pycapsule(py, Data::Quote(quote_tick));
107                                        if let Err(e) = callback.bind(py).call1((py_obj,)) {
108                                            log::error!("Error calling Python callback: {e}");
109                                        }
110                                    });
111                                }
112                                NautilusWsMessage::Deltas(deltas) => {
113                                    Python::attach(|py| {
114                                        let py_obj = data_to_pycapsule(
115                                            py,
116                                            Data::Deltas(OrderBookDeltas_API::new(deltas)),
117                                        );
118                                        if let Err(e) = callback.bind(py).call1((py_obj,)) {
119                                            log::error!("Error calling Python callback: {e}");
120                                        }
121                                    });
122                                }
123                                NautilusWsMessage::Candle(bar) => {
124                                    Python::attach(|py| {
125                                        let py_obj = data_to_pycapsule(py, Data::Bar(bar));
126                                        if let Err(e) = callback.bind(py).call1((py_obj,)) {
127                                            log::error!("Error calling Python callback: {e}");
128                                        }
129                                    });
130                                }
131                                NautilusWsMessage::MarkPrice(mark_price) => {
132                                    Python::attach(|py| {
133                                        let py_obj = data_to_pycapsule(
134                                            py,
135                                            Data::MarkPriceUpdate(mark_price),
136                                        );
137                                        if let Err(e) = callback.bind(py).call1((py_obj,)) {
138                                            log::error!("Error calling Python callback: {e}");
139                                        }
140                                    });
141                                }
142                                NautilusWsMessage::IndexPrice(index_price) => {
143                                    Python::attach(|py| {
144                                        let py_obj = data_to_pycapsule(
145                                            py,
146                                            Data::IndexPriceUpdate(index_price),
147                                        );
148                                        if let Err(e) = callback.bind(py).call1((py_obj,)) {
149                                            log::error!("Error calling Python callback: {e}");
150                                        }
151                                    });
152                                }
153                                NautilusWsMessage::FundingRate(funding_rate) => {
154                                    Python::attach(|py| {
155                                        if let Ok(py_obj) = funding_rate.into_py_any(py)
156                                            && let Err(e) = callback.bind(py).call1((py_obj,))
157                                        {
158                                            log::error!("Error calling Python callback: {e}");
159                                        }
160                                    });
161                                }
162                                NautilusWsMessage::ExecutionReports(reports) => {
163                                    Python::attach(|py| {
164                                        for report in reports {
165                                            match report {
166                                                ExecutionReport::Order(order_report) => {
167                                                    log::debug!(
168                                                        "Forwarding order status report: order_id={}, status={:?}",
169                                                        order_report.venue_order_id,
170                                                        order_report.order_status
171                                                    );
172                                                    match Py::new(py, order_report) {
173                                                        Ok(py_obj) => {
174                                                            if let Err(e) =
175                                                                callback.bind(py).call1((py_obj,))
176                                                            {
177                                                                log::error!("Error calling Python callback: {e}");
178                                                            }
179                                                        }
180                                                        Err(e) => {
181                                                            log::error!("Error converting OrderStatusReport to Python: {e}");
182                                                        }
183                                                    }
184                                                }
185                                                ExecutionReport::Fill(fill_report) => {
186                                                    log::debug!(
187                                                        "Forwarding fill report: trade_id={}, side={:?}, qty={}, price={}",
188                                                        fill_report.trade_id,
189                                                        fill_report.order_side,
190                                                        fill_report.last_qty,
191                                                        fill_report.last_px
192                                                    );
193                                                    match Py::new(py, fill_report) {
194                                                        Ok(py_obj) => {
195                                                            if let Err(e) =
196                                                                callback.bind(py).call1((py_obj,))
197                                                            {
198                                                                log::error!("Error calling Python callback: {e}");
199                                                            }
200                                                        }
201                                                        Err(e) => {
202                                                            log::error!("Error converting FillReport to Python: {e}");
203                                                        }
204                                                    }
205                                                }
206                                            }
207                                        }
208                                    });
209                                }
210                                _ => {
211                                    log::debug!("Unhandled message type: {msg:?}");
212                                }
213                            }
214                        }
215                        None => {
216                            log::info!("WebSocket connection closed");
217                            break;
218                        }
219                    }
220                }
221            });
222
223            Ok(())
224        })
225    }
226
227    #[pyo3(name = "wait_until_active")]
228    fn py_wait_until_active<'py>(
229        &self,
230        py: Python<'py>,
231        timeout_secs: f64,
232    ) -> PyResult<Bound<'py, PyAny>> {
233        let client = self.clone();
234
235        pyo3_async_runtimes::tokio::future_into_py(py, async move {
236            let start = std::time::Instant::now();
237            loop {
238                if client.is_active() {
239                    return Ok(());
240                }
241
242                if start.elapsed().as_secs_f64() >= timeout_secs {
243                    return Err(PyRuntimeError::new_err(format!(
244                        "WebSocket connection did not become active within {timeout_secs} seconds"
245                    )));
246                }
247
248                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
249            }
250        })
251    }
252
253    #[pyo3(name = "close")]
254    fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
255        let mut client = self.clone();
256
257        pyo3_async_runtimes::tokio::future_into_py(py, async move {
258            if let Err(e) = client.disconnect().await {
259                log::error!("Error on close: {e}");
260            }
261            Ok(())
262        })
263    }
264
265    #[pyo3(name = "subscribe_trades")]
266    fn py_subscribe_trades<'py>(
267        &self,
268        py: Python<'py>,
269        instrument_id: InstrumentId,
270    ) -> PyResult<Bound<'py, PyAny>> {
271        let client = self.clone();
272
273        pyo3_async_runtimes::tokio::future_into_py(py, async move {
274            client
275                .subscribe_trades(instrument_id)
276                .await
277                .map_err(to_pyruntime_err)?;
278            Ok(())
279        })
280    }
281
282    #[pyo3(name = "unsubscribe_trades")]
283    fn py_unsubscribe_trades<'py>(
284        &self,
285        py: Python<'py>,
286        instrument_id: InstrumentId,
287    ) -> PyResult<Bound<'py, PyAny>> {
288        let client = self.clone();
289
290        pyo3_async_runtimes::tokio::future_into_py(py, async move {
291            client
292                .unsubscribe_trades(instrument_id)
293                .await
294                .map_err(to_pyruntime_err)?;
295            Ok(())
296        })
297    }
298
299    #[pyo3(name = "subscribe_book")]
300    fn py_subscribe_book<'py>(
301        &self,
302        py: Python<'py>,
303        instrument_id: InstrumentId,
304    ) -> PyResult<Bound<'py, PyAny>> {
305        let client = self.clone();
306
307        pyo3_async_runtimes::tokio::future_into_py(py, async move {
308            client
309                .subscribe_book(instrument_id)
310                .await
311                .map_err(to_pyruntime_err)?;
312            Ok(())
313        })
314    }
315
316    #[pyo3(name = "unsubscribe_book")]
317    fn py_unsubscribe_book<'py>(
318        &self,
319        py: Python<'py>,
320        instrument_id: InstrumentId,
321    ) -> PyResult<Bound<'py, PyAny>> {
322        let client = self.clone();
323
324        pyo3_async_runtimes::tokio::future_into_py(py, async move {
325            client
326                .unsubscribe_book(instrument_id)
327                .await
328                .map_err(to_pyruntime_err)?;
329            Ok(())
330        })
331    }
332
333    #[pyo3(name = "subscribe_book_deltas")]
334    fn py_subscribe_book_deltas<'py>(
335        &self,
336        py: Python<'py>,
337        instrument_id: InstrumentId,
338        _book_type: u8,
339        _depth: u64,
340    ) -> PyResult<Bound<'py, PyAny>> {
341        let client = self.clone();
342
343        pyo3_async_runtimes::tokio::future_into_py(py, async move {
344            client
345                .subscribe_book(instrument_id)
346                .await
347                .map_err(to_pyruntime_err)?;
348            Ok(())
349        })
350    }
351
352    #[pyo3(name = "unsubscribe_book_deltas")]
353    fn py_unsubscribe_book_deltas<'py>(
354        &self,
355        py: Python<'py>,
356        instrument_id: InstrumentId,
357    ) -> PyResult<Bound<'py, PyAny>> {
358        let client = self.clone();
359
360        pyo3_async_runtimes::tokio::future_into_py(py, async move {
361            client
362                .unsubscribe_book(instrument_id)
363                .await
364                .map_err(to_pyruntime_err)?;
365            Ok(())
366        })
367    }
368
369    #[pyo3(name = "subscribe_book_snapshots")]
370    fn py_subscribe_book_snapshots<'py>(
371        &self,
372        py: Python<'py>,
373        instrument_id: InstrumentId,
374        _book_type: u8,
375        _depth: u64,
376    ) -> PyResult<Bound<'py, PyAny>> {
377        let client = self.clone();
378
379        pyo3_async_runtimes::tokio::future_into_py(py, async move {
380            client
381                .subscribe_book(instrument_id)
382                .await
383                .map_err(to_pyruntime_err)?;
384            Ok(())
385        })
386    }
387
388    #[pyo3(name = "subscribe_quotes")]
389    fn py_subscribe_quotes<'py>(
390        &self,
391        py: Python<'py>,
392        instrument_id: InstrumentId,
393    ) -> PyResult<Bound<'py, PyAny>> {
394        let client = self.clone();
395
396        pyo3_async_runtimes::tokio::future_into_py(py, async move {
397            client
398                .subscribe_quotes(instrument_id)
399                .await
400                .map_err(to_pyruntime_err)?;
401            Ok(())
402        })
403    }
404
405    #[pyo3(name = "unsubscribe_quotes")]
406    fn py_unsubscribe_quotes<'py>(
407        &self,
408        py: Python<'py>,
409        instrument_id: InstrumentId,
410    ) -> PyResult<Bound<'py, PyAny>> {
411        let client = self.clone();
412
413        pyo3_async_runtimes::tokio::future_into_py(py, async move {
414            client
415                .unsubscribe_quotes(instrument_id)
416                .await
417                .map_err(to_pyruntime_err)?;
418            Ok(())
419        })
420    }
421
422    #[pyo3(name = "subscribe_bars")]
423    fn py_subscribe_bars<'py>(
424        &self,
425        py: Python<'py>,
426        bar_type: BarType,
427    ) -> PyResult<Bound<'py, PyAny>> {
428        let client = self.clone();
429
430        pyo3_async_runtimes::tokio::future_into_py(py, async move {
431            client
432                .subscribe_bars(bar_type)
433                .await
434                .map_err(to_pyruntime_err)?;
435            Ok(())
436        })
437    }
438
439    #[pyo3(name = "unsubscribe_bars")]
440    fn py_unsubscribe_bars<'py>(
441        &self,
442        py: Python<'py>,
443        bar_type: BarType,
444    ) -> PyResult<Bound<'py, PyAny>> {
445        let client = self.clone();
446
447        pyo3_async_runtimes::tokio::future_into_py(py, async move {
448            client
449                .unsubscribe_bars(bar_type)
450                .await
451                .map_err(to_pyruntime_err)?;
452            Ok(())
453        })
454    }
455
456    #[pyo3(name = "subscribe_order_updates")]
457    fn py_subscribe_order_updates<'py>(
458        &self,
459        py: Python<'py>,
460        user: String,
461    ) -> PyResult<Bound<'py, PyAny>> {
462        let client = self.clone();
463
464        pyo3_async_runtimes::tokio::future_into_py(py, async move {
465            client
466                .subscribe_order_updates(&user)
467                .await
468                .map_err(to_pyruntime_err)?;
469            Ok(())
470        })
471    }
472
473    #[pyo3(name = "subscribe_user_events")]
474    fn py_subscribe_user_events<'py>(
475        &self,
476        py: Python<'py>,
477        user: String,
478    ) -> PyResult<Bound<'py, PyAny>> {
479        let client = self.clone();
480
481        pyo3_async_runtimes::tokio::future_into_py(py, async move {
482            client
483                .subscribe_user_events(&user)
484                .await
485                .map_err(to_pyruntime_err)?;
486            Ok(())
487        })
488    }
489
490    #[pyo3(name = "subscribe_mark_prices")]
491    fn py_subscribe_mark_prices<'py>(
492        &self,
493        py: Python<'py>,
494        instrument_id: InstrumentId,
495    ) -> PyResult<Bound<'py, PyAny>> {
496        let client = self.clone();
497
498        pyo3_async_runtimes::tokio::future_into_py(py, async move {
499            client
500                .subscribe_mark_prices(instrument_id)
501                .await
502                .map_err(to_pyruntime_err)?;
503            Ok(())
504        })
505    }
506
507    #[pyo3(name = "unsubscribe_mark_prices")]
508    fn py_unsubscribe_mark_prices<'py>(
509        &self,
510        py: Python<'py>,
511        instrument_id: InstrumentId,
512    ) -> PyResult<Bound<'py, PyAny>> {
513        let client = self.clone();
514
515        pyo3_async_runtimes::tokio::future_into_py(py, async move {
516            client
517                .unsubscribe_mark_prices(instrument_id)
518                .await
519                .map_err(to_pyruntime_err)?;
520            Ok(())
521        })
522    }
523
524    #[pyo3(name = "subscribe_index_prices")]
525    fn py_subscribe_index_prices<'py>(
526        &self,
527        py: Python<'py>,
528        instrument_id: InstrumentId,
529    ) -> PyResult<Bound<'py, PyAny>> {
530        let client = self.clone();
531
532        pyo3_async_runtimes::tokio::future_into_py(py, async move {
533            client
534                .subscribe_index_prices(instrument_id)
535                .await
536                .map_err(to_pyruntime_err)?;
537            Ok(())
538        })
539    }
540
541    #[pyo3(name = "unsubscribe_index_prices")]
542    fn py_unsubscribe_index_prices<'py>(
543        &self,
544        py: Python<'py>,
545        instrument_id: InstrumentId,
546    ) -> PyResult<Bound<'py, PyAny>> {
547        let client = self.clone();
548
549        pyo3_async_runtimes::tokio::future_into_py(py, async move {
550            client
551                .unsubscribe_index_prices(instrument_id)
552                .await
553                .map_err(to_pyruntime_err)?;
554            Ok(())
555        })
556    }
557
558    #[pyo3(name = "subscribe_funding_rates")]
559    fn py_subscribe_funding_rates<'py>(
560        &self,
561        py: Python<'py>,
562        instrument_id: InstrumentId,
563    ) -> PyResult<Bound<'py, PyAny>> {
564        let client = self.clone();
565
566        pyo3_async_runtimes::tokio::future_into_py(py, async move {
567            client
568                .subscribe_funding_rates(instrument_id)
569                .await
570                .map_err(to_pyruntime_err)?;
571            Ok(())
572        })
573    }
574
575    #[pyo3(name = "unsubscribe_funding_rates")]
576    fn py_unsubscribe_funding_rates<'py>(
577        &self,
578        py: Python<'py>,
579        instrument_id: InstrumentId,
580    ) -> PyResult<Bound<'py, PyAny>> {
581        let client = self.clone();
582
583        pyo3_async_runtimes::tokio::future_into_py(py, async move {
584            client
585                .unsubscribe_funding_rates(instrument_id)
586                .await
587                .map_err(to_pyruntime_err)?;
588            Ok(())
589        })
590    }
591}