nautilus_binance/spot/websocket/trading/
client.rs1use std::{
27 fmt::Debug,
28 num::NonZeroU32,
29 sync::{
30 Arc, Mutex,
31 atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering},
32 },
33};
34
35use arc_swap::ArcSwap;
36use nautilus_common::live::get_runtime;
37use nautilus_network::{
38 mode::ConnectionMode,
39 ratelimiter::quota::Quota,
40 websocket::{PingHandler, WebSocketClient, WebSocketConfig, channel_message_handler},
41};
42use tokio_util::sync::CancellationToken;
43
44use super::{
45 error::{BinanceWsApiError, BinanceWsApiResult},
46 handler::BinanceSpotWsApiHandler,
47 messages::{HandlerCommand, NautilusWsApiMessage},
48};
49use crate::{
50 common::{consts::BINANCE_SPOT_SBE_WS_API_URL, credential::Credential},
51 spot::http::query::{CancelOrderParams, CancelReplaceOrderParams, NewOrderParams},
52};
53
54pub const BINANCE_API_KEY: &str = "BINANCE_API_KEY";
56
57pub const BINANCE_API_SECRET: &str = "BINANCE_API_SECRET";
59
60pub const BINANCE_WS_RATE_LIMIT_KEY_ORDER: &str = "order";
64
65#[must_use]
73pub fn binance_ws_order_quota() -> Quota {
74 Quota::per_second(NonZeroU32::new(20).expect("20 > 0"))
75}
76
77#[derive(Clone)]
82#[cfg_attr(
83 feature = "python",
84 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.binance")
85)]
86pub struct BinanceSpotWsTradingClient {
87 url: String,
88 credential: Arc<Credential>,
89 heartbeat: Option<u64>,
90 signal: Arc<AtomicBool>,
91 connection_mode: Arc<ArcSwap<AtomicU8>>,
92 cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
93 out_rx: Arc<Mutex<Option<tokio::sync::mpsc::UnboundedReceiver<NautilusWsApiMessage>>>>,
94 task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
95 request_id_counter: Arc<AtomicU64>,
96 cancellation_token: CancellationToken,
97}
98
99impl Debug for BinanceSpotWsTradingClient {
100 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101 f.debug_struct(stringify!(BinanceSpotWsTradingClient))
102 .field("url", &self.url)
103 .field("credential", &"<redacted>")
104 .field("heartbeat", &self.heartbeat)
105 .finish_non_exhaustive()
106 }
107}
108
109impl BinanceSpotWsTradingClient {
110 #[must_use]
112 pub fn new(
113 url: Option<String>,
114 api_key: String,
115 api_secret: String,
116 heartbeat: Option<u64>,
117 ) -> Self {
118 let url = url.unwrap_or_else(|| BINANCE_SPOT_SBE_WS_API_URL.to_string());
119 let credential = Arc::new(Credential::new(api_key, api_secret));
120
121 let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel();
122
123 Self {
124 url,
125 credential,
126 heartbeat,
127 signal: Arc::new(AtomicBool::new(false)),
128 connection_mode: Arc::new(ArcSwap::new(Arc::new(AtomicU8::new(
129 ConnectionMode::Closed as u8,
130 )))),
131 cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
132 out_rx: Arc::new(Mutex::new(None)),
133 task_handle: None,
134 request_id_counter: Arc::new(AtomicU64::new(1)),
135 cancellation_token: CancellationToken::new(),
136 }
137 }
138
139 pub fn with_env(
149 url: Option<String>,
150 api_key: Option<String>,
151 api_secret: Option<String>,
152 heartbeat: Option<u64>,
153 ) -> anyhow::Result<Self> {
154 let api_key = nautilus_core::env::get_or_env_var(api_key, BINANCE_API_KEY)?;
155 let api_secret = nautilus_core::env::get_or_env_var(api_secret, BINANCE_API_SECRET)?;
156 Ok(Self::new(url, api_key, api_secret, heartbeat))
157 }
158
159 pub fn from_env(url: Option<String>, heartbeat: Option<u64>) -> anyhow::Result<Self> {
169 Self::with_env(url, None, None, heartbeat)
170 }
171
172 #[must_use]
174 pub fn is_active(&self) -> bool {
175 let mode_u8 = self.connection_mode.load().load(Ordering::Relaxed);
176 mode_u8 == ConnectionMode::Active as u8
177 }
178
179 #[must_use]
181 pub fn is_closed(&self) -> bool {
182 let mode_u8 = self.connection_mode.load().load(Ordering::Relaxed);
183 mode_u8 == ConnectionMode::Closed as u8
184 }
185
186 fn next_request_id(&self) -> String {
188 let id = self.request_id_counter.fetch_add(1, Ordering::Relaxed);
189 format!("req-{id}")
190 }
191
192 pub async fn connect(&mut self) -> BinanceWsApiResult<()> {
202 self.signal.store(false, Ordering::Relaxed);
203 self.cancellation_token = CancellationToken::new();
204
205 let (raw_handler, raw_rx) = channel_message_handler();
206 let ping_handler: PingHandler = Arc::new(move |_| {});
207
208 let headers = vec![(
209 "X-MBX-APIKEY".to_string(),
210 self.credential.api_key().to_string(),
211 )];
212
213 let config = WebSocketConfig {
214 url: self.url.clone(),
215 headers,
216 heartbeat: self.heartbeat,
217 heartbeat_msg: None,
218 reconnect_timeout_ms: Some(5_000),
219 reconnect_delay_initial_ms: Some(500),
220 reconnect_delay_max_ms: Some(5_000),
221 reconnect_backoff_factor: Some(2.0),
222 reconnect_jitter_ms: Some(250),
223 reconnect_max_attempts: None,
224 };
225
226 let keyed_quotas = vec![(
228 BINANCE_WS_RATE_LIMIT_KEY_ORDER.to_string(),
229 binance_ws_order_quota(),
230 )];
231
232 let client = WebSocketClient::connect(
233 config,
234 Some(raw_handler),
235 Some(ping_handler),
236 None,
237 keyed_quotas,
238 Some(binance_ws_order_quota()), )
240 .await
241 .map_err(|e| BinanceWsApiError::ConnectionError(e.to_string()))?;
242
243 self.connection_mode.store(client.connection_mode_atomic());
244
245 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
246 let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel();
247
248 {
249 let mut rx_guard = self.out_rx.lock().expect("Mutex poisoned");
250 *rx_guard = Some(out_rx);
251 }
252
253 {
254 let mut tx_guard = self.cmd_tx.write().await;
255 *tx_guard = cmd_tx;
256 }
257
258 let signal = self.signal.clone();
259 let credential = self.credential.clone();
260 let mut handler = BinanceSpotWsApiHandler::new(signal, cmd_rx, raw_rx, out_tx, credential);
261
262 self.cmd_tx
263 .read()
264 .await
265 .send(HandlerCommand::SetClient(client))
266 .map_err(|e| BinanceWsApiError::HandlerUnavailable(e.to_string()))?;
267
268 let cancellation_token = self.cancellation_token.clone();
269 let handle = get_runtime().spawn(async move {
270 tokio::select! {
271 () = cancellation_token.cancelled() => {
272 log::debug!("Handler task cancelled");
273 }
274 _ = handler.run() => {
275 log::debug!("Handler run completed");
276 }
277 }
278 });
279
280 self.task_handle = Some(Arc::new(handle));
281
282 Ok(())
283 }
284
285 pub async fn disconnect(&mut self) {
287 self.signal.store(true, Ordering::Relaxed);
288
289 if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Disconnect) {
290 log::warn!("Failed to send disconnect command: {e}");
291 }
292
293 self.cancellation_token.cancel();
294
295 if let Some(handle) = self.task_handle.take()
296 && let Ok(handle) = Arc::try_unwrap(handle)
297 {
298 let _ = handle.await;
299 }
300 }
301
302 pub async fn place_order(&self, params: NewOrderParams) -> BinanceWsApiResult<String> {
308 let id = self.next_request_id();
309 let cmd = HandlerCommand::PlaceOrder {
310 id: id.clone(),
311 params,
312 };
313 self.send_cmd(cmd).await?;
314 Ok(id)
315 }
316
317 pub async fn cancel_order(&self, params: CancelOrderParams) -> BinanceWsApiResult<String> {
323 let id = self.next_request_id();
324 let cmd = HandlerCommand::CancelOrder {
325 id: id.clone(),
326 params,
327 };
328 self.send_cmd(cmd).await?;
329 Ok(id)
330 }
331
332 pub async fn cancel_replace_order(
338 &self,
339 params: CancelReplaceOrderParams,
340 ) -> BinanceWsApiResult<String> {
341 let id = self.next_request_id();
342 let cmd = HandlerCommand::CancelReplaceOrder {
343 id: id.clone(),
344 params,
345 };
346 self.send_cmd(cmd).await?;
347 Ok(id)
348 }
349
350 pub async fn cancel_all_orders(&self, symbol: impl Into<String>) -> BinanceWsApiResult<String> {
356 let id = self.next_request_id();
357 let cmd = HandlerCommand::CancelAllOrders {
358 id: id.clone(),
359 symbol: symbol.into(),
360 };
361 self.send_cmd(cmd).await?;
362 Ok(id)
363 }
364
365 pub async fn recv(&self) -> Option<NautilusWsApiMessage> {
373 let rx_opt = {
375 let mut rx_guard = self.out_rx.lock().expect("Mutex poisoned");
376 rx_guard.take()
377 };
378
379 if let Some(mut rx) = rx_opt {
380 let result = rx.recv().await;
381
382 let mut rx_guard = self.out_rx.lock().expect("Mutex poisoned");
383 *rx_guard = Some(rx);
384 result
385 } else {
386 None
387 }
388 }
389
390 async fn send_cmd(&self, cmd: HandlerCommand) -> BinanceWsApiResult<()> {
391 self.cmd_tx
392 .read()
393 .await
394 .send(cmd)
395 .map_err(|e| BinanceWsApiError::HandlerUnavailable(e.to_string()))
396 }
397}