1use futures_util::StreamExt;
19use nautilus_common::live::get_runtime;
20use nautilus_core::python::{call_python, to_pyruntime_err};
21use nautilus_model::{
22 data::{Data, OrderBookDeltas_API},
23 python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
24};
25use pyo3::prelude::*;
26
27use crate::{
28 common::enums::{BinanceEnvironment, BinanceProductType},
29 futures::websocket::{
30 client::BinanceFuturesWebSocketClient, messages::NautilusFuturesWsMessage,
31 },
32 spot::websocket::streams::{client::BinanceSpotWebSocketClient, messages::NautilusWsMessage},
33};
34
35#[pymethods]
36impl BinanceSpotWebSocketClient {
37 #[new]
38 #[pyo3(signature = (url=None, api_key=None, api_secret=None, heartbeat=None))]
39 fn py_new(
40 url: Option<String>,
41 api_key: Option<String>,
42 api_secret: Option<String>,
43 heartbeat: Option<u64>,
44 ) -> PyResult<Self> {
45 Self::new(url, api_key, api_secret, heartbeat).map_err(to_pyruntime_err)
46 }
47
48 #[pyo3(name = "is_active")]
49 fn py_is_active(&self) -> bool {
50 self.is_active()
51 }
52
53 #[pyo3(name = "is_closed")]
54 fn py_is_closed(&self) -> bool {
55 self.is_closed()
56 }
57
58 #[pyo3(name = "subscription_count")]
59 fn py_subscription_count(&self) -> usize {
60 self.subscription_count()
61 }
62
63 #[pyo3(name = "cache_instrument")]
64 fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
65 self.cache_instrument(pyobject_to_instrument_any(py, instrument)?);
66 Ok(())
67 }
68
69 #[pyo3(name = "connect")]
70 fn py_connect<'py>(
71 &mut self,
72 py: Python<'py>,
73 callback: Py<PyAny>,
74 ) -> PyResult<Bound<'py, PyAny>> {
75 let mut client = self.clone();
76
77 pyo3_async_runtimes::tokio::future_into_py(py, async move {
78 client.connect().await.map_err(to_pyruntime_err)?;
79
80 let stream = client.stream();
81
82 get_runtime().spawn(async move {
83 tokio::pin!(stream);
84
85 while let Some(msg) = stream.next().await {
86 match msg {
87 NautilusWsMessage::Data(data_vec) => {
88 Python::attach(|py| {
89 for data in data_vec {
90 let py_obj = data_to_pycapsule(py, data);
91 call_python(py, &callback, py_obj);
92 }
93 });
94 }
95 NautilusWsMessage::Deltas(deltas) => {
96 Python::attach(|py| {
97 let py_obj = data_to_pycapsule(
98 py,
99 Data::Deltas(OrderBookDeltas_API::new(deltas)),
100 );
101 call_python(py, &callback, py_obj);
102 });
103 }
104 NautilusWsMessage::Error(err) => {
105 log::warn!(
106 "Binance WebSocket error: code={}, msg={}",
107 err.code,
108 err.msg
109 );
110 }
111 NautilusWsMessage::Reconnected => {
112 log::info!("Binance Spot WebSocket reconnected");
113 }
114 _ => {}
115 }
116 }
117 });
118
119 Ok(())
120 })
121 }
122
123 #[pyo3(name = "close")]
124 fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
125 let mut client = self.clone();
126
127 pyo3_async_runtimes::tokio::future_into_py(py, async move {
128 if let Err(e) = client.close().await {
129 log::error!("Error on close: {e}");
130 }
131 Ok(())
132 })
133 }
134
135 #[pyo3(name = "subscribe")]
136 fn py_subscribe<'py>(
137 &self,
138 py: Python<'py>,
139 streams: Vec<String>,
140 ) -> PyResult<Bound<'py, PyAny>> {
141 let client = self.clone();
142
143 pyo3_async_runtimes::tokio::future_into_py(py, async move {
144 client.subscribe(streams).await.map_err(to_pyruntime_err)?;
145 Ok(())
146 })
147 }
148
149 #[pyo3(name = "unsubscribe")]
150 fn py_unsubscribe<'py>(
151 &self,
152 py: Python<'py>,
153 streams: Vec<String>,
154 ) -> PyResult<Bound<'py, PyAny>> {
155 let client = self.clone();
156
157 pyo3_async_runtimes::tokio::future_into_py(py, async move {
158 client
159 .unsubscribe(streams)
160 .await
161 .map_err(to_pyruntime_err)?;
162 Ok(())
163 })
164 }
165}
166
167#[pymethods]
168impl BinanceFuturesWebSocketClient {
169 #[new]
170 #[pyo3(signature = (product_type, environment, api_key=None, api_secret=None, url_override=None, heartbeat=None))]
171 fn py_new(
172 product_type: BinanceProductType,
173 environment: BinanceEnvironment,
174 api_key: Option<String>,
175 api_secret: Option<String>,
176 url_override: Option<String>,
177 heartbeat: Option<u64>,
178 ) -> PyResult<Self> {
179 Self::new(
180 product_type,
181 environment,
182 api_key,
183 api_secret,
184 url_override,
185 heartbeat,
186 )
187 .map_err(to_pyruntime_err)
188 }
189
190 #[getter]
191 #[pyo3(name = "product_type")]
192 fn py_product_type(&self) -> BinanceProductType {
193 self.product_type()
194 }
195
196 #[pyo3(name = "is_active")]
197 fn py_is_active(&self) -> bool {
198 self.is_active()
199 }
200
201 #[pyo3(name = "is_closed")]
202 fn py_is_closed(&self) -> bool {
203 self.is_closed()
204 }
205
206 #[pyo3(name = "subscription_count")]
207 fn py_subscription_count(&self) -> usize {
208 self.subscription_count()
209 }
210
211 #[pyo3(name = "cache_instrument")]
212 fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
213 self.cache_instrument(pyobject_to_instrument_any(py, instrument)?);
214 Ok(())
215 }
216
217 #[pyo3(name = "connect")]
218 fn py_connect<'py>(
219 &mut self,
220 py: Python<'py>,
221 callback: Py<PyAny>,
222 ) -> PyResult<Bound<'py, PyAny>> {
223 let mut client = self.clone();
224
225 pyo3_async_runtimes::tokio::future_into_py(py, async move {
226 client.connect().await.map_err(to_pyruntime_err)?;
227
228 let stream = client.stream();
229
230 get_runtime().spawn(async move {
231 tokio::pin!(stream);
232
233 while let Some(msg) = stream.next().await {
234 match msg {
235 NautilusFuturesWsMessage::Data(data_vec) => {
236 Python::attach(|py| {
237 for data in data_vec {
238 let py_obj = data_to_pycapsule(py, data);
239 call_python(py, &callback, py_obj);
240 }
241 });
242 }
243 NautilusFuturesWsMessage::Deltas(deltas) => {
244 Python::attach(|py| {
245 let py_obj = data_to_pycapsule(
246 py,
247 Data::Deltas(OrderBookDeltas_API::new(deltas)),
248 );
249 call_python(py, &callback, py_obj);
250 });
251 }
252 NautilusFuturesWsMessage::Error(err) => {
253 log::warn!(
254 "Binance WebSocket error: code={}, msg={}",
255 err.code,
256 err.msg
257 );
258 }
259 NautilusFuturesWsMessage::Reconnected => {
260 log::info!("Binance Futures WebSocket reconnected");
261 }
262 _ => {}
263 }
264 }
265 });
266
267 Ok(())
268 })
269 }
270
271 #[pyo3(name = "close")]
272 fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
273 let mut client = self.clone();
274
275 pyo3_async_runtimes::tokio::future_into_py(py, async move {
276 if let Err(e) = client.close().await {
277 log::error!("Error on close: {e}");
278 }
279 Ok(())
280 })
281 }
282
283 #[pyo3(name = "subscribe")]
284 fn py_subscribe<'py>(
285 &self,
286 py: Python<'py>,
287 streams: Vec<String>,
288 ) -> PyResult<Bound<'py, PyAny>> {
289 let client = self.clone();
290
291 pyo3_async_runtimes::tokio::future_into_py(py, async move {
292 client.subscribe(streams).await.map_err(to_pyruntime_err)?;
293 Ok(())
294 })
295 }
296
297 #[pyo3(name = "unsubscribe")]
298 fn py_unsubscribe<'py>(
299 &self,
300 py: Python<'py>,
301 streams: Vec<String>,
302 ) -> PyResult<Bound<'py, PyAny>> {
303 let client = self.clone();
304
305 pyo3_async_runtimes::tokio::future_into_py(py, async move {
306 client
307 .unsubscribe(streams)
308 .await
309 .map_err(to_pyruntime_err)?;
310 Ok(())
311 })
312 }
313}