Skip to main content

nautilus_hyperliquid/python/
websocket.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the Hyperliquid WebSocket client.
17
18use nautilus_common::live::get_runtime;
19use nautilus_core::python::to_pyruntime_err;
20use nautilus_model::{
21    data::{BarType, Data, OrderBookDeltas_API},
22    identifiers::{AccountId, ClientOrderId, InstrumentId},
23    python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
24};
25use pyo3::{conversion::IntoPyObjectExt, prelude::*};
26
27use crate::websocket::{
28    HyperliquidWebSocketClient,
29    messages::{ExecutionReport, NautilusWsMessage},
30};
31
32#[pymethods]
33impl HyperliquidWebSocketClient {
34    #[new]
35    #[pyo3(signature = (url=None, testnet=false, account_id=None))]
36    fn py_new(url: Option<String>, testnet: bool, account_id: Option<String>) -> PyResult<Self> {
37        let account_id = account_id.map(|s| AccountId::from(s.as_str()));
38        Ok(Self::new(url, testnet, account_id))
39    }
40
41    #[getter]
42    #[pyo3(name = "url")]
43    #[must_use]
44    pub fn py_url(&self) -> String {
45        self.url().to_string()
46    }
47
48    #[pyo3(name = "is_active")]
49    fn py_is_active(&self) -> bool {
50        self.is_active()
51    }
52
53    #[pyo3(name = "is_closed")]
54    fn py_is_closed(&self) -> bool {
55        !self.is_active()
56    }
57
58    #[pyo3(name = "cache_spot_fill_coins")]
59    fn py_cache_spot_fill_coins(&self, mapping: std::collections::HashMap<String, String>) {
60        let ahash_mapping: ahash::AHashMap<ustr::Ustr, ustr::Ustr> = mapping
61            .into_iter()
62            .map(|(k, v)| (ustr::Ustr::from(&k), ustr::Ustr::from(&v)))
63            .collect();
64        self.cache_spot_fill_coins(ahash_mapping);
65    }
66
67    #[pyo3(name = "cache_cloid_mapping")]
68    fn py_cache_cloid_mapping(&self, cloid: String, client_order_id: ClientOrderId) {
69        self.cache_cloid_mapping(ustr::Ustr::from(&cloid), client_order_id);
70    }
71
72    #[pyo3(name = "remove_cloid_mapping")]
73    fn py_remove_cloid_mapping(&self, cloid: String) {
74        self.remove_cloid_mapping(&ustr::Ustr::from(&cloid));
75    }
76
77    #[pyo3(name = "clear_cloid_cache")]
78    fn py_clear_cloid_cache(&self) {
79        self.clear_cloid_cache();
80    }
81
82    #[pyo3(name = "cloid_cache_len")]
83    fn py_cloid_cache_len(&self) -> usize {
84        self.cloid_cache_len()
85    }
86
87    #[pyo3(name = "get_cloid_mapping")]
88    fn py_get_cloid_mapping(&self, cloid: String) -> Option<ClientOrderId> {
89        self.get_cloid_mapping(&ustr::Ustr::from(&cloid))
90    }
91
92    #[pyo3(name = "connect")]
93    fn py_connect<'py>(
94        &self,
95        py: Python<'py>,
96        instruments: Vec<Py<PyAny>>,
97        callback: Py<PyAny>,
98    ) -> PyResult<Bound<'py, PyAny>> {
99        for inst in instruments {
100            let inst_any = pyobject_to_instrument_any(py, inst)?;
101            self.cache_instrument(inst_any);
102        }
103
104        let mut client = self.clone();
105
106        pyo3_async_runtimes::tokio::future_into_py(py, async move {
107            client.connect().await.map_err(to_pyruntime_err)?;
108
109            get_runtime().spawn(async move {
110                loop {
111                    let event = client.next_event().await;
112
113                    match event {
114                        Some(msg) => {
115                            log::trace!("Received WebSocket message: {msg:?}");
116
117                            match msg {
118                                NautilusWsMessage::Trades(trade_ticks) => {
119                                    Python::attach(|py| {
120                                        for tick in trade_ticks {
121                                            let py_obj = data_to_pycapsule(py, Data::Trade(tick));
122                                            if let Err(e) = callback.bind(py).call1((py_obj,)) {
123                                                log::error!(
124                                                    "Error calling Python callback: {e}"
125                                                );
126                                            }
127                                        }
128                                    });
129                                }
130                                NautilusWsMessage::Quote(quote_tick) => {
131                                    Python::attach(|py| {
132                                        let py_obj = data_to_pycapsule(py, Data::Quote(quote_tick));
133                                        if let Err(e) = callback.bind(py).call1((py_obj,)) {
134                                            log::error!("Error calling Python callback: {e}");
135                                        }
136                                    });
137                                }
138                                NautilusWsMessage::Deltas(deltas) => {
139                                    Python::attach(|py| {
140                                        let py_obj = data_to_pycapsule(
141                                            py,
142                                            Data::Deltas(OrderBookDeltas_API::new(deltas)),
143                                        );
144                                        if let Err(e) = callback.bind(py).call1((py_obj,)) {
145                                            log::error!("Error calling Python callback: {e}");
146                                        }
147                                    });
148                                }
149                                NautilusWsMessage::Candle(bar) => {
150                                    Python::attach(|py| {
151                                        let py_obj = data_to_pycapsule(py, Data::Bar(bar));
152                                        if let Err(e) = callback.bind(py).call1((py_obj,)) {
153                                            log::error!("Error calling Python callback: {e}");
154                                        }
155                                    });
156                                }
157                                NautilusWsMessage::MarkPrice(mark_price) => {
158                                    Python::attach(|py| {
159                                        let py_obj = data_to_pycapsule(
160                                            py,
161                                            Data::MarkPriceUpdate(mark_price),
162                                        );
163                                        if let Err(e) = callback.bind(py).call1((py_obj,)) {
164                                            log::error!("Error calling Python callback: {e}");
165                                        }
166                                    });
167                                }
168                                NautilusWsMessage::IndexPrice(index_price) => {
169                                    Python::attach(|py| {
170                                        let py_obj = data_to_pycapsule(
171                                            py,
172                                            Data::IndexPriceUpdate(index_price),
173                                        );
174                                        if let Err(e) = callback.bind(py).call1((py_obj,)) {
175                                            log::error!("Error calling Python callback: {e}");
176                                        }
177                                    });
178                                }
179                                NautilusWsMessage::FundingRate(funding_rate) => {
180                                    Python::attach(|py| {
181                                        if let Ok(py_obj) = funding_rate.into_py_any(py)
182                                            && let Err(e) = callback.bind(py).call1((py_obj,))
183                                        {
184                                            log::error!("Error calling Python callback: {e}");
185                                        }
186                                    });
187                                }
188                                NautilusWsMessage::ExecutionReports(reports) => {
189                                    Python::attach(|py| {
190                                        for report in reports {
191                                            match report {
192                                                ExecutionReport::Order(order_report) => {
193                                                    log::debug!(
194                                                        "Forwarding order status report: order_id={}, status={:?}",
195                                                        order_report.venue_order_id,
196                                                        order_report.order_status
197                                                    );
198                                                    match Py::new(py, order_report) {
199                                                        Ok(py_obj) => {
200                                                            if let Err(e) =
201                                                                callback.bind(py).call1((py_obj,))
202                                                            {
203                                                                log::error!("Error calling Python callback: {e}");
204                                                            }
205                                                        }
206                                                        Err(e) => {
207                                                            log::error!("Error converting OrderStatusReport to Python: {e}");
208                                                        }
209                                                    }
210                                                }
211                                                ExecutionReport::Fill(fill_report) => {
212                                                    log::debug!(
213                                                        "Forwarding fill report: trade_id={}, side={:?}, qty={}, price={}",
214                                                        fill_report.trade_id,
215                                                        fill_report.order_side,
216                                                        fill_report.last_qty,
217                                                        fill_report.last_px
218                                                    );
219                                                    match Py::new(py, fill_report) {
220                                                        Ok(py_obj) => {
221                                                            if let Err(e) =
222                                                                callback.bind(py).call1((py_obj,))
223                                                            {
224                                                                log::error!("Error calling Python callback: {e}");
225                                                            }
226                                                        }
227                                                        Err(e) => {
228                                                            log::error!("Error converting FillReport to Python: {e}");
229                                                        }
230                                                    }
231                                                }
232                                            }
233                                        }
234                                    });
235                                }
236                                _ => {
237                                    log::debug!("Unhandled message type: {msg:?}");
238                                }
239                            }
240                        }
241                        None => {
242                            log::debug!("WebSocket connection closed");
243                            break;
244                        }
245                    }
246                }
247            });
248
249            Ok(())
250        })
251    }
252
253    #[pyo3(name = "wait_until_active")]
254    fn py_wait_until_active<'py>(
255        &self,
256        py: Python<'py>,
257        timeout_secs: f64,
258    ) -> PyResult<Bound<'py, PyAny>> {
259        let client = self.clone();
260
261        pyo3_async_runtimes::tokio::future_into_py(py, async move {
262            let start = std::time::Instant::now();
263            loop {
264                if client.is_active() {
265                    return Ok(());
266                }
267
268                if start.elapsed().as_secs_f64() >= timeout_secs {
269                    return Err(to_pyruntime_err(format!(
270                        "WebSocket connection did not become active within {timeout_secs} seconds"
271                    )));
272                }
273
274                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
275            }
276        })
277    }
278
279    #[pyo3(name = "close")]
280    fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
281        let mut client = self.clone();
282
283        pyo3_async_runtimes::tokio::future_into_py(py, async move {
284            if let Err(e) = client.disconnect().await {
285                log::error!("Error on close: {e}");
286            }
287            Ok(())
288        })
289    }
290
291    #[pyo3(name = "subscribe_trades")]
292    fn py_subscribe_trades<'py>(
293        &self,
294        py: Python<'py>,
295        instrument_id: InstrumentId,
296    ) -> PyResult<Bound<'py, PyAny>> {
297        let client = self.clone();
298
299        pyo3_async_runtimes::tokio::future_into_py(py, async move {
300            client
301                .subscribe_trades(instrument_id)
302                .await
303                .map_err(to_pyruntime_err)?;
304            Ok(())
305        })
306    }
307
308    #[pyo3(name = "unsubscribe_trades")]
309    fn py_unsubscribe_trades<'py>(
310        &self,
311        py: Python<'py>,
312        instrument_id: InstrumentId,
313    ) -> PyResult<Bound<'py, PyAny>> {
314        let client = self.clone();
315
316        pyo3_async_runtimes::tokio::future_into_py(py, async move {
317            client
318                .unsubscribe_trades(instrument_id)
319                .await
320                .map_err(to_pyruntime_err)?;
321            Ok(())
322        })
323    }
324
325    #[pyo3(name = "subscribe_book")]
326    fn py_subscribe_book<'py>(
327        &self,
328        py: Python<'py>,
329        instrument_id: InstrumentId,
330    ) -> PyResult<Bound<'py, PyAny>> {
331        let client = self.clone();
332
333        pyo3_async_runtimes::tokio::future_into_py(py, async move {
334            client
335                .subscribe_book(instrument_id)
336                .await
337                .map_err(to_pyruntime_err)?;
338            Ok(())
339        })
340    }
341
342    #[pyo3(name = "unsubscribe_book")]
343    fn py_unsubscribe_book<'py>(
344        &self,
345        py: Python<'py>,
346        instrument_id: InstrumentId,
347    ) -> PyResult<Bound<'py, PyAny>> {
348        let client = self.clone();
349
350        pyo3_async_runtimes::tokio::future_into_py(py, async move {
351            client
352                .unsubscribe_book(instrument_id)
353                .await
354                .map_err(to_pyruntime_err)?;
355            Ok(())
356        })
357    }
358
359    #[pyo3(name = "subscribe_book_deltas")]
360    fn py_subscribe_book_deltas<'py>(
361        &self,
362        py: Python<'py>,
363        instrument_id: InstrumentId,
364        _book_type: u8,
365        _depth: u64,
366    ) -> PyResult<Bound<'py, PyAny>> {
367        let client = self.clone();
368
369        pyo3_async_runtimes::tokio::future_into_py(py, async move {
370            client
371                .subscribe_book(instrument_id)
372                .await
373                .map_err(to_pyruntime_err)?;
374            Ok(())
375        })
376    }
377
378    #[pyo3(name = "unsubscribe_book_deltas")]
379    fn py_unsubscribe_book_deltas<'py>(
380        &self,
381        py: Python<'py>,
382        instrument_id: InstrumentId,
383    ) -> PyResult<Bound<'py, PyAny>> {
384        let client = self.clone();
385
386        pyo3_async_runtimes::tokio::future_into_py(py, async move {
387            client
388                .unsubscribe_book(instrument_id)
389                .await
390                .map_err(to_pyruntime_err)?;
391            Ok(())
392        })
393    }
394
395    #[pyo3(name = "subscribe_book_snapshots")]
396    fn py_subscribe_book_snapshots<'py>(
397        &self,
398        py: Python<'py>,
399        instrument_id: InstrumentId,
400        _book_type: u8,
401        _depth: u64,
402    ) -> PyResult<Bound<'py, PyAny>> {
403        let client = self.clone();
404
405        pyo3_async_runtimes::tokio::future_into_py(py, async move {
406            client
407                .subscribe_book(instrument_id)
408                .await
409                .map_err(to_pyruntime_err)?;
410            Ok(())
411        })
412    }
413
414    #[pyo3(name = "subscribe_quotes")]
415    fn py_subscribe_quotes<'py>(
416        &self,
417        py: Python<'py>,
418        instrument_id: InstrumentId,
419    ) -> PyResult<Bound<'py, PyAny>> {
420        let client = self.clone();
421
422        pyo3_async_runtimes::tokio::future_into_py(py, async move {
423            client
424                .subscribe_quotes(instrument_id)
425                .await
426                .map_err(to_pyruntime_err)?;
427            Ok(())
428        })
429    }
430
431    #[pyo3(name = "unsubscribe_quotes")]
432    fn py_unsubscribe_quotes<'py>(
433        &self,
434        py: Python<'py>,
435        instrument_id: InstrumentId,
436    ) -> PyResult<Bound<'py, PyAny>> {
437        let client = self.clone();
438
439        pyo3_async_runtimes::tokio::future_into_py(py, async move {
440            client
441                .unsubscribe_quotes(instrument_id)
442                .await
443                .map_err(to_pyruntime_err)?;
444            Ok(())
445        })
446    }
447
448    #[pyo3(name = "subscribe_bars")]
449    fn py_subscribe_bars<'py>(
450        &self,
451        py: Python<'py>,
452        bar_type: BarType,
453    ) -> PyResult<Bound<'py, PyAny>> {
454        let client = self.clone();
455
456        pyo3_async_runtimes::tokio::future_into_py(py, async move {
457            client
458                .subscribe_bars(bar_type)
459                .await
460                .map_err(to_pyruntime_err)?;
461            Ok(())
462        })
463    }
464
465    #[pyo3(name = "unsubscribe_bars")]
466    fn py_unsubscribe_bars<'py>(
467        &self,
468        py: Python<'py>,
469        bar_type: BarType,
470    ) -> PyResult<Bound<'py, PyAny>> {
471        let client = self.clone();
472
473        pyo3_async_runtimes::tokio::future_into_py(py, async move {
474            client
475                .unsubscribe_bars(bar_type)
476                .await
477                .map_err(to_pyruntime_err)?;
478            Ok(())
479        })
480    }
481
482    #[pyo3(name = "subscribe_order_updates")]
483    fn py_subscribe_order_updates<'py>(
484        &self,
485        py: Python<'py>,
486        user: String,
487    ) -> PyResult<Bound<'py, PyAny>> {
488        let client = self.clone();
489
490        pyo3_async_runtimes::tokio::future_into_py(py, async move {
491            client
492                .subscribe_order_updates(&user)
493                .await
494                .map_err(to_pyruntime_err)?;
495            Ok(())
496        })
497    }
498
499    #[pyo3(name = "subscribe_user_events")]
500    fn py_subscribe_user_events<'py>(
501        &self,
502        py: Python<'py>,
503        user: String,
504    ) -> PyResult<Bound<'py, PyAny>> {
505        let client = self.clone();
506
507        pyo3_async_runtimes::tokio::future_into_py(py, async move {
508            client
509                .subscribe_user_events(&user)
510                .await
511                .map_err(to_pyruntime_err)?;
512            Ok(())
513        })
514    }
515
516    #[pyo3(name = "subscribe_user_fills")]
517    fn py_subscribe_user_fills<'py>(
518        &self,
519        py: Python<'py>,
520        user: String,
521    ) -> PyResult<Bound<'py, PyAny>> {
522        let client = self.clone();
523
524        pyo3_async_runtimes::tokio::future_into_py(py, async move {
525            client
526                .subscribe_user_fills(&user)
527                .await
528                .map_err(to_pyruntime_err)?;
529            Ok(())
530        })
531    }
532
533    #[pyo3(name = "subscribe_mark_prices")]
534    fn py_subscribe_mark_prices<'py>(
535        &self,
536        py: Python<'py>,
537        instrument_id: InstrumentId,
538    ) -> PyResult<Bound<'py, PyAny>> {
539        let client = self.clone();
540
541        pyo3_async_runtimes::tokio::future_into_py(py, async move {
542            client
543                .subscribe_mark_prices(instrument_id)
544                .await
545                .map_err(to_pyruntime_err)?;
546            Ok(())
547        })
548    }
549
550    #[pyo3(name = "unsubscribe_mark_prices")]
551    fn py_unsubscribe_mark_prices<'py>(
552        &self,
553        py: Python<'py>,
554        instrument_id: InstrumentId,
555    ) -> PyResult<Bound<'py, PyAny>> {
556        let client = self.clone();
557
558        pyo3_async_runtimes::tokio::future_into_py(py, async move {
559            client
560                .unsubscribe_mark_prices(instrument_id)
561                .await
562                .map_err(to_pyruntime_err)?;
563            Ok(())
564        })
565    }
566
567    #[pyo3(name = "subscribe_index_prices")]
568    fn py_subscribe_index_prices<'py>(
569        &self,
570        py: Python<'py>,
571        instrument_id: InstrumentId,
572    ) -> PyResult<Bound<'py, PyAny>> {
573        let client = self.clone();
574
575        pyo3_async_runtimes::tokio::future_into_py(py, async move {
576            client
577                .subscribe_index_prices(instrument_id)
578                .await
579                .map_err(to_pyruntime_err)?;
580            Ok(())
581        })
582    }
583
584    #[pyo3(name = "unsubscribe_index_prices")]
585    fn py_unsubscribe_index_prices<'py>(
586        &self,
587        py: Python<'py>,
588        instrument_id: InstrumentId,
589    ) -> PyResult<Bound<'py, PyAny>> {
590        let client = self.clone();
591
592        pyo3_async_runtimes::tokio::future_into_py(py, async move {
593            client
594                .unsubscribe_index_prices(instrument_id)
595                .await
596                .map_err(to_pyruntime_err)?;
597            Ok(())
598        })
599    }
600
601    #[pyo3(name = "subscribe_funding_rates")]
602    fn py_subscribe_funding_rates<'py>(
603        &self,
604        py: Python<'py>,
605        instrument_id: InstrumentId,
606    ) -> PyResult<Bound<'py, PyAny>> {
607        let client = self.clone();
608
609        pyo3_async_runtimes::tokio::future_into_py(py, async move {
610            client
611                .subscribe_funding_rates(instrument_id)
612                .await
613                .map_err(to_pyruntime_err)?;
614            Ok(())
615        })
616    }
617
618    #[pyo3(name = "unsubscribe_funding_rates")]
619    fn py_unsubscribe_funding_rates<'py>(
620        &self,
621        py: Python<'py>,
622        instrument_id: InstrumentId,
623    ) -> PyResult<Bound<'py, PyAny>> {
624        let client = self.clone();
625
626        pyo3_async_runtimes::tokio::future_into_py(py, async move {
627            client
628                .unsubscribe_funding_rates(instrument_id)
629                .await
630                .map_err(to_pyruntime_err)?;
631            Ok(())
632        })
633    }
634}