nautilus_bitmex/python/
websocket.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the `BitmEX` WebSocket client.
17//!
18//! # Design Pattern: Clone and Share State
19//!
20//! The WebSocket client must be cloned for async operations because PyO3's `future_into_py`
21//! requires `'static` futures (cannot borrow from `self`). To ensure clones share the same
22//! connection state, key fields use `Arc<RwLock<T>>`:
23//!
24//! - `inner: Arc<RwLock<Option<WebSocketClient>>>` - The WebSocket connection.
25//!
26//! Without shared state, clones would be independent, causing:
27//! - Lost WebSocket messages.
28//! - Missing instrument data.
29//! - Connection state desynchronization.
30//!
31//! ## Connection Flow
32//!
33//! 1. Clone the client for async operation.
34//! 2. Connect and populate shared state on the clone.
35//! 3. Spawn stream handler as background task.
36//! 4. Return immediately (non-blocking).
37//!
38//! ## Important Notes
39//!
40//! - Never use `block_on()` - it blocks the runtime.
41//! - Always clone before async blocks for lifetime requirements.
42//! - `RwLock` is preferred over Mutex (many reads, few writes).
43
44use futures_util::StreamExt;
45use nautilus_core::python::{to_pyruntime_err, to_pyvalue_err};
46use nautilus_model::{
47    data::bar::BarType,
48    identifiers::{AccountId, InstrumentId},
49    python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
50};
51use pyo3::{conversion::IntoPyObjectExt, exceptions::PyRuntimeError, prelude::*};
52
53use crate::websocket::{BitmexWebSocketClient, messages::NautilusWsMessage};
54
55#[pymethods]
56impl BitmexWebSocketClient {
57    #[new]
58    #[pyo3(signature = (url=None, api_key=None, api_secret=None, account_id=None, heartbeat=None))]
59    fn py_new(
60        url: Option<String>,
61        api_key: Option<String>,
62        api_secret: Option<String>,
63        account_id: Option<AccountId>,
64        heartbeat: Option<u64>,
65    ) -> PyResult<Self> {
66        // If both api_key and api_secret are None, try to load from environment
67        let (final_api_key, final_api_secret) = if api_key.is_none() && api_secret.is_none() {
68            // Try to load from environment variables
69            let env_key = std::env::var("BITMEX_API_KEY").ok();
70            let env_secret = std::env::var("BITMEX_API_SECRET").ok();
71            (env_key, env_secret)
72        } else {
73            (api_key, api_secret)
74        };
75
76        Self::new(url, final_api_key, final_api_secret, account_id, heartbeat)
77            .map_err(to_pyvalue_err)
78    }
79
80    #[staticmethod]
81    #[pyo3(name = "from_env")]
82    fn py_from_env() -> PyResult<Self> {
83        Self::from_env().map_err(to_pyvalue_err)
84    }
85
86    #[getter]
87    #[pyo3(name = "url")]
88    #[must_use]
89    pub const fn py_url(&self) -> &str {
90        self.url()
91    }
92
93    #[getter]
94    #[pyo3(name = "api_key")]
95    #[must_use]
96    pub fn py_api_key(&self) -> Option<&str> {
97        self.api_key()
98    }
99
100    #[pyo3(name = "is_active")]
101    fn py_is_active(&mut self) -> bool {
102        self.is_active()
103    }
104
105    #[pyo3(name = "is_closed")]
106    fn py_is_closed(&mut self) -> bool {
107        self.is_closed()
108    }
109
110    #[pyo3(name = "get_subscriptions")]
111    fn py_get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
112        self.get_subscriptions(instrument_id)
113    }
114
115    #[pyo3(name = "connect")]
116    fn py_connect<'py>(
117        &mut self,
118        py: Python<'py>,
119        instruments: Vec<PyObject>,
120        callback: PyObject,
121    ) -> PyResult<Bound<'py, PyAny>> {
122        let mut instruments_any = Vec::new();
123        for inst in instruments {
124            let inst_any = pyobject_to_instrument_any(py, inst)?;
125            instruments_any.push(inst_any);
126        }
127
128        self.initialize_instruments_cache(instruments_any);
129
130        let mut client = self.clone();
131
132        pyo3_async_runtimes::tokio::future_into_py(py, async move {
133            client.connect().await.map_err(to_pyruntime_err)?;
134
135            let stream = client.stream();
136
137            tokio::spawn(async move {
138                tokio::pin!(stream);
139
140                while let Some(msg) = stream.next().await {
141                    Python::with_gil(|py| match msg {
142                        NautilusWsMessage::Data(data_vec) => {
143                            for data in data_vec {
144                                let py_obj = data_to_pycapsule(py, data);
145                                call_python(py, &callback, py_obj);
146                            }
147                        }
148                        NautilusWsMessage::OrderStatusReports(reports) => {
149                            for report in reports {
150                                if let Ok(py_obj) = report.into_py_any(py) {
151                                    call_python(py, &callback, py_obj);
152                                }
153                            }
154                        }
155                        NautilusWsMessage::FillReports(reports) => {
156                            for report in reports {
157                                if let Ok(py_obj) = report.into_py_any(py) {
158                                    call_python(py, &callback, py_obj);
159                                }
160                            }
161                        }
162                        NautilusWsMessage::PositionStatusReport(report) => {
163                            if let Ok(py_obj) = (*report).into_py_any(py) {
164                                call_python(py, &callback, py_obj);
165                            }
166                        }
167                        NautilusWsMessage::FundingRateUpdates(updates) => {
168                            for update in updates {
169                                if let Ok(py_obj) = update.into_py_any(py) {
170                                    call_python(py, &callback, py_obj);
171                                }
172                            }
173                        }
174                        NautilusWsMessage::AccountState(account_state) => {
175                            if let Ok(py_obj) = (*account_state).into_py_any(py) {
176                                call_python(py, &callback, py_obj);
177                            }
178                        }
179                        NautilusWsMessage::Reconnected => {} // Nothing to handle
180                    });
181                }
182            });
183
184            Ok(())
185        })
186    }
187
188    #[pyo3(name = "wait_until_active")]
189    fn py_wait_until_active<'py>(
190        &self,
191        py: Python<'py>,
192        timeout_secs: f64,
193    ) -> PyResult<Bound<'py, PyAny>> {
194        let client = self.clone();
195
196        pyo3_async_runtimes::tokio::future_into_py(py, async move {
197            client
198                .wait_until_active(timeout_secs)
199                .await
200                .map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
201            Ok(())
202        })
203    }
204
205    #[pyo3(name = "close")]
206    fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
207        let mut client = self.clone();
208
209        pyo3_async_runtimes::tokio::future_into_py(py, async move {
210            if let Err(e) = client.close().await {
211                log::error!("Error on close: {e}");
212            }
213            Ok(())
214        })
215    }
216
217    #[pyo3(name = "subscribe_instruments")]
218    fn py_subscribe_instruments<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
219        let client = self.clone();
220
221        pyo3_async_runtimes::tokio::future_into_py(py, async move {
222            if let Err(e) = client.subscribe_instruments().await {
223                log::error!("Failed to subscribe to instruments: {e}");
224            }
225            Ok(())
226        })
227    }
228
229    #[pyo3(name = "subscribe_instrument")]
230    fn py_subscribe_instrument<'py>(
231        &self,
232        py: Python<'py>,
233        instrument_id: InstrumentId,
234    ) -> PyResult<Bound<'py, PyAny>> {
235        let client = self.clone();
236
237        pyo3_async_runtimes::tokio::future_into_py(py, async move {
238            if let Err(e) = client.subscribe_instrument(instrument_id).await {
239                log::error!("Failed to subscribe to instrument: {e}");
240            }
241            Ok(())
242        })
243    }
244
245    #[pyo3(name = "subscribe_book")]
246    fn py_subscribe_book<'py>(
247        &self,
248        py: Python<'py>,
249        instrument_id: InstrumentId,
250    ) -> PyResult<Bound<'py, PyAny>> {
251        let client = self.clone();
252
253        pyo3_async_runtimes::tokio::future_into_py(py, async move {
254            if let Err(e) = client.subscribe_book(instrument_id).await {
255                log::error!("Failed to subscribe to order book: {e}");
256            }
257            Ok(())
258        })
259    }
260
261    #[pyo3(name = "subscribe_book_25")]
262    fn py_subscribe_book_25<'py>(
263        &self,
264        py: Python<'py>,
265        instrument_id: InstrumentId,
266    ) -> PyResult<Bound<'py, PyAny>> {
267        let client = self.clone();
268
269        pyo3_async_runtimes::tokio::future_into_py(py, async move {
270            if let Err(e) = client.subscribe_book_25(instrument_id).await {
271                log::error!("Failed to subscribe to order book 25: {e}");
272            }
273            Ok(())
274        })
275    }
276
277    #[pyo3(name = "subscribe_book_depth10")]
278    fn py_subscribe_book_depth10<'py>(
279        &self,
280        py: Python<'py>,
281        instrument_id: InstrumentId,
282    ) -> PyResult<Bound<'py, PyAny>> {
283        let client = self.clone();
284
285        pyo3_async_runtimes::tokio::future_into_py(py, async move {
286            if let Err(e) = client.subscribe_book_depth10(instrument_id).await {
287                log::error!("Failed to subscribe to order book depth 10: {e}");
288            }
289            Ok(())
290        })
291    }
292
293    #[pyo3(name = "subscribe_quotes")]
294    fn py_subscribe_quotes<'py>(
295        &self,
296        py: Python<'py>,
297        instrument_id: InstrumentId,
298    ) -> PyResult<Bound<'py, PyAny>> {
299        let client = self.clone();
300
301        pyo3_async_runtimes::tokio::future_into_py(py, async move {
302            if let Err(e) = client.subscribe_quotes(instrument_id).await {
303                log::error!("Failed to subscribe to quotes: {e}");
304            }
305            Ok(())
306        })
307    }
308
309    #[pyo3(name = "subscribe_trades")]
310    fn py_subscribe_trades<'py>(
311        &self,
312        py: Python<'py>,
313        instrument_id: InstrumentId,
314    ) -> PyResult<Bound<'py, PyAny>> {
315        let client = self.clone();
316
317        pyo3_async_runtimes::tokio::future_into_py(py, async move {
318            if let Err(e) = client.subscribe_trades(instrument_id).await {
319                log::error!("Failed to subscribe to trades: {e}");
320            }
321            Ok(())
322        })
323    }
324
325    #[pyo3(name = "subscribe_mark_prices")]
326    fn py_subscribe_mark_prices<'py>(
327        &self,
328        py: Python<'py>,
329        instrument_id: InstrumentId,
330    ) -> PyResult<Bound<'py, PyAny>> {
331        let client = self.clone();
332
333        pyo3_async_runtimes::tokio::future_into_py(py, async move {
334            if let Err(e) = client.subscribe_mark_prices(instrument_id).await {
335                log::error!("Failed to subscribe to mark prices: {e}");
336            }
337            Ok(())
338        })
339    }
340
341    #[pyo3(name = "subscribe_index_prices")]
342    fn py_subscribe_index_prices<'py>(
343        &self,
344        py: Python<'py>,
345        instrument_id: InstrumentId,
346    ) -> PyResult<Bound<'py, PyAny>> {
347        let client = self.clone();
348
349        pyo3_async_runtimes::tokio::future_into_py(py, async move {
350            if let Err(e) = client.subscribe_index_prices(instrument_id).await {
351                log::error!("Failed to subscribe to index prices: {e}");
352            }
353            Ok(())
354        })
355    }
356
357    #[pyo3(name = "subscribe_funding_rates")]
358    fn py_subscribe_funding_rates<'py>(
359        &self,
360        py: Python<'py>,
361        instrument_id: InstrumentId,
362    ) -> PyResult<Bound<'py, PyAny>> {
363        let client = self.clone();
364
365        pyo3_async_runtimes::tokio::future_into_py(py, async move {
366            if let Err(e) = client.subscribe_funding_rates(instrument_id).await {
367                log::error!("Failed to subscribe to funding: {e}");
368            }
369            Ok(())
370        })
371    }
372
373    #[pyo3(name = "subscribe_bars")]
374    fn py_subscribe_bars<'py>(
375        &self,
376        py: Python<'py>,
377        bar_type: BarType,
378    ) -> PyResult<Bound<'py, PyAny>> {
379        let client = self.clone();
380
381        pyo3_async_runtimes::tokio::future_into_py(py, async move {
382            if let Err(e) = client.subscribe_bars(bar_type).await {
383                log::error!("Failed to subscribe to bars: {e}");
384            }
385            Ok(())
386        })
387    }
388
389    #[pyo3(name = "unsubscribe_instruments")]
390    fn py_unsubscribe_instruments<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
391        let client = self.clone();
392
393        pyo3_async_runtimes::tokio::future_into_py(py, async move {
394            if let Err(e) = client.unsubscribe_instruments().await {
395                log::error!("Failed to unsubscribe from instruments: {e}");
396            }
397            Ok(())
398        })
399    }
400
401    #[pyo3(name = "unsubscribe_instrument")]
402    fn py_unsubscribe_instrument<'py>(
403        &self,
404        py: Python<'py>,
405        instrument_id: InstrumentId,
406    ) -> PyResult<Bound<'py, PyAny>> {
407        let client = self.clone();
408
409        pyo3_async_runtimes::tokio::future_into_py(py, async move {
410            if let Err(e) = client.unsubscribe_instrument(instrument_id).await {
411                log::error!("Failed to unsubscribe from instrument: {e}");
412            }
413            Ok(())
414        })
415    }
416
417    #[pyo3(name = "unsubscribe_book")]
418    fn py_unsubscribe_book<'py>(
419        &self,
420        py: Python<'py>,
421        instrument_id: InstrumentId,
422    ) -> PyResult<Bound<'py, PyAny>> {
423        let client = self.clone();
424
425        pyo3_async_runtimes::tokio::future_into_py(py, async move {
426            if let Err(e) = client.unsubscribe_book(instrument_id).await {
427                log::error!("Failed to unsubscribe from order book: {e}");
428            }
429            Ok(())
430        })
431    }
432
433    #[pyo3(name = "unsubscribe_book_25")]
434    fn py_unsubscribe_book_25<'py>(
435        &self,
436        py: Python<'py>,
437        instrument_id: InstrumentId,
438    ) -> PyResult<Bound<'py, PyAny>> {
439        let client = self.clone();
440
441        pyo3_async_runtimes::tokio::future_into_py(py, async move {
442            if let Err(e) = client.unsubscribe_book_25(instrument_id).await {
443                log::error!("Failed to unsubscribe from order book 25: {e}");
444            }
445            Ok(())
446        })
447    }
448
449    #[pyo3(name = "unsubscribe_book_depth10")]
450    fn py_unsubscribe_book_depth10<'py>(
451        &self,
452        py: Python<'py>,
453        instrument_id: InstrumentId,
454    ) -> PyResult<Bound<'py, PyAny>> {
455        let client = self.clone();
456
457        pyo3_async_runtimes::tokio::future_into_py(py, async move {
458            if let Err(e) = client.unsubscribe_book_depth10(instrument_id).await {
459                log::error!("Failed to unsubscribe from order book depth 10: {e}");
460            }
461            Ok(())
462        })
463    }
464
465    #[pyo3(name = "unsubscribe_quotes")]
466    fn py_unsubscribe_quotes<'py>(
467        &self,
468        py: Python<'py>,
469        instrument_id: InstrumentId,
470    ) -> PyResult<Bound<'py, PyAny>> {
471        let client = self.clone();
472
473        pyo3_async_runtimes::tokio::future_into_py(py, async move {
474            if let Err(e) = client.unsubscribe_quotes(instrument_id).await {
475                log::error!("Failed to unsubscribe from quotes: {e}");
476            }
477            Ok(())
478        })
479    }
480
481    #[pyo3(name = "unsubscribe_trades")]
482    fn py_unsubscribe_trades<'py>(
483        &self,
484        py: Python<'py>,
485        instrument_id: InstrumentId,
486    ) -> PyResult<Bound<'py, PyAny>> {
487        let client = self.clone();
488
489        pyo3_async_runtimes::tokio::future_into_py(py, async move {
490            if let Err(e) = client.unsubscribe_trades(instrument_id).await {
491                log::error!("Failed to unsubscribe from trades: {e}");
492            }
493            Ok(())
494        })
495    }
496
497    #[pyo3(name = "unsubscribe_mark_prices")]
498    fn py_unsubscribe_mark_prices<'py>(
499        &self,
500        py: Python<'py>,
501        instrument_id: InstrumentId,
502    ) -> PyResult<Bound<'py, PyAny>> {
503        let client = self.clone();
504
505        pyo3_async_runtimes::tokio::future_into_py(py, async move {
506            if let Err(e) = client.unsubscribe_mark_prices(instrument_id).await {
507                log::error!("Failed to unsubscribe from mark prices: {e}");
508            }
509            Ok(())
510        })
511    }
512
513    #[pyo3(name = "unsubscribe_index_prices")]
514    fn py_unsubscribe_index_prices<'py>(
515        &self,
516        py: Python<'py>,
517        instrument_id: InstrumentId,
518    ) -> PyResult<Bound<'py, PyAny>> {
519        let client = self.clone();
520
521        pyo3_async_runtimes::tokio::future_into_py(py, async move {
522            if let Err(e) = client.unsubscribe_index_prices(instrument_id).await {
523                log::error!("Failed to unsubscribe from index prices: {e}");
524            }
525            Ok(())
526        })
527    }
528
529    #[pyo3(name = "unsubscribe_funding_rates")]
530    fn py_unsubscribe_funding_rates<'py>(
531        &self,
532        py: Python<'py>,
533        instrument_id: InstrumentId,
534    ) -> PyResult<Bound<'py, PyAny>> {
535        let client = self.clone();
536        pyo3_async_runtimes::tokio::future_into_py(py, async move {
537            if let Err(e) = client.unsubscribe_funding_rates(instrument_id).await {
538                log::error!("Failed to unsubscribe from funding rates: {e}");
539            }
540            Ok(())
541        })
542    }
543
544    #[pyo3(name = "unsubscribe_bars")]
545    fn py_unsubscribe_bars<'py>(
546        &self,
547        py: Python<'py>,
548        bar_type: BarType,
549    ) -> PyResult<Bound<'py, PyAny>> {
550        let client = self.clone();
551
552        pyo3_async_runtimes::tokio::future_into_py(py, async move {
553            if let Err(e) = client.unsubscribe_bars(bar_type).await {
554                log::error!("Failed to unsubscribe from bars: {e}");
555            }
556            Ok(())
557        })
558    }
559
560    #[pyo3(name = "subscribe_orders")]
561    fn py_subscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
562        let client = self.clone();
563
564        pyo3_async_runtimes::tokio::future_into_py(py, async move {
565            if let Err(e) = client.subscribe_orders().await {
566                log::error!("Failed to subscribe to orders: {e}");
567            }
568            Ok(())
569        })
570    }
571
572    #[pyo3(name = "subscribe_executions")]
573    fn py_subscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
574        let client = self.clone();
575
576        pyo3_async_runtimes::tokio::future_into_py(py, async move {
577            if let Err(e) = client.subscribe_executions().await {
578                log::error!("Failed to subscribe to executions: {e}");
579            }
580            Ok(())
581        })
582    }
583
584    #[pyo3(name = "subscribe_positions")]
585    fn py_subscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
586        let client = self.clone();
587
588        pyo3_async_runtimes::tokio::future_into_py(py, async move {
589            if let Err(e) = client.subscribe_positions().await {
590                log::error!("Failed to subscribe to positions: {e}");
591            }
592            Ok(())
593        })
594    }
595
596    #[pyo3(name = "subscribe_margin")]
597    fn py_subscribe_margin<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
598        let client = self.clone();
599
600        pyo3_async_runtimes::tokio::future_into_py(py, async move {
601            if let Err(e) = client.subscribe_margin().await {
602                log::error!("Failed to subscribe to margin: {e}");
603            }
604            Ok(())
605        })
606    }
607
608    #[pyo3(name = "subscribe_wallet")]
609    fn py_subscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
610        let client = self.clone();
611
612        pyo3_async_runtimes::tokio::future_into_py(py, async move {
613            if let Err(e) = client.subscribe_wallet().await {
614                log::error!("Failed to subscribe to wallet: {e}");
615            }
616            Ok(())
617        })
618    }
619
620    #[pyo3(name = "unsubscribe_orders")]
621    fn py_unsubscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
622        let client = self.clone();
623
624        pyo3_async_runtimes::tokio::future_into_py(py, async move {
625            if let Err(e) = client.unsubscribe_orders().await {
626                log::error!("Failed to unsubscribe from orders: {e}");
627            }
628            Ok(())
629        })
630    }
631
632    #[pyo3(name = "unsubscribe_executions")]
633    fn py_unsubscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
634        let client = self.clone();
635
636        pyo3_async_runtimes::tokio::future_into_py(py, async move {
637            if let Err(e) = client.unsubscribe_executions().await {
638                log::error!("Failed to unsubscribe from executions: {e}");
639            }
640            Ok(())
641        })
642    }
643
644    #[pyo3(name = "unsubscribe_positions")]
645    fn py_unsubscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
646        let client = self.clone();
647
648        pyo3_async_runtimes::tokio::future_into_py(py, async move {
649            if let Err(e) = client.unsubscribe_positions().await {
650                log::error!("Failed to unsubscribe from positions: {e}");
651            }
652            Ok(())
653        })
654    }
655
656    #[pyo3(name = "unsubscribe_margin")]
657    fn py_unsubscribe_margin<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
658        let client = self.clone();
659
660        pyo3_async_runtimes::tokio::future_into_py(py, async move {
661            if let Err(e) = client.unsubscribe_margin().await {
662                log::error!("Failed to unsubscribe from margin: {e}");
663            }
664            Ok(())
665        })
666    }
667
668    #[pyo3(name = "unsubscribe_wallet")]
669    fn py_unsubscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
670        let client = self.clone();
671
672        pyo3_async_runtimes::tokio::future_into_py(py, async move {
673            if let Err(e) = client.unsubscribe_wallet().await {
674                log::error!("Failed to unsubscribe from wallet: {e}");
675            }
676            Ok(())
677        })
678    }
679}
680
681pub fn call_python(py: Python, callback: &PyObject, py_obj: PyObject) {
682    if let Err(e) = callback.call1(py, (py_obj,)) {
683        tracing::error!("Error calling Python: {e}");
684    }
685}