1use std::{
30 fmt::Debug,
31 sync::{
32 Arc,
33 atomic::{AtomicBool, Ordering},
34 },
35};
36
37use ahash::AHashMap;
38use nautilus_network::{RECONNECTED, websocket::WebSocketClient};
39use tokio_tungstenite::tungstenite::Message;
40
41use super::{
42 error::{BinanceWsApiError, BinanceWsApiResult},
43 messages::{HandlerCommand, NautilusWsApiMessage, RequestMeta, WsApiRequest, method},
44};
45use crate::{
46 common::{
47 credential::Credential,
48 sbe::spot::{
49 ReadBuf, message_header_codec,
50 web_socket_response_codec::{SBE_TEMPLATE_ID, WebSocketResponseDecoder},
51 },
52 },
53 spot::http::{models::BinanceCancelOrderResponse, parse},
54};
55
56pub struct BinanceSpotWsApiHandler {
62 signal: Arc<AtomicBool>,
63 inner: Option<WebSocketClient>,
64 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
65 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
66 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsApiMessage>,
67 credential: Arc<Credential>,
68 pending_requests: AHashMap<String, RequestMeta>,
69}
70
71impl Debug for BinanceSpotWsApiHandler {
72 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73 f.debug_struct(stringify!(BinanceSpotWsApiHandler))
74 .field("inner", &self.inner.as_ref().map(|_| "<client>"))
75 .field(
76 "pending_requests",
77 &format!("{} pending", self.pending_requests.len()),
78 )
79 .finish_non_exhaustive()
80 }
81}
82
83impl BinanceSpotWsApiHandler {
84 #[must_use]
86 pub fn new(
87 signal: Arc<AtomicBool>,
88 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
89 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
90 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsApiMessage>,
91 credential: Arc<Credential>,
92 ) -> Self {
93 Self {
94 signal,
95 inner: None,
96 cmd_rx,
97 raw_rx,
98 out_tx,
99 credential,
100 pending_requests: AHashMap::new(),
101 }
102 }
103
104 pub async fn run(&mut self) -> bool {
109 loop {
110 if self.signal.load(Ordering::Relaxed) {
111 return false;
112 }
113
114 tokio::select! {
115 Some(cmd) = self.cmd_rx.recv() => {
116 match cmd {
117 HandlerCommand::SetClient(client) => {
118 log::debug!("Handler received WebSocket client");
119 self.inner = Some(client);
120 self.emit(NautilusWsApiMessage::Connected);
121 }
122 HandlerCommand::Disconnect => {
123 log::debug!("Handler disconnecting WebSocket client");
124 self.inner = None;
125 return false;
126 }
127 HandlerCommand::PlaceOrder { id, params } => {
128 if let Err(e) = self.handle_place_order(id.clone(), params).await {
129 log::error!("Failed to handle place order command: {e}");
130 self.emit(NautilusWsApiMessage::OrderRejected {
131 request_id: id,
132 code: -1,
133 msg: e.to_string(),
134 });
135 }
136 }
137 HandlerCommand::CancelOrder { id, params } => {
138 if let Err(e) = self.handle_cancel_order(id.clone(), params).await {
139 log::error!("Failed to handle cancel order command: {e}");
140 self.emit(NautilusWsApiMessage::CancelRejected {
141 request_id: id,
142 code: -1,
143 msg: e.to_string(),
144 });
145 }
146 }
147 HandlerCommand::CancelReplaceOrder { id, params } => {
148 if let Err(e) = self.handle_cancel_replace_order(id.clone(), params).await {
149 log::error!("Failed to handle cancel replace command: {e}");
150 self.emit(NautilusWsApiMessage::CancelReplaceRejected {
151 request_id: id,
152 code: -1,
153 msg: e.to_string(),
154 });
155 }
156 }
157 HandlerCommand::CancelAllOrders { id, symbol } => {
158 if let Err(e) = self.handle_cancel_all_orders(id.clone(), symbol).await {
159 log::error!("Failed to handle cancel all command: {e}");
160 self.emit(NautilusWsApiMessage::CancelRejected {
161 request_id: id,
162 code: -1,
163 msg: e.to_string(),
164 });
165 }
166 }
167 }
168 }
169 Some(msg) = self.raw_rx.recv() => {
170 if let Message::Text(ref text) = msg
171 && text.as_str() == RECONNECTED
172 {
173 log::info!("Handler received reconnection signal");
174
175 self.fail_pending_requests();
177
178 self.emit(NautilusWsApiMessage::Reconnected);
179 continue;
180 }
181
182 self.handle_message(msg);
183 }
184 else => {
185 return false;
187 }
188 }
189 }
190 }
191
192 fn emit(&self, msg: NautilusWsApiMessage) {
194 if let Err(e) = self.out_tx.send(msg) {
195 log::error!("Failed to send message to output channel: {e}");
196 }
197 }
198
199 fn fail_pending_requests(&mut self) {
201 if self.pending_requests.is_empty() {
202 return;
203 }
204
205 let count = self.pending_requests.len();
206 log::warn!("Failing {count} pending requests after reconnection");
207
208 let pending = std::mem::take(&mut self.pending_requests);
209 for (request_id, meta) in pending {
210 let msg = self.create_rejection(
211 request_id,
212 -1,
213 "Connection lost before response received".to_string(),
214 meta,
215 );
216 self.emit(msg);
217 }
218 }
219
220 async fn handle_place_order(
221 &mut self,
222 id: String,
223 params: crate::spot::http::query::NewOrderParams,
224 ) -> BinanceWsApiResult<()> {
225 let params_json = serde_json::to_value(¶ms)
226 .map_err(|e| BinanceWsApiError::ClientError(e.to_string()))?;
227 let signed_params = self.sign_params(params_json)?;
228
229 let request = WsApiRequest::new(&id, method::ORDER_PLACE, signed_params);
230 self.pending_requests
231 .insert(id.clone(), RequestMeta::PlaceOrder);
232 self.send_request(request).await
233 }
234
235 async fn handle_cancel_order(
236 &mut self,
237 id: String,
238 params: crate::spot::http::query::CancelOrderParams,
239 ) -> BinanceWsApiResult<()> {
240 let params_json = serde_json::to_value(¶ms)
241 .map_err(|e| BinanceWsApiError::ClientError(e.to_string()))?;
242 let signed_params = self.sign_params(params_json)?;
243
244 let request = WsApiRequest::new(&id, method::ORDER_CANCEL, signed_params);
245 self.pending_requests
246 .insert(id.clone(), RequestMeta::CancelOrder);
247 self.send_request(request).await
248 }
249
250 async fn handle_cancel_replace_order(
251 &mut self,
252 id: String,
253 params: crate::spot::http::query::CancelReplaceOrderParams,
254 ) -> BinanceWsApiResult<()> {
255 let params_json = serde_json::to_value(¶ms)
256 .map_err(|e| BinanceWsApiError::ClientError(e.to_string()))?;
257 let signed_params = self.sign_params(params_json)?;
258
259 let request = WsApiRequest::new(&id, method::ORDER_CANCEL_REPLACE, signed_params);
260 self.pending_requests
261 .insert(id.clone(), RequestMeta::CancelReplaceOrder);
262 self.send_request(request).await
263 }
264
265 async fn handle_cancel_all_orders(
266 &mut self,
267 id: String,
268 symbol: String,
269 ) -> BinanceWsApiResult<()> {
270 let params_json = serde_json::json!({ "symbol": symbol });
271 let signed_params = self.sign_params(params_json)?;
272
273 let request = WsApiRequest::new(&id, method::OPEN_ORDERS_CANCEL_ALL, signed_params);
274 self.pending_requests
275 .insert(id.clone(), RequestMeta::CancelAllOrders);
276 self.send_request(request).await
277 }
278
279 fn sign_params(&self, mut params: serde_json::Value) -> BinanceWsApiResult<serde_json::Value> {
280 let timestamp = std::time::SystemTime::now()
281 .duration_since(std::time::UNIX_EPOCH)
282 .map_err(|e| BinanceWsApiError::ClientError(e.to_string()))?
283 .as_millis() as i64;
284
285 if let Some(obj) = params.as_object_mut() {
286 obj.insert("timestamp".to_string(), serde_json::json!(timestamp));
287 obj.insert(
288 "apiKey".to_string(),
289 serde_json::json!(self.credential.api_key()),
290 );
291 }
292
293 let query_string = serde_urlencoded::to_string(¶ms)
294 .map_err(|e| BinanceWsApiError::ClientError(e.to_string()))?;
295 let signature = self.credential.sign(&query_string);
296
297 if let Some(obj) = params.as_object_mut() {
298 obj.insert("signature".to_string(), serde_json::json!(signature));
299 }
300
301 Ok(params)
302 }
303
304 async fn send_request(&mut self, request: WsApiRequest) -> BinanceWsApiResult<()> {
305 use super::client::BINANCE_WS_RATE_LIMIT_KEY_ORDER;
306
307 let client = self.inner.as_mut().ok_or_else(|| {
308 BinanceWsApiError::ConnectionError("WebSocket not connected".to_string())
309 })?;
310
311 let json = serde_json::to_string(&request)
312 .map_err(|e| BinanceWsApiError::ClientError(e.to_string()))?;
313
314 log::debug!(
315 "Sending WebSocket API request id={} method={}",
316 request.id,
317 request.method
318 );
319
320 client
322 .send_text(json, Some(BINANCE_WS_RATE_LIMIT_KEY_ORDER.as_slice()))
323 .await
324 .map_err(|e| {
325 BinanceWsApiError::ConnectionError(format!("Failed to send request: {e}"))
326 })?;
327
328 Ok(())
329 }
330
331 fn handle_message(&mut self, msg: Message) {
332 match msg {
333 Message::Binary(data) => self.handle_binary_response(&data),
334 Message::Text(text) => self.handle_text_response(&text),
335 Message::Ping(_) | Message::Pong(_) => {}
336 Message::Close(frame) => {
337 log::debug!("WebSocket closed: {frame:?}");
338 }
339 Message::Frame(_) => {}
340 }
341 }
342
343 fn handle_binary_response(&mut self, data: &[u8]) {
344 match self.decode_ws_api_response(data) {
345 Ok(response) => self.emit(response),
346 Err(e) => {
347 log::error!("Failed to decode WebSocket API response: {e}");
348 self.emit(NautilusWsApiMessage::Error(e.to_string()));
349 }
350 }
351 }
352
353 fn handle_text_response(&mut self, text: &str) {
354 match serde_json::from_str::<serde_json::Value>(text) {
356 Ok(json) => {
357 if let Some(code) = json.get("code").and_then(|v| v.as_i64()) {
358 let msg = json
359 .get("msg")
360 .and_then(|v| v.as_str())
361 .unwrap_or("Unknown error");
362 let id = json.get("id").and_then(|v| v.as_str()).map(String::from);
363
364 if let Some(request_id) = id
365 && let Some(meta) = self.pending_requests.remove(&request_id)
366 {
367 let rejection =
368 self.create_rejection(request_id, code as i32, msg.to_string(), meta);
369 self.emit(rejection);
370 return;
371 }
372 log::warn!(
373 "Received error response without matching request ID: code={code} msg={msg}"
374 );
375 }
376 }
377 Err(e) => {
378 log::warn!("Failed to parse text response as JSON: {e}");
379 }
380 }
381 }
382
383 fn decode_ws_api_response(
384 &mut self,
385 data: &[u8],
386 ) -> Result<NautilusWsApiMessage, BinanceWsApiError> {
387 let (request_id, status, result_data) = self.parse_envelope(data)?;
389
390 let meta = self.pending_requests.remove(&request_id).ok_or_else(|| {
392 BinanceWsApiError::UnknownRequestId(format!("No pending request for ID: {request_id}"))
393 })?;
394
395 if status != 200 {
397 return Ok(self.create_rejection(
398 request_id,
399 status as i32,
400 format!("Request failed with status {status}"),
401 meta,
402 ));
403 }
404
405 match meta {
407 RequestMeta::PlaceOrder => {
408 let response = parse::decode_new_order_full(&result_data)?;
409 Ok(NautilusWsApiMessage::OrderAccepted {
410 request_id,
411 response,
412 })
413 }
414 RequestMeta::CancelOrder => {
415 let response = parse::decode_cancel_order(&result_data)?;
416 Ok(NautilusWsApiMessage::OrderCanceled {
417 request_id,
418 response,
419 })
420 }
421 RequestMeta::CancelReplaceOrder => {
422 let new_order_response = parse::decode_new_order_full(&result_data)?;
424 let cancel_response = BinanceCancelOrderResponse {
425 price_exponent: new_order_response.price_exponent,
426 qty_exponent: new_order_response.qty_exponent,
427 order_id: 0,
428 order_list_id: None,
429 transact_time: new_order_response.transact_time,
430 price_mantissa: 0,
431 orig_qty_mantissa: 0,
432 executed_qty_mantissa: 0,
433 cummulative_quote_qty_mantissa: 0,
434 status: crate::common::sbe::spot::order_status::OrderStatus::Canceled,
435 time_in_force: new_order_response.time_in_force,
436 order_type: new_order_response.order_type,
437 side: new_order_response.side,
438 self_trade_prevention_mode: new_order_response.self_trade_prevention_mode,
439 client_order_id: String::new(),
440 orig_client_order_id: String::new(),
441 symbol: new_order_response.symbol.clone(),
442 };
443 Ok(NautilusWsApiMessage::CancelReplaceAccepted {
444 request_id,
445 cancel_response,
446 new_order_response,
447 })
448 }
449 RequestMeta::CancelAllOrders => {
450 let responses = parse::decode_cancel_open_orders(&result_data)?;
451 Ok(NautilusWsApiMessage::AllOrdersCanceled {
452 request_id,
453 responses,
454 })
455 }
456 }
457 }
458
459 fn parse_envelope(&self, data: &[u8]) -> Result<(String, u16, Vec<u8>), BinanceWsApiError> {
463 if data.len() < message_header_codec::ENCODED_LENGTH {
464 return Err(BinanceWsApiError::DecodeError(
465 crate::common::sbe::error::SbeDecodeError::BufferTooShort {
466 expected: message_header_codec::ENCODED_LENGTH,
467 actual: data.len(),
468 },
469 ));
470 }
471
472 let buf = ReadBuf::new(data);
473
474 let block_length = buf.get_u16_at(0);
476 let template_id = buf.get_u16_at(2);
477
478 if template_id != SBE_TEMPLATE_ID {
479 return Err(BinanceWsApiError::DecodeError(
480 crate::common::sbe::error::SbeDecodeError::UnknownTemplateId(template_id),
481 ));
482 }
483
484 let version = buf.get_u16_at(6);
485
486 let decoder = WebSocketResponseDecoder::default().wrap(
488 buf,
489 message_header_codec::ENCODED_LENGTH,
490 block_length,
491 version,
492 );
493
494 let status = decoder.status();
496
497 let mut rate_limits = decoder.rate_limits_decoder();
499 while rate_limits.advance().unwrap_or(None).is_some() {}
500 let mut decoder = rate_limits.parent().map_err(|_| {
501 BinanceWsApiError::ClientError("Failed to get parent from rate_limits".to_string())
502 })?;
503
504 let id_coords = decoder.id_decoder();
506 let id_bytes = decoder.id_slice(id_coords);
507 let request_id = String::from_utf8_lossy(id_bytes).to_string();
508
509 let result_coords = decoder.result_decoder();
511 let result_data = decoder.result_slice(result_coords).to_vec();
512
513 Ok((request_id, status, result_data))
514 }
515
516 fn create_rejection(
517 &self,
518 request_id: String,
519 code: i32,
520 msg: String,
521 meta: RequestMeta,
522 ) -> NautilusWsApiMessage {
523 match meta {
524 RequestMeta::PlaceOrder => NautilusWsApiMessage::OrderRejected {
525 request_id,
526 code,
527 msg,
528 },
529 RequestMeta::CancelOrder => NautilusWsApiMessage::CancelRejected {
530 request_id,
531 code,
532 msg,
533 },
534 RequestMeta::CancelReplaceOrder => NautilusWsApiMessage::CancelReplaceRejected {
535 request_id,
536 code,
537 msg,
538 },
539 RequestMeta::CancelAllOrders => NautilusWsApiMessage::CancelRejected {
540 request_id,
541 code,
542 msg,
543 },
544 }
545 }
546}