nautilus_bitmex/python/
websocket.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the `BitmEX` WebSocket client.
17//!
18//! # Design Pattern: Clone and Share State
19//!
20//! The WebSocket client must be cloned for async operations because PyO3's `future_into_py`
21//! requires `'static` futures (cannot borrow from `self`). To ensure clones share the same
22//! connection state, key fields use `Arc<RwLock<T>>`:
23//!
24//! - `inner: Arc<RwLock<Option<WebSocketClient>>>` - The WebSocket connection.
25//!
26//! Without shared state, clones would be independent, causing:
27//! - Lost WebSocket messages.
28//! - Missing instrument data.
29//! - Connection state desynchronization.
30//!
31//! ## Connection Flow
32//!
33//! 1. Clone the client for async operation.
34//! 2. Connect and populate shared state on the clone.
35//! 3. Spawn stream handler as background task.
36//! 4. Return immediately (non-blocking).
37//!
38//! ## Important Notes
39//!
40//! - Never use `block_on()` - it blocks the runtime.
41//! - Always clone before async blocks for lifetime requirements.
42//! - `RwLock` is preferred over Mutex (many reads, few writes).
43
44use futures_util::StreamExt;
45use nautilus_common::live::get_runtime;
46use nautilus_core::python::{call_python, to_pyruntime_err, to_pyvalue_err};
47use nautilus_model::{
48    data::bar::BarType,
49    identifiers::{AccountId, InstrumentId},
50    python::{
51        data::data_to_pycapsule,
52        instruments::{instrument_any_to_pyobject, pyobject_to_instrument_any},
53    },
54};
55use pyo3::{conversion::IntoPyObjectExt, exceptions::PyRuntimeError, prelude::*};
56
57use crate::websocket::{BitmexWebSocketClient, messages::NautilusWsMessage};
58
59#[pymethods]
60impl BitmexWebSocketClient {
61    #[new]
62    #[pyo3(signature = (url=None, api_key=None, api_secret=None, account_id=None, heartbeat=None, testnet=false))]
63    fn py_new(
64        url: Option<String>,
65        api_key: Option<String>,
66        api_secret: Option<String>,
67        account_id: Option<AccountId>,
68        heartbeat: Option<u64>,
69        testnet: bool,
70    ) -> PyResult<Self> {
71        Self::new_with_env(url, api_key, api_secret, account_id, heartbeat, testnet)
72            .map_err(to_pyvalue_err)
73    }
74
75    #[staticmethod]
76    #[pyo3(name = "from_env")]
77    fn py_from_env() -> PyResult<Self> {
78        Self::from_env().map_err(to_pyvalue_err)
79    }
80
81    #[getter]
82    #[pyo3(name = "url")]
83    #[must_use]
84    pub const fn py_url(&self) -> &str {
85        self.url()
86    }
87
88    #[getter]
89    #[pyo3(name = "api_key")]
90    #[must_use]
91    pub fn py_api_key(&self) -> Option<&str> {
92        self.api_key()
93    }
94
95    #[getter]
96    #[pyo3(name = "api_key_masked")]
97    #[must_use]
98    pub fn py_api_key_masked(&self) -> Option<String> {
99        self.api_key_masked()
100    }
101
102    #[pyo3(name = "is_active")]
103    fn py_is_active(&mut self) -> bool {
104        self.is_active()
105    }
106
107    #[pyo3(name = "is_closed")]
108    fn py_is_closed(&mut self) -> bool {
109        self.is_closed()
110    }
111
112    #[pyo3(name = "get_subscriptions")]
113    fn py_get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
114        self.get_subscriptions(instrument_id)
115    }
116
117    #[pyo3(name = "set_account_id")]
118    pub fn py_set_account_id(&mut self, account_id: AccountId) {
119        self.set_account_id(account_id);
120    }
121
122    #[pyo3(name = "cache_instrument")]
123    fn py_cache_instrument(&self, py: Python, instrument: Py<PyAny>) -> PyResult<()> {
124        let inst_any = pyobject_to_instrument_any(py, instrument)?;
125        self.cache_instrument(inst_any);
126        Ok(())
127    }
128
129    #[pyo3(name = "connect")]
130    fn py_connect<'py>(
131        &mut self,
132        py: Python<'py>,
133        instruments: Vec<Py<PyAny>>,
134        callback: Py<PyAny>,
135    ) -> PyResult<Bound<'py, PyAny>> {
136        let mut instruments_any = Vec::new();
137        for inst in instruments {
138            let inst_any = pyobject_to_instrument_any(py, inst)?;
139            instruments_any.push(inst_any);
140        }
141
142        self.cache_instruments(instruments_any);
143
144        // We need to clone self to move into the async block,
145        // the clone will be connected and kept alive to maintain the handler.
146        let mut client = self.clone();
147
148        pyo3_async_runtimes::tokio::future_into_py(py, async move {
149            client.connect().await.map_err(to_pyruntime_err)?;
150
151            let stream = client.stream();
152
153            get_runtime().spawn(async move {
154                let _client = client; // Keep client alive for the entire duration
155                tokio::pin!(stream);
156
157                while let Some(msg) = stream.next().await {
158                    Python::attach(|py| match msg {
159                        NautilusWsMessage::Data(data_vec) => {
160                            for data in data_vec {
161                                let py_obj = data_to_pycapsule(py, data);
162                                call_python(py, &callback, py_obj);
163                            }
164                        }
165                        NautilusWsMessage::Instruments(instruments) => {
166                            for instrument in instruments {
167                                if let Ok(py_obj) = instrument_any_to_pyobject(py, instrument) {
168                                    call_python(py, &callback, py_obj);
169                                }
170                            }
171                        }
172                        NautilusWsMessage::OrderStatusReports(reports) => {
173                            for report in reports {
174                                if let Ok(py_obj) = report.into_py_any(py) {
175                                    call_python(py, &callback, py_obj);
176                                }
177                            }
178                        }
179                        NautilusWsMessage::FillReports(reports) => {
180                            for report in reports {
181                                if let Ok(py_obj) = report.into_py_any(py) {
182                                    call_python(py, &callback, py_obj);
183                                }
184                            }
185                        }
186                        NautilusWsMessage::PositionStatusReport(report) => {
187                            if let Ok(py_obj) = report.into_py_any(py) {
188                                call_python(py, &callback, py_obj);
189                            }
190                        }
191                        NautilusWsMessage::FundingRateUpdates(updates) => {
192                            for update in updates {
193                                if let Ok(py_obj) = update.into_py_any(py) {
194                                    call_python(py, &callback, py_obj);
195                                }
196                            }
197                        }
198                        NautilusWsMessage::AccountState(account_state) => {
199                            if let Ok(py_obj) = account_state.into_py_any(py) {
200                                call_python(py, &callback, py_obj);
201                            }
202                        }
203                        NautilusWsMessage::OrderUpdated(event) => {
204                            if let Ok(py_obj) = event.into_py_any(py) {
205                                call_python(py, &callback, py_obj);
206                            }
207                        }
208                        NautilusWsMessage::Reconnected => {}
209                        NautilusWsMessage::Authenticated => {}
210                    });
211                }
212            });
213
214            Ok(())
215        })
216    }
217
218    #[pyo3(name = "wait_until_active")]
219    fn py_wait_until_active<'py>(
220        &self,
221        py: Python<'py>,
222        timeout_secs: f64,
223    ) -> PyResult<Bound<'py, PyAny>> {
224        let client = self.clone();
225
226        pyo3_async_runtimes::tokio::future_into_py(py, async move {
227            client
228                .wait_until_active(timeout_secs)
229                .await
230                .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
231            Ok(())
232        })
233    }
234
235    #[pyo3(name = "close")]
236    fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
237        let mut client = self.clone();
238
239        pyo3_async_runtimes::tokio::future_into_py(py, async move {
240            if let Err(e) = client.close().await {
241                log::error!("Error on close: {e}");
242            }
243            Ok(())
244        })
245    }
246
247    #[pyo3(name = "subscribe_instruments")]
248    fn py_subscribe_instruments<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
249        let client = self.clone();
250
251        pyo3_async_runtimes::tokio::future_into_py(py, async move {
252            if let Err(e) = client.subscribe_instruments().await {
253                log::error!("Failed to subscribe to instruments: {e}");
254            }
255            Ok(())
256        })
257    }
258
259    #[pyo3(name = "subscribe_instrument")]
260    fn py_subscribe_instrument<'py>(
261        &self,
262        py: Python<'py>,
263        instrument_id: InstrumentId,
264    ) -> PyResult<Bound<'py, PyAny>> {
265        let client = self.clone();
266
267        pyo3_async_runtimes::tokio::future_into_py(py, async move {
268            if let Err(e) = client.subscribe_instrument(instrument_id).await {
269                log::error!("Failed to subscribe to instrument: {e}");
270            }
271            Ok(())
272        })
273    }
274
275    #[pyo3(name = "subscribe_book")]
276    fn py_subscribe_book<'py>(
277        &self,
278        py: Python<'py>,
279        instrument_id: InstrumentId,
280    ) -> PyResult<Bound<'py, PyAny>> {
281        let client = self.clone();
282
283        pyo3_async_runtimes::tokio::future_into_py(py, async move {
284            if let Err(e) = client.subscribe_book(instrument_id).await {
285                log::error!("Failed to subscribe to order book: {e}");
286            }
287            Ok(())
288        })
289    }
290
291    #[pyo3(name = "subscribe_book_25")]
292    fn py_subscribe_book_25<'py>(
293        &self,
294        py: Python<'py>,
295        instrument_id: InstrumentId,
296    ) -> PyResult<Bound<'py, PyAny>> {
297        let client = self.clone();
298
299        pyo3_async_runtimes::tokio::future_into_py(py, async move {
300            if let Err(e) = client.subscribe_book_25(instrument_id).await {
301                log::error!("Failed to subscribe to order book 25: {e}");
302            }
303            Ok(())
304        })
305    }
306
307    #[pyo3(name = "subscribe_book_depth10")]
308    fn py_subscribe_book_depth10<'py>(
309        &self,
310        py: Python<'py>,
311        instrument_id: InstrumentId,
312    ) -> PyResult<Bound<'py, PyAny>> {
313        let client = self.clone();
314
315        pyo3_async_runtimes::tokio::future_into_py(py, async move {
316            if let Err(e) = client.subscribe_book_depth10(instrument_id).await {
317                log::error!("Failed to subscribe to order book depth 10: {e}");
318            }
319            Ok(())
320        })
321    }
322
323    #[pyo3(name = "subscribe_quotes")]
324    fn py_subscribe_quotes<'py>(
325        &self,
326        py: Python<'py>,
327        instrument_id: InstrumentId,
328    ) -> PyResult<Bound<'py, PyAny>> {
329        let client = self.clone();
330
331        pyo3_async_runtimes::tokio::future_into_py(py, async move {
332            if let Err(e) = client.subscribe_quotes(instrument_id).await {
333                log::error!("Failed to subscribe to quotes: {e}");
334            }
335            Ok(())
336        })
337    }
338
339    #[pyo3(name = "subscribe_trades")]
340    fn py_subscribe_trades<'py>(
341        &self,
342        py: Python<'py>,
343        instrument_id: InstrumentId,
344    ) -> PyResult<Bound<'py, PyAny>> {
345        let client = self.clone();
346
347        pyo3_async_runtimes::tokio::future_into_py(py, async move {
348            if let Err(e) = client.subscribe_trades(instrument_id).await {
349                log::error!("Failed to subscribe to trades: {e}");
350            }
351            Ok(())
352        })
353    }
354
355    #[pyo3(name = "subscribe_mark_prices")]
356    fn py_subscribe_mark_prices<'py>(
357        &self,
358        py: Python<'py>,
359        instrument_id: InstrumentId,
360    ) -> PyResult<Bound<'py, PyAny>> {
361        let client = self.clone();
362
363        pyo3_async_runtimes::tokio::future_into_py(py, async move {
364            if let Err(e) = client.subscribe_mark_prices(instrument_id).await {
365                log::error!("Failed to subscribe to mark prices: {e}");
366            }
367            Ok(())
368        })
369    }
370
371    #[pyo3(name = "subscribe_index_prices")]
372    fn py_subscribe_index_prices<'py>(
373        &self,
374        py: Python<'py>,
375        instrument_id: InstrumentId,
376    ) -> PyResult<Bound<'py, PyAny>> {
377        let client = self.clone();
378
379        pyo3_async_runtimes::tokio::future_into_py(py, async move {
380            if let Err(e) = client.subscribe_index_prices(instrument_id).await {
381                log::error!("Failed to subscribe to index prices: {e}");
382            }
383            Ok(())
384        })
385    }
386
387    #[pyo3(name = "subscribe_funding_rates")]
388    fn py_subscribe_funding_rates<'py>(
389        &self,
390        py: Python<'py>,
391        instrument_id: InstrumentId,
392    ) -> PyResult<Bound<'py, PyAny>> {
393        let client = self.clone();
394
395        pyo3_async_runtimes::tokio::future_into_py(py, async move {
396            if let Err(e) = client.subscribe_funding_rates(instrument_id).await {
397                log::error!("Failed to subscribe to funding: {e}");
398            }
399            Ok(())
400        })
401    }
402
403    #[pyo3(name = "subscribe_bars")]
404    fn py_subscribe_bars<'py>(
405        &self,
406        py: Python<'py>,
407        bar_type: BarType,
408    ) -> PyResult<Bound<'py, PyAny>> {
409        let client = self.clone();
410
411        pyo3_async_runtimes::tokio::future_into_py(py, async move {
412            if let Err(e) = client.subscribe_bars(bar_type).await {
413                log::error!("Failed to subscribe to bars: {e}");
414            }
415            Ok(())
416        })
417    }
418
419    #[pyo3(name = "unsubscribe_instruments")]
420    fn py_unsubscribe_instruments<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
421        let client = self.clone();
422
423        pyo3_async_runtimes::tokio::future_into_py(py, async move {
424            if let Err(e) = client.unsubscribe_instruments().await {
425                log::error!("Failed to unsubscribe from instruments: {e}");
426            }
427            Ok(())
428        })
429    }
430
431    #[pyo3(name = "unsubscribe_instrument")]
432    fn py_unsubscribe_instrument<'py>(
433        &self,
434        py: Python<'py>,
435        instrument_id: InstrumentId,
436    ) -> PyResult<Bound<'py, PyAny>> {
437        let client = self.clone();
438
439        pyo3_async_runtimes::tokio::future_into_py(py, async move {
440            if let Err(e) = client.unsubscribe_instrument(instrument_id).await {
441                log::error!("Failed to unsubscribe from instrument: {e}");
442            }
443            Ok(())
444        })
445    }
446
447    #[pyo3(name = "unsubscribe_book")]
448    fn py_unsubscribe_book<'py>(
449        &self,
450        py: Python<'py>,
451        instrument_id: InstrumentId,
452    ) -> PyResult<Bound<'py, PyAny>> {
453        let client = self.clone();
454
455        pyo3_async_runtimes::tokio::future_into_py(py, async move {
456            if let Err(e) = client.unsubscribe_book(instrument_id).await {
457                log::error!("Failed to unsubscribe from order book: {e}");
458            }
459            Ok(())
460        })
461    }
462
463    #[pyo3(name = "unsubscribe_book_25")]
464    fn py_unsubscribe_book_25<'py>(
465        &self,
466        py: Python<'py>,
467        instrument_id: InstrumentId,
468    ) -> PyResult<Bound<'py, PyAny>> {
469        let client = self.clone();
470
471        pyo3_async_runtimes::tokio::future_into_py(py, async move {
472            if let Err(e) = client.unsubscribe_book_25(instrument_id).await {
473                log::error!("Failed to unsubscribe from order book 25: {e}");
474            }
475            Ok(())
476        })
477    }
478
479    #[pyo3(name = "unsubscribe_book_depth10")]
480    fn py_unsubscribe_book_depth10<'py>(
481        &self,
482        py: Python<'py>,
483        instrument_id: InstrumentId,
484    ) -> PyResult<Bound<'py, PyAny>> {
485        let client = self.clone();
486
487        pyo3_async_runtimes::tokio::future_into_py(py, async move {
488            if let Err(e) = client.unsubscribe_book_depth10(instrument_id).await {
489                log::error!("Failed to unsubscribe from order book depth 10: {e}");
490            }
491            Ok(())
492        })
493    }
494
495    #[pyo3(name = "unsubscribe_quotes")]
496    fn py_unsubscribe_quotes<'py>(
497        &self,
498        py: Python<'py>,
499        instrument_id: InstrumentId,
500    ) -> PyResult<Bound<'py, PyAny>> {
501        let client = self.clone();
502
503        pyo3_async_runtimes::tokio::future_into_py(py, async move {
504            if let Err(e) = client.unsubscribe_quotes(instrument_id).await {
505                log::error!("Failed to unsubscribe from quotes: {e}");
506            }
507            Ok(())
508        })
509    }
510
511    #[pyo3(name = "unsubscribe_trades")]
512    fn py_unsubscribe_trades<'py>(
513        &self,
514        py: Python<'py>,
515        instrument_id: InstrumentId,
516    ) -> PyResult<Bound<'py, PyAny>> {
517        let client = self.clone();
518
519        pyo3_async_runtimes::tokio::future_into_py(py, async move {
520            if let Err(e) = client.unsubscribe_trades(instrument_id).await {
521                log::error!("Failed to unsubscribe from trades: {e}");
522            }
523            Ok(())
524        })
525    }
526
527    #[pyo3(name = "unsubscribe_mark_prices")]
528    fn py_unsubscribe_mark_prices<'py>(
529        &self,
530        py: Python<'py>,
531        instrument_id: InstrumentId,
532    ) -> PyResult<Bound<'py, PyAny>> {
533        let client = self.clone();
534
535        pyo3_async_runtimes::tokio::future_into_py(py, async move {
536            if let Err(e) = client.unsubscribe_mark_prices(instrument_id).await {
537                log::error!("Failed to unsubscribe from mark prices: {e}");
538            }
539            Ok(())
540        })
541    }
542
543    #[pyo3(name = "unsubscribe_index_prices")]
544    fn py_unsubscribe_index_prices<'py>(
545        &self,
546        py: Python<'py>,
547        instrument_id: InstrumentId,
548    ) -> PyResult<Bound<'py, PyAny>> {
549        let client = self.clone();
550
551        pyo3_async_runtimes::tokio::future_into_py(py, async move {
552            if let Err(e) = client.unsubscribe_index_prices(instrument_id).await {
553                log::error!("Failed to unsubscribe from index prices: {e}");
554            }
555            Ok(())
556        })
557    }
558
559    #[pyo3(name = "unsubscribe_funding_rates")]
560    fn py_unsubscribe_funding_rates<'py>(
561        &self,
562        py: Python<'py>,
563        instrument_id: InstrumentId,
564    ) -> PyResult<Bound<'py, PyAny>> {
565        let client = self.clone();
566        pyo3_async_runtimes::tokio::future_into_py(py, async move {
567            if let Err(e) = client.unsubscribe_funding_rates(instrument_id).await {
568                log::error!("Failed to unsubscribe from funding rates: {e}");
569            }
570            Ok(())
571        })
572    }
573
574    #[pyo3(name = "unsubscribe_bars")]
575    fn py_unsubscribe_bars<'py>(
576        &self,
577        py: Python<'py>,
578        bar_type: BarType,
579    ) -> PyResult<Bound<'py, PyAny>> {
580        let client = self.clone();
581
582        pyo3_async_runtimes::tokio::future_into_py(py, async move {
583            if let Err(e) = client.unsubscribe_bars(bar_type).await {
584                log::error!("Failed to unsubscribe from bars: {e}");
585            }
586            Ok(())
587        })
588    }
589
590    #[pyo3(name = "subscribe_orders")]
591    fn py_subscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
592        let client = self.clone();
593
594        pyo3_async_runtimes::tokio::future_into_py(py, async move {
595            if let Err(e) = client.subscribe_orders().await {
596                log::error!("Failed to subscribe to orders: {e}");
597            }
598            Ok(())
599        })
600    }
601
602    #[pyo3(name = "subscribe_executions")]
603    fn py_subscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
604        let client = self.clone();
605
606        pyo3_async_runtimes::tokio::future_into_py(py, async move {
607            if let Err(e) = client.subscribe_executions().await {
608                log::error!("Failed to subscribe to executions: {e}");
609            }
610            Ok(())
611        })
612    }
613
614    #[pyo3(name = "subscribe_positions")]
615    fn py_subscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
616        let client = self.clone();
617
618        pyo3_async_runtimes::tokio::future_into_py(py, async move {
619            if let Err(e) = client.subscribe_positions().await {
620                log::error!("Failed to subscribe to positions: {e}");
621            }
622            Ok(())
623        })
624    }
625
626    #[pyo3(name = "subscribe_margin")]
627    fn py_subscribe_margin<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
628        let client = self.clone();
629
630        pyo3_async_runtimes::tokio::future_into_py(py, async move {
631            if let Err(e) = client.subscribe_margin().await {
632                log::error!("Failed to subscribe to margin: {e}");
633            }
634            Ok(())
635        })
636    }
637
638    #[pyo3(name = "subscribe_wallet")]
639    fn py_subscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
640        let client = self.clone();
641
642        pyo3_async_runtimes::tokio::future_into_py(py, async move {
643            if let Err(e) = client.subscribe_wallet().await {
644                log::error!("Failed to subscribe to wallet: {e}");
645            }
646            Ok(())
647        })
648    }
649
650    #[pyo3(name = "unsubscribe_orders")]
651    fn py_unsubscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
652        let client = self.clone();
653
654        pyo3_async_runtimes::tokio::future_into_py(py, async move {
655            if let Err(e) = client.unsubscribe_orders().await {
656                log::error!("Failed to unsubscribe from orders: {e}");
657            }
658            Ok(())
659        })
660    }
661
662    #[pyo3(name = "unsubscribe_executions")]
663    fn py_unsubscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
664        let client = self.clone();
665
666        pyo3_async_runtimes::tokio::future_into_py(py, async move {
667            if let Err(e) = client.unsubscribe_executions().await {
668                log::error!("Failed to unsubscribe from executions: {e}");
669            }
670            Ok(())
671        })
672    }
673
674    #[pyo3(name = "unsubscribe_positions")]
675    fn py_unsubscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
676        let client = self.clone();
677
678        pyo3_async_runtimes::tokio::future_into_py(py, async move {
679            if let Err(e) = client.unsubscribe_positions().await {
680                log::error!("Failed to unsubscribe from positions: {e}");
681            }
682            Ok(())
683        })
684    }
685
686    #[pyo3(name = "unsubscribe_margin")]
687    fn py_unsubscribe_margin<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
688        let client = self.clone();
689
690        pyo3_async_runtimes::tokio::future_into_py(py, async move {
691            if let Err(e) = client.unsubscribe_margin().await {
692                log::error!("Failed to unsubscribe from margin: {e}");
693            }
694            Ok(())
695        })
696    }
697
698    #[pyo3(name = "unsubscribe_wallet")]
699    fn py_unsubscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
700        let client = self.clone();
701
702        pyo3_async_runtimes::tokio::future_into_py(py, async move {
703            if let Err(e) = client.unsubscribe_wallet().await {
704                log::error!("Failed to unsubscribe from wallet: {e}");
705            }
706            Ok(())
707        })
708    }
709}