nautilus_bitmex/python/
websocket.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the `BitmEX` WebSocket client.
17//!
18//! # Design Pattern: Clone and Share State
19//!
20//! The WebSocket client must be cloned for async operations because PyO3's `future_into_py`
21//! requires `'static` futures (cannot borrow from `self`). To ensure clones share the same
22//! connection state, key fields use `Arc<RwLock<T>>`:
23//!
24//! - `inner: Arc<RwLock<Option<WebSocketClient>>>` - The WebSocket connection.
25//!
26//! Without shared state, clones would be independent, causing:
27//! - Lost WebSocket messages.
28//! - Missing instrument data.
29//! - Connection state desynchronization.
30//!
31//! ## Connection Flow
32//!
33//! 1. Clone the client for async operation.
34//! 2. Connect and populate shared state on the clone.
35//! 3. Spawn stream handler as background task.
36//! 4. Return immediately (non-blocking).
37//!
38//! ## Important Notes
39//!
40//! - Never use `block_on()` - it blocks the runtime.
41//! - Always clone before async blocks for lifetime requirements.
42//! - `RwLock` is preferred over Mutex (many reads, few writes).
43
44use futures_util::StreamExt;
45use nautilus_core::python::{to_pyruntime_err, to_pyvalue_err};
46use nautilus_model::{
47    data::bar::BarType,
48    identifiers::{AccountId, InstrumentId},
49    python::{
50        data::data_to_pycapsule,
51        instruments::{instrument_any_to_pyobject, pyobject_to_instrument_any},
52    },
53};
54use pyo3::{conversion::IntoPyObjectExt, exceptions::PyRuntimeError, prelude::*};
55
56use crate::websocket::{BitmexWebSocketClient, messages::NautilusWsMessage};
57
58#[pymethods]
59impl BitmexWebSocketClient {
60    #[new]
61    #[pyo3(signature = (url=None, api_key=None, api_secret=None, account_id=None, heartbeat=None, testnet=false))]
62    fn py_new(
63        url: Option<String>,
64        api_key: Option<String>,
65        api_secret: Option<String>,
66        account_id: Option<AccountId>,
67        heartbeat: Option<u64>,
68        testnet: bool,
69    ) -> PyResult<Self> {
70        Self::new_with_env(url, api_key, api_secret, account_id, heartbeat, testnet)
71            .map_err(to_pyvalue_err)
72    }
73
74    #[staticmethod]
75    #[pyo3(name = "from_env")]
76    fn py_from_env() -> PyResult<Self> {
77        Self::from_env().map_err(to_pyvalue_err)
78    }
79
80    #[getter]
81    #[pyo3(name = "url")]
82    #[must_use]
83    pub const fn py_url(&self) -> &str {
84        self.url()
85    }
86
87    #[getter]
88    #[pyo3(name = "api_key")]
89    #[must_use]
90    pub fn py_api_key(&self) -> Option<&str> {
91        self.api_key()
92    }
93
94    #[getter]
95    #[pyo3(name = "api_key_masked")]
96    #[must_use]
97    pub fn py_api_key_masked(&self) -> Option<String> {
98        self.api_key_masked()
99    }
100
101    #[pyo3(name = "is_active")]
102    fn py_is_active(&mut self) -> bool {
103        self.is_active()
104    }
105
106    #[pyo3(name = "is_closed")]
107    fn py_is_closed(&mut self) -> bool {
108        self.is_closed()
109    }
110
111    #[pyo3(name = "get_subscriptions")]
112    fn py_get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
113        self.get_subscriptions(instrument_id)
114    }
115
116    #[pyo3(name = "set_account_id")]
117    pub fn py_set_account_id(&mut self, account_id: AccountId) {
118        self.set_account_id(account_id);
119    }
120
121    #[pyo3(name = "cache_instrument")]
122    fn py_cache_instrument(&self, py: Python, instrument: Py<PyAny>) -> PyResult<()> {
123        let inst_any = pyobject_to_instrument_any(py, instrument)?;
124        self.cache_instrument(inst_any);
125        Ok(())
126    }
127
128    #[pyo3(name = "connect")]
129    fn py_connect<'py>(
130        &mut self,
131        py: Python<'py>,
132        instruments: Vec<Py<PyAny>>,
133        callback: Py<PyAny>,
134    ) -> PyResult<Bound<'py, PyAny>> {
135        let mut instruments_any = Vec::new();
136        for inst in instruments {
137            let inst_any = pyobject_to_instrument_any(py, inst)?;
138            instruments_any.push(inst_any);
139        }
140
141        self.cache_instruments(instruments_any);
142
143        // We need to clone self to move into the async block,
144        // the clone will be connected and kept alive to maintain the handler.
145        let mut client = self.clone();
146
147        pyo3_async_runtimes::tokio::future_into_py(py, async move {
148            client.connect().await.map_err(to_pyruntime_err)?;
149
150            let stream = client.stream();
151
152            tokio::spawn(async move {
153                let _client = client; // Keep client alive for the entire duration
154                tokio::pin!(stream);
155
156                while let Some(msg) = stream.next().await {
157                    Python::attach(|py| match msg {
158                        NautilusWsMessage::Data(data_vec) => {
159                            for data in data_vec {
160                                let py_obj = data_to_pycapsule(py, data);
161                                call_python(py, &callback, py_obj);
162                            }
163                        }
164                        NautilusWsMessage::Instruments(instruments) => {
165                            for instrument in instruments {
166                                if let Ok(py_obj) = instrument_any_to_pyobject(py, instrument) {
167                                    call_python(py, &callback, py_obj);
168                                }
169                            }
170                        }
171                        NautilusWsMessage::OrderStatusReports(reports) => {
172                            for report in reports {
173                                if let Ok(py_obj) = report.into_py_any(py) {
174                                    call_python(py, &callback, py_obj);
175                                }
176                            }
177                        }
178                        NautilusWsMessage::FillReports(reports) => {
179                            for report in reports {
180                                if let Ok(py_obj) = report.into_py_any(py) {
181                                    call_python(py, &callback, py_obj);
182                                }
183                            }
184                        }
185                        NautilusWsMessage::PositionStatusReport(report) => {
186                            if let Ok(py_obj) = report.into_py_any(py) {
187                                call_python(py, &callback, py_obj);
188                            }
189                        }
190                        NautilusWsMessage::FundingRateUpdates(updates) => {
191                            for update in updates {
192                                if let Ok(py_obj) = update.into_py_any(py) {
193                                    call_python(py, &callback, py_obj);
194                                }
195                            }
196                        }
197                        NautilusWsMessage::AccountState(account_state) => {
198                            if let Ok(py_obj) = account_state.into_py_any(py) {
199                                call_python(py, &callback, py_obj);
200                            }
201                        }
202                        NautilusWsMessage::OrderUpdated(event) => {
203                            if let Ok(py_obj) = event.into_py_any(py) {
204                                call_python(py, &callback, py_obj);
205                            }
206                        }
207                        NautilusWsMessage::Reconnected => {}
208                        NautilusWsMessage::Authenticated => {}
209                    });
210                }
211            });
212
213            Ok(())
214        })
215    }
216
217    #[pyo3(name = "wait_until_active")]
218    fn py_wait_until_active<'py>(
219        &self,
220        py: Python<'py>,
221        timeout_secs: f64,
222    ) -> PyResult<Bound<'py, PyAny>> {
223        let client = self.clone();
224
225        pyo3_async_runtimes::tokio::future_into_py(py, async move {
226            client
227                .wait_until_active(timeout_secs)
228                .await
229                .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
230            Ok(())
231        })
232    }
233
234    #[pyo3(name = "close")]
235    fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
236        let mut client = self.clone();
237
238        pyo3_async_runtimes::tokio::future_into_py(py, async move {
239            if let Err(e) = client.close().await {
240                log::error!("Error on close: {e}");
241            }
242            Ok(())
243        })
244    }
245
246    #[pyo3(name = "subscribe_instruments")]
247    fn py_subscribe_instruments<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
248        let client = self.clone();
249
250        pyo3_async_runtimes::tokio::future_into_py(py, async move {
251            if let Err(e) = client.subscribe_instruments().await {
252                log::error!("Failed to subscribe to instruments: {e}");
253            }
254            Ok(())
255        })
256    }
257
258    #[pyo3(name = "subscribe_instrument")]
259    fn py_subscribe_instrument<'py>(
260        &self,
261        py: Python<'py>,
262        instrument_id: InstrumentId,
263    ) -> PyResult<Bound<'py, PyAny>> {
264        let client = self.clone();
265
266        pyo3_async_runtimes::tokio::future_into_py(py, async move {
267            if let Err(e) = client.subscribe_instrument(instrument_id).await {
268                log::error!("Failed to subscribe to instrument: {e}");
269            }
270            Ok(())
271        })
272    }
273
274    #[pyo3(name = "subscribe_book")]
275    fn py_subscribe_book<'py>(
276        &self,
277        py: Python<'py>,
278        instrument_id: InstrumentId,
279    ) -> PyResult<Bound<'py, PyAny>> {
280        let client = self.clone();
281
282        pyo3_async_runtimes::tokio::future_into_py(py, async move {
283            if let Err(e) = client.subscribe_book(instrument_id).await {
284                log::error!("Failed to subscribe to order book: {e}");
285            }
286            Ok(())
287        })
288    }
289
290    #[pyo3(name = "subscribe_book_25")]
291    fn py_subscribe_book_25<'py>(
292        &self,
293        py: Python<'py>,
294        instrument_id: InstrumentId,
295    ) -> PyResult<Bound<'py, PyAny>> {
296        let client = self.clone();
297
298        pyo3_async_runtimes::tokio::future_into_py(py, async move {
299            if let Err(e) = client.subscribe_book_25(instrument_id).await {
300                log::error!("Failed to subscribe to order book 25: {e}");
301            }
302            Ok(())
303        })
304    }
305
306    #[pyo3(name = "subscribe_book_depth10")]
307    fn py_subscribe_book_depth10<'py>(
308        &self,
309        py: Python<'py>,
310        instrument_id: InstrumentId,
311    ) -> PyResult<Bound<'py, PyAny>> {
312        let client = self.clone();
313
314        pyo3_async_runtimes::tokio::future_into_py(py, async move {
315            if let Err(e) = client.subscribe_book_depth10(instrument_id).await {
316                log::error!("Failed to subscribe to order book depth 10: {e}");
317            }
318            Ok(())
319        })
320    }
321
322    #[pyo3(name = "subscribe_quotes")]
323    fn py_subscribe_quotes<'py>(
324        &self,
325        py: Python<'py>,
326        instrument_id: InstrumentId,
327    ) -> PyResult<Bound<'py, PyAny>> {
328        let client = self.clone();
329
330        pyo3_async_runtimes::tokio::future_into_py(py, async move {
331            if let Err(e) = client.subscribe_quotes(instrument_id).await {
332                log::error!("Failed to subscribe to quotes: {e}");
333            }
334            Ok(())
335        })
336    }
337
338    #[pyo3(name = "subscribe_trades")]
339    fn py_subscribe_trades<'py>(
340        &self,
341        py: Python<'py>,
342        instrument_id: InstrumentId,
343    ) -> PyResult<Bound<'py, PyAny>> {
344        let client = self.clone();
345
346        pyo3_async_runtimes::tokio::future_into_py(py, async move {
347            if let Err(e) = client.subscribe_trades(instrument_id).await {
348                log::error!("Failed to subscribe to trades: {e}");
349            }
350            Ok(())
351        })
352    }
353
354    #[pyo3(name = "subscribe_mark_prices")]
355    fn py_subscribe_mark_prices<'py>(
356        &self,
357        py: Python<'py>,
358        instrument_id: InstrumentId,
359    ) -> PyResult<Bound<'py, PyAny>> {
360        let client = self.clone();
361
362        pyo3_async_runtimes::tokio::future_into_py(py, async move {
363            if let Err(e) = client.subscribe_mark_prices(instrument_id).await {
364                log::error!("Failed to subscribe to mark prices: {e}");
365            }
366            Ok(())
367        })
368    }
369
370    #[pyo3(name = "subscribe_index_prices")]
371    fn py_subscribe_index_prices<'py>(
372        &self,
373        py: Python<'py>,
374        instrument_id: InstrumentId,
375    ) -> PyResult<Bound<'py, PyAny>> {
376        let client = self.clone();
377
378        pyo3_async_runtimes::tokio::future_into_py(py, async move {
379            if let Err(e) = client.subscribe_index_prices(instrument_id).await {
380                log::error!("Failed to subscribe to index prices: {e}");
381            }
382            Ok(())
383        })
384    }
385
386    #[pyo3(name = "subscribe_funding_rates")]
387    fn py_subscribe_funding_rates<'py>(
388        &self,
389        py: Python<'py>,
390        instrument_id: InstrumentId,
391    ) -> PyResult<Bound<'py, PyAny>> {
392        let client = self.clone();
393
394        pyo3_async_runtimes::tokio::future_into_py(py, async move {
395            if let Err(e) = client.subscribe_funding_rates(instrument_id).await {
396                log::error!("Failed to subscribe to funding: {e}");
397            }
398            Ok(())
399        })
400    }
401
402    #[pyo3(name = "subscribe_bars")]
403    fn py_subscribe_bars<'py>(
404        &self,
405        py: Python<'py>,
406        bar_type: BarType,
407    ) -> PyResult<Bound<'py, PyAny>> {
408        let client = self.clone();
409
410        pyo3_async_runtimes::tokio::future_into_py(py, async move {
411            if let Err(e) = client.subscribe_bars(bar_type).await {
412                log::error!("Failed to subscribe to bars: {e}");
413            }
414            Ok(())
415        })
416    }
417
418    #[pyo3(name = "unsubscribe_instruments")]
419    fn py_unsubscribe_instruments<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
420        let client = self.clone();
421
422        pyo3_async_runtimes::tokio::future_into_py(py, async move {
423            if let Err(e) = client.unsubscribe_instruments().await {
424                log::error!("Failed to unsubscribe from instruments: {e}");
425            }
426            Ok(())
427        })
428    }
429
430    #[pyo3(name = "unsubscribe_instrument")]
431    fn py_unsubscribe_instrument<'py>(
432        &self,
433        py: Python<'py>,
434        instrument_id: InstrumentId,
435    ) -> PyResult<Bound<'py, PyAny>> {
436        let client = self.clone();
437
438        pyo3_async_runtimes::tokio::future_into_py(py, async move {
439            if let Err(e) = client.unsubscribe_instrument(instrument_id).await {
440                log::error!("Failed to unsubscribe from instrument: {e}");
441            }
442            Ok(())
443        })
444    }
445
446    #[pyo3(name = "unsubscribe_book")]
447    fn py_unsubscribe_book<'py>(
448        &self,
449        py: Python<'py>,
450        instrument_id: InstrumentId,
451    ) -> PyResult<Bound<'py, PyAny>> {
452        let client = self.clone();
453
454        pyo3_async_runtimes::tokio::future_into_py(py, async move {
455            if let Err(e) = client.unsubscribe_book(instrument_id).await {
456                log::error!("Failed to unsubscribe from order book: {e}");
457            }
458            Ok(())
459        })
460    }
461
462    #[pyo3(name = "unsubscribe_book_25")]
463    fn py_unsubscribe_book_25<'py>(
464        &self,
465        py: Python<'py>,
466        instrument_id: InstrumentId,
467    ) -> PyResult<Bound<'py, PyAny>> {
468        let client = self.clone();
469
470        pyo3_async_runtimes::tokio::future_into_py(py, async move {
471            if let Err(e) = client.unsubscribe_book_25(instrument_id).await {
472                log::error!("Failed to unsubscribe from order book 25: {e}");
473            }
474            Ok(())
475        })
476    }
477
478    #[pyo3(name = "unsubscribe_book_depth10")]
479    fn py_unsubscribe_book_depth10<'py>(
480        &self,
481        py: Python<'py>,
482        instrument_id: InstrumentId,
483    ) -> PyResult<Bound<'py, PyAny>> {
484        let client = self.clone();
485
486        pyo3_async_runtimes::tokio::future_into_py(py, async move {
487            if let Err(e) = client.unsubscribe_book_depth10(instrument_id).await {
488                log::error!("Failed to unsubscribe from order book depth 10: {e}");
489            }
490            Ok(())
491        })
492    }
493
494    #[pyo3(name = "unsubscribe_quotes")]
495    fn py_unsubscribe_quotes<'py>(
496        &self,
497        py: Python<'py>,
498        instrument_id: InstrumentId,
499    ) -> PyResult<Bound<'py, PyAny>> {
500        let client = self.clone();
501
502        pyo3_async_runtimes::tokio::future_into_py(py, async move {
503            if let Err(e) = client.unsubscribe_quotes(instrument_id).await {
504                log::error!("Failed to unsubscribe from quotes: {e}");
505            }
506            Ok(())
507        })
508    }
509
510    #[pyo3(name = "unsubscribe_trades")]
511    fn py_unsubscribe_trades<'py>(
512        &self,
513        py: Python<'py>,
514        instrument_id: InstrumentId,
515    ) -> PyResult<Bound<'py, PyAny>> {
516        let client = self.clone();
517
518        pyo3_async_runtimes::tokio::future_into_py(py, async move {
519            if let Err(e) = client.unsubscribe_trades(instrument_id).await {
520                log::error!("Failed to unsubscribe from trades: {e}");
521            }
522            Ok(())
523        })
524    }
525
526    #[pyo3(name = "unsubscribe_mark_prices")]
527    fn py_unsubscribe_mark_prices<'py>(
528        &self,
529        py: Python<'py>,
530        instrument_id: InstrumentId,
531    ) -> PyResult<Bound<'py, PyAny>> {
532        let client = self.clone();
533
534        pyo3_async_runtimes::tokio::future_into_py(py, async move {
535            if let Err(e) = client.unsubscribe_mark_prices(instrument_id).await {
536                log::error!("Failed to unsubscribe from mark prices: {e}");
537            }
538            Ok(())
539        })
540    }
541
542    #[pyo3(name = "unsubscribe_index_prices")]
543    fn py_unsubscribe_index_prices<'py>(
544        &self,
545        py: Python<'py>,
546        instrument_id: InstrumentId,
547    ) -> PyResult<Bound<'py, PyAny>> {
548        let client = self.clone();
549
550        pyo3_async_runtimes::tokio::future_into_py(py, async move {
551            if let Err(e) = client.unsubscribe_index_prices(instrument_id).await {
552                log::error!("Failed to unsubscribe from index prices: {e}");
553            }
554            Ok(())
555        })
556    }
557
558    #[pyo3(name = "unsubscribe_funding_rates")]
559    fn py_unsubscribe_funding_rates<'py>(
560        &self,
561        py: Python<'py>,
562        instrument_id: InstrumentId,
563    ) -> PyResult<Bound<'py, PyAny>> {
564        let client = self.clone();
565        pyo3_async_runtimes::tokio::future_into_py(py, async move {
566            if let Err(e) = client.unsubscribe_funding_rates(instrument_id).await {
567                log::error!("Failed to unsubscribe from funding rates: {e}");
568            }
569            Ok(())
570        })
571    }
572
573    #[pyo3(name = "unsubscribe_bars")]
574    fn py_unsubscribe_bars<'py>(
575        &self,
576        py: Python<'py>,
577        bar_type: BarType,
578    ) -> PyResult<Bound<'py, PyAny>> {
579        let client = self.clone();
580
581        pyo3_async_runtimes::tokio::future_into_py(py, async move {
582            if let Err(e) = client.unsubscribe_bars(bar_type).await {
583                log::error!("Failed to unsubscribe from bars: {e}");
584            }
585            Ok(())
586        })
587    }
588
589    #[pyo3(name = "subscribe_orders")]
590    fn py_subscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
591        let client = self.clone();
592
593        pyo3_async_runtimes::tokio::future_into_py(py, async move {
594            if let Err(e) = client.subscribe_orders().await {
595                log::error!("Failed to subscribe to orders: {e}");
596            }
597            Ok(())
598        })
599    }
600
601    #[pyo3(name = "subscribe_executions")]
602    fn py_subscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
603        let client = self.clone();
604
605        pyo3_async_runtimes::tokio::future_into_py(py, async move {
606            if let Err(e) = client.subscribe_executions().await {
607                log::error!("Failed to subscribe to executions: {e}");
608            }
609            Ok(())
610        })
611    }
612
613    #[pyo3(name = "subscribe_positions")]
614    fn py_subscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
615        let client = self.clone();
616
617        pyo3_async_runtimes::tokio::future_into_py(py, async move {
618            if let Err(e) = client.subscribe_positions().await {
619                log::error!("Failed to subscribe to positions: {e}");
620            }
621            Ok(())
622        })
623    }
624
625    #[pyo3(name = "subscribe_margin")]
626    fn py_subscribe_margin<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
627        let client = self.clone();
628
629        pyo3_async_runtimes::tokio::future_into_py(py, async move {
630            if let Err(e) = client.subscribe_margin().await {
631                log::error!("Failed to subscribe to margin: {e}");
632            }
633            Ok(())
634        })
635    }
636
637    #[pyo3(name = "subscribe_wallet")]
638    fn py_subscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
639        let client = self.clone();
640
641        pyo3_async_runtimes::tokio::future_into_py(py, async move {
642            if let Err(e) = client.subscribe_wallet().await {
643                log::error!("Failed to subscribe to wallet: {e}");
644            }
645            Ok(())
646        })
647    }
648
649    #[pyo3(name = "unsubscribe_orders")]
650    fn py_unsubscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
651        let client = self.clone();
652
653        pyo3_async_runtimes::tokio::future_into_py(py, async move {
654            if let Err(e) = client.unsubscribe_orders().await {
655                log::error!("Failed to unsubscribe from orders: {e}");
656            }
657            Ok(())
658        })
659    }
660
661    #[pyo3(name = "unsubscribe_executions")]
662    fn py_unsubscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
663        let client = self.clone();
664
665        pyo3_async_runtimes::tokio::future_into_py(py, async move {
666            if let Err(e) = client.unsubscribe_executions().await {
667                log::error!("Failed to unsubscribe from executions: {e}");
668            }
669            Ok(())
670        })
671    }
672
673    #[pyo3(name = "unsubscribe_positions")]
674    fn py_unsubscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
675        let client = self.clone();
676
677        pyo3_async_runtimes::tokio::future_into_py(py, async move {
678            if let Err(e) = client.unsubscribe_positions().await {
679                log::error!("Failed to unsubscribe from positions: {e}");
680            }
681            Ok(())
682        })
683    }
684
685    #[pyo3(name = "unsubscribe_margin")]
686    fn py_unsubscribe_margin<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
687        let client = self.clone();
688
689        pyo3_async_runtimes::tokio::future_into_py(py, async move {
690            if let Err(e) = client.unsubscribe_margin().await {
691                log::error!("Failed to unsubscribe from margin: {e}");
692            }
693            Ok(())
694        })
695    }
696
697    #[pyo3(name = "unsubscribe_wallet")]
698    fn py_unsubscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
699        let client = self.clone();
700
701        pyo3_async_runtimes::tokio::future_into_py(py, async move {
702            if let Err(e) = client.unsubscribe_wallet().await {
703                log::error!("Failed to unsubscribe from wallet: {e}");
704            }
705            Ok(())
706        })
707    }
708}
709
710pub fn call_python(py: Python, callback: &Py<PyAny>, py_obj: Py<PyAny>) {
711    if let Err(e) = callback.call1(py, (py_obj,)) {
712        tracing::error!("Error calling Python: {e}");
713    }
714}