nautilus_binance/python/
websocket_spot.rs1use futures_util::StreamExt;
19use nautilus_common::live::get_runtime;
20use nautilus_core::python::{call_python, to_pyruntime_err};
21use nautilus_model::{
22 data::{Data, OrderBookDeltas_API},
23 python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
24};
25use pyo3::prelude::*;
26
27use crate::spot::websocket::streams::{
28 client::BinanceSpotWebSocketClient,
29 messages::{BinanceSpotWsMessage, NautilusSpotDataWsMessage},
30};
31
32#[pymethods]
33impl BinanceSpotWebSocketClient {
34 #[new]
35 #[pyo3(signature = (url=None, api_key=None, api_secret=None, heartbeat=None))]
36 fn py_new(
37 url: Option<String>,
38 api_key: Option<String>,
39 api_secret: Option<String>,
40 heartbeat: Option<u64>,
41 ) -> PyResult<Self> {
42 Self::new(url, api_key, api_secret, heartbeat).map_err(to_pyruntime_err)
43 }
44
45 #[pyo3(name = "is_active")]
46 fn py_is_active(&self) -> bool {
47 self.is_active()
48 }
49
50 #[pyo3(name = "is_closed")]
51 fn py_is_closed(&self) -> bool {
52 self.is_closed()
53 }
54
55 #[pyo3(name = "subscription_count")]
56 fn py_subscription_count(&self) -> usize {
57 self.subscription_count()
58 }
59
60 #[pyo3(name = "cache_instrument")]
61 fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
62 self.cache_instrument(pyobject_to_instrument_any(py, instrument)?);
63 Ok(())
64 }
65
66 #[pyo3(name = "connect")]
67 fn py_connect<'py>(
68 &mut self,
69 py: Python<'py>,
70 callback: Py<PyAny>,
71 ) -> PyResult<Bound<'py, PyAny>> {
72 let mut client = self.clone();
73
74 pyo3_async_runtimes::tokio::future_into_py(py, async move {
75 client.connect().await.map_err(to_pyruntime_err)?;
76
77 let stream = client.stream();
78
79 get_runtime().spawn(async move {
80 tokio::pin!(stream);
81
82 while let Some(msg) = stream.next().await {
83 match msg {
84 BinanceSpotWsMessage::Data(data_msg) => match data_msg {
85 NautilusSpotDataWsMessage::Data(data_vec) => {
86 Python::attach(|py| {
87 for data in data_vec {
88 let py_obj = data_to_pycapsule(py, data);
89 call_python(py, &callback, py_obj);
90 }
91 });
92 }
93 NautilusSpotDataWsMessage::Deltas(deltas) => {
94 Python::attach(|py| {
95 let py_obj = data_to_pycapsule(
96 py,
97 Data::Deltas(OrderBookDeltas_API::new(deltas)),
98 );
99 call_python(py, &callback, py_obj);
100 });
101 }
102 _ => {}
103 },
104 BinanceSpotWsMessage::Error(err) => {
105 log::warn!(
106 "Binance WebSocket error: code={}, msg={}",
107 err.code,
108 err.msg
109 );
110 }
111 BinanceSpotWsMessage::Reconnected => {
112 log::info!("Binance Spot WebSocket reconnected");
113 }
114 }
115 }
116 });
117
118 Ok(())
119 })
120 }
121
122 #[pyo3(name = "close")]
123 fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
124 let mut client = self.clone();
125
126 pyo3_async_runtimes::tokio::future_into_py(py, async move {
127 if let Err(e) = client.close().await {
128 log::error!("Error on close: {e}");
129 }
130 Ok(())
131 })
132 }
133
134 #[pyo3(name = "subscribe")]
135 fn py_subscribe<'py>(
136 &self,
137 py: Python<'py>,
138 streams: Vec<String>,
139 ) -> PyResult<Bound<'py, PyAny>> {
140 let client = self.clone();
141
142 pyo3_async_runtimes::tokio::future_into_py(py, async move {
143 client.subscribe(streams).await.map_err(to_pyruntime_err)?;
144 Ok(())
145 })
146 }
147
148 #[pyo3(name = "unsubscribe")]
149 fn py_unsubscribe<'py>(
150 &self,
151 py: Python<'py>,
152 streams: Vec<String>,
153 ) -> PyResult<Bound<'py, PyAny>> {
154 let client = self.clone();
155
156 pyo3_async_runtimes::tokio::future_into_py(py, async move {
157 client
158 .unsubscribe(streams)
159 .await
160 .map_err(to_pyruntime_err)?;
161 Ok(())
162 })
163 }
164}