Skip to main content

nautilus_binance/python/
websocket_futures.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the Binance Futures WebSocket client.
17
18use futures_util::StreamExt;
19use nautilus_common::live::get_runtime;
20use nautilus_core::python::{call_python, to_pyruntime_err};
21use nautilus_model::{
22    data::{Data, OrderBookDeltas_API},
23    python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
24};
25use pyo3::prelude::*;
26
27use crate::{
28    common::enums::{BinanceEnvironment, BinanceProductType},
29    futures::websocket::{
30        client::BinanceFuturesWebSocketClient,
31        messages::{NautilusDataWsMessage, NautilusWsMessage},
32    },
33};
34
35#[pymethods]
36impl BinanceFuturesWebSocketClient {
37    #[new]
38    #[pyo3(signature = (product_type, environment, api_key=None, api_secret=None, url_override=None, heartbeat=None))]
39    fn py_new(
40        product_type: BinanceProductType,
41        environment: BinanceEnvironment,
42        api_key: Option<String>,
43        api_secret: Option<String>,
44        url_override: Option<String>,
45        heartbeat: Option<u64>,
46    ) -> PyResult<Self> {
47        Self::new(
48            product_type,
49            environment,
50            api_key,
51            api_secret,
52            url_override,
53            heartbeat,
54        )
55        .map_err(to_pyruntime_err)
56    }
57
58    #[getter]
59    #[pyo3(name = "product_type")]
60    fn py_product_type(&self) -> BinanceProductType {
61        self.product_type()
62    }
63
64    #[pyo3(name = "is_active")]
65    fn py_is_active(&self) -> bool {
66        self.is_active()
67    }
68
69    #[pyo3(name = "is_closed")]
70    fn py_is_closed(&self) -> bool {
71        self.is_closed()
72    }
73
74    #[pyo3(name = "subscription_count")]
75    fn py_subscription_count(&self) -> usize {
76        self.subscription_count()
77    }
78
79    #[pyo3(name = "cache_instrument")]
80    fn py_cache_instrument(&self, py: Python<'_>, instrument: Py<PyAny>) -> PyResult<()> {
81        self.cache_instrument(pyobject_to_instrument_any(py, instrument)?);
82        Ok(())
83    }
84
85    #[pyo3(name = "connect")]
86    fn py_connect<'py>(
87        &mut self,
88        py: Python<'py>,
89        callback: Py<PyAny>,
90    ) -> PyResult<Bound<'py, PyAny>> {
91        let mut client = self.clone();
92
93        pyo3_async_runtimes::tokio::future_into_py(py, async move {
94            client.connect().await.map_err(to_pyruntime_err)?;
95
96            let stream = client.stream();
97
98            get_runtime().spawn(async move {
99                tokio::pin!(stream);
100
101                while let Some(msg) = stream.next().await {
102                    match msg {
103                        NautilusWsMessage::Data(data_msg) => match data_msg {
104                            NautilusDataWsMessage::Data(data_vec) => {
105                                Python::attach(|py| {
106                                    for data in data_vec {
107                                        let py_obj = data_to_pycapsule(py, data);
108                                        call_python(py, &callback, py_obj);
109                                    }
110                                });
111                            }
112                            NautilusDataWsMessage::DepthUpdate { deltas, .. } => {
113                                Python::attach(|py| {
114                                    let py_obj = data_to_pycapsule(
115                                        py,
116                                        Data::Deltas(OrderBookDeltas_API::new(deltas)),
117                                    );
118                                    call_python(py, &callback, py_obj);
119                                });
120                            }
121                            _ => {}
122                        },
123                        NautilusWsMessage::Exec(_) | NautilusWsMessage::ExecRaw(_) => {}
124                        NautilusWsMessage::Error(err) => {
125                            log::warn!(
126                                "Binance WebSocket error: code={}, msg={}",
127                                err.code,
128                                err.msg
129                            );
130                        }
131                        NautilusWsMessage::Reconnected => {
132                            log::info!("Binance Futures WebSocket reconnected");
133                        }
134                    }
135                }
136            });
137
138            Ok(())
139        })
140    }
141
142    #[pyo3(name = "close")]
143    fn py_close<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
144        let mut client = self.clone();
145
146        pyo3_async_runtimes::tokio::future_into_py(py, async move {
147            if let Err(e) = client.close().await {
148                log::error!("Error on close: {e}");
149            }
150            Ok(())
151        })
152    }
153
154    #[pyo3(name = "subscribe")]
155    fn py_subscribe<'py>(
156        &self,
157        py: Python<'py>,
158        streams: Vec<String>,
159    ) -> PyResult<Bound<'py, PyAny>> {
160        let client = self.clone();
161
162        pyo3_async_runtimes::tokio::future_into_py(py, async move {
163            client.subscribe(streams).await.map_err(to_pyruntime_err)?;
164            Ok(())
165        })
166    }
167
168    #[pyo3(name = "unsubscribe")]
169    fn py_unsubscribe<'py>(
170        &self,
171        py: Python<'py>,
172        streams: Vec<String>,
173    ) -> PyResult<Bound<'py, PyAny>> {
174        let client = self.clone();
175
176        pyo3_async_runtimes::tokio::future_into_py(py, async move {
177            client
178                .unsubscribe(streams)
179                .await
180                .map_err(to_pyruntime_err)?;
181            Ok(())
182        })
183    }
184}