Skip to main content

nautilus_binance/python/
websocket_spot.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the Binance Spot WebSocket client.
17
18use futures_util::StreamExt;
19use nautilus_common::live::get_runtime;
20use nautilus_core::python::{call_python, to_pyruntime_err};
21use nautilus_model::{
22    data::{Data, OrderBookDeltas_API},
23    python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
24};
25use pyo3::prelude::*;
26
27use crate::spot::websocket::streams::{
28    client::BinanceSpotWebSocketClient,
29    messages::{BinanceSpotWsMessage, NautilusSpotDataWsMessage},
30};
31
32#[pymethods]
33impl BinanceSpotWebSocketClient {
34    #[new]
35    #[pyo3(signature = (url=None, api_key=None, api_secret=None, heartbeat=None))]
36    fn py_new(
37        url: Option<String>,
38        api_key: Option<String>,
39        api_secret: Option<String>,
40        heartbeat: Option<u64>,
41    ) -> PyResult<Self> {
42        Self::new(url, api_key, api_secret, heartbeat).map_err(to_pyruntime_err)
43    }
44
45    #[pyo3(name = "is_active")]
46    fn py_is_active(&self) -> bool {
47        self.is_active()
48    }
49
50    #[pyo3(name = "is_closed")]
51    fn py_is_closed(&self) -> bool {
52        self.is_closed()
53    }
54
55    #[pyo3(name = "subscription_count")]
56    fn py_subscription_count(&self) -> usize {
57        self.subscription_count()
58    }
59
60    #[pyo3(name = "cache_instrument")]
61    fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
62        self.cache_instrument(pyobject_to_instrument_any(py, instrument)?);
63        Ok(())
64    }
65
66    #[pyo3(name = "connect")]
67    fn py_connect<'py>(
68        &mut self,
69        py: Python<'py>,
70        callback: Py<PyAny>,
71    ) -> PyResult<Bound<'py, PyAny>> {
72        let mut client = self.clone();
73
74        pyo3_async_runtimes::tokio::future_into_py(py, async move {
75            client.connect().await.map_err(to_pyruntime_err)?;
76
77            let stream = client.stream();
78
79            get_runtime().spawn(async move {
80                tokio::pin!(stream);
81
82                while let Some(msg) = stream.next().await {
83                    match msg {
84                        BinanceSpotWsMessage::Data(data_msg) => match data_msg {
85                            NautilusSpotDataWsMessage::Data(data_vec) => {
86                                Python::attach(|py| {
87                                    for data in data_vec {
88                                        let py_obj = data_to_pycapsule(py, data);
89                                        call_python(py, &callback, py_obj);
90                                    }
91                                });
92                            }
93                            NautilusSpotDataWsMessage::Deltas(deltas) => {
94                                Python::attach(|py| {
95                                    let py_obj = data_to_pycapsule(
96                                        py,
97                                        Data::Deltas(OrderBookDeltas_API::new(deltas)),
98                                    );
99                                    call_python(py, &callback, py_obj);
100                                });
101                            }
102                            _ => {}
103                        },
104                        BinanceSpotWsMessage::Error(err) => {
105                            log::warn!(
106                                "Binance WebSocket error: code={}, msg={}",
107                                err.code,
108                                err.msg
109                            );
110                        }
111                        BinanceSpotWsMessage::Reconnected => {
112                            log::info!("Binance Spot WebSocket reconnected");
113                        }
114                    }
115                }
116            });
117
118            Ok(())
119        })
120    }
121
122    #[pyo3(name = "close")]
123    fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
124        let mut client = self.clone();
125
126        pyo3_async_runtimes::tokio::future_into_py(py, async move {
127            if let Err(e) = client.close().await {
128                log::error!("Error on close: {e}");
129            }
130            Ok(())
131        })
132    }
133
134    #[pyo3(name = "subscribe")]
135    fn py_subscribe<'py>(
136        &self,
137        py: Python<'py>,
138        streams: Vec<String>,
139    ) -> PyResult<Bound<'py, PyAny>> {
140        let client = self.clone();
141
142        pyo3_async_runtimes::tokio::future_into_py(py, async move {
143            client.subscribe(streams).await.map_err(to_pyruntime_err)?;
144            Ok(())
145        })
146    }
147
148    #[pyo3(name = "unsubscribe")]
149    fn py_unsubscribe<'py>(
150        &self,
151        py: Python<'py>,
152        streams: Vec<String>,
153    ) -> PyResult<Bound<'py, PyAny>> {
154        let client = self.clone();
155
156        pyo3_async_runtimes::tokio::future_into_py(py, async move {
157            client
158                .unsubscribe(streams)
159                .await
160                .map_err(to_pyruntime_err)?;
161            Ok(())
162        })
163    }
164}