nautilus_bitmex/python/
websocket.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the `BitmEX` WebSocket client.
17//!
18//! # Design Pattern: Clone and Share State
19//!
20//! The WebSocket client must be cloned for async operations because PyO3's `future_into_py`
21//! requires `'static` futures (cannot borrow from `self`). To ensure clones share the same
22//! connection state, key fields use `Arc<RwLock<T>>`:
23//!
24//! - `inner: Arc<RwLock<Option<WebSocketClient>>>` - The WebSocket connection.
25//!
26//! Without shared state, clones would be independent, causing:
27//! - Lost WebSocket messages.
28//! - Missing instrument data.
29//! - Connection state desynchronization.
30//!
31//! ## Connection Flow
32//!
33//! 1. Clone the client for async operation.
34//! 2. Connect and populate shared state on the clone.
35//! 3. Spawn stream handler as background task.
36//! 4. Return immediately (non-blocking).
37//!
38//! ## Important Notes
39//!
40//! - Never use `block_on()` - it blocks the runtime.
41//! - Always clone before async blocks for lifetime requirements.
42//! - `RwLock` is preferred over Mutex (many reads, few writes).
43
44use futures_util::StreamExt;
45use nautilus_core::python::{to_pyruntime_err, to_pyvalue_err};
46use nautilus_model::{
47    data::bar::BarType,
48    identifiers::{AccountId, InstrumentId},
49    python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
50};
51use pyo3::{conversion::IntoPyObjectExt, exceptions::PyRuntimeError, prelude::*};
52
53use crate::websocket::{BitmexWebSocketClient, messages::NautilusWsMessage};
54
55#[pymethods]
56impl BitmexWebSocketClient {
57    #[new]
58    #[pyo3(signature = (url=None, api_key=None, api_secret=None, account_id=None, heartbeat=None, testnet=false))]
59    fn py_new(
60        url: Option<String>,
61        api_key: Option<String>,
62        api_secret: Option<String>,
63        account_id: Option<AccountId>,
64        heartbeat: Option<u64>,
65        testnet: bool,
66    ) -> PyResult<Self> {
67        // If both api_key and api_secret are None, try to load from environment
68        let (final_api_key, final_api_secret) = if api_key.is_none() && api_secret.is_none() {
69            // Choose environment variables based on testnet flag
70            let (key_var, secret_var) = if testnet {
71                ("BITMEX_TESTNET_API_KEY", "BITMEX_TESTNET_API_SECRET")
72            } else {
73                ("BITMEX_API_KEY", "BITMEX_API_SECRET")
74            };
75
76            let env_key = std::env::var(key_var).ok();
77            let env_secret = std::env::var(secret_var).ok();
78            (env_key, env_secret)
79        } else {
80            (api_key, api_secret)
81        };
82
83        Self::new(url, final_api_key, final_api_secret, account_id, heartbeat)
84            .map_err(to_pyvalue_err)
85    }
86
87    #[staticmethod]
88    #[pyo3(name = "from_env")]
89    fn py_from_env() -> PyResult<Self> {
90        Self::from_env().map_err(to_pyvalue_err)
91    }
92
93    #[getter]
94    #[pyo3(name = "url")]
95    #[must_use]
96    pub const fn py_url(&self) -> &str {
97        self.url()
98    }
99
100    #[getter]
101    #[pyo3(name = "api_key")]
102    #[must_use]
103    pub fn py_api_key(&self) -> Option<&str> {
104        self.api_key()
105    }
106
107    #[pyo3(name = "is_active")]
108    fn py_is_active(&mut self) -> bool {
109        self.is_active()
110    }
111
112    #[pyo3(name = "is_closed")]
113    fn py_is_closed(&mut self) -> bool {
114        self.is_closed()
115    }
116
117    #[pyo3(name = "get_subscriptions")]
118    fn py_get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
119        self.get_subscriptions(instrument_id)
120    }
121
122    #[pyo3(name = "set_account_id")]
123    pub fn py_set_account_id(&mut self, account_id: AccountId) {
124        self.set_account_id(account_id);
125    }
126
127    #[pyo3(name = "connect")]
128    fn py_connect<'py>(
129        &mut self,
130        py: Python<'py>,
131        instruments: Vec<Py<PyAny>>,
132        callback: Py<PyAny>,
133    ) -> PyResult<Bound<'py, PyAny>> {
134        let mut instruments_any = Vec::new();
135        for inst in instruments {
136            let inst_any = pyobject_to_instrument_any(py, inst)?;
137            instruments_any.push(inst_any);
138        }
139
140        self.initialize_instruments_cache(instruments_any);
141
142        let mut client = self.clone();
143
144        pyo3_async_runtimes::tokio::future_into_py(py, async move {
145            client.connect().await.map_err(to_pyruntime_err)?;
146
147            let stream = client.stream();
148
149            tokio::spawn(async move {
150                tokio::pin!(stream);
151
152                while let Some(msg) = stream.next().await {
153                    Python::attach(|py| match msg {
154                        NautilusWsMessage::Data(data_vec) => {
155                            for data in data_vec {
156                                let py_obj = data_to_pycapsule(py, data);
157                                call_python(py, &callback, py_obj);
158                            }
159                        }
160                        NautilusWsMessage::OrderStatusReports(reports) => {
161                            for report in reports {
162                                if let Ok(py_obj) = report.into_py_any(py) {
163                                    call_python(py, &callback, py_obj);
164                                }
165                            }
166                        }
167                        NautilusWsMessage::FillReports(reports) => {
168                            for report in reports {
169                                if let Ok(py_obj) = report.into_py_any(py) {
170                                    call_python(py, &callback, py_obj);
171                                }
172                            }
173                        }
174                        NautilusWsMessage::PositionStatusReport(report) => {
175                            if let Ok(py_obj) = report.into_py_any(py) {
176                                call_python(py, &callback, py_obj);
177                            }
178                        }
179                        NautilusWsMessage::FundingRateUpdates(updates) => {
180                            for update in updates {
181                                if let Ok(py_obj) = update.into_py_any(py) {
182                                    call_python(py, &callback, py_obj);
183                                }
184                            }
185                        }
186                        NautilusWsMessage::AccountState(account_state) => {
187                            if let Ok(py_obj) = account_state.into_py_any(py) {
188                                call_python(py, &callback, py_obj);
189                            }
190                        }
191                        NautilusWsMessage::OrderUpdated(event) => {
192                            if let Ok(py_obj) = event.into_py_any(py) {
193                                call_python(py, &callback, py_obj);
194                            }
195                        }
196                        NautilusWsMessage::Reconnected => {} // Nothing to handle
197                    });
198                }
199            });
200
201            Ok(())
202        })
203    }
204
205    #[pyo3(name = "wait_until_active")]
206    fn py_wait_until_active<'py>(
207        &self,
208        py: Python<'py>,
209        timeout_secs: f64,
210    ) -> PyResult<Bound<'py, PyAny>> {
211        let client = self.clone();
212
213        pyo3_async_runtimes::tokio::future_into_py(py, async move {
214            client
215                .wait_until_active(timeout_secs)
216                .await
217                .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
218            Ok(())
219        })
220    }
221
222    #[pyo3(name = "close")]
223    fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
224        let mut client = self.clone();
225
226        pyo3_async_runtimes::tokio::future_into_py(py, async move {
227            if let Err(e) = client.close().await {
228                log::error!("Error on close: {e}");
229            }
230            Ok(())
231        })
232    }
233
234    #[pyo3(name = "subscribe_instruments")]
235    fn py_subscribe_instruments<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
236        let client = self.clone();
237
238        pyo3_async_runtimes::tokio::future_into_py(py, async move {
239            if let Err(e) = client.subscribe_instruments().await {
240                log::error!("Failed to subscribe to instruments: {e}");
241            }
242            Ok(())
243        })
244    }
245
246    #[pyo3(name = "subscribe_instrument")]
247    fn py_subscribe_instrument<'py>(
248        &self,
249        py: Python<'py>,
250        instrument_id: InstrumentId,
251    ) -> PyResult<Bound<'py, PyAny>> {
252        let client = self.clone();
253
254        pyo3_async_runtimes::tokio::future_into_py(py, async move {
255            if let Err(e) = client.subscribe_instrument(instrument_id).await {
256                log::error!("Failed to subscribe to instrument: {e}");
257            }
258            Ok(())
259        })
260    }
261
262    #[pyo3(name = "subscribe_book")]
263    fn py_subscribe_book<'py>(
264        &self,
265        py: Python<'py>,
266        instrument_id: InstrumentId,
267    ) -> PyResult<Bound<'py, PyAny>> {
268        let client = self.clone();
269
270        pyo3_async_runtimes::tokio::future_into_py(py, async move {
271            if let Err(e) = client.subscribe_book(instrument_id).await {
272                log::error!("Failed to subscribe to order book: {e}");
273            }
274            Ok(())
275        })
276    }
277
278    #[pyo3(name = "subscribe_book_25")]
279    fn py_subscribe_book_25<'py>(
280        &self,
281        py: Python<'py>,
282        instrument_id: InstrumentId,
283    ) -> PyResult<Bound<'py, PyAny>> {
284        let client = self.clone();
285
286        pyo3_async_runtimes::tokio::future_into_py(py, async move {
287            if let Err(e) = client.subscribe_book_25(instrument_id).await {
288                log::error!("Failed to subscribe to order book 25: {e}");
289            }
290            Ok(())
291        })
292    }
293
294    #[pyo3(name = "subscribe_book_depth10")]
295    fn py_subscribe_book_depth10<'py>(
296        &self,
297        py: Python<'py>,
298        instrument_id: InstrumentId,
299    ) -> PyResult<Bound<'py, PyAny>> {
300        let client = self.clone();
301
302        pyo3_async_runtimes::tokio::future_into_py(py, async move {
303            if let Err(e) = client.subscribe_book_depth10(instrument_id).await {
304                log::error!("Failed to subscribe to order book depth 10: {e}");
305            }
306            Ok(())
307        })
308    }
309
310    #[pyo3(name = "subscribe_quotes")]
311    fn py_subscribe_quotes<'py>(
312        &self,
313        py: Python<'py>,
314        instrument_id: InstrumentId,
315    ) -> PyResult<Bound<'py, PyAny>> {
316        let client = self.clone();
317
318        pyo3_async_runtimes::tokio::future_into_py(py, async move {
319            if let Err(e) = client.subscribe_quotes(instrument_id).await {
320                log::error!("Failed to subscribe to quotes: {e}");
321            }
322            Ok(())
323        })
324    }
325
326    #[pyo3(name = "subscribe_trades")]
327    fn py_subscribe_trades<'py>(
328        &self,
329        py: Python<'py>,
330        instrument_id: InstrumentId,
331    ) -> PyResult<Bound<'py, PyAny>> {
332        let client = self.clone();
333
334        pyo3_async_runtimes::tokio::future_into_py(py, async move {
335            if let Err(e) = client.subscribe_trades(instrument_id).await {
336                log::error!("Failed to subscribe to trades: {e}");
337            }
338            Ok(())
339        })
340    }
341
342    #[pyo3(name = "subscribe_mark_prices")]
343    fn py_subscribe_mark_prices<'py>(
344        &self,
345        py: Python<'py>,
346        instrument_id: InstrumentId,
347    ) -> PyResult<Bound<'py, PyAny>> {
348        let client = self.clone();
349
350        pyo3_async_runtimes::tokio::future_into_py(py, async move {
351            if let Err(e) = client.subscribe_mark_prices(instrument_id).await {
352                log::error!("Failed to subscribe to mark prices: {e}");
353            }
354            Ok(())
355        })
356    }
357
358    #[pyo3(name = "subscribe_index_prices")]
359    fn py_subscribe_index_prices<'py>(
360        &self,
361        py: Python<'py>,
362        instrument_id: InstrumentId,
363    ) -> PyResult<Bound<'py, PyAny>> {
364        let client = self.clone();
365
366        pyo3_async_runtimes::tokio::future_into_py(py, async move {
367            if let Err(e) = client.subscribe_index_prices(instrument_id).await {
368                log::error!("Failed to subscribe to index prices: {e}");
369            }
370            Ok(())
371        })
372    }
373
374    #[pyo3(name = "subscribe_funding_rates")]
375    fn py_subscribe_funding_rates<'py>(
376        &self,
377        py: Python<'py>,
378        instrument_id: InstrumentId,
379    ) -> PyResult<Bound<'py, PyAny>> {
380        let client = self.clone();
381
382        pyo3_async_runtimes::tokio::future_into_py(py, async move {
383            if let Err(e) = client.subscribe_funding_rates(instrument_id).await {
384                log::error!("Failed to subscribe to funding: {e}");
385            }
386            Ok(())
387        })
388    }
389
390    #[pyo3(name = "subscribe_bars")]
391    fn py_subscribe_bars<'py>(
392        &self,
393        py: Python<'py>,
394        bar_type: BarType,
395    ) -> PyResult<Bound<'py, PyAny>> {
396        let client = self.clone();
397
398        pyo3_async_runtimes::tokio::future_into_py(py, async move {
399            if let Err(e) = client.subscribe_bars(bar_type).await {
400                log::error!("Failed to subscribe to bars: {e}");
401            }
402            Ok(())
403        })
404    }
405
406    #[pyo3(name = "unsubscribe_instruments")]
407    fn py_unsubscribe_instruments<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
408        let client = self.clone();
409
410        pyo3_async_runtimes::tokio::future_into_py(py, async move {
411            if let Err(e) = client.unsubscribe_instruments().await {
412                log::error!("Failed to unsubscribe from instruments: {e}");
413            }
414            Ok(())
415        })
416    }
417
418    #[pyo3(name = "unsubscribe_instrument")]
419    fn py_unsubscribe_instrument<'py>(
420        &self,
421        py: Python<'py>,
422        instrument_id: InstrumentId,
423    ) -> PyResult<Bound<'py, PyAny>> {
424        let client = self.clone();
425
426        pyo3_async_runtimes::tokio::future_into_py(py, async move {
427            if let Err(e) = client.unsubscribe_instrument(instrument_id).await {
428                log::error!("Failed to unsubscribe from instrument: {e}");
429            }
430            Ok(())
431        })
432    }
433
434    #[pyo3(name = "unsubscribe_book")]
435    fn py_unsubscribe_book<'py>(
436        &self,
437        py: Python<'py>,
438        instrument_id: InstrumentId,
439    ) -> PyResult<Bound<'py, PyAny>> {
440        let client = self.clone();
441
442        pyo3_async_runtimes::tokio::future_into_py(py, async move {
443            if let Err(e) = client.unsubscribe_book(instrument_id).await {
444                log::error!("Failed to unsubscribe from order book: {e}");
445            }
446            Ok(())
447        })
448    }
449
450    #[pyo3(name = "unsubscribe_book_25")]
451    fn py_unsubscribe_book_25<'py>(
452        &self,
453        py: Python<'py>,
454        instrument_id: InstrumentId,
455    ) -> PyResult<Bound<'py, PyAny>> {
456        let client = self.clone();
457
458        pyo3_async_runtimes::tokio::future_into_py(py, async move {
459            if let Err(e) = client.unsubscribe_book_25(instrument_id).await {
460                log::error!("Failed to unsubscribe from order book 25: {e}");
461            }
462            Ok(())
463        })
464    }
465
466    #[pyo3(name = "unsubscribe_book_depth10")]
467    fn py_unsubscribe_book_depth10<'py>(
468        &self,
469        py: Python<'py>,
470        instrument_id: InstrumentId,
471    ) -> PyResult<Bound<'py, PyAny>> {
472        let client = self.clone();
473
474        pyo3_async_runtimes::tokio::future_into_py(py, async move {
475            if let Err(e) = client.unsubscribe_book_depth10(instrument_id).await {
476                log::error!("Failed to unsubscribe from order book depth 10: {e}");
477            }
478            Ok(())
479        })
480    }
481
482    #[pyo3(name = "unsubscribe_quotes")]
483    fn py_unsubscribe_quotes<'py>(
484        &self,
485        py: Python<'py>,
486        instrument_id: InstrumentId,
487    ) -> PyResult<Bound<'py, PyAny>> {
488        let client = self.clone();
489
490        pyo3_async_runtimes::tokio::future_into_py(py, async move {
491            if let Err(e) = client.unsubscribe_quotes(instrument_id).await {
492                log::error!("Failed to unsubscribe from quotes: {e}");
493            }
494            Ok(())
495        })
496    }
497
498    #[pyo3(name = "unsubscribe_trades")]
499    fn py_unsubscribe_trades<'py>(
500        &self,
501        py: Python<'py>,
502        instrument_id: InstrumentId,
503    ) -> PyResult<Bound<'py, PyAny>> {
504        let client = self.clone();
505
506        pyo3_async_runtimes::tokio::future_into_py(py, async move {
507            if let Err(e) = client.unsubscribe_trades(instrument_id).await {
508                log::error!("Failed to unsubscribe from trades: {e}");
509            }
510            Ok(())
511        })
512    }
513
514    #[pyo3(name = "unsubscribe_mark_prices")]
515    fn py_unsubscribe_mark_prices<'py>(
516        &self,
517        py: Python<'py>,
518        instrument_id: InstrumentId,
519    ) -> PyResult<Bound<'py, PyAny>> {
520        let client = self.clone();
521
522        pyo3_async_runtimes::tokio::future_into_py(py, async move {
523            if let Err(e) = client.unsubscribe_mark_prices(instrument_id).await {
524                log::error!("Failed to unsubscribe from mark prices: {e}");
525            }
526            Ok(())
527        })
528    }
529
530    #[pyo3(name = "unsubscribe_index_prices")]
531    fn py_unsubscribe_index_prices<'py>(
532        &self,
533        py: Python<'py>,
534        instrument_id: InstrumentId,
535    ) -> PyResult<Bound<'py, PyAny>> {
536        let client = self.clone();
537
538        pyo3_async_runtimes::tokio::future_into_py(py, async move {
539            if let Err(e) = client.unsubscribe_index_prices(instrument_id).await {
540                log::error!("Failed to unsubscribe from index prices: {e}");
541            }
542            Ok(())
543        })
544    }
545
546    #[pyo3(name = "unsubscribe_funding_rates")]
547    fn py_unsubscribe_funding_rates<'py>(
548        &self,
549        py: Python<'py>,
550        instrument_id: InstrumentId,
551    ) -> PyResult<Bound<'py, PyAny>> {
552        let client = self.clone();
553        pyo3_async_runtimes::tokio::future_into_py(py, async move {
554            if let Err(e) = client.unsubscribe_funding_rates(instrument_id).await {
555                log::error!("Failed to unsubscribe from funding rates: {e}");
556            }
557            Ok(())
558        })
559    }
560
561    #[pyo3(name = "unsubscribe_bars")]
562    fn py_unsubscribe_bars<'py>(
563        &self,
564        py: Python<'py>,
565        bar_type: BarType,
566    ) -> PyResult<Bound<'py, PyAny>> {
567        let client = self.clone();
568
569        pyo3_async_runtimes::tokio::future_into_py(py, async move {
570            if let Err(e) = client.unsubscribe_bars(bar_type).await {
571                log::error!("Failed to unsubscribe from bars: {e}");
572            }
573            Ok(())
574        })
575    }
576
577    #[pyo3(name = "subscribe_orders")]
578    fn py_subscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
579        let client = self.clone();
580
581        pyo3_async_runtimes::tokio::future_into_py(py, async move {
582            if let Err(e) = client.subscribe_orders().await {
583                log::error!("Failed to subscribe to orders: {e}");
584            }
585            Ok(())
586        })
587    }
588
589    #[pyo3(name = "subscribe_executions")]
590    fn py_subscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
591        let client = self.clone();
592
593        pyo3_async_runtimes::tokio::future_into_py(py, async move {
594            if let Err(e) = client.subscribe_executions().await {
595                log::error!("Failed to subscribe to executions: {e}");
596            }
597            Ok(())
598        })
599    }
600
601    #[pyo3(name = "subscribe_positions")]
602    fn py_subscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
603        let client = self.clone();
604
605        pyo3_async_runtimes::tokio::future_into_py(py, async move {
606            if let Err(e) = client.subscribe_positions().await {
607                log::error!("Failed to subscribe to positions: {e}");
608            }
609            Ok(())
610        })
611    }
612
613    #[pyo3(name = "subscribe_margin")]
614    fn py_subscribe_margin<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
615        let client = self.clone();
616
617        pyo3_async_runtimes::tokio::future_into_py(py, async move {
618            if let Err(e) = client.subscribe_margin().await {
619                log::error!("Failed to subscribe to margin: {e}");
620            }
621            Ok(())
622        })
623    }
624
625    #[pyo3(name = "subscribe_wallet")]
626    fn py_subscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
627        let client = self.clone();
628
629        pyo3_async_runtimes::tokio::future_into_py(py, async move {
630            if let Err(e) = client.subscribe_wallet().await {
631                log::error!("Failed to subscribe to wallet: {e}");
632            }
633            Ok(())
634        })
635    }
636
637    #[pyo3(name = "unsubscribe_orders")]
638    fn py_unsubscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
639        let client = self.clone();
640
641        pyo3_async_runtimes::tokio::future_into_py(py, async move {
642            if let Err(e) = client.unsubscribe_orders().await {
643                log::error!("Failed to unsubscribe from orders: {e}");
644            }
645            Ok(())
646        })
647    }
648
649    #[pyo3(name = "unsubscribe_executions")]
650    fn py_unsubscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
651        let client = self.clone();
652
653        pyo3_async_runtimes::tokio::future_into_py(py, async move {
654            if let Err(e) = client.unsubscribe_executions().await {
655                log::error!("Failed to unsubscribe from executions: {e}");
656            }
657            Ok(())
658        })
659    }
660
661    #[pyo3(name = "unsubscribe_positions")]
662    fn py_unsubscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
663        let client = self.clone();
664
665        pyo3_async_runtimes::tokio::future_into_py(py, async move {
666            if let Err(e) = client.unsubscribe_positions().await {
667                log::error!("Failed to unsubscribe from positions: {e}");
668            }
669            Ok(())
670        })
671    }
672
673    #[pyo3(name = "unsubscribe_margin")]
674    fn py_unsubscribe_margin<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
675        let client = self.clone();
676
677        pyo3_async_runtimes::tokio::future_into_py(py, async move {
678            if let Err(e) = client.unsubscribe_margin().await {
679                log::error!("Failed to unsubscribe from margin: {e}");
680            }
681            Ok(())
682        })
683    }
684
685    #[pyo3(name = "unsubscribe_wallet")]
686    fn py_unsubscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
687        let client = self.clone();
688
689        pyo3_async_runtimes::tokio::future_into_py(py, async move {
690            if let Err(e) = client.unsubscribe_wallet().await {
691                log::error!("Failed to unsubscribe from wallet: {e}");
692            }
693            Ok(())
694        })
695    }
696}
697
698pub fn call_python(py: Python, callback: &Py<PyAny>, py_obj: Py<PyAny>) {
699    if let Err(e) = callback.call1(py, (py_obj,)) {
700        tracing::error!("Error calling Python: {e}");
701    }
702}