nautilus_bitmex/python/
websocket.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the `BitmEX` WebSocket client.
17//!
18//! # Design Pattern: Clone and Share State
19//!
20//! The WebSocket client must be cloned for async operations because PyO3's `future_into_py`
21//! requires `'static` futures (cannot borrow from `self`). To ensure clones share the same
22//! connection state, key fields use `Arc<RwLock<T>>`:
23//!
24//! - `inner: Arc<RwLock<Option<WebSocketClient>>>` - The WebSocket connection.
25//!
26//! Without shared state, clones would be independent, causing:
27//! - Lost WebSocket messages.
28//! - Missing instrument data.
29//! - Connection state desynchronization.
30//!
31//! ## Connection Flow
32//!
33//! 1. Clone the client for async operation.
34//! 2. Connect and populate shared state on the clone.
35//! 3. Spawn stream handler as background task.
36//! 4. Return immediately (non-blocking).
37//!
38//! ## Important Notes
39//!
40//! - Never use `block_on()` - it blocks the runtime.
41//! - Always clone before async blocks for lifetime requirements.
42//! - `RwLock` is preferred over Mutex (many reads, few writes).
43
44use futures_util::StreamExt;
45use nautilus_core::python::{to_pyruntime_err, to_pyvalue_err};
46use nautilus_model::{
47    data::bar::BarType,
48    identifiers::{AccountId, InstrumentId},
49    python::{
50        data::data_to_pycapsule,
51        instruments::{instrument_any_to_pyobject, pyobject_to_instrument_any},
52    },
53};
54use pyo3::{conversion::IntoPyObjectExt, exceptions::PyRuntimeError, prelude::*};
55
56use crate::websocket::{BitmexWebSocketClient, messages::NautilusWsMessage};
57
58#[pymethods]
59impl BitmexWebSocketClient {
60    #[new]
61    #[pyo3(signature = (url=None, api_key=None, api_secret=None, account_id=None, heartbeat=None, testnet=false))]
62    fn py_new(
63        url: Option<String>,
64        api_key: Option<String>,
65        api_secret: Option<String>,
66        account_id: Option<AccountId>,
67        heartbeat: Option<u64>,
68        testnet: bool,
69    ) -> PyResult<Self> {
70        // If both api_key and api_secret are None, try to load from environment
71        let (final_api_key, final_api_secret) = if api_key.is_none() && api_secret.is_none() {
72            // Choose environment variables based on testnet flag
73            let (key_var, secret_var) = if testnet {
74                ("BITMEX_TESTNET_API_KEY", "BITMEX_TESTNET_API_SECRET")
75            } else {
76                ("BITMEX_API_KEY", "BITMEX_API_SECRET")
77            };
78
79            let env_key = std::env::var(key_var).ok();
80            let env_secret = std::env::var(secret_var).ok();
81            (env_key, env_secret)
82        } else {
83            (api_key, api_secret)
84        };
85
86        Self::new(url, final_api_key, final_api_secret, account_id, heartbeat)
87            .map_err(to_pyvalue_err)
88    }
89
90    #[staticmethod]
91    #[pyo3(name = "from_env")]
92    fn py_from_env() -> PyResult<Self> {
93        Self::from_env().map_err(to_pyvalue_err)
94    }
95
96    #[getter]
97    #[pyo3(name = "url")]
98    #[must_use]
99    pub const fn py_url(&self) -> &str {
100        self.url()
101    }
102
103    #[getter]
104    #[pyo3(name = "api_key")]
105    #[must_use]
106    pub fn py_api_key(&self) -> Option<&str> {
107        self.api_key()
108    }
109
110    #[getter]
111    #[pyo3(name = "api_key_masked")]
112    #[must_use]
113    pub fn py_api_key_masked(&self) -> Option<String> {
114        self.api_key_masked()
115    }
116
117    #[pyo3(name = "is_active")]
118    fn py_is_active(&mut self) -> bool {
119        self.is_active()
120    }
121
122    #[pyo3(name = "is_closed")]
123    fn py_is_closed(&mut self) -> bool {
124        self.is_closed()
125    }
126
127    #[pyo3(name = "get_subscriptions")]
128    fn py_get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
129        self.get_subscriptions(instrument_id)
130    }
131
132    #[pyo3(name = "set_account_id")]
133    pub fn py_set_account_id(&mut self, account_id: AccountId) {
134        self.set_account_id(account_id);
135    }
136
137    #[pyo3(name = "cache_instrument")]
138    fn py_cache_instrument(&self, py: Python, instrument: Py<PyAny>) -> PyResult<()> {
139        let inst_any = pyobject_to_instrument_any(py, instrument)?;
140        self.cache_instrument(inst_any);
141        Ok(())
142    }
143
144    #[pyo3(name = "connect")]
145    fn py_connect<'py>(
146        &mut self,
147        py: Python<'py>,
148        instruments: Vec<Py<PyAny>>,
149        callback: Py<PyAny>,
150    ) -> PyResult<Bound<'py, PyAny>> {
151        let mut instruments_any = Vec::new();
152        for inst in instruments {
153            let inst_any = pyobject_to_instrument_any(py, inst)?;
154            instruments_any.push(inst_any);
155        }
156
157        self.cache_instruments(instruments_any);
158
159        // We need to clone self to move into the async block,
160        // the clone will be connected and kept alive to maintain the handler.
161        let mut client = self.clone();
162
163        pyo3_async_runtimes::tokio::future_into_py(py, async move {
164            client.connect().await.map_err(to_pyruntime_err)?;
165
166            let stream = client.stream();
167
168            tokio::spawn(async move {
169                let _client = client; // Keep client alive for the entire duration
170                tokio::pin!(stream);
171
172                while let Some(msg) = stream.next().await {
173                    Python::attach(|py| match msg {
174                        NautilusWsMessage::Data(data_vec) => {
175                            for data in data_vec {
176                                let py_obj = data_to_pycapsule(py, data);
177                                call_python(py, &callback, py_obj);
178                            }
179                        }
180                        NautilusWsMessage::Instruments(instruments) => {
181                            for instrument in instruments {
182                                if let Ok(py_obj) = instrument_any_to_pyobject(py, instrument) {
183                                    call_python(py, &callback, py_obj);
184                                }
185                            }
186                        }
187                        NautilusWsMessage::OrderStatusReports(reports) => {
188                            for report in reports {
189                                if let Ok(py_obj) = report.into_py_any(py) {
190                                    call_python(py, &callback, py_obj);
191                                }
192                            }
193                        }
194                        NautilusWsMessage::FillReports(reports) => {
195                            for report in reports {
196                                if let Ok(py_obj) = report.into_py_any(py) {
197                                    call_python(py, &callback, py_obj);
198                                }
199                            }
200                        }
201                        NautilusWsMessage::PositionStatusReport(report) => {
202                            if let Ok(py_obj) = report.into_py_any(py) {
203                                call_python(py, &callback, py_obj);
204                            }
205                        }
206                        NautilusWsMessage::FundingRateUpdates(updates) => {
207                            for update in updates {
208                                if let Ok(py_obj) = update.into_py_any(py) {
209                                    call_python(py, &callback, py_obj);
210                                }
211                            }
212                        }
213                        NautilusWsMessage::AccountState(account_state) => {
214                            if let Ok(py_obj) = account_state.into_py_any(py) {
215                                call_python(py, &callback, py_obj);
216                            }
217                        }
218                        NautilusWsMessage::OrderUpdated(event) => {
219                            if let Ok(py_obj) = event.into_py_any(py) {
220                                call_python(py, &callback, py_obj);
221                            }
222                        }
223                        NautilusWsMessage::Reconnected => {}
224                        NautilusWsMessage::Authenticated => {}
225                    });
226                }
227            });
228
229            Ok(())
230        })
231    }
232
233    #[pyo3(name = "wait_until_active")]
234    fn py_wait_until_active<'py>(
235        &self,
236        py: Python<'py>,
237        timeout_secs: f64,
238    ) -> PyResult<Bound<'py, PyAny>> {
239        let client = self.clone();
240
241        pyo3_async_runtimes::tokio::future_into_py(py, async move {
242            client
243                .wait_until_active(timeout_secs)
244                .await
245                .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
246            Ok(())
247        })
248    }
249
250    #[pyo3(name = "close")]
251    fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
252        let mut client = self.clone();
253
254        pyo3_async_runtimes::tokio::future_into_py(py, async move {
255            if let Err(e) = client.close().await {
256                log::error!("Error on close: {e}");
257            }
258            Ok(())
259        })
260    }
261
262    #[pyo3(name = "subscribe_instruments")]
263    fn py_subscribe_instruments<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
264        let client = self.clone();
265
266        pyo3_async_runtimes::tokio::future_into_py(py, async move {
267            if let Err(e) = client.subscribe_instruments().await {
268                log::error!("Failed to subscribe to instruments: {e}");
269            }
270            Ok(())
271        })
272    }
273
274    #[pyo3(name = "subscribe_instrument")]
275    fn py_subscribe_instrument<'py>(
276        &self,
277        py: Python<'py>,
278        instrument_id: InstrumentId,
279    ) -> PyResult<Bound<'py, PyAny>> {
280        let client = self.clone();
281
282        pyo3_async_runtimes::tokio::future_into_py(py, async move {
283            if let Err(e) = client.subscribe_instrument(instrument_id).await {
284                log::error!("Failed to subscribe to instrument: {e}");
285            }
286            Ok(())
287        })
288    }
289
290    #[pyo3(name = "subscribe_book")]
291    fn py_subscribe_book<'py>(
292        &self,
293        py: Python<'py>,
294        instrument_id: InstrumentId,
295    ) -> PyResult<Bound<'py, PyAny>> {
296        let client = self.clone();
297
298        pyo3_async_runtimes::tokio::future_into_py(py, async move {
299            if let Err(e) = client.subscribe_book(instrument_id).await {
300                log::error!("Failed to subscribe to order book: {e}");
301            }
302            Ok(())
303        })
304    }
305
306    #[pyo3(name = "subscribe_book_25")]
307    fn py_subscribe_book_25<'py>(
308        &self,
309        py: Python<'py>,
310        instrument_id: InstrumentId,
311    ) -> PyResult<Bound<'py, PyAny>> {
312        let client = self.clone();
313
314        pyo3_async_runtimes::tokio::future_into_py(py, async move {
315            if let Err(e) = client.subscribe_book_25(instrument_id).await {
316                log::error!("Failed to subscribe to order book 25: {e}");
317            }
318            Ok(())
319        })
320    }
321
322    #[pyo3(name = "subscribe_book_depth10")]
323    fn py_subscribe_book_depth10<'py>(
324        &self,
325        py: Python<'py>,
326        instrument_id: InstrumentId,
327    ) -> PyResult<Bound<'py, PyAny>> {
328        let client = self.clone();
329
330        pyo3_async_runtimes::tokio::future_into_py(py, async move {
331            if let Err(e) = client.subscribe_book_depth10(instrument_id).await {
332                log::error!("Failed to subscribe to order book depth 10: {e}");
333            }
334            Ok(())
335        })
336    }
337
338    #[pyo3(name = "subscribe_quotes")]
339    fn py_subscribe_quotes<'py>(
340        &self,
341        py: Python<'py>,
342        instrument_id: InstrumentId,
343    ) -> PyResult<Bound<'py, PyAny>> {
344        let client = self.clone();
345
346        pyo3_async_runtimes::tokio::future_into_py(py, async move {
347            if let Err(e) = client.subscribe_quotes(instrument_id).await {
348                log::error!("Failed to subscribe to quotes: {e}");
349            }
350            Ok(())
351        })
352    }
353
354    #[pyo3(name = "subscribe_trades")]
355    fn py_subscribe_trades<'py>(
356        &self,
357        py: Python<'py>,
358        instrument_id: InstrumentId,
359    ) -> PyResult<Bound<'py, PyAny>> {
360        let client = self.clone();
361
362        pyo3_async_runtimes::tokio::future_into_py(py, async move {
363            if let Err(e) = client.subscribe_trades(instrument_id).await {
364                log::error!("Failed to subscribe to trades: {e}");
365            }
366            Ok(())
367        })
368    }
369
370    #[pyo3(name = "subscribe_mark_prices")]
371    fn py_subscribe_mark_prices<'py>(
372        &self,
373        py: Python<'py>,
374        instrument_id: InstrumentId,
375    ) -> PyResult<Bound<'py, PyAny>> {
376        let client = self.clone();
377
378        pyo3_async_runtimes::tokio::future_into_py(py, async move {
379            if let Err(e) = client.subscribe_mark_prices(instrument_id).await {
380                log::error!("Failed to subscribe to mark prices: {e}");
381            }
382            Ok(())
383        })
384    }
385
386    #[pyo3(name = "subscribe_index_prices")]
387    fn py_subscribe_index_prices<'py>(
388        &self,
389        py: Python<'py>,
390        instrument_id: InstrumentId,
391    ) -> PyResult<Bound<'py, PyAny>> {
392        let client = self.clone();
393
394        pyo3_async_runtimes::tokio::future_into_py(py, async move {
395            if let Err(e) = client.subscribe_index_prices(instrument_id).await {
396                log::error!("Failed to subscribe to index prices: {e}");
397            }
398            Ok(())
399        })
400    }
401
402    #[pyo3(name = "subscribe_funding_rates")]
403    fn py_subscribe_funding_rates<'py>(
404        &self,
405        py: Python<'py>,
406        instrument_id: InstrumentId,
407    ) -> PyResult<Bound<'py, PyAny>> {
408        let client = self.clone();
409
410        pyo3_async_runtimes::tokio::future_into_py(py, async move {
411            if let Err(e) = client.subscribe_funding_rates(instrument_id).await {
412                log::error!("Failed to subscribe to funding: {e}");
413            }
414            Ok(())
415        })
416    }
417
418    #[pyo3(name = "subscribe_bars")]
419    fn py_subscribe_bars<'py>(
420        &self,
421        py: Python<'py>,
422        bar_type: BarType,
423    ) -> PyResult<Bound<'py, PyAny>> {
424        let client = self.clone();
425
426        pyo3_async_runtimes::tokio::future_into_py(py, async move {
427            if let Err(e) = client.subscribe_bars(bar_type).await {
428                log::error!("Failed to subscribe to bars: {e}");
429            }
430            Ok(())
431        })
432    }
433
434    #[pyo3(name = "unsubscribe_instruments")]
435    fn py_unsubscribe_instruments<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
436        let client = self.clone();
437
438        pyo3_async_runtimes::tokio::future_into_py(py, async move {
439            if let Err(e) = client.unsubscribe_instruments().await {
440                log::error!("Failed to unsubscribe from instruments: {e}");
441            }
442            Ok(())
443        })
444    }
445
446    #[pyo3(name = "unsubscribe_instrument")]
447    fn py_unsubscribe_instrument<'py>(
448        &self,
449        py: Python<'py>,
450        instrument_id: InstrumentId,
451    ) -> PyResult<Bound<'py, PyAny>> {
452        let client = self.clone();
453
454        pyo3_async_runtimes::tokio::future_into_py(py, async move {
455            if let Err(e) = client.unsubscribe_instrument(instrument_id).await {
456                log::error!("Failed to unsubscribe from instrument: {e}");
457            }
458            Ok(())
459        })
460    }
461
462    #[pyo3(name = "unsubscribe_book")]
463    fn py_unsubscribe_book<'py>(
464        &self,
465        py: Python<'py>,
466        instrument_id: InstrumentId,
467    ) -> PyResult<Bound<'py, PyAny>> {
468        let client = self.clone();
469
470        pyo3_async_runtimes::tokio::future_into_py(py, async move {
471            if let Err(e) = client.unsubscribe_book(instrument_id).await {
472                log::error!("Failed to unsubscribe from order book: {e}");
473            }
474            Ok(())
475        })
476    }
477
478    #[pyo3(name = "unsubscribe_book_25")]
479    fn py_unsubscribe_book_25<'py>(
480        &self,
481        py: Python<'py>,
482        instrument_id: InstrumentId,
483    ) -> PyResult<Bound<'py, PyAny>> {
484        let client = self.clone();
485
486        pyo3_async_runtimes::tokio::future_into_py(py, async move {
487            if let Err(e) = client.unsubscribe_book_25(instrument_id).await {
488                log::error!("Failed to unsubscribe from order book 25: {e}");
489            }
490            Ok(())
491        })
492    }
493
494    #[pyo3(name = "unsubscribe_book_depth10")]
495    fn py_unsubscribe_book_depth10<'py>(
496        &self,
497        py: Python<'py>,
498        instrument_id: InstrumentId,
499    ) -> PyResult<Bound<'py, PyAny>> {
500        let client = self.clone();
501
502        pyo3_async_runtimes::tokio::future_into_py(py, async move {
503            if let Err(e) = client.unsubscribe_book_depth10(instrument_id).await {
504                log::error!("Failed to unsubscribe from order book depth 10: {e}");
505            }
506            Ok(())
507        })
508    }
509
510    #[pyo3(name = "unsubscribe_quotes")]
511    fn py_unsubscribe_quotes<'py>(
512        &self,
513        py: Python<'py>,
514        instrument_id: InstrumentId,
515    ) -> PyResult<Bound<'py, PyAny>> {
516        let client = self.clone();
517
518        pyo3_async_runtimes::tokio::future_into_py(py, async move {
519            if let Err(e) = client.unsubscribe_quotes(instrument_id).await {
520                log::error!("Failed to unsubscribe from quotes: {e}");
521            }
522            Ok(())
523        })
524    }
525
526    #[pyo3(name = "unsubscribe_trades")]
527    fn py_unsubscribe_trades<'py>(
528        &self,
529        py: Python<'py>,
530        instrument_id: InstrumentId,
531    ) -> PyResult<Bound<'py, PyAny>> {
532        let client = self.clone();
533
534        pyo3_async_runtimes::tokio::future_into_py(py, async move {
535            if let Err(e) = client.unsubscribe_trades(instrument_id).await {
536                log::error!("Failed to unsubscribe from trades: {e}");
537            }
538            Ok(())
539        })
540    }
541
542    #[pyo3(name = "unsubscribe_mark_prices")]
543    fn py_unsubscribe_mark_prices<'py>(
544        &self,
545        py: Python<'py>,
546        instrument_id: InstrumentId,
547    ) -> PyResult<Bound<'py, PyAny>> {
548        let client = self.clone();
549
550        pyo3_async_runtimes::tokio::future_into_py(py, async move {
551            if let Err(e) = client.unsubscribe_mark_prices(instrument_id).await {
552                log::error!("Failed to unsubscribe from mark prices: {e}");
553            }
554            Ok(())
555        })
556    }
557
558    #[pyo3(name = "unsubscribe_index_prices")]
559    fn py_unsubscribe_index_prices<'py>(
560        &self,
561        py: Python<'py>,
562        instrument_id: InstrumentId,
563    ) -> PyResult<Bound<'py, PyAny>> {
564        let client = self.clone();
565
566        pyo3_async_runtimes::tokio::future_into_py(py, async move {
567            if let Err(e) = client.unsubscribe_index_prices(instrument_id).await {
568                log::error!("Failed to unsubscribe from index prices: {e}");
569            }
570            Ok(())
571        })
572    }
573
574    #[pyo3(name = "unsubscribe_funding_rates")]
575    fn py_unsubscribe_funding_rates<'py>(
576        &self,
577        py: Python<'py>,
578        instrument_id: InstrumentId,
579    ) -> PyResult<Bound<'py, PyAny>> {
580        let client = self.clone();
581        pyo3_async_runtimes::tokio::future_into_py(py, async move {
582            if let Err(e) = client.unsubscribe_funding_rates(instrument_id).await {
583                log::error!("Failed to unsubscribe from funding rates: {e}");
584            }
585            Ok(())
586        })
587    }
588
589    #[pyo3(name = "unsubscribe_bars")]
590    fn py_unsubscribe_bars<'py>(
591        &self,
592        py: Python<'py>,
593        bar_type: BarType,
594    ) -> PyResult<Bound<'py, PyAny>> {
595        let client = self.clone();
596
597        pyo3_async_runtimes::tokio::future_into_py(py, async move {
598            if let Err(e) = client.unsubscribe_bars(bar_type).await {
599                log::error!("Failed to unsubscribe from bars: {e}");
600            }
601            Ok(())
602        })
603    }
604
605    #[pyo3(name = "subscribe_orders")]
606    fn py_subscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
607        let client = self.clone();
608
609        pyo3_async_runtimes::tokio::future_into_py(py, async move {
610            if let Err(e) = client.subscribe_orders().await {
611                log::error!("Failed to subscribe to orders: {e}");
612            }
613            Ok(())
614        })
615    }
616
617    #[pyo3(name = "subscribe_executions")]
618    fn py_subscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
619        let client = self.clone();
620
621        pyo3_async_runtimes::tokio::future_into_py(py, async move {
622            if let Err(e) = client.subscribe_executions().await {
623                log::error!("Failed to subscribe to executions: {e}");
624            }
625            Ok(())
626        })
627    }
628
629    #[pyo3(name = "subscribe_positions")]
630    fn py_subscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
631        let client = self.clone();
632
633        pyo3_async_runtimes::tokio::future_into_py(py, async move {
634            if let Err(e) = client.subscribe_positions().await {
635                log::error!("Failed to subscribe to positions: {e}");
636            }
637            Ok(())
638        })
639    }
640
641    #[pyo3(name = "subscribe_margin")]
642    fn py_subscribe_margin<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
643        let client = self.clone();
644
645        pyo3_async_runtimes::tokio::future_into_py(py, async move {
646            if let Err(e) = client.subscribe_margin().await {
647                log::error!("Failed to subscribe to margin: {e}");
648            }
649            Ok(())
650        })
651    }
652
653    #[pyo3(name = "subscribe_wallet")]
654    fn py_subscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
655        let client = self.clone();
656
657        pyo3_async_runtimes::tokio::future_into_py(py, async move {
658            if let Err(e) = client.subscribe_wallet().await {
659                log::error!("Failed to subscribe to wallet: {e}");
660            }
661            Ok(())
662        })
663    }
664
665    #[pyo3(name = "unsubscribe_orders")]
666    fn py_unsubscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
667        let client = self.clone();
668
669        pyo3_async_runtimes::tokio::future_into_py(py, async move {
670            if let Err(e) = client.unsubscribe_orders().await {
671                log::error!("Failed to unsubscribe from orders: {e}");
672            }
673            Ok(())
674        })
675    }
676
677    #[pyo3(name = "unsubscribe_executions")]
678    fn py_unsubscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
679        let client = self.clone();
680
681        pyo3_async_runtimes::tokio::future_into_py(py, async move {
682            if let Err(e) = client.unsubscribe_executions().await {
683                log::error!("Failed to unsubscribe from executions: {e}");
684            }
685            Ok(())
686        })
687    }
688
689    #[pyo3(name = "unsubscribe_positions")]
690    fn py_unsubscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
691        let client = self.clone();
692
693        pyo3_async_runtimes::tokio::future_into_py(py, async move {
694            if let Err(e) = client.unsubscribe_positions().await {
695                log::error!("Failed to unsubscribe from positions: {e}");
696            }
697            Ok(())
698        })
699    }
700
701    #[pyo3(name = "unsubscribe_margin")]
702    fn py_unsubscribe_margin<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
703        let client = self.clone();
704
705        pyo3_async_runtimes::tokio::future_into_py(py, async move {
706            if let Err(e) = client.unsubscribe_margin().await {
707                log::error!("Failed to unsubscribe from margin: {e}");
708            }
709            Ok(())
710        })
711    }
712
713    #[pyo3(name = "unsubscribe_wallet")]
714    fn py_unsubscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
715        let client = self.clone();
716
717        pyo3_async_runtimes::tokio::future_into_py(py, async move {
718            if let Err(e) = client.unsubscribe_wallet().await {
719                log::error!("Failed to unsubscribe from wallet: {e}");
720            }
721            Ok(())
722        })
723    }
724}
725
726pub fn call_python(py: Python, callback: &Py<PyAny>, py_obj: Py<PyAny>) {
727    if let Err(e) = callback.call1(py, (py_obj,)) {
728        tracing::error!("Error calling Python: {e}");
729    }
730}