1use futures_util::StreamExt;
19use nautilus_common::live::get_runtime;
20use nautilus_core::python::to_pyruntime_err;
21use nautilus_model::{
22 data::{Data, OrderBookDeltas_API},
23 python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
24};
25use pyo3::prelude::*;
26
27use crate::{
28 common::enums::{BinanceEnvironment, BinanceProductType},
29 futures::websocket::{
30 client::BinanceFuturesWebSocketClient, messages::NautilusFuturesWsMessage,
31 },
32 spot::websocket::{client::BinanceSpotWebSocketClient, messages::NautilusWsMessage},
33};
34
35#[pymethods]
36impl BinanceSpotWebSocketClient {
37 #[new]
38 #[pyo3(signature = (url=None, api_key=None, api_secret=None, heartbeat=None))]
39 fn py_new(
40 url: Option<String>,
41 api_key: Option<String>,
42 api_secret: Option<String>,
43 heartbeat: Option<u64>,
44 ) -> PyResult<Self> {
45 Self::new(url, api_key, api_secret, heartbeat).map_err(to_pyruntime_err)
46 }
47
48 #[pyo3(name = "is_active")]
49 fn py_is_active(&self) -> bool {
50 self.is_active()
51 }
52
53 #[pyo3(name = "is_closed")]
54 fn py_is_closed(&self) -> bool {
55 self.is_closed()
56 }
57
58 #[pyo3(name = "subscription_count")]
59 fn py_subscription_count(&self) -> usize {
60 self.subscription_count()
61 }
62
63 #[pyo3(name = "cache_instrument")]
64 fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
65 self.cache_instrument(pyobject_to_instrument_any(py, instrument)?);
66 Ok(())
67 }
68
69 #[pyo3(name = "connect")]
70 fn py_connect<'py>(
71 &mut self,
72 py: Python<'py>,
73 callback: Py<PyAny>,
74 ) -> PyResult<Bound<'py, PyAny>> {
75 let mut client = self.clone();
76
77 pyo3_async_runtimes::tokio::future_into_py(py, async move {
78 client.connect().await.map_err(to_pyruntime_err)?;
79
80 let stream = client.stream();
81
82 get_runtime().spawn(async move {
83 tokio::pin!(stream);
84
85 while let Some(msg) = stream.next().await {
86 match msg {
87 NautilusWsMessage::Data(data_vec) => {
88 Python::attach(|py| {
89 for data in data_vec {
90 let py_obj = data_to_pycapsule(py, data);
91 call_python(py, &callback, py_obj);
92 }
93 });
94 }
95 NautilusWsMessage::Deltas(deltas) => {
96 Python::attach(|py| {
97 let py_obj = data_to_pycapsule(
98 py,
99 Data::Deltas(OrderBookDeltas_API::new(deltas)),
100 );
101 call_python(py, &callback, py_obj);
102 });
103 }
104 NautilusWsMessage::Error(err) => {
105 tracing::warn!(code = err.code, msg = %err.msg, "Binance WebSocket error");
106 }
107 NautilusWsMessage::Reconnected => {
108 tracing::info!("Binance Spot WebSocket reconnected");
109 }
110 _ => {}
111 }
112 }
113 });
114
115 Ok(())
116 })
117 }
118
119 #[pyo3(name = "close")]
120 fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
121 let mut client = self.clone();
122
123 pyo3_async_runtimes::tokio::future_into_py(py, async move {
124 if let Err(e) = client.close().await {
125 tracing::error!("Error on close: {e}");
126 }
127 Ok(())
128 })
129 }
130
131 #[pyo3(name = "subscribe")]
132 fn py_subscribe<'py>(
133 &self,
134 py: Python<'py>,
135 streams: Vec<String>,
136 ) -> PyResult<Bound<'py, PyAny>> {
137 let client = self.clone();
138
139 pyo3_async_runtimes::tokio::future_into_py(py, async move {
140 client.subscribe(streams).await.map_err(to_pyruntime_err)?;
141 Ok(())
142 })
143 }
144
145 #[pyo3(name = "unsubscribe")]
146 fn py_unsubscribe<'py>(
147 &self,
148 py: Python<'py>,
149 streams: Vec<String>,
150 ) -> PyResult<Bound<'py, PyAny>> {
151 let client = self.clone();
152
153 pyo3_async_runtimes::tokio::future_into_py(py, async move {
154 client
155 .unsubscribe(streams)
156 .await
157 .map_err(to_pyruntime_err)?;
158 Ok(())
159 })
160 }
161}
162
163#[pymethods]
164impl BinanceFuturesWebSocketClient {
165 #[new]
166 #[pyo3(signature = (product_type, environment, api_key=None, api_secret=None, url_override=None, heartbeat=None))]
167 fn py_new(
168 product_type: BinanceProductType,
169 environment: BinanceEnvironment,
170 api_key: Option<String>,
171 api_secret: Option<String>,
172 url_override: Option<String>,
173 heartbeat: Option<u64>,
174 ) -> PyResult<Self> {
175 Self::new(
176 product_type,
177 environment,
178 api_key,
179 api_secret,
180 url_override,
181 heartbeat,
182 )
183 .map_err(to_pyruntime_err)
184 }
185
186 #[getter]
187 #[pyo3(name = "product_type")]
188 fn py_product_type(&self) -> BinanceProductType {
189 self.product_type()
190 }
191
192 #[pyo3(name = "is_active")]
193 fn py_is_active(&self) -> bool {
194 self.is_active()
195 }
196
197 #[pyo3(name = "is_closed")]
198 fn py_is_closed(&self) -> bool {
199 self.is_closed()
200 }
201
202 #[pyo3(name = "subscription_count")]
203 fn py_subscription_count(&self) -> usize {
204 self.subscription_count()
205 }
206
207 #[pyo3(name = "cache_instrument")]
208 fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
209 self.cache_instrument(pyobject_to_instrument_any(py, instrument)?);
210 Ok(())
211 }
212
213 #[pyo3(name = "connect")]
214 fn py_connect<'py>(
215 &mut self,
216 py: Python<'py>,
217 callback: Py<PyAny>,
218 ) -> PyResult<Bound<'py, PyAny>> {
219 let mut client = self.clone();
220
221 pyo3_async_runtimes::tokio::future_into_py(py, async move {
222 client.connect().await.map_err(to_pyruntime_err)?;
223
224 let stream = client.stream();
225
226 get_runtime().spawn(async move {
227 tokio::pin!(stream);
228
229 while let Some(msg) = stream.next().await {
230 match msg {
231 NautilusFuturesWsMessage::Data(data_vec) => {
232 Python::attach(|py| {
233 for data in data_vec {
234 let py_obj = data_to_pycapsule(py, data);
235 call_python(py, &callback, py_obj);
236 }
237 });
238 }
239 NautilusFuturesWsMessage::Deltas(deltas) => {
240 Python::attach(|py| {
241 let py_obj = data_to_pycapsule(
242 py,
243 Data::Deltas(OrderBookDeltas_API::new(deltas)),
244 );
245 call_python(py, &callback, py_obj);
246 });
247 }
248 NautilusFuturesWsMessage::Error(err) => {
249 tracing::warn!(code = err.code, msg = %err.msg, "Binance WebSocket error");
250 }
251 NautilusFuturesWsMessage::Reconnected => {
252 tracing::info!("Binance Futures WebSocket reconnected");
253 }
254 _ => {}
255 }
256 }
257 });
258
259 Ok(())
260 })
261 }
262
263 #[pyo3(name = "close")]
264 fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
265 let mut client = self.clone();
266
267 pyo3_async_runtimes::tokio::future_into_py(py, async move {
268 if let Err(e) = client.close().await {
269 tracing::error!("Error on close: {e}");
270 }
271 Ok(())
272 })
273 }
274
275 #[pyo3(name = "subscribe")]
276 fn py_subscribe<'py>(
277 &self,
278 py: Python<'py>,
279 streams: Vec<String>,
280 ) -> PyResult<Bound<'py, PyAny>> {
281 let client = self.clone();
282
283 pyo3_async_runtimes::tokio::future_into_py(py, async move {
284 client.subscribe(streams).await.map_err(to_pyruntime_err)?;
285 Ok(())
286 })
287 }
288
289 #[pyo3(name = "unsubscribe")]
290 fn py_unsubscribe<'py>(
291 &self,
292 py: Python<'py>,
293 streams: Vec<String>,
294 ) -> PyResult<Bound<'py, PyAny>> {
295 let client = self.clone();
296
297 pyo3_async_runtimes::tokio::future_into_py(py, async move {
298 client
299 .unsubscribe(streams)
300 .await
301 .map_err(to_pyruntime_err)?;
302 Ok(())
303 })
304 }
305}
306
307fn call_python(py: Python, callback: &Py<PyAny>, py_obj: Py<PyAny>) {
308 if let Err(e) = callback.call1(py, (py_obj,)) {
309 tracing::error!("Error calling Python callback: {e}");
310 }
311}