nautilus_hyperliquid/python/
websocket.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the Hyperliquid WebSocket client.
17
18use nautilus_core::python::to_pyruntime_err;
19use nautilus_model::{
20    data::{BarType, Data, OrderBookDeltas_API},
21    identifiers::{AccountId, InstrumentId},
22    python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
23};
24use pyo3::{conversion::IntoPyObjectExt, exceptions::PyRuntimeError, prelude::*};
25
26use crate::{
27    common::HyperliquidProductType,
28    websocket::{
29        HyperliquidWebSocketClient,
30        messages::{ExecutionReport, NautilusWsMessage},
31    },
32};
33
34#[pymethods]
35impl HyperliquidWebSocketClient {
36    #[new]
37    #[pyo3(signature = (url=None, testnet=false, product_type=HyperliquidProductType::Perp, account_id=None))]
38    fn py_new(
39        url: Option<String>,
40        testnet: bool,
41        product_type: HyperliquidProductType,
42        account_id: Option<String>,
43    ) -> PyResult<Self> {
44        let account_id = account_id.map(|s| AccountId::from(s.as_str()));
45        Ok(Self::new(url, testnet, product_type, account_id))
46    }
47
48    #[getter]
49    #[pyo3(name = "url")]
50    #[must_use]
51    pub fn py_url(&self) -> String {
52        self.url().to_string()
53    }
54
55    #[pyo3(name = "is_active")]
56    fn py_is_active(&self) -> bool {
57        self.is_active()
58    }
59
60    #[pyo3(name = "is_closed")]
61    fn py_is_closed(&self) -> bool {
62        !self.is_active()
63    }
64
65    #[pyo3(name = "connect")]
66    fn py_connect<'py>(
67        &self,
68        py: Python<'py>,
69        instruments: Vec<Py<PyAny>>,
70        callback: Py<PyAny>,
71    ) -> PyResult<Bound<'py, PyAny>> {
72        for inst in instruments {
73            let inst_any = pyobject_to_instrument_any(py, inst)?;
74            self.cache_instrument(inst_any);
75        }
76
77        let mut client = self.clone();
78
79        pyo3_async_runtimes::tokio::future_into_py(py, async move {
80            client.connect().await.map_err(to_pyruntime_err)?;
81
82            tokio::spawn(async move {
83                loop {
84                    let event = client.next_event().await;
85
86                    match event {
87                        Some(msg) => {
88                            tracing::trace!("Received WebSocket message: {msg:?}");
89
90                            match msg {
91                                NautilusWsMessage::Trades(trade_ticks) => {
92                                    Python::attach(|py| {
93                                        for tick in trade_ticks {
94                                            let py_obj = data_to_pycapsule(py, Data::Trade(tick));
95                                            if let Err(e) = callback.bind(py).call1((py_obj,)) {
96                                                tracing::error!(
97                                                    "Error calling Python callback: {}",
98                                                    e
99                                                );
100                                            }
101                                        }
102                                    });
103                                }
104                                NautilusWsMessage::Quote(quote_tick) => {
105                                    Python::attach(|py| {
106                                        let py_obj = data_to_pycapsule(py, Data::Quote(quote_tick));
107                                        if let Err(e) = callback.bind(py).call1((py_obj,)) {
108                                            tracing::error!("Error calling Python callback: {}", e);
109                                        }
110                                    });
111                                }
112                                NautilusWsMessage::Deltas(deltas) => {
113                                    Python::attach(|py| {
114                                        let py_obj = data_to_pycapsule(
115                                            py,
116                                            Data::Deltas(OrderBookDeltas_API::new(deltas)),
117                                        );
118                                        if let Err(e) = callback.bind(py).call1((py_obj,)) {
119                                            tracing::error!("Error calling Python callback: {}", e);
120                                        }
121                                    });
122                                }
123                                NautilusWsMessage::Candle(bar) => {
124                                    Python::attach(|py| {
125                                        let py_obj = data_to_pycapsule(py, Data::Bar(bar));
126                                        if let Err(e) = callback.bind(py).call1((py_obj,)) {
127                                            tracing::error!("Error calling Python callback: {}", e);
128                                        }
129                                    });
130                                }
131                                NautilusWsMessage::MarkPrice(mark_price) => {
132                                    Python::attach(|py| {
133                                        let py_obj = data_to_pycapsule(
134                                            py,
135                                            Data::MarkPriceUpdate(mark_price),
136                                        );
137                                        if let Err(e) = callback.bind(py).call1((py_obj,)) {
138                                            tracing::error!("Error calling Python callback: {}", e);
139                                        }
140                                    });
141                                }
142                                NautilusWsMessage::IndexPrice(index_price) => {
143                                    Python::attach(|py| {
144                                        let py_obj = data_to_pycapsule(
145                                            py,
146                                            Data::IndexPriceUpdate(index_price),
147                                        );
148                                        if let Err(e) = callback.bind(py).call1((py_obj,)) {
149                                            tracing::error!("Error calling Python callback: {}", e);
150                                        }
151                                    });
152                                }
153                                NautilusWsMessage::FundingRate(funding_rate) => {
154                                    Python::attach(|py| {
155                                        if let Ok(py_obj) = funding_rate.into_py_any(py)
156                                            && let Err(e) = callback.bind(py).call1((py_obj,))
157                                        {
158                                            tracing::error!("Error calling Python callback: {}", e);
159                                        }
160                                    });
161                                }
162                                NautilusWsMessage::ExecutionReports(reports) => {
163                                    Python::attach(|py| {
164                                        for report in reports {
165                                            match report {
166                                                ExecutionReport::Order(order_report) => {
167                                                    tracing::debug!(
168                                                        "Forwarding order status report: order_id={}, status={:?}",
169                                                        order_report.venue_order_id,
170                                                        order_report.order_status
171                                                    );
172                                                    match Py::new(py, order_report) {
173                                                        Ok(py_obj) => {
174                                                            if let Err(e) =
175                                                                callback.bind(py).call1((py_obj,))
176                                                            {
177                                                                tracing::error!(
178                                                                    "Error calling Python callback: {}",
179                                                                    e
180                                                                );
181                                                            }
182                                                        }
183                                                        Err(e) => {
184                                                            tracing::error!(
185                                                                "Error converting OrderStatusReport to Python: {}",
186                                                                e
187                                                            );
188                                                        }
189                                                    }
190                                                }
191                                                ExecutionReport::Fill(fill_report) => {
192                                                    tracing::debug!(
193                                                        "Forwarding fill report: trade_id={}, side={:?}, qty={}, price={}",
194                                                        fill_report.trade_id,
195                                                        fill_report.order_side,
196                                                        fill_report.last_qty,
197                                                        fill_report.last_px
198                                                    );
199                                                    match Py::new(py, fill_report) {
200                                                        Ok(py_obj) => {
201                                                            if let Err(e) =
202                                                                callback.bind(py).call1((py_obj,))
203                                                            {
204                                                                tracing::error!(
205                                                                    "Error calling Python callback: {}",
206                                                                    e
207                                                                );
208                                                            }
209                                                        }
210                                                        Err(e) => {
211                                                            tracing::error!(
212                                                                "Error converting FillReport to Python: {}",
213                                                                e
214                                                            );
215                                                        }
216                                                    }
217                                                }
218                                            }
219                                        }
220                                    });
221                                }
222                                _ => {
223                                    tracing::debug!("Unhandled message type: {:?}", msg);
224                                }
225                            }
226                        }
227                        None => {
228                            tracing::info!("WebSocket connection closed");
229                            break;
230                        }
231                    }
232                }
233            });
234
235            Ok(())
236        })
237    }
238
239    #[pyo3(name = "wait_until_active")]
240    fn py_wait_until_active<'py>(
241        &self,
242        py: Python<'py>,
243        timeout_secs: f64,
244    ) -> PyResult<Bound<'py, PyAny>> {
245        let client = self.clone();
246
247        pyo3_async_runtimes::tokio::future_into_py(py, async move {
248            let start = std::time::Instant::now();
249            loop {
250                if client.is_active() {
251                    return Ok(());
252                }
253
254                if start.elapsed().as_secs_f64() >= timeout_secs {
255                    return Err(PyRuntimeError::new_err(format!(
256                        "WebSocket connection did not become active within {timeout_secs} seconds"
257                    )));
258                }
259
260                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
261            }
262        })
263    }
264
265    #[pyo3(name = "close")]
266    fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
267        let mut client = self.clone();
268
269        pyo3_async_runtimes::tokio::future_into_py(py, async move {
270            if let Err(e) = client.disconnect().await {
271                tracing::error!("Error on close: {e}");
272            }
273            Ok(())
274        })
275    }
276
277    #[pyo3(name = "subscribe_trades")]
278    fn py_subscribe_trades<'py>(
279        &self,
280        py: Python<'py>,
281        instrument_id: InstrumentId,
282    ) -> PyResult<Bound<'py, PyAny>> {
283        let client = self.clone();
284
285        pyo3_async_runtimes::tokio::future_into_py(py, async move {
286            client
287                .subscribe_trades(instrument_id)
288                .await
289                .map_err(to_pyruntime_err)?;
290            Ok(())
291        })
292    }
293
294    #[pyo3(name = "unsubscribe_trades")]
295    fn py_unsubscribe_trades<'py>(
296        &self,
297        py: Python<'py>,
298        instrument_id: InstrumentId,
299    ) -> PyResult<Bound<'py, PyAny>> {
300        let client = self.clone();
301
302        pyo3_async_runtimes::tokio::future_into_py(py, async move {
303            client
304                .unsubscribe_trades(instrument_id)
305                .await
306                .map_err(to_pyruntime_err)?;
307            Ok(())
308        })
309    }
310
311    #[pyo3(name = "subscribe_book")]
312    fn py_subscribe_book<'py>(
313        &self,
314        py: Python<'py>,
315        instrument_id: InstrumentId,
316    ) -> PyResult<Bound<'py, PyAny>> {
317        let client = self.clone();
318
319        pyo3_async_runtimes::tokio::future_into_py(py, async move {
320            client
321                .subscribe_book(instrument_id)
322                .await
323                .map_err(to_pyruntime_err)?;
324            Ok(())
325        })
326    }
327
328    #[pyo3(name = "unsubscribe_book")]
329    fn py_unsubscribe_book<'py>(
330        &self,
331        py: Python<'py>,
332        instrument_id: InstrumentId,
333    ) -> PyResult<Bound<'py, PyAny>> {
334        let client = self.clone();
335
336        pyo3_async_runtimes::tokio::future_into_py(py, async move {
337            client
338                .unsubscribe_book(instrument_id)
339                .await
340                .map_err(to_pyruntime_err)?;
341            Ok(())
342        })
343    }
344
345    #[pyo3(name = "subscribe_book_deltas")]
346    fn py_subscribe_book_deltas<'py>(
347        &self,
348        py: Python<'py>,
349        instrument_id: InstrumentId,
350        _book_type: u8,
351        _depth: u64,
352    ) -> PyResult<Bound<'py, PyAny>> {
353        let client = self.clone();
354
355        pyo3_async_runtimes::tokio::future_into_py(py, async move {
356            client
357                .subscribe_book(instrument_id)
358                .await
359                .map_err(to_pyruntime_err)?;
360            Ok(())
361        })
362    }
363
364    #[pyo3(name = "unsubscribe_book_deltas")]
365    fn py_unsubscribe_book_deltas<'py>(
366        &self,
367        py: Python<'py>,
368        instrument_id: InstrumentId,
369    ) -> PyResult<Bound<'py, PyAny>> {
370        let client = self.clone();
371
372        pyo3_async_runtimes::tokio::future_into_py(py, async move {
373            client
374                .unsubscribe_book(instrument_id)
375                .await
376                .map_err(to_pyruntime_err)?;
377            Ok(())
378        })
379    }
380
381    #[pyo3(name = "subscribe_book_snapshots")]
382    fn py_subscribe_book_snapshots<'py>(
383        &self,
384        py: Python<'py>,
385        instrument_id: InstrumentId,
386        _book_type: u8,
387        _depth: u64,
388    ) -> PyResult<Bound<'py, PyAny>> {
389        let client = self.clone();
390
391        pyo3_async_runtimes::tokio::future_into_py(py, async move {
392            client
393                .subscribe_book(instrument_id)
394                .await
395                .map_err(to_pyruntime_err)?;
396            Ok(())
397        })
398    }
399
400    #[pyo3(name = "subscribe_quotes")]
401    fn py_subscribe_quotes<'py>(
402        &self,
403        py: Python<'py>,
404        instrument_id: InstrumentId,
405    ) -> PyResult<Bound<'py, PyAny>> {
406        let client = self.clone();
407
408        pyo3_async_runtimes::tokio::future_into_py(py, async move {
409            client
410                .subscribe_quotes(instrument_id)
411                .await
412                .map_err(to_pyruntime_err)?;
413            Ok(())
414        })
415    }
416
417    #[pyo3(name = "unsubscribe_quotes")]
418    fn py_unsubscribe_quotes<'py>(
419        &self,
420        py: Python<'py>,
421        instrument_id: InstrumentId,
422    ) -> PyResult<Bound<'py, PyAny>> {
423        let client = self.clone();
424
425        pyo3_async_runtimes::tokio::future_into_py(py, async move {
426            client
427                .unsubscribe_quotes(instrument_id)
428                .await
429                .map_err(to_pyruntime_err)?;
430            Ok(())
431        })
432    }
433
434    #[pyo3(name = "subscribe_bars")]
435    fn py_subscribe_bars<'py>(
436        &self,
437        py: Python<'py>,
438        bar_type: BarType,
439    ) -> PyResult<Bound<'py, PyAny>> {
440        let client = self.clone();
441
442        pyo3_async_runtimes::tokio::future_into_py(py, async move {
443            client
444                .subscribe_bars(bar_type)
445                .await
446                .map_err(to_pyruntime_err)?;
447            Ok(())
448        })
449    }
450
451    #[pyo3(name = "unsubscribe_bars")]
452    fn py_unsubscribe_bars<'py>(
453        &self,
454        py: Python<'py>,
455        bar_type: BarType,
456    ) -> PyResult<Bound<'py, PyAny>> {
457        let client = self.clone();
458
459        pyo3_async_runtimes::tokio::future_into_py(py, async move {
460            client
461                .unsubscribe_bars(bar_type)
462                .await
463                .map_err(to_pyruntime_err)?;
464            Ok(())
465        })
466    }
467
468    #[pyo3(name = "subscribe_order_updates")]
469    fn py_subscribe_order_updates<'py>(
470        &self,
471        py: Python<'py>,
472        user: String,
473    ) -> PyResult<Bound<'py, PyAny>> {
474        let client = self.clone();
475
476        pyo3_async_runtimes::tokio::future_into_py(py, async move {
477            client
478                .subscribe_order_updates(&user)
479                .await
480                .map_err(to_pyruntime_err)?;
481            Ok(())
482        })
483    }
484
485    #[pyo3(name = "subscribe_user_events")]
486    fn py_subscribe_user_events<'py>(
487        &self,
488        py: Python<'py>,
489        user: String,
490    ) -> PyResult<Bound<'py, PyAny>> {
491        let client = self.clone();
492
493        pyo3_async_runtimes::tokio::future_into_py(py, async move {
494            client
495                .subscribe_user_events(&user)
496                .await
497                .map_err(to_pyruntime_err)?;
498            Ok(())
499        })
500    }
501
502    #[pyo3(name = "subscribe_mark_prices")]
503    fn py_subscribe_mark_prices<'py>(
504        &self,
505        py: Python<'py>,
506        instrument_id: InstrumentId,
507    ) -> PyResult<Bound<'py, PyAny>> {
508        let client = self.clone();
509
510        pyo3_async_runtimes::tokio::future_into_py(py, async move {
511            client
512                .subscribe_mark_prices(instrument_id)
513                .await
514                .map_err(to_pyruntime_err)?;
515            Ok(())
516        })
517    }
518
519    #[pyo3(name = "unsubscribe_mark_prices")]
520    fn py_unsubscribe_mark_prices<'py>(
521        &self,
522        py: Python<'py>,
523        instrument_id: InstrumentId,
524    ) -> PyResult<Bound<'py, PyAny>> {
525        let client = self.clone();
526
527        pyo3_async_runtimes::tokio::future_into_py(py, async move {
528            client
529                .unsubscribe_mark_prices(instrument_id)
530                .await
531                .map_err(to_pyruntime_err)?;
532            Ok(())
533        })
534    }
535
536    #[pyo3(name = "subscribe_index_prices")]
537    fn py_subscribe_index_prices<'py>(
538        &self,
539        py: Python<'py>,
540        instrument_id: InstrumentId,
541    ) -> PyResult<Bound<'py, PyAny>> {
542        let client = self.clone();
543
544        pyo3_async_runtimes::tokio::future_into_py(py, async move {
545            client
546                .subscribe_index_prices(instrument_id)
547                .await
548                .map_err(to_pyruntime_err)?;
549            Ok(())
550        })
551    }
552
553    #[pyo3(name = "unsubscribe_index_prices")]
554    fn py_unsubscribe_index_prices<'py>(
555        &self,
556        py: Python<'py>,
557        instrument_id: InstrumentId,
558    ) -> PyResult<Bound<'py, PyAny>> {
559        let client = self.clone();
560
561        pyo3_async_runtimes::tokio::future_into_py(py, async move {
562            client
563                .unsubscribe_index_prices(instrument_id)
564                .await
565                .map_err(to_pyruntime_err)?;
566            Ok(())
567        })
568    }
569
570    #[pyo3(name = "subscribe_funding_rates")]
571    fn py_subscribe_funding_rates<'py>(
572        &self,
573        py: Python<'py>,
574        instrument_id: InstrumentId,
575    ) -> PyResult<Bound<'py, PyAny>> {
576        let client = self.clone();
577
578        pyo3_async_runtimes::tokio::future_into_py(py, async move {
579            client
580                .subscribe_funding_rates(instrument_id)
581                .await
582                .map_err(to_pyruntime_err)?;
583            Ok(())
584        })
585    }
586
587    #[pyo3(name = "unsubscribe_funding_rates")]
588    fn py_unsubscribe_funding_rates<'py>(
589        &self,
590        py: Python<'py>,
591        instrument_id: InstrumentId,
592    ) -> PyResult<Bound<'py, PyAny>> {
593        let client = self.clone();
594
595        pyo3_async_runtimes::tokio::future_into_py(py, async move {
596            client
597                .unsubscribe_funding_rates(instrument_id)
598                .await
599                .map_err(to_pyruntime_err)?;
600            Ok(())
601        })
602    }
603}