nautilus_binance/python/
websocket.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for Binance WebSocket clients.
17
18use futures_util::StreamExt;
19use nautilus_common::live::get_runtime;
20use nautilus_core::python::{call_python, to_pyruntime_err};
21use nautilus_model::{
22    data::{Data, OrderBookDeltas_API},
23    python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
24};
25use pyo3::prelude::*;
26
27use crate::{
28    common::enums::{BinanceEnvironment, BinanceProductType},
29    futures::websocket::{
30        client::BinanceFuturesWebSocketClient, messages::NautilusFuturesWsMessage,
31    },
32    spot::websocket::streams::{client::BinanceSpotWebSocketClient, messages::NautilusWsMessage},
33};
34
35#[pymethods]
36impl BinanceSpotWebSocketClient {
37    #[new]
38    #[pyo3(signature = (url=None, api_key=None, api_secret=None, heartbeat=None))]
39    fn py_new(
40        url: Option<String>,
41        api_key: Option<String>,
42        api_secret: Option<String>,
43        heartbeat: Option<u64>,
44    ) -> PyResult<Self> {
45        Self::new(url, api_key, api_secret, heartbeat).map_err(to_pyruntime_err)
46    }
47
48    #[pyo3(name = "is_active")]
49    fn py_is_active(&self) -> bool {
50        self.is_active()
51    }
52
53    #[pyo3(name = "is_closed")]
54    fn py_is_closed(&self) -> bool {
55        self.is_closed()
56    }
57
58    #[pyo3(name = "subscription_count")]
59    fn py_subscription_count(&self) -> usize {
60        self.subscription_count()
61    }
62
63    #[pyo3(name = "cache_instrument")]
64    fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
65        self.cache_instrument(pyobject_to_instrument_any(py, instrument)?);
66        Ok(())
67    }
68
69    #[pyo3(name = "connect")]
70    fn py_connect<'py>(
71        &mut self,
72        py: Python<'py>,
73        callback: Py<PyAny>,
74    ) -> PyResult<Bound<'py, PyAny>> {
75        let mut client = self.clone();
76
77        pyo3_async_runtimes::tokio::future_into_py(py, async move {
78            client.connect().await.map_err(to_pyruntime_err)?;
79
80            let stream = client.stream();
81
82            get_runtime().spawn(async move {
83                tokio::pin!(stream);
84
85                while let Some(msg) = stream.next().await {
86                    match msg {
87                        NautilusWsMessage::Data(data_vec) => {
88                            Python::attach(|py| {
89                                for data in data_vec {
90                                    let py_obj = data_to_pycapsule(py, data);
91                                    call_python(py, &callback, py_obj);
92                                }
93                            });
94                        }
95                        NautilusWsMessage::Deltas(deltas) => {
96                            Python::attach(|py| {
97                                let py_obj = data_to_pycapsule(
98                                    py,
99                                    Data::Deltas(OrderBookDeltas_API::new(deltas)),
100                                );
101                                call_python(py, &callback, py_obj);
102                            });
103                        }
104                        NautilusWsMessage::Error(err) => {
105                            log::warn!(
106                                "Binance WebSocket error: code={}, msg={}",
107                                err.code,
108                                err.msg
109                            );
110                        }
111                        NautilusWsMessage::Reconnected => {
112                            log::info!("Binance Spot WebSocket reconnected");
113                        }
114                        _ => {}
115                    }
116                }
117            });
118
119            Ok(())
120        })
121    }
122
123    #[pyo3(name = "close")]
124    fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
125        let mut client = self.clone();
126
127        pyo3_async_runtimes::tokio::future_into_py(py, async move {
128            if let Err(e) = client.close().await {
129                log::error!("Error on close: {e}");
130            }
131            Ok(())
132        })
133    }
134
135    #[pyo3(name = "subscribe")]
136    fn py_subscribe<'py>(
137        &self,
138        py: Python<'py>,
139        streams: Vec<String>,
140    ) -> PyResult<Bound<'py, PyAny>> {
141        let client = self.clone();
142
143        pyo3_async_runtimes::tokio::future_into_py(py, async move {
144            client.subscribe(streams).await.map_err(to_pyruntime_err)?;
145            Ok(())
146        })
147    }
148
149    #[pyo3(name = "unsubscribe")]
150    fn py_unsubscribe<'py>(
151        &self,
152        py: Python<'py>,
153        streams: Vec<String>,
154    ) -> PyResult<Bound<'py, PyAny>> {
155        let client = self.clone();
156
157        pyo3_async_runtimes::tokio::future_into_py(py, async move {
158            client
159                .unsubscribe(streams)
160                .await
161                .map_err(to_pyruntime_err)?;
162            Ok(())
163        })
164    }
165}
166
167#[pymethods]
168impl BinanceFuturesWebSocketClient {
169    #[new]
170    #[pyo3(signature = (product_type, environment, api_key=None, api_secret=None, url_override=None, heartbeat=None))]
171    fn py_new(
172        product_type: BinanceProductType,
173        environment: BinanceEnvironment,
174        api_key: Option<String>,
175        api_secret: Option<String>,
176        url_override: Option<String>,
177        heartbeat: Option<u64>,
178    ) -> PyResult<Self> {
179        Self::new(
180            product_type,
181            environment,
182            api_key,
183            api_secret,
184            url_override,
185            heartbeat,
186        )
187        .map_err(to_pyruntime_err)
188    }
189
190    #[getter]
191    #[pyo3(name = "product_type")]
192    fn py_product_type(&self) -> BinanceProductType {
193        self.product_type()
194    }
195
196    #[pyo3(name = "is_active")]
197    fn py_is_active(&self) -> bool {
198        self.is_active()
199    }
200
201    #[pyo3(name = "is_closed")]
202    fn py_is_closed(&self) -> bool {
203        self.is_closed()
204    }
205
206    #[pyo3(name = "subscription_count")]
207    fn py_subscription_count(&self) -> usize {
208        self.subscription_count()
209    }
210
211    #[pyo3(name = "cache_instrument")]
212    fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
213        self.cache_instrument(pyobject_to_instrument_any(py, instrument)?);
214        Ok(())
215    }
216
217    #[pyo3(name = "connect")]
218    fn py_connect<'py>(
219        &mut self,
220        py: Python<'py>,
221        callback: Py<PyAny>,
222    ) -> PyResult<Bound<'py, PyAny>> {
223        let mut client = self.clone();
224
225        pyo3_async_runtimes::tokio::future_into_py(py, async move {
226            client.connect().await.map_err(to_pyruntime_err)?;
227
228            let stream = client.stream();
229
230            get_runtime().spawn(async move {
231                tokio::pin!(stream);
232
233                while let Some(msg) = stream.next().await {
234                    match msg {
235                        NautilusFuturesWsMessage::Data(data_vec) => {
236                            Python::attach(|py| {
237                                for data in data_vec {
238                                    let py_obj = data_to_pycapsule(py, data);
239                                    call_python(py, &callback, py_obj);
240                                }
241                            });
242                        }
243                        NautilusFuturesWsMessage::Deltas(deltas) => {
244                            Python::attach(|py| {
245                                let py_obj = data_to_pycapsule(
246                                    py,
247                                    Data::Deltas(OrderBookDeltas_API::new(deltas)),
248                                );
249                                call_python(py, &callback, py_obj);
250                            });
251                        }
252                        NautilusFuturesWsMessage::Error(err) => {
253                            log::warn!(
254                                "Binance WebSocket error: code={}, msg={}",
255                                err.code,
256                                err.msg
257                            );
258                        }
259                        NautilusFuturesWsMessage::Reconnected => {
260                            log::info!("Binance Futures WebSocket reconnected");
261                        }
262                        _ => {}
263                    }
264                }
265            });
266
267            Ok(())
268        })
269    }
270
271    #[pyo3(name = "close")]
272    fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
273        let mut client = self.clone();
274
275        pyo3_async_runtimes::tokio::future_into_py(py, async move {
276            if let Err(e) = client.close().await {
277                log::error!("Error on close: {e}");
278            }
279            Ok(())
280        })
281    }
282
283    #[pyo3(name = "subscribe")]
284    fn py_subscribe<'py>(
285        &self,
286        py: Python<'py>,
287        streams: Vec<String>,
288    ) -> PyResult<Bound<'py, PyAny>> {
289        let client = self.clone();
290
291        pyo3_async_runtimes::tokio::future_into_py(py, async move {
292            client.subscribe(streams).await.map_err(to_pyruntime_err)?;
293            Ok(())
294        })
295    }
296
297    #[pyo3(name = "unsubscribe")]
298    fn py_unsubscribe<'py>(
299        &self,
300        py: Python<'py>,
301        streams: Vec<String>,
302    ) -> PyResult<Bound<'py, PyAny>> {
303        let client = self.clone();
304
305        pyo3_async_runtimes::tokio::future_into_py(py, async move {
306            client
307                .unsubscribe(streams)
308                .await
309                .map_err(to_pyruntime_err)?;
310            Ok(())
311        })
312    }
313}