1use std::{
30 fmt::Debug,
31 sync::{
32 Arc,
33 atomic::{AtomicBool, Ordering},
34 },
35};
36
37use ahash::AHashMap;
38use nautilus_network::{RECONNECTED, websocket::WebSocketClient};
39use tokio_tungstenite::tungstenite::Message;
40
41use super::{
42 error::{BinanceWsApiError, BinanceWsApiResult},
43 messages::{HandlerCommand, NautilusWsApiMessage, RequestMeta, WsApiRequest, method},
44};
45use crate::{
46 common::{
47 credential::Credential,
48 sbe::spot::{
49 ReadBuf, message_header_codec,
50 web_socket_response_codec::{SBE_TEMPLATE_ID, WebSocketResponseDecoder},
51 },
52 },
53 spot::http::{models::BinanceCancelOrderResponse, parse},
54};
55
56pub struct BinanceSpotWsApiHandler {
62 signal: Arc<AtomicBool>,
63 inner: Option<WebSocketClient>,
64 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
65 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
66 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsApiMessage>,
67 credential: Arc<Credential>,
68 pending_requests: AHashMap<String, RequestMeta>,
69}
70
71impl Debug for BinanceSpotWsApiHandler {
72 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73 f.debug_struct(stringify!(BinanceSpotWsApiHandler))
74 .field("inner", &self.inner.as_ref().map(|_| "<client>"))
75 .field(
76 "pending_requests",
77 &format!("{} pending", self.pending_requests.len()),
78 )
79 .finish_non_exhaustive()
80 }
81}
82
83impl BinanceSpotWsApiHandler {
84 #[must_use]
86 pub fn new(
87 signal: Arc<AtomicBool>,
88 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
89 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
90 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsApiMessage>,
91 credential: Arc<Credential>,
92 ) -> Self {
93 Self {
94 signal,
95 inner: None,
96 cmd_rx,
97 raw_rx,
98 out_tx,
99 credential,
100 pending_requests: AHashMap::new(),
101 }
102 }
103
104 pub async fn run(&mut self) -> bool {
109 loop {
110 if self.signal.load(Ordering::Relaxed) {
111 return false;
112 }
113
114 tokio::select! {
115 Some(cmd) = self.cmd_rx.recv() => {
116 match cmd {
117 HandlerCommand::SetClient(client) => {
118 log::debug!("Handler received WebSocket client");
119 self.inner = Some(client);
120 self.emit(NautilusWsApiMessage::Connected);
121 }
122 HandlerCommand::Disconnect => {
123 log::debug!("Handler disconnecting WebSocket client");
124 self.inner = None;
125 return false;
126 }
127 HandlerCommand::PlaceOrder { id, params } => {
128 if let Err(e) = self.handle_place_order(id.clone(), params).await {
129 log::error!("Failed to handle place order command: {e}");
130 self.emit(NautilusWsApiMessage::OrderRejected {
131 request_id: id,
132 code: -1,
133 msg: e.to_string(),
134 });
135 }
136 }
137 HandlerCommand::CancelOrder { id, params } => {
138 if let Err(e) = self.handle_cancel_order(id.clone(), params).await {
139 log::error!("Failed to handle cancel order command: {e}");
140 self.emit(NautilusWsApiMessage::CancelRejected {
141 request_id: id,
142 code: -1,
143 msg: e.to_string(),
144 });
145 }
146 }
147 HandlerCommand::CancelReplaceOrder { id, params } => {
148 if let Err(e) = self.handle_cancel_replace_order(id.clone(), params).await {
149 log::error!("Failed to handle cancel replace command: {e}");
150 self.emit(NautilusWsApiMessage::CancelReplaceRejected {
151 request_id: id,
152 code: -1,
153 msg: e.to_string(),
154 });
155 }
156 }
157 HandlerCommand::CancelAllOrders { id, symbol } => {
158 if let Err(e) = self.handle_cancel_all_orders(id.clone(), symbol).await {
159 log::error!("Failed to handle cancel all command: {e}");
160 self.emit(NautilusWsApiMessage::CancelRejected {
161 request_id: id,
162 code: -1,
163 msg: e.to_string(),
164 });
165 }
166 }
167 }
168 }
169 Some(msg) = self.raw_rx.recv() => {
170 if let Message::Text(ref text) = msg
171 && text.as_str() == RECONNECTED
172 {
173 log::info!("Handler received reconnection signal");
174
175 self.fail_pending_requests();
177
178 self.emit(NautilusWsApiMessage::Reconnected);
179 continue;
180 }
181
182 self.handle_message(msg);
183 }
184 else => {
185 return false;
187 }
188 }
189 }
190 }
191
192 fn emit(&self, msg: NautilusWsApiMessage) {
194 if let Err(e) = self.out_tx.send(msg) {
195 log::error!("Failed to send message to output channel: {e}");
196 }
197 }
198
199 fn fail_pending_requests(&mut self) {
201 if self.pending_requests.is_empty() {
202 return;
203 }
204
205 let count = self.pending_requests.len();
206 log::warn!("Failing {count} pending requests after reconnection");
207
208 let pending = std::mem::take(&mut self.pending_requests);
209 for (request_id, meta) in pending {
210 let msg = self.create_rejection(
211 request_id,
212 -1,
213 "Connection lost before response received".to_string(),
214 meta,
215 );
216 self.emit(msg);
217 }
218 }
219
220 async fn handle_place_order(
221 &mut self,
222 id: String,
223 params: crate::spot::http::query::NewOrderParams,
224 ) -> BinanceWsApiResult<()> {
225 let params_json = serde_json::to_value(¶ms)
226 .map_err(|e| BinanceWsApiError::ClientError(e.to_string()))?;
227 let signed_params = self.sign_params(params_json)?;
228
229 let request = WsApiRequest::new(&id, method::ORDER_PLACE, signed_params);
230 self.pending_requests
231 .insert(id.clone(), RequestMeta::PlaceOrder);
232 self.send_request(request).await
233 }
234
235 async fn handle_cancel_order(
236 &mut self,
237 id: String,
238 params: crate::spot::http::query::CancelOrderParams,
239 ) -> BinanceWsApiResult<()> {
240 let params_json = serde_json::to_value(¶ms)
241 .map_err(|e| BinanceWsApiError::ClientError(e.to_string()))?;
242 let signed_params = self.sign_params(params_json)?;
243
244 let request = WsApiRequest::new(&id, method::ORDER_CANCEL, signed_params);
245 self.pending_requests
246 .insert(id.clone(), RequestMeta::CancelOrder);
247 self.send_request(request).await
248 }
249
250 async fn handle_cancel_replace_order(
251 &mut self,
252 id: String,
253 params: crate::spot::http::query::CancelReplaceOrderParams,
254 ) -> BinanceWsApiResult<()> {
255 let params_json = serde_json::to_value(¶ms)
256 .map_err(|e| BinanceWsApiError::ClientError(e.to_string()))?;
257 let signed_params = self.sign_params(params_json)?;
258
259 let request = WsApiRequest::new(&id, method::ORDER_CANCEL_REPLACE, signed_params);
260 self.pending_requests
261 .insert(id.clone(), RequestMeta::CancelReplaceOrder);
262 self.send_request(request).await
263 }
264
265 async fn handle_cancel_all_orders(
266 &mut self,
267 id: String,
268 symbol: String,
269 ) -> BinanceWsApiResult<()> {
270 let params_json = serde_json::json!({ "symbol": symbol });
271 let signed_params = self.sign_params(params_json)?;
272
273 let request = WsApiRequest::new(&id, method::OPEN_ORDERS_CANCEL_ALL, signed_params);
274 self.pending_requests
275 .insert(id.clone(), RequestMeta::CancelAllOrders);
276 self.send_request(request).await
277 }
278
279 fn sign_params(&self, mut params: serde_json::Value) -> BinanceWsApiResult<serde_json::Value> {
280 let timestamp = std::time::SystemTime::now()
281 .duration_since(std::time::UNIX_EPOCH)
282 .map_err(|e| BinanceWsApiError::ClientError(e.to_string()))?
283 .as_millis() as i64;
284
285 if let Some(obj) = params.as_object_mut() {
286 obj.insert("timestamp".to_string(), serde_json::json!(timestamp));
287 obj.insert(
288 "apiKey".to_string(),
289 serde_json::json!(self.credential.api_key()),
290 );
291 }
292
293 let query_string = serde_urlencoded::to_string(¶ms)
294 .map_err(|e| BinanceWsApiError::ClientError(e.to_string()))?;
295 let signature = self.credential.sign(&query_string);
296
297 if let Some(obj) = params.as_object_mut() {
298 obj.insert("signature".to_string(), serde_json::json!(signature));
299 }
300
301 Ok(params)
302 }
303
304 async fn send_request(&mut self, request: WsApiRequest) -> BinanceWsApiResult<()> {
305 use super::client::BINANCE_WS_RATE_LIMIT_KEY_ORDER;
306
307 let client = self.inner.as_mut().ok_or_else(|| {
308 BinanceWsApiError::ConnectionError("WebSocket not connected".to_string())
309 })?;
310
311 let json = serde_json::to_string(&request)
312 .map_err(|e| BinanceWsApiError::ClientError(e.to_string()))?;
313
314 log::debug!(
315 "Sending WebSocket API request id={} method={}",
316 request.id,
317 request.method
318 );
319
320 let rate_limit_keys = Some(vec![BINANCE_WS_RATE_LIMIT_KEY_ORDER.to_string()]);
322
323 client.send_text(json, rate_limit_keys).await.map_err(|e| {
324 BinanceWsApiError::ConnectionError(format!("Failed to send request: {e}"))
325 })?;
326
327 Ok(())
328 }
329
330 fn handle_message(&mut self, msg: Message) {
331 match msg {
332 Message::Binary(data) => self.handle_binary_response(&data),
333 Message::Text(text) => self.handle_text_response(&text),
334 Message::Ping(_) | Message::Pong(_) => {}
335 Message::Close(frame) => {
336 log::debug!("WebSocket closed: {frame:?}");
337 }
338 Message::Frame(_) => {}
339 }
340 }
341
342 fn handle_binary_response(&mut self, data: &[u8]) {
343 match self.decode_ws_api_response(data) {
344 Ok(response) => self.emit(response),
345 Err(e) => {
346 log::error!("Failed to decode WebSocket API response: {e}");
347 self.emit(NautilusWsApiMessage::Error(e.to_string()));
348 }
349 }
350 }
351
352 fn handle_text_response(&mut self, text: &str) {
353 match serde_json::from_str::<serde_json::Value>(text) {
355 Ok(json) => {
356 if let Some(code) = json.get("code").and_then(|v| v.as_i64()) {
357 let msg = json
358 .get("msg")
359 .and_then(|v| v.as_str())
360 .unwrap_or("Unknown error");
361 let id = json.get("id").and_then(|v| v.as_str()).map(String::from);
362
363 if let Some(request_id) = id
364 && let Some(meta) = self.pending_requests.remove(&request_id)
365 {
366 let rejection =
367 self.create_rejection(request_id, code as i32, msg.to_string(), meta);
368 self.emit(rejection);
369 return;
370 }
371 log::warn!(
372 "Received error response without matching request ID: code={code} msg={msg}"
373 );
374 }
375 }
376 Err(e) => {
377 log::warn!("Failed to parse text response as JSON: {e}");
378 }
379 }
380 }
381
382 fn decode_ws_api_response(
383 &mut self,
384 data: &[u8],
385 ) -> Result<NautilusWsApiMessage, BinanceWsApiError> {
386 let (request_id, status, result_data) = self.parse_envelope(data)?;
388
389 let meta = self.pending_requests.remove(&request_id).ok_or_else(|| {
391 BinanceWsApiError::UnknownRequestId(format!("No pending request for ID: {request_id}"))
392 })?;
393
394 if status != 200 {
396 return Ok(self.create_rejection(
397 request_id,
398 status as i32,
399 format!("Request failed with status {status}"),
400 meta,
401 ));
402 }
403
404 match meta {
406 RequestMeta::PlaceOrder => {
407 let response = parse::decode_new_order_full(&result_data)?;
408 Ok(NautilusWsApiMessage::OrderAccepted {
409 request_id,
410 response,
411 })
412 }
413 RequestMeta::CancelOrder => {
414 let response = parse::decode_cancel_order(&result_data)?;
415 Ok(NautilusWsApiMessage::OrderCanceled {
416 request_id,
417 response,
418 })
419 }
420 RequestMeta::CancelReplaceOrder => {
421 let new_order_response = parse::decode_new_order_full(&result_data)?;
423 let cancel_response = BinanceCancelOrderResponse {
424 price_exponent: new_order_response.price_exponent,
425 qty_exponent: new_order_response.qty_exponent,
426 order_id: 0,
427 order_list_id: None,
428 transact_time: new_order_response.transact_time,
429 price_mantissa: 0,
430 orig_qty_mantissa: 0,
431 executed_qty_mantissa: 0,
432 cummulative_quote_qty_mantissa: 0,
433 status: crate::common::sbe::spot::order_status::OrderStatus::Canceled,
434 time_in_force: new_order_response.time_in_force,
435 order_type: new_order_response.order_type,
436 side: new_order_response.side,
437 self_trade_prevention_mode: new_order_response.self_trade_prevention_mode,
438 client_order_id: String::new(),
439 orig_client_order_id: String::new(),
440 symbol: new_order_response.symbol.clone(),
441 };
442 Ok(NautilusWsApiMessage::CancelReplaceAccepted {
443 request_id,
444 cancel_response,
445 new_order_response,
446 })
447 }
448 RequestMeta::CancelAllOrders => {
449 let responses = parse::decode_cancel_open_orders(&result_data)?;
450 Ok(NautilusWsApiMessage::AllOrdersCanceled {
451 request_id,
452 responses,
453 })
454 }
455 }
456 }
457
458 fn parse_envelope(&self, data: &[u8]) -> Result<(String, u16, Vec<u8>), BinanceWsApiError> {
462 if data.len() < message_header_codec::ENCODED_LENGTH {
463 return Err(BinanceWsApiError::DecodeError(
464 crate::common::sbe::error::SbeDecodeError::BufferTooShort {
465 expected: message_header_codec::ENCODED_LENGTH,
466 actual: data.len(),
467 },
468 ));
469 }
470
471 let buf = ReadBuf::new(data);
472
473 let block_length = buf.get_u16_at(0);
475 let template_id = buf.get_u16_at(2);
476
477 if template_id != SBE_TEMPLATE_ID {
478 return Err(BinanceWsApiError::DecodeError(
479 crate::common::sbe::error::SbeDecodeError::UnknownTemplateId(template_id),
480 ));
481 }
482
483 let version = buf.get_u16_at(6);
484
485 let decoder = WebSocketResponseDecoder::default().wrap(
487 buf,
488 message_header_codec::ENCODED_LENGTH,
489 block_length,
490 version,
491 );
492
493 let status = decoder.status();
495
496 let mut rate_limits = decoder.rate_limits_decoder();
498 while rate_limits.advance().unwrap_or(None).is_some() {}
499 let mut decoder = rate_limits.parent().map_err(|_| {
500 BinanceWsApiError::ClientError("Failed to get parent from rate_limits".to_string())
501 })?;
502
503 let id_coords = decoder.id_decoder();
505 let id_bytes = decoder.id_slice(id_coords);
506 let request_id = String::from_utf8_lossy(id_bytes).to_string();
507
508 let result_coords = decoder.result_decoder();
510 let result_data = decoder.result_slice(result_coords).to_vec();
511
512 Ok((request_id, status, result_data))
513 }
514
515 fn create_rejection(
516 &self,
517 request_id: String,
518 code: i32,
519 msg: String,
520 meta: RequestMeta,
521 ) -> NautilusWsApiMessage {
522 match meta {
523 RequestMeta::PlaceOrder => NautilusWsApiMessage::OrderRejected {
524 request_id,
525 code,
526 msg,
527 },
528 RequestMeta::CancelOrder => NautilusWsApiMessage::CancelRejected {
529 request_id,
530 code,
531 msg,
532 },
533 RequestMeta::CancelReplaceOrder => NautilusWsApiMessage::CancelReplaceRejected {
534 request_id,
535 code,
536 msg,
537 },
538 RequestMeta::CancelAllOrders => NautilusWsApiMessage::CancelRejected {
539 request_id,
540 code,
541 msg,
542 },
543 }
544 }
545}