Skip to main content

nautilus_bitmex/python/
websocket.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the `BitmEX` WebSocket client.
17//!
18//! # Design Pattern: Clone and Share State
19//!
20//! The WebSocket client must be cloned for async operations because PyO3's `future_into_py`
21//! requires `'static` futures (cannot borrow from `self`). To ensure clones share the same
22//! connection state, key fields use `Arc<RwLock<T>>`:
23//!
24//! - `inner: Arc<RwLock<Option<WebSocketClient>>>` - The WebSocket connection.
25//!
26//! Without shared state, clones would be independent, causing:
27//! - Lost WebSocket messages.
28//! - Missing instrument data.
29//! - Connection state desynchronization.
30//!
31//! ## Connection Flow
32//!
33//! 1. Clone the client for async operation.
34//! 2. Connect and populate shared state on the clone.
35//! 3. Spawn stream handler as background task.
36//! 4. Return immediately (non-blocking).
37//!
38//! ## Important Notes
39//!
40//! - Never use `block_on()` - it blocks the runtime.
41//! - Always clone before async blocks for lifetime requirements.
42//! - `RwLock` is preferred over Mutex (many reads, few writes).
43
44use futures_util::StreamExt;
45use nautilus_common::live::get_runtime;
46use nautilus_core::python::{call_python, to_pyruntime_err, to_pyvalue_err};
47use nautilus_model::{
48    data::bar::BarType,
49    identifiers::{AccountId, InstrumentId},
50    python::{
51        data::data_to_pycapsule,
52        instruments::{instrument_any_to_pyobject, pyobject_to_instrument_any},
53    },
54};
55use pyo3::{conversion::IntoPyObjectExt, prelude::*};
56
57use crate::websocket::{BitmexWebSocketClient, messages::NautilusWsMessage};
58
59#[pymethods]
60impl BitmexWebSocketClient {
61    #[new]
62    #[pyo3(signature = (url=None, api_key=None, api_secret=None, account_id=None, heartbeat=None, testnet=false))]
63    fn py_new(
64        url: Option<String>,
65        api_key: Option<String>,
66        api_secret: Option<String>,
67        account_id: Option<AccountId>,
68        heartbeat: Option<u64>,
69        testnet: bool,
70    ) -> PyResult<Self> {
71        Self::new_with_env(url, api_key, api_secret, account_id, heartbeat, testnet)
72            .map_err(to_pyvalue_err)
73    }
74
75    #[staticmethod]
76    #[pyo3(name = "from_env")]
77    fn py_from_env() -> PyResult<Self> {
78        Self::from_env().map_err(to_pyvalue_err)
79    }
80
81    #[getter]
82    #[pyo3(name = "url")]
83    #[must_use]
84    pub const fn py_url(&self) -> &str {
85        self.url()
86    }
87
88    #[getter]
89    #[pyo3(name = "api_key")]
90    #[must_use]
91    pub fn py_api_key(&self) -> Option<&str> {
92        self.api_key()
93    }
94
95    #[getter]
96    #[pyo3(name = "api_key_masked")]
97    #[must_use]
98    pub fn py_api_key_masked(&self) -> Option<String> {
99        self.api_key_masked()
100    }
101
102    #[pyo3(name = "is_active")]
103    fn py_is_active(&mut self) -> bool {
104        self.is_active()
105    }
106
107    #[pyo3(name = "is_closed")]
108    fn py_is_closed(&mut self) -> bool {
109        self.is_closed()
110    }
111
112    #[pyo3(name = "get_subscriptions")]
113    fn py_get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
114        self.get_subscriptions(instrument_id)
115    }
116
117    #[pyo3(name = "set_account_id")]
118    pub fn py_set_account_id(&mut self, account_id: AccountId) {
119        self.set_account_id(account_id);
120    }
121
122    #[pyo3(name = "cache_instrument")]
123    fn py_cache_instrument(&self, py: Python, instrument: Py<PyAny>) -> PyResult<()> {
124        let inst_any = pyobject_to_instrument_any(py, instrument)?;
125        self.cache_instrument(inst_any);
126        Ok(())
127    }
128
129    #[pyo3(name = "connect")]
130    fn py_connect<'py>(
131        &mut self,
132        py: Python<'py>,
133        instruments: Vec<Py<PyAny>>,
134        callback: Py<PyAny>,
135    ) -> PyResult<Bound<'py, PyAny>> {
136        let mut instruments_any = Vec::new();
137        for inst in instruments {
138            let inst_any = pyobject_to_instrument_any(py, inst)?;
139            instruments_any.push(inst_any);
140        }
141
142        self.cache_instruments(instruments_any);
143
144        // We need to clone self to move into the async block,
145        // the clone will be connected and kept alive to maintain the handler.
146        let mut client = self.clone();
147
148        pyo3_async_runtimes::tokio::future_into_py(py, async move {
149            client.connect().await.map_err(to_pyruntime_err)?;
150
151            let stream = client.stream();
152
153            get_runtime().spawn(async move {
154                let _client = client; // Keep client alive for the entire duration
155                tokio::pin!(stream);
156
157                while let Some(msg) = stream.next().await {
158                    Python::attach(|py| match msg {
159                        NautilusWsMessage::Data(data_vec) => {
160                            for data in data_vec {
161                                let py_obj = data_to_pycapsule(py, data);
162                                call_python(py, &callback, py_obj);
163                            }
164                        }
165                        NautilusWsMessage::Instruments(instruments) => {
166                            for instrument in instruments {
167                                if let Ok(py_obj) = instrument_any_to_pyobject(py, instrument) {
168                                    call_python(py, &callback, py_obj);
169                                }
170                            }
171                        }
172                        NautilusWsMessage::OrderStatusReports(reports) => {
173                            for report in reports {
174                                if let Ok(py_obj) = report.into_py_any(py) {
175                                    call_python(py, &callback, py_obj);
176                                }
177                            }
178                        }
179                        NautilusWsMessage::FillReports(reports) => {
180                            for report in reports {
181                                if let Ok(py_obj) = report.into_py_any(py) {
182                                    call_python(py, &callback, py_obj);
183                                }
184                            }
185                        }
186                        NautilusWsMessage::PositionStatusReports(reports) => {
187                            for report in reports {
188                                if let Ok(py_obj) = report.into_py_any(py) {
189                                    call_python(py, &callback, py_obj);
190                                }
191                            }
192                        }
193                        NautilusWsMessage::FundingRateUpdates(updates) => {
194                            for update in updates {
195                                if let Ok(py_obj) = update.into_py_any(py) {
196                                    call_python(py, &callback, py_obj);
197                                }
198                            }
199                        }
200                        NautilusWsMessage::AccountStates(states) => {
201                            for state in states {
202                                if let Ok(py_obj) = state.into_py_any(py) {
203                                    call_python(py, &callback, py_obj);
204                                }
205                            }
206                        }
207                        NautilusWsMessage::OrderUpdated(event) => {
208                            if let Ok(py_obj) = (*event).into_py_any(py) {
209                                call_python(py, &callback, py_obj);
210                            }
211                        }
212                        NautilusWsMessage::OrderUpdates(events) => {
213                            for event in events {
214                                if let Ok(py_obj) = event.into_py_any(py) {
215                                    call_python(py, &callback, py_obj);
216                                }
217                            }
218                        }
219                        NautilusWsMessage::Reconnected => {}
220                        NautilusWsMessage::Authenticated => {}
221                    });
222                }
223            });
224
225            Ok(())
226        })
227    }
228
229    #[pyo3(name = "wait_until_active")]
230    fn py_wait_until_active<'py>(
231        &self,
232        py: Python<'py>,
233        timeout_secs: f64,
234    ) -> PyResult<Bound<'py, PyAny>> {
235        let client = self.clone();
236
237        pyo3_async_runtimes::tokio::future_into_py(py, async move {
238            client
239                .wait_until_active(timeout_secs)
240                .await
241                .map_err(to_pyruntime_err)?;
242            Ok(())
243        })
244    }
245
246    #[pyo3(name = "close")]
247    fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
248        let mut client = self.clone();
249
250        pyo3_async_runtimes::tokio::future_into_py(py, async move {
251            if let Err(e) = client.close().await {
252                log::error!("Error on close: {e}");
253            }
254            Ok(())
255        })
256    }
257
258    #[pyo3(name = "subscribe_instruments")]
259    fn py_subscribe_instruments<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
260        let client = self.clone();
261
262        pyo3_async_runtimes::tokio::future_into_py(py, async move {
263            if let Err(e) = client.subscribe_instruments().await {
264                log::error!("Failed to subscribe to instruments: {e}");
265            }
266            Ok(())
267        })
268    }
269
270    #[pyo3(name = "subscribe_instrument")]
271    fn py_subscribe_instrument<'py>(
272        &self,
273        py: Python<'py>,
274        instrument_id: InstrumentId,
275    ) -> PyResult<Bound<'py, PyAny>> {
276        let client = self.clone();
277
278        pyo3_async_runtimes::tokio::future_into_py(py, async move {
279            if let Err(e) = client.subscribe_instrument(instrument_id).await {
280                log::error!("Failed to subscribe to instrument: {e}");
281            }
282            Ok(())
283        })
284    }
285
286    #[pyo3(name = "subscribe_book")]
287    fn py_subscribe_book<'py>(
288        &self,
289        py: Python<'py>,
290        instrument_id: InstrumentId,
291    ) -> PyResult<Bound<'py, PyAny>> {
292        let client = self.clone();
293
294        pyo3_async_runtimes::tokio::future_into_py(py, async move {
295            if let Err(e) = client.subscribe_book(instrument_id).await {
296                log::error!("Failed to subscribe to order book: {e}");
297            }
298            Ok(())
299        })
300    }
301
302    #[pyo3(name = "subscribe_book_25")]
303    fn py_subscribe_book_25<'py>(
304        &self,
305        py: Python<'py>,
306        instrument_id: InstrumentId,
307    ) -> PyResult<Bound<'py, PyAny>> {
308        let client = self.clone();
309
310        pyo3_async_runtimes::tokio::future_into_py(py, async move {
311            if let Err(e) = client.subscribe_book_25(instrument_id).await {
312                log::error!("Failed to subscribe to order book 25: {e}");
313            }
314            Ok(())
315        })
316    }
317
318    #[pyo3(name = "subscribe_book_depth10")]
319    fn py_subscribe_book_depth10<'py>(
320        &self,
321        py: Python<'py>,
322        instrument_id: InstrumentId,
323    ) -> PyResult<Bound<'py, PyAny>> {
324        let client = self.clone();
325
326        pyo3_async_runtimes::tokio::future_into_py(py, async move {
327            if let Err(e) = client.subscribe_book_depth10(instrument_id).await {
328                log::error!("Failed to subscribe to order book depth 10: {e}");
329            }
330            Ok(())
331        })
332    }
333
334    #[pyo3(name = "subscribe_quotes")]
335    fn py_subscribe_quotes<'py>(
336        &self,
337        py: Python<'py>,
338        instrument_id: InstrumentId,
339    ) -> PyResult<Bound<'py, PyAny>> {
340        let client = self.clone();
341
342        pyo3_async_runtimes::tokio::future_into_py(py, async move {
343            if let Err(e) = client.subscribe_quotes(instrument_id).await {
344                log::error!("Failed to subscribe to quotes: {e}");
345            }
346            Ok(())
347        })
348    }
349
350    #[pyo3(name = "subscribe_trades")]
351    fn py_subscribe_trades<'py>(
352        &self,
353        py: Python<'py>,
354        instrument_id: InstrumentId,
355    ) -> PyResult<Bound<'py, PyAny>> {
356        let client = self.clone();
357
358        pyo3_async_runtimes::tokio::future_into_py(py, async move {
359            if let Err(e) = client.subscribe_trades(instrument_id).await {
360                log::error!("Failed to subscribe to trades: {e}");
361            }
362            Ok(())
363        })
364    }
365
366    #[pyo3(name = "subscribe_mark_prices")]
367    fn py_subscribe_mark_prices<'py>(
368        &self,
369        py: Python<'py>,
370        instrument_id: InstrumentId,
371    ) -> PyResult<Bound<'py, PyAny>> {
372        let client = self.clone();
373
374        pyo3_async_runtimes::tokio::future_into_py(py, async move {
375            if let Err(e) = client.subscribe_mark_prices(instrument_id).await {
376                log::error!("Failed to subscribe to mark prices: {e}");
377            }
378            Ok(())
379        })
380    }
381
382    #[pyo3(name = "subscribe_index_prices")]
383    fn py_subscribe_index_prices<'py>(
384        &self,
385        py: Python<'py>,
386        instrument_id: InstrumentId,
387    ) -> PyResult<Bound<'py, PyAny>> {
388        let client = self.clone();
389
390        pyo3_async_runtimes::tokio::future_into_py(py, async move {
391            if let Err(e) = client.subscribe_index_prices(instrument_id).await {
392                log::error!("Failed to subscribe to index prices: {e}");
393            }
394            Ok(())
395        })
396    }
397
398    #[pyo3(name = "subscribe_funding_rates")]
399    fn py_subscribe_funding_rates<'py>(
400        &self,
401        py: Python<'py>,
402        instrument_id: InstrumentId,
403    ) -> PyResult<Bound<'py, PyAny>> {
404        let client = self.clone();
405
406        pyo3_async_runtimes::tokio::future_into_py(py, async move {
407            if let Err(e) = client.subscribe_funding_rates(instrument_id).await {
408                log::error!("Failed to subscribe to funding: {e}");
409            }
410            Ok(())
411        })
412    }
413
414    #[pyo3(name = "subscribe_bars")]
415    fn py_subscribe_bars<'py>(
416        &self,
417        py: Python<'py>,
418        bar_type: BarType,
419    ) -> PyResult<Bound<'py, PyAny>> {
420        let client = self.clone();
421
422        pyo3_async_runtimes::tokio::future_into_py(py, async move {
423            if let Err(e) = client.subscribe_bars(bar_type).await {
424                log::error!("Failed to subscribe to bars: {e}");
425            }
426            Ok(())
427        })
428    }
429
430    #[pyo3(name = "unsubscribe_instruments")]
431    fn py_unsubscribe_instruments<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
432        let client = self.clone();
433
434        pyo3_async_runtimes::tokio::future_into_py(py, async move {
435            if let Err(e) = client.unsubscribe_instruments().await {
436                log::error!("Failed to unsubscribe from instruments: {e}");
437            }
438            Ok(())
439        })
440    }
441
442    #[pyo3(name = "unsubscribe_instrument")]
443    fn py_unsubscribe_instrument<'py>(
444        &self,
445        py: Python<'py>,
446        instrument_id: InstrumentId,
447    ) -> PyResult<Bound<'py, PyAny>> {
448        let client = self.clone();
449
450        pyo3_async_runtimes::tokio::future_into_py(py, async move {
451            if let Err(e) = client.unsubscribe_instrument(instrument_id).await {
452                log::error!("Failed to unsubscribe from instrument: {e}");
453            }
454            Ok(())
455        })
456    }
457
458    #[pyo3(name = "unsubscribe_book")]
459    fn py_unsubscribe_book<'py>(
460        &self,
461        py: Python<'py>,
462        instrument_id: InstrumentId,
463    ) -> PyResult<Bound<'py, PyAny>> {
464        let client = self.clone();
465
466        pyo3_async_runtimes::tokio::future_into_py(py, async move {
467            if let Err(e) = client.unsubscribe_book(instrument_id).await {
468                log::error!("Failed to unsubscribe from order book: {e}");
469            }
470            Ok(())
471        })
472    }
473
474    #[pyo3(name = "unsubscribe_book_25")]
475    fn py_unsubscribe_book_25<'py>(
476        &self,
477        py: Python<'py>,
478        instrument_id: InstrumentId,
479    ) -> PyResult<Bound<'py, PyAny>> {
480        let client = self.clone();
481
482        pyo3_async_runtimes::tokio::future_into_py(py, async move {
483            if let Err(e) = client.unsubscribe_book_25(instrument_id).await {
484                log::error!("Failed to unsubscribe from order book 25: {e}");
485            }
486            Ok(())
487        })
488    }
489
490    #[pyo3(name = "unsubscribe_book_depth10")]
491    fn py_unsubscribe_book_depth10<'py>(
492        &self,
493        py: Python<'py>,
494        instrument_id: InstrumentId,
495    ) -> PyResult<Bound<'py, PyAny>> {
496        let client = self.clone();
497
498        pyo3_async_runtimes::tokio::future_into_py(py, async move {
499            if let Err(e) = client.unsubscribe_book_depth10(instrument_id).await {
500                log::error!("Failed to unsubscribe from order book depth 10: {e}");
501            }
502            Ok(())
503        })
504    }
505
506    #[pyo3(name = "unsubscribe_quotes")]
507    fn py_unsubscribe_quotes<'py>(
508        &self,
509        py: Python<'py>,
510        instrument_id: InstrumentId,
511    ) -> PyResult<Bound<'py, PyAny>> {
512        let client = self.clone();
513
514        pyo3_async_runtimes::tokio::future_into_py(py, async move {
515            if let Err(e) = client.unsubscribe_quotes(instrument_id).await {
516                log::error!("Failed to unsubscribe from quotes: {e}");
517            }
518            Ok(())
519        })
520    }
521
522    #[pyo3(name = "unsubscribe_trades")]
523    fn py_unsubscribe_trades<'py>(
524        &self,
525        py: Python<'py>,
526        instrument_id: InstrumentId,
527    ) -> PyResult<Bound<'py, PyAny>> {
528        let client = self.clone();
529
530        pyo3_async_runtimes::tokio::future_into_py(py, async move {
531            if let Err(e) = client.unsubscribe_trades(instrument_id).await {
532                log::error!("Failed to unsubscribe from trades: {e}");
533            }
534            Ok(())
535        })
536    }
537
538    #[pyo3(name = "unsubscribe_mark_prices")]
539    fn py_unsubscribe_mark_prices<'py>(
540        &self,
541        py: Python<'py>,
542        instrument_id: InstrumentId,
543    ) -> PyResult<Bound<'py, PyAny>> {
544        let client = self.clone();
545
546        pyo3_async_runtimes::tokio::future_into_py(py, async move {
547            if let Err(e) = client.unsubscribe_mark_prices(instrument_id).await {
548                log::error!("Failed to unsubscribe from mark prices: {e}");
549            }
550            Ok(())
551        })
552    }
553
554    #[pyo3(name = "unsubscribe_index_prices")]
555    fn py_unsubscribe_index_prices<'py>(
556        &self,
557        py: Python<'py>,
558        instrument_id: InstrumentId,
559    ) -> PyResult<Bound<'py, PyAny>> {
560        let client = self.clone();
561
562        pyo3_async_runtimes::tokio::future_into_py(py, async move {
563            if let Err(e) = client.unsubscribe_index_prices(instrument_id).await {
564                log::error!("Failed to unsubscribe from index prices: {e}");
565            }
566            Ok(())
567        })
568    }
569
570    #[pyo3(name = "unsubscribe_funding_rates")]
571    fn py_unsubscribe_funding_rates<'py>(
572        &self,
573        py: Python<'py>,
574        instrument_id: InstrumentId,
575    ) -> PyResult<Bound<'py, PyAny>> {
576        let client = self.clone();
577        pyo3_async_runtimes::tokio::future_into_py(py, async move {
578            if let Err(e) = client.unsubscribe_funding_rates(instrument_id).await {
579                log::error!("Failed to unsubscribe from funding rates: {e}");
580            }
581            Ok(())
582        })
583    }
584
585    #[pyo3(name = "unsubscribe_bars")]
586    fn py_unsubscribe_bars<'py>(
587        &self,
588        py: Python<'py>,
589        bar_type: BarType,
590    ) -> PyResult<Bound<'py, PyAny>> {
591        let client = self.clone();
592
593        pyo3_async_runtimes::tokio::future_into_py(py, async move {
594            if let Err(e) = client.unsubscribe_bars(bar_type).await {
595                log::error!("Failed to unsubscribe from bars: {e}");
596            }
597            Ok(())
598        })
599    }
600
601    #[pyo3(name = "subscribe_orders")]
602    fn py_subscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
603        let client = self.clone();
604
605        pyo3_async_runtimes::tokio::future_into_py(py, async move {
606            if let Err(e) = client.subscribe_orders().await {
607                log::error!("Failed to subscribe to orders: {e}");
608            }
609            Ok(())
610        })
611    }
612
613    #[pyo3(name = "subscribe_executions")]
614    fn py_subscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
615        let client = self.clone();
616
617        pyo3_async_runtimes::tokio::future_into_py(py, async move {
618            if let Err(e) = client.subscribe_executions().await {
619                log::error!("Failed to subscribe to executions: {e}");
620            }
621            Ok(())
622        })
623    }
624
625    #[pyo3(name = "subscribe_positions")]
626    fn py_subscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
627        let client = self.clone();
628
629        pyo3_async_runtimes::tokio::future_into_py(py, async move {
630            if let Err(e) = client.subscribe_positions().await {
631                log::error!("Failed to subscribe to positions: {e}");
632            }
633            Ok(())
634        })
635    }
636
637    #[pyo3(name = "subscribe_margin")]
638    fn py_subscribe_margin<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
639        let client = self.clone();
640
641        pyo3_async_runtimes::tokio::future_into_py(py, async move {
642            if let Err(e) = client.subscribe_margin().await {
643                log::error!("Failed to subscribe to margin: {e}");
644            }
645            Ok(())
646        })
647    }
648
649    #[pyo3(name = "subscribe_wallet")]
650    fn py_subscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
651        let client = self.clone();
652
653        pyo3_async_runtimes::tokio::future_into_py(py, async move {
654            if let Err(e) = client.subscribe_wallet().await {
655                log::error!("Failed to subscribe to wallet: {e}");
656            }
657            Ok(())
658        })
659    }
660
661    #[pyo3(name = "unsubscribe_orders")]
662    fn py_unsubscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
663        let client = self.clone();
664
665        pyo3_async_runtimes::tokio::future_into_py(py, async move {
666            if let Err(e) = client.unsubscribe_orders().await {
667                log::error!("Failed to unsubscribe from orders: {e}");
668            }
669            Ok(())
670        })
671    }
672
673    #[pyo3(name = "unsubscribe_executions")]
674    fn py_unsubscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
675        let client = self.clone();
676
677        pyo3_async_runtimes::tokio::future_into_py(py, async move {
678            if let Err(e) = client.unsubscribe_executions().await {
679                log::error!("Failed to unsubscribe from executions: {e}");
680            }
681            Ok(())
682        })
683    }
684
685    #[pyo3(name = "unsubscribe_positions")]
686    fn py_unsubscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
687        let client = self.clone();
688
689        pyo3_async_runtimes::tokio::future_into_py(py, async move {
690            if let Err(e) = client.unsubscribe_positions().await {
691                log::error!("Failed to unsubscribe from positions: {e}");
692            }
693            Ok(())
694        })
695    }
696
697    #[pyo3(name = "unsubscribe_margin")]
698    fn py_unsubscribe_margin<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
699        let client = self.clone();
700
701        pyo3_async_runtimes::tokio::future_into_py(py, async move {
702            if let Err(e) = client.unsubscribe_margin().await {
703                log::error!("Failed to unsubscribe from margin: {e}");
704            }
705            Ok(())
706        })
707    }
708
709    #[pyo3(name = "unsubscribe_wallet")]
710    fn py_unsubscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
711        let client = self.clone();
712
713        pyo3_async_runtimes::tokio::future_into_py(py, async move {
714            if let Err(e) = client.unsubscribe_wallet().await {
715                log::error!("Failed to unsubscribe from wallet: {e}");
716            }
717            Ok(())
718        })
719    }
720}