Skip to main content

nautilus_binance/spot/websocket/trading/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Binance Spot WebSocket API client for SBE trading.
17//!
18//! ## Connection Details
19//!
20//! - Endpoint: `ws-api.binance.com:443/ws-api/v3`
21//! - Authentication: Ed25519 signature per request
22//! - SBE responses: Enabled via `responseFormat=sbe` query parameter
23//! - Connection validity: 24 hours
24//! - Ping/pong: Every 20 seconds
25
26use std::{
27    fmt::Debug,
28    num::NonZeroU32,
29    sync::{
30        Arc, LazyLock, Mutex,
31        atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering},
32    },
33};
34
35use arc_swap::ArcSwap;
36use nautilus_common::live::get_runtime;
37use nautilus_network::{
38    mode::ConnectionMode,
39    ratelimiter::quota::Quota,
40    websocket::{PingHandler, WebSocketClient, WebSocketConfig, channel_message_handler},
41};
42use tokio_util::sync::CancellationToken;
43use ustr::Ustr;
44
45use super::{
46    error::{BinanceWsApiError, BinanceWsApiResult},
47    handler::BinanceSpotWsApiHandler,
48    messages::{HandlerCommand, NautilusWsApiMessage},
49};
50use crate::{
51    common::{consts::BINANCE_SPOT_SBE_WS_API_URL, credential::Credential},
52    spot::http::query::{CancelOrderParams, CancelReplaceOrderParams, NewOrderParams},
53};
54
55/// Environment variable key for Binance API key.
56pub const BINANCE_API_KEY: &str = "BINANCE_API_KEY";
57
58/// Environment variable key for Binance API secret.
59pub const BINANCE_API_SECRET: &str = "BINANCE_API_SECRET";
60
61/// Pre-interned rate limit key for order operations (place/cancel/replace).
62///
63/// Binance WebSocket API: 1200 requests per minute per IP (20/sec).
64pub static BINANCE_WS_RATE_LIMIT_KEY_ORDER: LazyLock<[Ustr; 1]> =
65    LazyLock::new(|| [Ustr::from("order")]);
66
67/// Binance WebSocket API order rate limit: 1200 per minute (20/sec).
68///
69/// Based on Binance documentation for WebSocket API rate limits.
70///
71/// # Panics
72///
73/// This function will never panic as it uses a constant non-zero value.
74#[must_use]
75pub fn binance_ws_order_quota() -> Quota {
76    Quota::per_second(NonZeroU32::new(20).expect("20 > 0"))
77}
78
79/// Binance Spot WebSocket API client for SBE trading.
80///
81/// This client provides order management via WebSocket with SBE-encoded responses,
82/// complementing the HTTP client with lower-latency order submission.
83#[derive(Clone)]
84#[cfg_attr(
85    feature = "python",
86    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.binance")
87)]
88pub struct BinanceSpotWsTradingClient {
89    url: String,
90    credential: Arc<Credential>,
91    heartbeat: Option<u64>,
92    signal: Arc<AtomicBool>,
93    connection_mode: Arc<ArcSwap<AtomicU8>>,
94    cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
95    out_rx: Arc<Mutex<Option<tokio::sync::mpsc::UnboundedReceiver<NautilusWsApiMessage>>>>,
96    task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
97    request_id_counter: Arc<AtomicU64>,
98    cancellation_token: CancellationToken,
99}
100
101impl Debug for BinanceSpotWsTradingClient {
102    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
103        f.debug_struct(stringify!(BinanceSpotWsTradingClient))
104            .field("url", &self.url)
105            .field("credential", &"<redacted>")
106            .field("heartbeat", &self.heartbeat)
107            .finish_non_exhaustive()
108    }
109}
110
111impl BinanceSpotWsTradingClient {
112    /// Creates a new [`BinanceSpotWsTradingClient`] instance.
113    #[must_use]
114    pub fn new(
115        url: Option<String>,
116        api_key: String,
117        api_secret: String,
118        heartbeat: Option<u64>,
119    ) -> Self {
120        let url = url.unwrap_or_else(|| BINANCE_SPOT_SBE_WS_API_URL.to_string());
121        let credential = Arc::new(Credential::new(api_key, api_secret));
122
123        let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel();
124
125        Self {
126            url,
127            credential,
128            heartbeat,
129            signal: Arc::new(AtomicBool::new(false)),
130            connection_mode: Arc::new(ArcSwap::new(Arc::new(AtomicU8::new(
131                ConnectionMode::Closed as u8,
132            )))),
133            cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
134            out_rx: Arc::new(Mutex::new(None)),
135            task_handle: None,
136            request_id_counter: Arc::new(AtomicU64::new(1)),
137            cancellation_token: CancellationToken::new(),
138        }
139    }
140
141    /// Creates a new client with credentials sourced from environment variables.
142    ///
143    /// Falls back to env vars if `api_key` or `api_secret` are `None`:
144    /// - `BINANCE_API_KEY` for the API key
145    /// - `BINANCE_API_SECRET` for the API secret
146    ///
147    /// # Errors
148    ///
149    /// Returns an error if credentials are missing from environment.
150    pub fn with_env(
151        url: Option<String>,
152        api_key: Option<String>,
153        api_secret: Option<String>,
154        heartbeat: Option<u64>,
155    ) -> anyhow::Result<Self> {
156        let api_key = nautilus_core::env::get_or_env_var(api_key, BINANCE_API_KEY)?;
157        let api_secret = nautilus_core::env::get_or_env_var(api_secret, BINANCE_API_SECRET)?;
158        Ok(Self::new(url, api_key, api_secret, heartbeat))
159    }
160
161    /// Creates a new client with credentials loaded entirely from environment variables.
162    ///
163    /// Reads:
164    /// - `BINANCE_API_KEY` for the API key
165    /// - `BINANCE_API_SECRET` for the API secret
166    ///
167    /// # Errors
168    ///
169    /// Returns an error if environment variables are missing.
170    pub fn from_env(url: Option<String>, heartbeat: Option<u64>) -> anyhow::Result<Self> {
171        Self::with_env(url, None, None, heartbeat)
172    }
173
174    /// Returns whether the client is actively connected.
175    #[must_use]
176    pub fn is_active(&self) -> bool {
177        let mode_u8 = self.connection_mode.load().load(Ordering::Relaxed);
178        mode_u8 == ConnectionMode::Active as u8
179    }
180
181    /// Returns whether the client is closed.
182    #[must_use]
183    pub fn is_closed(&self) -> bool {
184        let mode_u8 = self.connection_mode.load().load(Ordering::Relaxed);
185        mode_u8 == ConnectionMode::Closed as u8
186    }
187
188    /// Generates the next request ID.
189    fn next_request_id(&self) -> String {
190        let id = self.request_id_counter.fetch_add(1, Ordering::Relaxed);
191        format!("req-{id}")
192    }
193
194    /// Connects to the WebSocket API server.
195    ///
196    /// # Errors
197    ///
198    /// Returns an error if connection fails.
199    ///
200    /// # Panics
201    ///
202    /// Panics if the internal output receiver mutex is poisoned.
203    pub async fn connect(&mut self) -> BinanceWsApiResult<()> {
204        self.signal.store(false, Ordering::Relaxed);
205        self.cancellation_token = CancellationToken::new();
206
207        let (raw_handler, raw_rx) = channel_message_handler();
208        let ping_handler: PingHandler = Arc::new(move |_| {});
209
210        let headers = vec![(
211            "X-MBX-APIKEY".to_string(),
212            self.credential.api_key().to_string(),
213        )];
214
215        let config = WebSocketConfig {
216            url: self.url.clone(),
217            headers,
218            heartbeat: self.heartbeat,
219            heartbeat_msg: None,
220            reconnect_timeout_ms: Some(5_000),
221            reconnect_delay_initial_ms: Some(500),
222            reconnect_delay_max_ms: Some(5_000),
223            reconnect_backoff_factor: Some(2.0),
224            reconnect_jitter_ms: Some(250),
225            reconnect_max_attempts: None,
226        };
227
228        // Configure rate limits for order operations
229        let keyed_quotas = vec![(
230            BINANCE_WS_RATE_LIMIT_KEY_ORDER[0].as_str().to_string(),
231            binance_ws_order_quota(),
232        )];
233
234        let client = WebSocketClient::connect(
235            config,
236            Some(raw_handler),
237            Some(ping_handler),
238            None,
239            keyed_quotas,
240            Some(binance_ws_order_quota()), // Default quota for all operations
241        )
242        .await
243        .map_err(|e| BinanceWsApiError::ConnectionError(e.to_string()))?;
244
245        self.connection_mode.store(client.connection_mode_atomic());
246
247        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
248        let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel();
249
250        {
251            let mut rx_guard = self.out_rx.lock().expect("Mutex poisoned");
252            *rx_guard = Some(out_rx);
253        }
254
255        {
256            let mut tx_guard = self.cmd_tx.write().await;
257            *tx_guard = cmd_tx;
258        }
259
260        let signal = self.signal.clone();
261        let credential = self.credential.clone();
262        let mut handler = BinanceSpotWsApiHandler::new(signal, cmd_rx, raw_rx, out_tx, credential);
263
264        self.cmd_tx
265            .read()
266            .await
267            .send(HandlerCommand::SetClient(client))
268            .map_err(|e| BinanceWsApiError::HandlerUnavailable(e.to_string()))?;
269
270        let cancellation_token = self.cancellation_token.clone();
271        let handle = get_runtime().spawn(async move {
272            tokio::select! {
273                () = cancellation_token.cancelled() => {
274                    log::debug!("Handler task cancelled");
275                }
276                _ = handler.run() => {
277                    log::debug!("Handler run completed");
278                }
279            }
280        });
281
282        self.task_handle = Some(Arc::new(handle));
283
284        Ok(())
285    }
286
287    /// Disconnects from the WebSocket API server.
288    pub async fn disconnect(&mut self) {
289        self.signal.store(true, Ordering::Relaxed);
290
291        if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Disconnect) {
292            log::warn!("Failed to send disconnect command: {e}");
293        }
294
295        self.cancellation_token.cancel();
296
297        if let Some(handle) = self.task_handle.take()
298            && let Ok(handle) = Arc::try_unwrap(handle)
299        {
300            let _ = handle.await;
301        }
302    }
303
304    /// Places a new order via WebSocket API.
305    ///
306    /// # Errors
307    ///
308    /// Returns an error if the handler is unavailable.
309    pub async fn place_order(&self, params: NewOrderParams) -> BinanceWsApiResult<String> {
310        let id = self.next_request_id();
311        let cmd = HandlerCommand::PlaceOrder {
312            id: id.clone(),
313            params,
314        };
315        self.send_cmd(cmd).await?;
316        Ok(id)
317    }
318
319    /// Cancels an order via WebSocket API.
320    ///
321    /// # Errors
322    ///
323    /// Returns an error if the handler is unavailable.
324    pub async fn cancel_order(&self, params: CancelOrderParams) -> BinanceWsApiResult<String> {
325        let id = self.next_request_id();
326        let cmd = HandlerCommand::CancelOrder {
327            id: id.clone(),
328            params,
329        };
330        self.send_cmd(cmd).await?;
331        Ok(id)
332    }
333
334    /// Cancel and replace an order atomically via WebSocket API.
335    ///
336    /// # Errors
337    ///
338    /// Returns an error if the handler is unavailable.
339    pub async fn cancel_replace_order(
340        &self,
341        params: CancelReplaceOrderParams,
342    ) -> BinanceWsApiResult<String> {
343        let id = self.next_request_id();
344        let cmd = HandlerCommand::CancelReplaceOrder {
345            id: id.clone(),
346            params,
347        };
348        self.send_cmd(cmd).await?;
349        Ok(id)
350    }
351
352    /// Cancels all open orders for a symbol via WebSocket API.
353    ///
354    /// # Errors
355    ///
356    /// Returns an error if the handler is unavailable.
357    pub async fn cancel_all_orders(&self, symbol: impl Into<String>) -> BinanceWsApiResult<String> {
358        let id = self.next_request_id();
359        let cmd = HandlerCommand::CancelAllOrders {
360            id: id.clone(),
361            symbol: symbol.into(),
362        };
363        self.send_cmd(cmd).await?;
364        Ok(id)
365    }
366
367    /// Receives the next message from the handler.
368    ///
369    /// Returns `None` if the receiver is closed or not initialized.
370    ///
371    /// # Panics
372    ///
373    /// Panics if the internal output receiver mutex is poisoned.
374    pub async fn recv(&self) -> Option<NautilusWsApiMessage> {
375        // Take the receiver out of the mutex to avoid holding it across await
376        let rx_opt = {
377            let mut rx_guard = self.out_rx.lock().expect("Mutex poisoned");
378            rx_guard.take()
379        };
380
381        if let Some(mut rx) = rx_opt {
382            let result = rx.recv().await;
383
384            let mut rx_guard = self.out_rx.lock().expect("Mutex poisoned");
385            *rx_guard = Some(rx);
386            result
387        } else {
388            None
389        }
390    }
391
392    async fn send_cmd(&self, cmd: HandlerCommand) -> BinanceWsApiResult<()> {
393        self.cmd_tx
394            .read()
395            .await
396            .send(cmd)
397            .map_err(|e| BinanceWsApiError::HandlerUnavailable(e.to_string()))
398    }
399}