nautilus_binance/spot/websocket/trading/
client.rs1use std::{
27 fmt::Debug,
28 num::NonZeroU32,
29 sync::{
30 Arc, LazyLock, Mutex,
31 atomic::{AtomicBool, AtomicU8, AtomicU64, Ordering},
32 },
33};
34
35use arc_swap::ArcSwap;
36use nautilus_common::live::get_runtime;
37use nautilus_network::{
38 mode::ConnectionMode,
39 ratelimiter::quota::Quota,
40 websocket::{PingHandler, WebSocketClient, WebSocketConfig, channel_message_handler},
41};
42use tokio_util::sync::CancellationToken;
43use ustr::Ustr;
44
45use super::{
46 error::{BinanceWsApiError, BinanceWsApiResult},
47 handler::BinanceSpotWsApiHandler,
48 messages::{HandlerCommand, NautilusWsApiMessage},
49};
50use crate::{
51 common::{consts::BINANCE_SPOT_SBE_WS_API_URL, credential::Credential},
52 spot::http::query::{CancelOrderParams, CancelReplaceOrderParams, NewOrderParams},
53};
54
55pub const BINANCE_API_KEY: &str = "BINANCE_API_KEY";
57
58pub const BINANCE_API_SECRET: &str = "BINANCE_API_SECRET";
60
61pub static BINANCE_WS_RATE_LIMIT_KEY_ORDER: LazyLock<[Ustr; 1]> =
65 LazyLock::new(|| [Ustr::from("order")]);
66
67#[must_use]
75pub fn binance_ws_order_quota() -> Quota {
76 Quota::per_second(NonZeroU32::new(20).expect("20 > 0"))
77}
78
79#[derive(Clone)]
84#[cfg_attr(
85 feature = "python",
86 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.binance")
87)]
88pub struct BinanceSpotWsTradingClient {
89 url: String,
90 credential: Arc<Credential>,
91 heartbeat: Option<u64>,
92 signal: Arc<AtomicBool>,
93 connection_mode: Arc<ArcSwap<AtomicU8>>,
94 cmd_tx: Arc<tokio::sync::RwLock<tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>,
95 out_rx: Arc<Mutex<Option<tokio::sync::mpsc::UnboundedReceiver<NautilusWsApiMessage>>>>,
96 task_handle: Option<Arc<tokio::task::JoinHandle<()>>>,
97 request_id_counter: Arc<AtomicU64>,
98 cancellation_token: CancellationToken,
99}
100
101impl Debug for BinanceSpotWsTradingClient {
102 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
103 f.debug_struct(stringify!(BinanceSpotWsTradingClient))
104 .field("url", &self.url)
105 .field("credential", &"<redacted>")
106 .field("heartbeat", &self.heartbeat)
107 .finish_non_exhaustive()
108 }
109}
110
111impl BinanceSpotWsTradingClient {
112 #[must_use]
114 pub fn new(
115 url: Option<String>,
116 api_key: String,
117 api_secret: String,
118 heartbeat: Option<u64>,
119 ) -> Self {
120 let url = url.unwrap_or_else(|| BINANCE_SPOT_SBE_WS_API_URL.to_string());
121 let credential = Arc::new(Credential::new(api_key, api_secret));
122
123 let (cmd_tx, _cmd_rx) = tokio::sync::mpsc::unbounded_channel();
124
125 Self {
126 url,
127 credential,
128 heartbeat,
129 signal: Arc::new(AtomicBool::new(false)),
130 connection_mode: Arc::new(ArcSwap::new(Arc::new(AtomicU8::new(
131 ConnectionMode::Closed as u8,
132 )))),
133 cmd_tx: Arc::new(tokio::sync::RwLock::new(cmd_tx)),
134 out_rx: Arc::new(Mutex::new(None)),
135 task_handle: None,
136 request_id_counter: Arc::new(AtomicU64::new(1)),
137 cancellation_token: CancellationToken::new(),
138 }
139 }
140
141 pub fn with_env(
151 url: Option<String>,
152 api_key: Option<String>,
153 api_secret: Option<String>,
154 heartbeat: Option<u64>,
155 ) -> anyhow::Result<Self> {
156 let api_key = nautilus_core::env::get_or_env_var(api_key, BINANCE_API_KEY)?;
157 let api_secret = nautilus_core::env::get_or_env_var(api_secret, BINANCE_API_SECRET)?;
158 Ok(Self::new(url, api_key, api_secret, heartbeat))
159 }
160
161 pub fn from_env(url: Option<String>, heartbeat: Option<u64>) -> anyhow::Result<Self> {
171 Self::with_env(url, None, None, heartbeat)
172 }
173
174 #[must_use]
176 pub fn is_active(&self) -> bool {
177 let mode_u8 = self.connection_mode.load().load(Ordering::Relaxed);
178 mode_u8 == ConnectionMode::Active as u8
179 }
180
181 #[must_use]
183 pub fn is_closed(&self) -> bool {
184 let mode_u8 = self.connection_mode.load().load(Ordering::Relaxed);
185 mode_u8 == ConnectionMode::Closed as u8
186 }
187
188 fn next_request_id(&self) -> String {
190 let id = self.request_id_counter.fetch_add(1, Ordering::Relaxed);
191 format!("req-{id}")
192 }
193
194 pub async fn connect(&mut self) -> BinanceWsApiResult<()> {
204 self.signal.store(false, Ordering::Relaxed);
205 self.cancellation_token = CancellationToken::new();
206
207 let (raw_handler, raw_rx) = channel_message_handler();
208 let ping_handler: PingHandler = Arc::new(move |_| {});
209
210 let headers = vec![(
211 "X-MBX-APIKEY".to_string(),
212 self.credential.api_key().to_string(),
213 )];
214
215 let config = WebSocketConfig {
216 url: self.url.clone(),
217 headers,
218 heartbeat: self.heartbeat,
219 heartbeat_msg: None,
220 reconnect_timeout_ms: Some(5_000),
221 reconnect_delay_initial_ms: Some(500),
222 reconnect_delay_max_ms: Some(5_000),
223 reconnect_backoff_factor: Some(2.0),
224 reconnect_jitter_ms: Some(250),
225 reconnect_max_attempts: None,
226 };
227
228 let keyed_quotas = vec![(
230 BINANCE_WS_RATE_LIMIT_KEY_ORDER[0].as_str().to_string(),
231 binance_ws_order_quota(),
232 )];
233
234 let client = WebSocketClient::connect(
235 config,
236 Some(raw_handler),
237 Some(ping_handler),
238 None,
239 keyed_quotas,
240 Some(binance_ws_order_quota()), )
242 .await
243 .map_err(|e| BinanceWsApiError::ConnectionError(e.to_string()))?;
244
245 self.connection_mode.store(client.connection_mode_atomic());
246
247 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
248 let (out_tx, out_rx) = tokio::sync::mpsc::unbounded_channel();
249
250 {
251 let mut rx_guard = self.out_rx.lock().expect("Mutex poisoned");
252 *rx_guard = Some(out_rx);
253 }
254
255 {
256 let mut tx_guard = self.cmd_tx.write().await;
257 *tx_guard = cmd_tx;
258 }
259
260 let signal = self.signal.clone();
261 let credential = self.credential.clone();
262 let mut handler = BinanceSpotWsApiHandler::new(signal, cmd_rx, raw_rx, out_tx, credential);
263
264 self.cmd_tx
265 .read()
266 .await
267 .send(HandlerCommand::SetClient(client))
268 .map_err(|e| BinanceWsApiError::HandlerUnavailable(e.to_string()))?;
269
270 let cancellation_token = self.cancellation_token.clone();
271 let handle = get_runtime().spawn(async move {
272 tokio::select! {
273 () = cancellation_token.cancelled() => {
274 log::debug!("Handler task cancelled");
275 }
276 _ = handler.run() => {
277 log::debug!("Handler run completed");
278 }
279 }
280 });
281
282 self.task_handle = Some(Arc::new(handle));
283
284 Ok(())
285 }
286
287 pub async fn disconnect(&mut self) {
289 self.signal.store(true, Ordering::Relaxed);
290
291 if let Err(e) = self.cmd_tx.read().await.send(HandlerCommand::Disconnect) {
292 log::warn!("Failed to send disconnect command: {e}");
293 }
294
295 self.cancellation_token.cancel();
296
297 if let Some(handle) = self.task_handle.take()
298 && let Ok(handle) = Arc::try_unwrap(handle)
299 {
300 let _ = handle.await;
301 }
302 }
303
304 pub async fn place_order(&self, params: NewOrderParams) -> BinanceWsApiResult<String> {
310 let id = self.next_request_id();
311 let cmd = HandlerCommand::PlaceOrder {
312 id: id.clone(),
313 params,
314 };
315 self.send_cmd(cmd).await?;
316 Ok(id)
317 }
318
319 pub async fn cancel_order(&self, params: CancelOrderParams) -> BinanceWsApiResult<String> {
325 let id = self.next_request_id();
326 let cmd = HandlerCommand::CancelOrder {
327 id: id.clone(),
328 params,
329 };
330 self.send_cmd(cmd).await?;
331 Ok(id)
332 }
333
334 pub async fn cancel_replace_order(
340 &self,
341 params: CancelReplaceOrderParams,
342 ) -> BinanceWsApiResult<String> {
343 let id = self.next_request_id();
344 let cmd = HandlerCommand::CancelReplaceOrder {
345 id: id.clone(),
346 params,
347 };
348 self.send_cmd(cmd).await?;
349 Ok(id)
350 }
351
352 pub async fn cancel_all_orders(&self, symbol: impl Into<String>) -> BinanceWsApiResult<String> {
358 let id = self.next_request_id();
359 let cmd = HandlerCommand::CancelAllOrders {
360 id: id.clone(),
361 symbol: symbol.into(),
362 };
363 self.send_cmd(cmd).await?;
364 Ok(id)
365 }
366
367 pub async fn recv(&self) -> Option<NautilusWsApiMessage> {
375 let rx_opt = {
377 let mut rx_guard = self.out_rx.lock().expect("Mutex poisoned");
378 rx_guard.take()
379 };
380
381 if let Some(mut rx) = rx_opt {
382 let result = rx.recv().await;
383
384 let mut rx_guard = self.out_rx.lock().expect("Mutex poisoned");
385 *rx_guard = Some(rx);
386 result
387 } else {
388 None
389 }
390 }
391
392 async fn send_cmd(&self, cmd: HandlerCommand) -> BinanceWsApiResult<()> {
393 self.cmd_tx
394 .read()
395 .await
396 .send(cmd)
397 .map_err(|e| BinanceWsApiError::HandlerUnavailable(e.to_string()))
398 }
399}