nautilus_binance/python/
websocket.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for Binance WebSocket clients.
17
18use futures_util::StreamExt;
19use nautilus_common::live::get_runtime;
20use nautilus_core::python::to_pyruntime_err;
21use nautilus_model::{
22    data::{Data, OrderBookDeltas_API},
23    python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
24};
25use pyo3::prelude::*;
26
27use crate::{
28    common::enums::{BinanceEnvironment, BinanceProductType},
29    futures::websocket::{
30        client::BinanceFuturesWebSocketClient, messages::NautilusFuturesWsMessage,
31    },
32    spot::websocket::{client::BinanceSpotWebSocketClient, messages::NautilusWsMessage},
33};
34
35#[pymethods]
36impl BinanceSpotWebSocketClient {
37    #[new]
38    #[pyo3(signature = (url=None, api_key=None, api_secret=None, heartbeat=None))]
39    fn py_new(
40        url: Option<String>,
41        api_key: Option<String>,
42        api_secret: Option<String>,
43        heartbeat: Option<u64>,
44    ) -> PyResult<Self> {
45        Self::new(url, api_key, api_secret, heartbeat).map_err(to_pyruntime_err)
46    }
47
48    #[pyo3(name = "is_active")]
49    fn py_is_active(&self) -> bool {
50        self.is_active()
51    }
52
53    #[pyo3(name = "is_closed")]
54    fn py_is_closed(&self) -> bool {
55        self.is_closed()
56    }
57
58    #[pyo3(name = "subscription_count")]
59    fn py_subscription_count(&self) -> usize {
60        self.subscription_count()
61    }
62
63    #[pyo3(name = "cache_instrument")]
64    fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
65        self.cache_instrument(pyobject_to_instrument_any(py, instrument)?);
66        Ok(())
67    }
68
69    #[pyo3(name = "connect")]
70    fn py_connect<'py>(
71        &mut self,
72        py: Python<'py>,
73        callback: Py<PyAny>,
74    ) -> PyResult<Bound<'py, PyAny>> {
75        let mut client = self.clone();
76
77        pyo3_async_runtimes::tokio::future_into_py(py, async move {
78            client.connect().await.map_err(to_pyruntime_err)?;
79
80            let stream = client.stream();
81
82            get_runtime().spawn(async move {
83                tokio::pin!(stream);
84
85                while let Some(msg) = stream.next().await {
86                    match msg {
87                        NautilusWsMessage::Data(data_vec) => {
88                            Python::attach(|py| {
89                                for data in data_vec {
90                                    let py_obj = data_to_pycapsule(py, data);
91                                    call_python(py, &callback, py_obj);
92                                }
93                            });
94                        }
95                        NautilusWsMessage::Deltas(deltas) => {
96                            Python::attach(|py| {
97                                let py_obj = data_to_pycapsule(
98                                    py,
99                                    Data::Deltas(OrderBookDeltas_API::new(deltas)),
100                                );
101                                call_python(py, &callback, py_obj);
102                            });
103                        }
104                        NautilusWsMessage::Error(err) => {
105                            tracing::warn!(code = err.code, msg = %err.msg, "Binance WebSocket error");
106                        }
107                        NautilusWsMessage::Reconnected => {
108                            tracing::info!("Binance Spot WebSocket reconnected");
109                        }
110                        _ => {}
111                    }
112                }
113            });
114
115            Ok(())
116        })
117    }
118
119    #[pyo3(name = "close")]
120    fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
121        let mut client = self.clone();
122
123        pyo3_async_runtimes::tokio::future_into_py(py, async move {
124            if let Err(e) = client.close().await {
125                tracing::error!("Error on close: {e}");
126            }
127            Ok(())
128        })
129    }
130
131    #[pyo3(name = "subscribe")]
132    fn py_subscribe<'py>(
133        &self,
134        py: Python<'py>,
135        streams: Vec<String>,
136    ) -> PyResult<Bound<'py, PyAny>> {
137        let client = self.clone();
138
139        pyo3_async_runtimes::tokio::future_into_py(py, async move {
140            client.subscribe(streams).await.map_err(to_pyruntime_err)?;
141            Ok(())
142        })
143    }
144
145    #[pyo3(name = "unsubscribe")]
146    fn py_unsubscribe<'py>(
147        &self,
148        py: Python<'py>,
149        streams: Vec<String>,
150    ) -> PyResult<Bound<'py, PyAny>> {
151        let client = self.clone();
152
153        pyo3_async_runtimes::tokio::future_into_py(py, async move {
154            client
155                .unsubscribe(streams)
156                .await
157                .map_err(to_pyruntime_err)?;
158            Ok(())
159        })
160    }
161}
162
163#[pymethods]
164impl BinanceFuturesWebSocketClient {
165    #[new]
166    #[pyo3(signature = (product_type, environment, api_key=None, api_secret=None, url_override=None, heartbeat=None))]
167    fn py_new(
168        product_type: BinanceProductType,
169        environment: BinanceEnvironment,
170        api_key: Option<String>,
171        api_secret: Option<String>,
172        url_override: Option<String>,
173        heartbeat: Option<u64>,
174    ) -> PyResult<Self> {
175        Self::new(
176            product_type,
177            environment,
178            api_key,
179            api_secret,
180            url_override,
181            heartbeat,
182        )
183        .map_err(to_pyruntime_err)
184    }
185
186    #[getter]
187    #[pyo3(name = "product_type")]
188    fn py_product_type(&self) -> BinanceProductType {
189        self.product_type()
190    }
191
192    #[pyo3(name = "is_active")]
193    fn py_is_active(&self) -> bool {
194        self.is_active()
195    }
196
197    #[pyo3(name = "is_closed")]
198    fn py_is_closed(&self) -> bool {
199        self.is_closed()
200    }
201
202    #[pyo3(name = "subscription_count")]
203    fn py_subscription_count(&self) -> usize {
204        self.subscription_count()
205    }
206
207    #[pyo3(name = "cache_instrument")]
208    fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
209        self.cache_instrument(pyobject_to_instrument_any(py, instrument)?);
210        Ok(())
211    }
212
213    #[pyo3(name = "connect")]
214    fn py_connect<'py>(
215        &mut self,
216        py: Python<'py>,
217        callback: Py<PyAny>,
218    ) -> PyResult<Bound<'py, PyAny>> {
219        let mut client = self.clone();
220
221        pyo3_async_runtimes::tokio::future_into_py(py, async move {
222            client.connect().await.map_err(to_pyruntime_err)?;
223
224            let stream = client.stream();
225
226            get_runtime().spawn(async move {
227                tokio::pin!(stream);
228
229                while let Some(msg) = stream.next().await {
230                    match msg {
231                        NautilusFuturesWsMessage::Data(data_vec) => {
232                            Python::attach(|py| {
233                                for data in data_vec {
234                                    let py_obj = data_to_pycapsule(py, data);
235                                    call_python(py, &callback, py_obj);
236                                }
237                            });
238                        }
239                        NautilusFuturesWsMessage::Deltas(deltas) => {
240                            Python::attach(|py| {
241                                let py_obj = data_to_pycapsule(
242                                    py,
243                                    Data::Deltas(OrderBookDeltas_API::new(deltas)),
244                                );
245                                call_python(py, &callback, py_obj);
246                            });
247                        }
248                        NautilusFuturesWsMessage::Error(err) => {
249                            tracing::warn!(code = err.code, msg = %err.msg, "Binance WebSocket error");
250                        }
251                        NautilusFuturesWsMessage::Reconnected => {
252                            tracing::info!("Binance Futures WebSocket reconnected");
253                        }
254                        _ => {}
255                    }
256                }
257            });
258
259            Ok(())
260        })
261    }
262
263    #[pyo3(name = "close")]
264    fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
265        let mut client = self.clone();
266
267        pyo3_async_runtimes::tokio::future_into_py(py, async move {
268            if let Err(e) = client.close().await {
269                tracing::error!("Error on close: {e}");
270            }
271            Ok(())
272        })
273    }
274
275    #[pyo3(name = "subscribe")]
276    fn py_subscribe<'py>(
277        &self,
278        py: Python<'py>,
279        streams: Vec<String>,
280    ) -> PyResult<Bound<'py, PyAny>> {
281        let client = self.clone();
282
283        pyo3_async_runtimes::tokio::future_into_py(py, async move {
284            client.subscribe(streams).await.map_err(to_pyruntime_err)?;
285            Ok(())
286        })
287    }
288
289    #[pyo3(name = "unsubscribe")]
290    fn py_unsubscribe<'py>(
291        &self,
292        py: Python<'py>,
293        streams: Vec<String>,
294    ) -> PyResult<Bound<'py, PyAny>> {
295        let client = self.clone();
296
297        pyo3_async_runtimes::tokio::future_into_py(py, async move {
298            client
299                .unsubscribe(streams)
300                .await
301                .map_err(to_pyruntime_err)?;
302            Ok(())
303        })
304    }
305}
306
307fn call_python(py: Python, callback: &Py<PyAny>, py_obj: Py<PyAny>) {
308    if let Err(e) = callback.call1(py, (py_obj,)) {
309        tracing::error!("Error calling Python callback: {e}");
310    }
311}