nautilus_binance/python/
websocket_futures.rs1use futures_util::StreamExt;
19use nautilus_common::live::get_runtime;
20use nautilus_core::python::{call_python, to_pyruntime_err};
21use nautilus_model::{
22 data::{Data, OrderBookDeltas_API},
23 python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
24};
25use pyo3::prelude::*;
26
27use crate::{
28 common::enums::{BinanceEnvironment, BinanceProductType},
29 futures::websocket::{
30 client::BinanceFuturesWebSocketClient,
31 messages::{NautilusDataWsMessage, NautilusWsMessage},
32 },
33};
34
35#[pymethods]
36impl BinanceFuturesWebSocketClient {
37 #[new]
38 #[pyo3(signature = (product_type, environment, api_key=None, api_secret=None, url_override=None, heartbeat=None))]
39 fn py_new(
40 product_type: BinanceProductType,
41 environment: BinanceEnvironment,
42 api_key: Option<String>,
43 api_secret: Option<String>,
44 url_override: Option<String>,
45 heartbeat: Option<u64>,
46 ) -> PyResult<Self> {
47 Self::new(
48 product_type,
49 environment,
50 api_key,
51 api_secret,
52 url_override,
53 heartbeat,
54 )
55 .map_err(to_pyruntime_err)
56 }
57
58 #[getter]
59 #[pyo3(name = "product_type")]
60 fn py_product_type(&self) -> BinanceProductType {
61 self.product_type()
62 }
63
64 #[pyo3(name = "is_active")]
65 fn py_is_active(&self) -> bool {
66 self.is_active()
67 }
68
69 #[pyo3(name = "is_closed")]
70 fn py_is_closed(&self) -> bool {
71 self.is_closed()
72 }
73
74 #[pyo3(name = "subscription_count")]
75 fn py_subscription_count(&self) -> usize {
76 self.subscription_count()
77 }
78
79 #[pyo3(name = "cache_instrument")]
80 fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
81 self.cache_instrument(pyobject_to_instrument_any(py, instrument)?);
82 Ok(())
83 }
84
85 #[pyo3(name = "connect")]
86 fn py_connect<'py>(
87 &mut self,
88 py: Python<'py>,
89 callback: Py<PyAny>,
90 ) -> PyResult<Bound<'py, PyAny>> {
91 let mut client = self.clone();
92
93 pyo3_async_runtimes::tokio::future_into_py(py, async move {
94 client.connect().await.map_err(to_pyruntime_err)?;
95
96 let stream = client.stream();
97
98 get_runtime().spawn(async move {
99 tokio::pin!(stream);
100
101 while let Some(msg) = stream.next().await {
102 match msg {
103 NautilusWsMessage::Data(data_msg) => match data_msg {
104 NautilusDataWsMessage::Data(data_vec) => {
105 Python::attach(|py| {
106 for data in data_vec {
107 let py_obj = data_to_pycapsule(py, data);
108 call_python(py, &callback, py_obj);
109 }
110 });
111 }
112 NautilusDataWsMessage::DepthUpdate { deltas, .. } => {
113 Python::attach(|py| {
114 let py_obj = data_to_pycapsule(
115 py,
116 Data::Deltas(OrderBookDeltas_API::new(deltas)),
117 );
118 call_python(py, &callback, py_obj);
119 });
120 }
121 _ => {}
122 },
123 NautilusWsMessage::Exec(_) | NautilusWsMessage::ExecRaw(_) => {}
124 NautilusWsMessage::Error(err) => {
125 log::warn!(
126 "Binance WebSocket error: code={}, msg={}",
127 err.code,
128 err.msg
129 );
130 }
131 NautilusWsMessage::Reconnected => {
132 log::info!("Binance Futures WebSocket reconnected");
133 }
134 }
135 }
136 });
137
138 Ok(())
139 })
140 }
141
142 #[pyo3(name = "close")]
143 fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
144 let mut client = self.clone();
145
146 pyo3_async_runtimes::tokio::future_into_py(py, async move {
147 if let Err(e) = client.close().await {
148 log::error!("Error on close: {e}");
149 }
150 Ok(())
151 })
152 }
153
154 #[pyo3(name = "subscribe")]
155 fn py_subscribe<'py>(
156 &self,
157 py: Python<'py>,
158 streams: Vec<String>,
159 ) -> PyResult<Bound<'py, PyAny>> {
160 let client = self.clone();
161
162 pyo3_async_runtimes::tokio::future_into_py(py, async move {
163 client.subscribe(streams).await.map_err(to_pyruntime_err)?;
164 Ok(())
165 })
166 }
167
168 #[pyo3(name = "unsubscribe")]
169 fn py_unsubscribe<'py>(
170 &self,
171 py: Python<'py>,
172 streams: Vec<String>,
173 ) -> PyResult<Bound<'py, PyAny>> {
174 let client = self.clone();
175
176 pyo3_async_runtimes::tokio::future_into_py(py, async move {
177 client
178 .unsubscribe(streams)
179 .await
180 .map_err(to_pyruntime_err)?;
181 Ok(())
182 })
183 }
184}