nautilus_binance/spot/websocket/trading/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Binance Spot WebSocket API client for SBE trading.
17//!
18//! ## Connection Details
19//!
20//! - Endpoint: `ws-api.binance.com:443/ws-api/v3`
21//! - Authentication: Ed25519 signature per request
22//! - SBE responses: Enabled via `responseFormat=sbe` query parameter
23//! - Connection validity: 24 hours
24//! - Ping/pong: Every 20 seconds
25
26use std::{
27    fmt::Debug,
28    num::NonZeroU32,
29    sync::{
30        Arc, Mutex,
31        atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering},
32    },
33};
34
35use arc_swap::ArcSwap;
36use nautilus_common::live::get_runtime;
37use nautilus_network::{
38    mode::ConnectionMode,
39    ratelimiter::quota::Quota,
40    websocket::{PingHandler, WebSocketClient, WebSocketConfig, channel_message_handler},
41};
42use tokio_util::sync::CancellationToken;
43
44use super::{
45    error::{BinanceWsApiError, BinanceWsApiResult},
46    handler::BinanceSpotWsApiHandler,
47    messages::{HandlerCommand, NautilusWsApiMessage},
48};
49use crate::{
50    common::{consts::BINANCE_SPOT_SBE_WS_API_URL, credential::Credential},
51    spot::http::query::{CancelOrderParams, CancelReplaceOrderParams, NewOrderParams},
52};
53
54/// Environment variable key for Binance API key.
55pub const BINANCE_API_KEY: &str = "BINANCE_API_KEY";
56
57/// Environment variable key for Binance API secret.
58pub const BINANCE_API_SECRET: &str = "BINANCE_API_SECRET";
59
60/// Rate limit key for order operations (place/cancel/replace).
61///
62/// Binance WebSocket API: 1200 requests per minute per IP (20/sec).
63pub const BINANCE_WS_RATE_LIMIT_KEY_ORDER: &str = "order";
64
65/// Binance WebSocket API order rate limit: 1200 per minute (20/sec).
66///
67/// Based on Binance documentation for WebSocket API rate limits.
68///
69/// # Panics
70///
71/// This function will never panic as it uses a constant non-zero value.
72#[must_use]
73pub fn binance_ws_order_quota() -> Quota {
74    Quota::per_second(NonZeroU32::new(20).expect("20 > 0"))
75}
76
77/// Binance Spot WebSocket API client for SBE trading.
78///
79/// This client provides order management via WebSocket with SBE-encoded responses,
80/// complementing the HTTP client with lower-latency order submission.
81#[derive(Clone)]
82#[cfg_attr(
83    feature = "python",
84    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.binance")
85)]
86pub struct BinanceSpotWsTradingClient {
87    url: String,
88    credential: Arc<Credential>,
89    heartbeat: Option<u64>,
90    signal: Arc<AtomicBool>,
91    connection_mode: Arc<ArcSwap<AtomicU8>>,
92    cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
93    out_rx: Arc<Mutex<Option<tokio::sync::mpsc::UnboundedReceiver<NautilusWsApiMessage>>>>,
94    task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
95    request_id_counter: Arc<AtomicU64>,
96    cancellation_token: CancellationToken,
97}
98
99impl Debug for BinanceSpotWsTradingClient {
100    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101        f.debug_struct(stringify!(BinanceSpotWsTradingClient))
102            .field("url", &self.url)
103            .field("credential", &"<redacted>")
104            .field("heartbeat", &self.heartbeat)
105            .finish_non_exhaustive()
106    }
107}
108
109impl BinanceSpotWsTradingClient {
110    /// Creates a new [`BinanceSpotWsTradingClient`] instance.
111    #[must_use]
112    pub fn new(
113        url: Option<String>,
114        api_key: String,
115        api_secret: String,
116        heartbeat: Option<u64>,
117    ) -> Self {
118        let url = url.unwrap_or_else(|| BINANCE_SPOT_SBE_WS_API_URL.to_string());
119        let credential = Arc::new(Credential::new(api_key, api_secret));
120
121        let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel();
122
123        Self {
124            url,
125            credential,
126            heartbeat,
127            signal: Arc::new(AtomicBool::new(false)),
128            connection_mode: Arc::new(ArcSwap::new(Arc::new(AtomicU8::new(
129                ConnectionMode::Closed as u8,
130            )))),
131            cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
132            out_rx: Arc::new(Mutex::new(None)),
133            task_handle: None,
134            request_id_counter: Arc::new(AtomicU64::new(1)),
135            cancellation_token: CancellationToken::new(),
136        }
137    }
138
139    /// Creates a new client with credentials sourced from environment variables.
140    ///
141    /// Falls back to env vars if `api_key` or `api_secret` are `None`:
142    /// - `BINANCE_API_KEY` for the API key
143    /// - `BINANCE_API_SECRET` for the API secret
144    ///
145    /// # Errors
146    ///
147    /// Returns an error if credentials are missing from environment.
148    pub fn with_env(
149        url: Option<String>,
150        api_key: Option<String>,
151        api_secret: Option<String>,
152        heartbeat: Option<u64>,
153    ) -> anyhow::Result<Self> {
154        let api_key = nautilus_core::env::get_or_env_var(api_key, BINANCE_API_KEY)?;
155        let api_secret = nautilus_core::env::get_or_env_var(api_secret, BINANCE_API_SECRET)?;
156        Ok(Self::new(url, api_key, api_secret, heartbeat))
157    }
158
159    /// Creates a new client with credentials loaded entirely from environment variables.
160    ///
161    /// Reads:
162    /// - `BINANCE_API_KEY` for the API key
163    /// - `BINANCE_API_SECRET` for the API secret
164    ///
165    /// # Errors
166    ///
167    /// Returns an error if environment variables are missing.
168    pub fn from_env(url: Option<String>, heartbeat: Option<u64>) -> anyhow::Result<Self> {
169        Self::with_env(url, None, None, heartbeat)
170    }
171
172    /// Returns whether the client is actively connected.
173    #[must_use]
174    pub fn is_active(&self) -> bool {
175        let mode_u8 = self.connection_mode.load().load(Ordering::Relaxed);
176        mode_u8 == ConnectionMode::Active as u8
177    }
178
179    /// Returns whether the client is closed.
180    #[must_use]
181    pub fn is_closed(&self) -> bool {
182        let mode_u8 = self.connection_mode.load().load(Ordering::Relaxed);
183        mode_u8 == ConnectionMode::Closed as u8
184    }
185
186    /// Generates the next request ID.
187    fn next_request_id(&self) -> String {
188        let id = self.request_id_counter.fetch_add(1, Ordering::Relaxed);
189        format!("req-{id}")
190    }
191
192    /// Connects to the WebSocket API server.
193    ///
194    /// # Errors
195    ///
196    /// Returns an error if connection fails.
197    ///
198    /// # Panics
199    ///
200    /// Panics if the internal output receiver mutex is poisoned.
201    pub async fn connect(&mut self) -> BinanceWsApiResult<()> {
202        self.signal.store(false, Ordering::Relaxed);
203        self.cancellation_token = CancellationToken::new();
204
205        let (raw_handler, raw_rx) = channel_message_handler();
206        let ping_handler: PingHandler = Arc::new(move |_| {});
207
208        let headers = vec![(
209            "X-MBX-APIKEY".to_string(),
210            self.credential.api_key().to_string(),
211        )];
212
213        let config = WebSocketConfig {
214            url: self.url.clone(),
215            headers,
216            heartbeat: self.heartbeat,
217            heartbeat_msg: None,
218            reconnect_timeout_ms: Some(5_000),
219            reconnect_delay_initial_ms: Some(500),
220            reconnect_delay_max_ms: Some(5_000),
221            reconnect_backoff_factor: Some(2.0),
222            reconnect_jitter_ms: Some(250),
223            reconnect_max_attempts: None,
224        };
225
226        // Configure rate limits for order operations
227        let keyed_quotas = vec![(
228            BINANCE_WS_RATE_LIMIT_KEY_ORDER.to_string(),
229            binance_ws_order_quota(),
230        )];
231
232        let client = WebSocketClient::connect(
233            config,
234            Some(raw_handler),
235            Some(ping_handler),
236            None,
237            keyed_quotas,
238            Some(binance_ws_order_quota()), // Default quota for all operations
239        )
240        .await
241        .map_err(|e| BinanceWsApiError::ConnectionError(e.to_string()))?;
242
243        self.connection_mode.store(client.connection_mode_atomic());
244
245        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
246        let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel();
247
248        {
249            let mut rx_guard = self.out_rx.lock().expect("Mutex poisoned");
250            *rx_guard = Some(out_rx);
251        }
252
253        {
254            let mut tx_guard = self.cmd_tx.write().await;
255            *tx_guard = cmd_tx;
256        }
257
258        let signal = self.signal.clone();
259        let credential = self.credential.clone();
260        let mut handler = BinanceSpotWsApiHandler::new(signal, cmd_rx, raw_rx, out_tx, credential);
261
262        self.cmd_tx
263            .read()
264            .await
265            .send(HandlerCommand::SetClient(client))
266            .map_err(|e| BinanceWsApiError::HandlerUnavailable(e.to_string()))?;
267
268        let cancellation_token = self.cancellation_token.clone();
269        let handle = get_runtime().spawn(async move {
270            tokio::select! {
271                () = cancellation_token.cancelled() => {
272                    log::debug!("Handler task cancelled");
273                }
274                _ = handler.run() => {
275                    log::debug!("Handler run completed");
276                }
277            }
278        });
279
280        self.task_handle = Some(Arc::new(handle));
281
282        Ok(())
283    }
284
285    /// Disconnects from the WebSocket API server.
286    pub async fn disconnect(&mut self) {
287        self.signal.store(true, Ordering::Relaxed);
288
289        if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Disconnect) {
290            log::warn!("Failed to send disconnect command: {e}");
291        }
292
293        self.cancellation_token.cancel();
294
295        if let Some(handle) = self.task_handle.take()
296            && let Ok(handle) = Arc::try_unwrap(handle)
297        {
298            let _ = handle.await;
299        }
300    }
301
302    /// Places a new order via WebSocket API.
303    ///
304    /// # Errors
305    ///
306    /// Returns an error if the handler is unavailable.
307    pub async fn place_order(&self, params: NewOrderParams) -> BinanceWsApiResult<String> {
308        let id = self.next_request_id();
309        let cmd = HandlerCommand::PlaceOrder {
310            id: id.clone(),
311            params,
312        };
313        self.send_cmd(cmd).await?;
314        Ok(id)
315    }
316
317    /// Cancels an order via WebSocket API.
318    ///
319    /// # Errors
320    ///
321    /// Returns an error if the handler is unavailable.
322    pub async fn cancel_order(&self, params: CancelOrderParams) -> BinanceWsApiResult<String> {
323        let id = self.next_request_id();
324        let cmd = HandlerCommand::CancelOrder {
325            id: id.clone(),
326            params,
327        };
328        self.send_cmd(cmd).await?;
329        Ok(id)
330    }
331
332    /// Cancel and replace an order atomically via WebSocket API.
333    ///
334    /// # Errors
335    ///
336    /// Returns an error if the handler is unavailable.
337    pub async fn cancel_replace_order(
338        &self,
339        params: CancelReplaceOrderParams,
340    ) -> BinanceWsApiResult<String> {
341        let id = self.next_request_id();
342        let cmd = HandlerCommand::CancelReplaceOrder {
343            id: id.clone(),
344            params,
345        };
346        self.send_cmd(cmd).await?;
347        Ok(id)
348    }
349
350    /// Cancels all open orders for a symbol via WebSocket API.
351    ///
352    /// # Errors
353    ///
354    /// Returns an error if the handler is unavailable.
355    pub async fn cancel_all_orders(&self, symbol: impl Into<String>) -> BinanceWsApiResult<String> {
356        let id = self.next_request_id();
357        let cmd = HandlerCommand::CancelAllOrders {
358            id: id.clone(),
359            symbol: symbol.into(),
360        };
361        self.send_cmd(cmd).await?;
362        Ok(id)
363    }
364
365    /// Receives the next message from the handler.
366    ///
367    /// Returns `None` if the receiver is closed or not initialized.
368    ///
369    /// # Panics
370    ///
371    /// Panics if the internal output receiver mutex is poisoned.
372    pub async fn recv(&self) -> Option<NautilusWsApiMessage> {
373        // Take the receiver out of the mutex to avoid holding it across await
374        let rx_opt = {
375            let mut rx_guard = self.out_rx.lock().expect("Mutex poisoned");
376            rx_guard.take()
377        };
378
379        if let Some(mut rx) = rx_opt {
380            let result = rx.recv().await;
381
382            let mut rx_guard = self.out_rx.lock().expect("Mutex poisoned");
383            *rx_guard = Some(rx);
384            result
385        } else {
386            None
387        }
388    }
389
390    async fn send_cmd(&self, cmd: HandlerCommand) -> BinanceWsApiResult<()> {
391        self.cmd_tx
392            .read()
393            .await
394            .send(cmd)
395            .map_err(|e| BinanceWsApiError::HandlerUnavailable(e.to_string()))
396    }
397}