nautilus_hyperliquid/python/
websocket.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the Hyperliquid WebSocket client.
17
18use nautilus_core::python::to_pyruntime_err;
19use nautilus_model::{
20    data::{BarType, Data, OrderBookDeltas_API},
21    identifiers::{AccountId, InstrumentId},
22    python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
23};
24use pyo3::{conversion::IntoPyObjectExt, exceptions::PyRuntimeError, prelude::*};
25
26use crate::{
27    common::HyperliquidProductType,
28    websocket::{
29        HyperliquidWebSocketClient,
30        messages::{ExecutionReport, NautilusWsMessage},
31    },
32};
33
34#[pymethods]
35impl HyperliquidWebSocketClient {
36    #[new]
37    #[pyo3(signature = (url=None, testnet=false, product_type=HyperliquidProductType::Perp, account_id=None))]
38    fn py_new(
39        url: Option<String>,
40        testnet: bool,
41        product_type: HyperliquidProductType,
42        account_id: Option<String>,
43    ) -> PyResult<Self> {
44        let account_id = account_id.map(|s| AccountId::from(s.as_str()));
45        Ok(Self::new(url, testnet, product_type, account_id))
46    }
47
48    #[getter]
49    #[pyo3(name = "url")]
50    #[must_use]
51    pub fn py_url(&self) -> String {
52        self.url().to_string()
53    }
54
55    #[pyo3(name = "is_active")]
56    fn py_is_active(&self) -> bool {
57        self.is_active()
58    }
59
60    #[pyo3(name = "is_closed")]
61    fn py_is_closed(&self) -> bool {
62        !self.is_active()
63    }
64
65    #[pyo3(name = "connect")]
66    fn py_connect<'py>(
67        &self,
68        py: Python<'py>,
69        instruments: Vec<Py<PyAny>>,
70        callback: Py<PyAny>,
71    ) -> PyResult<Bound<'py, PyAny>> {
72        for inst in instruments {
73            let inst_any = pyobject_to_instrument_any(py, inst)?;
74            self.cache_instrument(inst_any);
75        }
76
77        let mut client = self.clone();
78
79        pyo3_async_runtimes::tokio::future_into_py(py, async move {
80            client.connect().await.map_err(to_pyruntime_err)?;
81
82            tokio::spawn(async move {
83                loop {
84                    let event = client.next_event().await;
85
86                    match event {
87                        Some(msg) => {
88                            tracing::trace!("Received WebSocket message: {msg:?}");
89
90                            match msg {
91                                NautilusWsMessage::Trades(trade_ticks) => {
92                                    Python::attach(|py| {
93                                        for tick in trade_ticks {
94                                            let py_obj = data_to_pycapsule(py, Data::Trade(tick));
95                                            if let Err(e) = callback.bind(py).call1((py_obj,)) {
96                                                tracing::error!(
97                                                    "Error calling Python callback: {}",
98                                                    e
99                                                );
100                                            }
101                                        }
102                                    });
103                                }
104                                NautilusWsMessage::Quote(quote_tick) => {
105                                    Python::attach(|py| {
106                                        let py_obj = data_to_pycapsule(py, Data::Quote(quote_tick));
107                                        if let Err(e) = callback.bind(py).call1((py_obj,)) {
108                                            tracing::error!("Error calling Python callback: {}", e);
109                                        }
110                                    });
111                                }
112                                NautilusWsMessage::Deltas(deltas) => {
113                                    Python::attach(|py| {
114                                        let py_obj = data_to_pycapsule(
115                                            py,
116                                            Data::Deltas(OrderBookDeltas_API::new(deltas)),
117                                        );
118                                        if let Err(e) = callback.bind(py).call1((py_obj,)) {
119                                            tracing::error!("Error calling Python callback: {}", e);
120                                        }
121                                    });
122                                }
123                                NautilusWsMessage::Candle(bar) => {
124                                    Python::attach(|py| {
125                                        let py_obj = data_to_pycapsule(py, Data::Bar(bar));
126                                        if let Err(e) = callback.bind(py).call1((py_obj,)) {
127                                            tracing::error!("Error calling Python callback: {}", e);
128                                        }
129                                    });
130                                }
131                                NautilusWsMessage::MarkPrice(mark_price) => {
132                                    Python::attach(|py| {
133                                        let py_obj = data_to_pycapsule(
134                                            py,
135                                            Data::MarkPriceUpdate(mark_price),
136                                        );
137                                        if let Err(e) = callback.bind(py).call1((py_obj,)) {
138                                            tracing::error!("Error calling Python callback: {}", e);
139                                        }
140                                    });
141                                }
142                                NautilusWsMessage::IndexPrice(index_price) => {
143                                    Python::attach(|py| {
144                                        let py_obj = data_to_pycapsule(
145                                            py,
146                                            Data::IndexPriceUpdate(index_price),
147                                        );
148                                        if let Err(e) = callback.bind(py).call1((py_obj,)) {
149                                            tracing::error!("Error calling Python callback: {}", e);
150                                        }
151                                    });
152                                }
153                                NautilusWsMessage::FundingRate(funding_rate) => {
154                                    Python::attach(|py| {
155                                        if let Ok(py_obj) = funding_rate.into_py_any(py)
156                                            && let Err(e) = callback.bind(py).call1((py_obj,))
157                                        {
158                                            tracing::error!("Error calling Python callback: {}", e);
159                                        }
160                                    });
161                                }
162                                NautilusWsMessage::ExecutionReports(reports) => {
163                                    Python::attach(|py| {
164                                        for report in reports {
165                                            match report {
166                                                ExecutionReport::Order(order_report) => {
167                                                    tracing::debug!(
168                                                        "Forwarding order status report: order_id={}, status={:?}",
169                                                        order_report.venue_order_id,
170                                                        order_report.order_status
171                                                    );
172                                                    match Py::new(py, order_report) {
173                                                        Ok(py_obj) => {
174                                                            if let Err(e) =
175                                                                callback.bind(py).call1((py_obj,))
176                                                            {
177                                                                tracing::error!(
178                                                                    "Error calling Python callback: {}",
179                                                                    e
180                                                                );
181                                                            }
182                                                        }
183                                                        Err(e) => {
184                                                            tracing::error!(
185                                                                "Error converting OrderStatusReport to Python: {}",
186                                                                e
187                                                            );
188                                                        }
189                                                    }
190                                                }
191                                                ExecutionReport::Fill(fill_report) => {
192                                                    tracing::debug!(
193                                                        "Forwarding fill report: trade_id={}, side={:?}, qty={}, price={}",
194                                                        fill_report.trade_id,
195                                                        fill_report.order_side,
196                                                        fill_report.last_qty,
197                                                        fill_report.last_px
198                                                    );
199                                                    match Py::new(py, fill_report) {
200                                                        Ok(py_obj) => {
201                                                            if let Err(e) =
202                                                                callback.bind(py).call1((py_obj,))
203                                                            {
204                                                                tracing::error!(
205                                                                    "Error calling Python callback: {}",
206                                                                    e
207                                                                );
208                                                            }
209                                                        }
210                                                        Err(e) => {
211                                                            tracing::error!(
212                                                                "Error converting FillReport to Python: {}",
213                                                                e
214                                                            );
215                                                        }
216                                                    }
217                                                }
218                                            }
219                                        }
220                                    });
221                                }
222                                _ => {
223                                    tracing::debug!("Unhandled message type: {:?}", msg);
224                                }
225                            }
226                        }
227                        None => {
228                            tracing::info!("WebSocket connection closed");
229                            break;
230                        }
231                    }
232                }
233            });
234
235            Ok(())
236        })
237    }
238
239    #[pyo3(name = "wait_until_active")]
240    fn py_wait_until_active<'py>(
241        &self,
242        py: Python<'py>,
243        timeout_secs: f64,
244    ) -> PyResult<Bound<'py, PyAny>> {
245        let client = self.clone();
246
247        pyo3_async_runtimes::tokio::future_into_py(py, async move {
248            let start = std::time::Instant::now();
249            loop {
250                if client.is_active() {
251                    return Ok(());
252                }
253
254                if start.elapsed().as_secs_f64() >= timeout_secs {
255                    return Err(PyRuntimeError::new_err(format!(
256                        "WebSocket connection did not become active within {} seconds",
257                        timeout_secs
258                    )));
259                }
260
261                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
262            }
263        })
264    }
265
266    #[pyo3(name = "close")]
267    fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
268        let mut client = self.clone();
269
270        pyo3_async_runtimes::tokio::future_into_py(py, async move {
271            if let Err(e) = client.disconnect().await {
272                tracing::error!("Error on close: {e}");
273            }
274            Ok(())
275        })
276    }
277
278    #[pyo3(name = "subscribe_trades")]
279    fn py_subscribe_trades<'py>(
280        &self,
281        py: Python<'py>,
282        instrument_id: InstrumentId,
283    ) -> PyResult<Bound<'py, PyAny>> {
284        let client = self.clone();
285
286        pyo3_async_runtimes::tokio::future_into_py(py, async move {
287            client
288                .subscribe_trades(instrument_id)
289                .await
290                .map_err(to_pyruntime_err)?;
291            Ok(())
292        })
293    }
294
295    #[pyo3(name = "unsubscribe_trades")]
296    fn py_unsubscribe_trades<'py>(
297        &self,
298        py: Python<'py>,
299        instrument_id: InstrumentId,
300    ) -> PyResult<Bound<'py, PyAny>> {
301        let client = self.clone();
302
303        pyo3_async_runtimes::tokio::future_into_py(py, async move {
304            client
305                .unsubscribe_trades(instrument_id)
306                .await
307                .map_err(to_pyruntime_err)?;
308            Ok(())
309        })
310    }
311
312    #[pyo3(name = "subscribe_book")]
313    fn py_subscribe_book<'py>(
314        &self,
315        py: Python<'py>,
316        instrument_id: InstrumentId,
317    ) -> PyResult<Bound<'py, PyAny>> {
318        let client = self.clone();
319
320        pyo3_async_runtimes::tokio::future_into_py(py, async move {
321            client
322                .subscribe_book(instrument_id)
323                .await
324                .map_err(to_pyruntime_err)?;
325            Ok(())
326        })
327    }
328
329    #[pyo3(name = "unsubscribe_book")]
330    fn py_unsubscribe_book<'py>(
331        &self,
332        py: Python<'py>,
333        instrument_id: InstrumentId,
334    ) -> PyResult<Bound<'py, PyAny>> {
335        let client = self.clone();
336
337        pyo3_async_runtimes::tokio::future_into_py(py, async move {
338            client
339                .unsubscribe_book(instrument_id)
340                .await
341                .map_err(to_pyruntime_err)?;
342            Ok(())
343        })
344    }
345
346    #[pyo3(name = "subscribe_book_deltas")]
347    fn py_subscribe_book_deltas<'py>(
348        &self,
349        py: Python<'py>,
350        instrument_id: InstrumentId,
351        _book_type: u8,
352        _depth: u64,
353    ) -> PyResult<Bound<'py, PyAny>> {
354        let client = self.clone();
355
356        pyo3_async_runtimes::tokio::future_into_py(py, async move {
357            client
358                .subscribe_book(instrument_id)
359                .await
360                .map_err(to_pyruntime_err)?;
361            Ok(())
362        })
363    }
364
365    #[pyo3(name = "unsubscribe_book_deltas")]
366    fn py_unsubscribe_book_deltas<'py>(
367        &self,
368        py: Python<'py>,
369        instrument_id: InstrumentId,
370    ) -> PyResult<Bound<'py, PyAny>> {
371        let client = self.clone();
372
373        pyo3_async_runtimes::tokio::future_into_py(py, async move {
374            client
375                .unsubscribe_book(instrument_id)
376                .await
377                .map_err(to_pyruntime_err)?;
378            Ok(())
379        })
380    }
381
382    #[pyo3(name = "subscribe_book_snapshots")]
383    fn py_subscribe_book_snapshots<'py>(
384        &self,
385        py: Python<'py>,
386        instrument_id: InstrumentId,
387        _book_type: u8,
388        _depth: u64,
389    ) -> PyResult<Bound<'py, PyAny>> {
390        let client = self.clone();
391
392        pyo3_async_runtimes::tokio::future_into_py(py, async move {
393            client
394                .subscribe_book(instrument_id)
395                .await
396                .map_err(to_pyruntime_err)?;
397            Ok(())
398        })
399    }
400
401    #[pyo3(name = "subscribe_quotes")]
402    fn py_subscribe_quotes<'py>(
403        &self,
404        py: Python<'py>,
405        instrument_id: InstrumentId,
406    ) -> PyResult<Bound<'py, PyAny>> {
407        let client = self.clone();
408
409        pyo3_async_runtimes::tokio::future_into_py(py, async move {
410            client
411                .subscribe_quotes(instrument_id)
412                .await
413                .map_err(to_pyruntime_err)?;
414            Ok(())
415        })
416    }
417
418    #[pyo3(name = "unsubscribe_quotes")]
419    fn py_unsubscribe_quotes<'py>(
420        &self,
421        py: Python<'py>,
422        instrument_id: InstrumentId,
423    ) -> PyResult<Bound<'py, PyAny>> {
424        let client = self.clone();
425
426        pyo3_async_runtimes::tokio::future_into_py(py, async move {
427            client
428                .unsubscribe_quotes(instrument_id)
429                .await
430                .map_err(to_pyruntime_err)?;
431            Ok(())
432        })
433    }
434
435    #[pyo3(name = "subscribe_bars")]
436    fn py_subscribe_bars<'py>(
437        &self,
438        py: Python<'py>,
439        bar_type: BarType,
440    ) -> PyResult<Bound<'py, PyAny>> {
441        let client = self.clone();
442
443        pyo3_async_runtimes::tokio::future_into_py(py, async move {
444            client
445                .subscribe_bars(bar_type)
446                .await
447                .map_err(to_pyruntime_err)?;
448            Ok(())
449        })
450    }
451
452    #[pyo3(name = "unsubscribe_bars")]
453    fn py_unsubscribe_bars<'py>(
454        &self,
455        py: Python<'py>,
456        bar_type: BarType,
457    ) -> PyResult<Bound<'py, PyAny>> {
458        let client = self.clone();
459
460        pyo3_async_runtimes::tokio::future_into_py(py, async move {
461            client
462                .unsubscribe_bars(bar_type)
463                .await
464                .map_err(to_pyruntime_err)?;
465            Ok(())
466        })
467    }
468
469    #[pyo3(name = "subscribe_order_updates")]
470    fn py_subscribe_order_updates<'py>(
471        &self,
472        py: Python<'py>,
473        user: String,
474    ) -> PyResult<Bound<'py, PyAny>> {
475        let client = self.clone();
476
477        pyo3_async_runtimes::tokio::future_into_py(py, async move {
478            client
479                .subscribe_order_updates(&user)
480                .await
481                .map_err(to_pyruntime_err)?;
482            Ok(())
483        })
484    }
485
486    #[pyo3(name = "subscribe_user_events")]
487    fn py_subscribe_user_events<'py>(
488        &self,
489        py: Python<'py>,
490        user: String,
491    ) -> PyResult<Bound<'py, PyAny>> {
492        let client = self.clone();
493
494        pyo3_async_runtimes::tokio::future_into_py(py, async move {
495            client
496                .subscribe_user_events(&user)
497                .await
498                .map_err(to_pyruntime_err)?;
499            Ok(())
500        })
501    }
502
503    #[pyo3(name = "subscribe_mark_prices")]
504    fn py_subscribe_mark_prices<'py>(
505        &self,
506        py: Python<'py>,
507        instrument_id: InstrumentId,
508    ) -> PyResult<Bound<'py, PyAny>> {
509        let client = self.clone();
510
511        pyo3_async_runtimes::tokio::future_into_py(py, async move {
512            client
513                .subscribe_mark_prices(instrument_id)
514                .await
515                .map_err(to_pyruntime_err)?;
516            Ok(())
517        })
518    }
519
520    #[pyo3(name = "unsubscribe_mark_prices")]
521    fn py_unsubscribe_mark_prices<'py>(
522        &self,
523        py: Python<'py>,
524        instrument_id: InstrumentId,
525    ) -> PyResult<Bound<'py, PyAny>> {
526        let client = self.clone();
527
528        pyo3_async_runtimes::tokio::future_into_py(py, async move {
529            client
530                .unsubscribe_mark_prices(instrument_id)
531                .await
532                .map_err(to_pyruntime_err)?;
533            Ok(())
534        })
535    }
536
537    #[pyo3(name = "subscribe_index_prices")]
538    fn py_subscribe_index_prices<'py>(
539        &self,
540        py: Python<'py>,
541        instrument_id: InstrumentId,
542    ) -> PyResult<Bound<'py, PyAny>> {
543        let client = self.clone();
544
545        pyo3_async_runtimes::tokio::future_into_py(py, async move {
546            client
547                .subscribe_index_prices(instrument_id)
548                .await
549                .map_err(to_pyruntime_err)?;
550            Ok(())
551        })
552    }
553
554    #[pyo3(name = "unsubscribe_index_prices")]
555    fn py_unsubscribe_index_prices<'py>(
556        &self,
557        py: Python<'py>,
558        instrument_id: InstrumentId,
559    ) -> PyResult<Bound<'py, PyAny>> {
560        let client = self.clone();
561
562        pyo3_async_runtimes::tokio::future_into_py(py, async move {
563            client
564                .unsubscribe_index_prices(instrument_id)
565                .await
566                .map_err(to_pyruntime_err)?;
567            Ok(())
568        })
569    }
570
571    #[pyo3(name = "subscribe_funding_rates")]
572    fn py_subscribe_funding_rates<'py>(
573        &self,
574        py: Python<'py>,
575        instrument_id: InstrumentId,
576    ) -> PyResult<Bound<'py, PyAny>> {
577        let client = self.clone();
578
579        pyo3_async_runtimes::tokio::future_into_py(py, async move {
580            client
581                .subscribe_funding_rates(instrument_id)
582                .await
583                .map_err(to_pyruntime_err)?;
584            Ok(())
585        })
586    }
587
588    #[pyo3(name = "unsubscribe_funding_rates")]
589    fn py_unsubscribe_funding_rates<'py>(
590        &self,
591        py: Python<'py>,
592        instrument_id: InstrumentId,
593    ) -> PyResult<Bound<'py, PyAny>> {
594        let client = self.clone();
595
596        pyo3_async_runtimes::tokio::future_into_py(py, async move {
597            client
598                .unsubscribe_funding_rates(instrument_id)
599                .await
600                .map_err(to_pyruntime_err)?;
601            Ok(())
602        })
603    }
604}