Skip to main content

nautilus_binance/spot/websocket/trading/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Binance Spot WebSocket API message handler.
17//!
18//! The handler runs in a dedicated Tokio task as the I/O boundary between the client
19//! orchestrator and the network layer. It exclusively owns the `WebSocketClient` and
20//! processes commands from the client via an unbounded channel.
21//!
22//! ## Responsibilities
23//!
24//! - Command processing: Receives `HandlerCommand` from client, serializes to JSON requests.
25//! - Response decoding: Parses SBE binary responses using schema 3 decoders.
26//! - Request correlation: Matches responses to pending requests by ID.
27//! - Message transformation: Emits `NautilusWsApiMessage` events to client via channel.
28
29use std::{
30    fmt::Debug,
31    sync::{
32        Arc,
33        atomic::{AtomicBool, Ordering},
34    },
35};
36
37use ahash::AHashMap;
38use nautilus_network::{RECONNECTED, websocket::WebSocketClient};
39use tokio_tungstenite::tungstenite::Message;
40
41use super::{
42    error::{BinanceWsApiError, BinanceWsApiResult},
43    messages::{HandlerCommand, NautilusWsApiMessage, RequestMeta, WsApiRequest, method},
44};
45use crate::{
46    common::{
47        credential::Credential,
48        sbe::spot::{
49            ReadBuf, message_header_codec,
50            web_socket_response_codec::{SBE_TEMPLATE_ID, WebSocketResponseDecoder},
51        },
52    },
53    spot::http::{models::BinanceCancelOrderResponse, parse},
54};
55
56/// Binance Spot WebSocket API handler.
57///
58/// Runs in a dedicated Tokio task, processing commands from the client
59/// and transforming raw WebSocket messages into Nautilus domain events.
60/// Messages are sent to the client via the output channel.
61pub struct BinanceSpotWsApiHandler {
62    signal: Arc<AtomicBool>,
63    inner: Option<WebSocketClient>,
64    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
65    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
66    out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsApiMessage>,
67    credential: Arc<Credential>,
68    pending_requests: AHashMap<String, RequestMeta>,
69}
70
71impl Debug for BinanceSpotWsApiHandler {
72    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73        f.debug_struct(stringify!(BinanceSpotWsApiHandler))
74            .field("inner", &self.inner.as_ref().map(|_| "<client>"))
75            .field(
76                "pending_requests",
77                &format!("{} pending", self.pending_requests.len()),
78            )
79            .finish_non_exhaustive()
80    }
81}
82
83impl BinanceSpotWsApiHandler {
84    /// Creates a new handler instance.
85    #[must_use]
86    pub fn new(
87        signal: Arc<AtomicBool>,
88        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
89        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
90        out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsApiMessage>,
91        credential: Arc<Credential>,
92    ) -> Self {
93        Self {
94            signal,
95            inner: None,
96            cmd_rx,
97            raw_rx,
98            out_tx,
99            credential,
100            pending_requests: AHashMap::new(),
101        }
102    }
103
104    /// Main event loop - processes commands and raw messages.
105    ///
106    /// Sends output messages via `out_tx` channel. Returns `false` when disconnected
107    /// or the signal is set, indicating the handler should exit.
108    pub async fn run(&mut self) -> bool {
109        loop {
110            if self.signal.load(Ordering::Relaxed) {
111                return false;
112            }
113
114            tokio::select! {
115                Some(cmd) = self.cmd_rx.recv() => {
116                    match cmd {
117                        HandlerCommand::SetClient(client) => {
118                            log::debug!("Handler received WebSocket client");
119                            self.inner = Some(client);
120                            self.emit(NautilusWsApiMessage::Connected);
121                        }
122                        HandlerCommand::Disconnect => {
123                            log::debug!("Handler disconnecting WebSocket client");
124                            self.inner = None;
125                            return false;
126                        }
127                        HandlerCommand::PlaceOrder { id, params } => {
128                            if let Err(e) = self.handle_place_order(id.clone(), params).await {
129                                log::error!("Failed to handle place order command: {e}");
130                                self.emit(NautilusWsApiMessage::OrderRejected {
131                                    request_id: id,
132                                    code: -1,
133                                    msg: e.to_string(),
134                                });
135                            }
136                        }
137                        HandlerCommand::CancelOrder { id, params } => {
138                            if let Err(e) = self.handle_cancel_order(id.clone(), params).await {
139                                log::error!("Failed to handle cancel order command: {e}");
140                                self.emit(NautilusWsApiMessage::CancelRejected {
141                                    request_id: id,
142                                    code: -1,
143                                    msg: e.to_string(),
144                                });
145                            }
146                        }
147                        HandlerCommand::CancelReplaceOrder { id, params } => {
148                            if let Err(e) = self.handle_cancel_replace_order(id.clone(), params).await {
149                                log::error!("Failed to handle cancel replace command: {e}");
150                                self.emit(NautilusWsApiMessage::CancelReplaceRejected {
151                                    request_id: id,
152                                    code: -1,
153                                    msg: e.to_string(),
154                                });
155                            }
156                        }
157                        HandlerCommand::CancelAllOrders { id, symbol } => {
158                            if let Err(e) = self.handle_cancel_all_orders(id.clone(), symbol).await {
159                                log::error!("Failed to handle cancel all command: {e}");
160                                self.emit(NautilusWsApiMessage::CancelRejected {
161                                    request_id: id,
162                                    code: -1,
163                                    msg: e.to_string(),
164                                });
165                            }
166                        }
167                    }
168                }
169                Some(msg) = self.raw_rx.recv() => {
170                    if let Message::Text(ref text) = msg
171                        && text.as_str() == RECONNECTED
172                    {
173                        log::info!("Handler received reconnection signal");
174
175                        // Fail any pending requests - they won't get responses on new connection
176                        self.fail_pending_requests();
177
178                        self.emit(NautilusWsApiMessage::Reconnected);
179                        continue;
180                    }
181
182                    self.handle_message(msg);
183                }
184                else => {
185                    // Both channels closed
186                    return false;
187                }
188            }
189        }
190    }
191
192    /// Sends a message to the output channel.
193    fn emit(&self, msg: NautilusWsApiMessage) {
194        if let Err(e) = self.out_tx.send(msg) {
195            log::error!("Failed to send message to output channel: {e}");
196        }
197    }
198
199    /// Fails all pending requests after a reconnection.
200    fn fail_pending_requests(&mut self) {
201        if self.pending_requests.is_empty() {
202            return;
203        }
204
205        let count = self.pending_requests.len();
206        log::warn!("Failing {count} pending requests after reconnection");
207
208        let pending = std::mem::take(&mut self.pending_requests);
209        for (request_id, meta) in pending {
210            let msg = self.create_rejection(
211                request_id,
212                -1,
213                "Connection lost before response received".to_string(),
214                meta,
215            );
216            self.emit(msg);
217        }
218    }
219
220    async fn handle_place_order(
221        &mut self,
222        id: String,
223        params: crate::spot::http::query::NewOrderParams,
224    ) -> BinanceWsApiResult<()> {
225        let params_json = serde_json::to_value(&params)
226            .map_err(|e| BinanceWsApiError::ClientError(e.to_string()))?;
227        let signed_params = self.sign_params(params_json)?;
228
229        let request = WsApiRequest::new(&id, method::ORDER_PLACE, signed_params);
230        self.pending_requests
231            .insert(id.clone(), RequestMeta::PlaceOrder);
232        self.send_request(request).await
233    }
234
235    async fn handle_cancel_order(
236        &mut self,
237        id: String,
238        params: crate::spot::http::query::CancelOrderParams,
239    ) -> BinanceWsApiResult<()> {
240        let params_json = serde_json::to_value(&params)
241            .map_err(|e| BinanceWsApiError::ClientError(e.to_string()))?;
242        let signed_params = self.sign_params(params_json)?;
243
244        let request = WsApiRequest::new(&id, method::ORDER_CANCEL, signed_params);
245        self.pending_requests
246            .insert(id.clone(), RequestMeta::CancelOrder);
247        self.send_request(request).await
248    }
249
250    async fn handle_cancel_replace_order(
251        &mut self,
252        id: String,
253        params: crate::spot::http::query::CancelReplaceOrderParams,
254    ) -> BinanceWsApiResult<()> {
255        let params_json = serde_json::to_value(&params)
256            .map_err(|e| BinanceWsApiError::ClientError(e.to_string()))?;
257        let signed_params = self.sign_params(params_json)?;
258
259        let request = WsApiRequest::new(&id, method::ORDER_CANCEL_REPLACE, signed_params);
260        self.pending_requests
261            .insert(id.clone(), RequestMeta::CancelReplaceOrder);
262        self.send_request(request).await
263    }
264
265    async fn handle_cancel_all_orders(
266        &mut self,
267        id: String,
268        symbol: String,
269    ) -> BinanceWsApiResult<()> {
270        let params_json = serde_json::json!({ "symbol": symbol });
271        let signed_params = self.sign_params(params_json)?;
272
273        let request = WsApiRequest::new(&id, method::OPEN_ORDERS_CANCEL_ALL, signed_params);
274        self.pending_requests
275            .insert(id.clone(), RequestMeta::CancelAllOrders);
276        self.send_request(request).await
277    }
278
279    fn sign_params(&self, mut params: serde_json::Value) -> BinanceWsApiResult<serde_json::Value> {
280        let timestamp = std::time::SystemTime::now()
281            .duration_since(std::time::UNIX_EPOCH)
282            .map_err(|e| BinanceWsApiError::ClientError(e.to_string()))?
283            .as_millis() as i64;
284
285        if let Some(obj) = params.as_object_mut() {
286            obj.insert("timestamp".to_string(), serde_json::json!(timestamp));
287            obj.insert(
288                "apiKey".to_string(),
289                serde_json::json!(self.credential.api_key()),
290            );
291        }
292
293        let query_string = serde_urlencoded::to_string(&params)
294            .map_err(|e| BinanceWsApiError::ClientError(e.to_string()))?;
295        let signature = self.credential.sign(&query_string);
296
297        if let Some(obj) = params.as_object_mut() {
298            obj.insert("signature".to_string(), serde_json::json!(signature));
299        }
300
301        Ok(params)
302    }
303
304    async fn send_request(&mut self, request: WsApiRequest) -> BinanceWsApiResult<()> {
305        use super::client::BINANCE_WS_RATE_LIMIT_KEY_ORDER;
306
307        let client = self.inner.as_mut().ok_or_else(|| {
308            BinanceWsApiError::ConnectionError("WebSocket not connected".to_string())
309        })?;
310
311        let json = serde_json::to_string(&request)
312            .map_err(|e| BinanceWsApiError::ClientError(e.to_string()))?;
313
314        log::debug!(
315            "Sending WebSocket API request id={} method={}",
316            request.id,
317            request.method
318        );
319
320        // Apply rate limiting for order operations
321        client
322            .send_text(json, Some(BINANCE_WS_RATE_LIMIT_KEY_ORDER.as_slice()))
323            .await
324            .map_err(|e| {
325                BinanceWsApiError::ConnectionError(format!("Failed to send request: {e}"))
326            })?;
327
328        Ok(())
329    }
330
331    fn handle_message(&mut self, msg: Message) {
332        match msg {
333            Message::Binary(data) => self.handle_binary_response(&data),
334            Message::Text(text) => self.handle_text_response(&text),
335            Message::Ping(_) | Message::Pong(_) => {}
336            Message::Close(frame) => {
337                log::debug!("WebSocket closed: {frame:?}");
338            }
339            Message::Frame(_) => {}
340        }
341    }
342
343    fn handle_binary_response(&mut self, data: &[u8]) {
344        match self.decode_ws_api_response(data) {
345            Ok(response) => self.emit(response),
346            Err(e) => {
347                log::error!("Failed to decode WebSocket API response: {e}");
348                self.emit(NautilusWsApiMessage::Error(e.to_string()));
349            }
350        }
351    }
352
353    fn handle_text_response(&mut self, text: &str) {
354        // Text responses are typically JSON errors
355        match serde_json::from_str::<serde_json::Value>(text) {
356            Ok(json) => {
357                if let Some(code) = json.get("code").and_then(|v| v.as_i64()) {
358                    let msg = json
359                        .get("msg")
360                        .and_then(|v| v.as_str())
361                        .unwrap_or("Unknown error");
362                    let id = json.get("id").and_then(|v| v.as_str()).map(String::from);
363
364                    if let Some(request_id) = id
365                        && let Some(meta) = self.pending_requests.remove(&request_id)
366                    {
367                        let rejection =
368                            self.create_rejection(request_id, code as i32, msg.to_string(), meta);
369                        self.emit(rejection);
370                        return;
371                    }
372                    log::warn!(
373                        "Received error response without matching request ID: code={code} msg={msg}"
374                    );
375                }
376            }
377            Err(e) => {
378                log::warn!("Failed to parse text response as JSON: {e}");
379            }
380        }
381    }
382
383    fn decode_ws_api_response(
384        &mut self,
385        data: &[u8],
386    ) -> Result<NautilusWsApiMessage, BinanceWsApiError> {
387        // Parse SBE envelope to extract request ID and inner payload
388        let (request_id, status, result_data) = self.parse_envelope(data)?;
389
390        // Look up the pending request by ID
391        let meta = self.pending_requests.remove(&request_id).ok_or_else(|| {
392            BinanceWsApiError::UnknownRequestId(format!("No pending request for ID: {request_id}"))
393        })?;
394
395        // Check for error status (non-200)
396        if status != 200 {
397            return Ok(self.create_rejection(
398                request_id,
399                status as i32,
400                format!("Request failed with status {status}"),
401                meta,
402            ));
403        }
404
405        // Decode the inner payload based on request type
406        match meta {
407            RequestMeta::PlaceOrder => {
408                let response = parse::decode_new_order_full(&result_data)?;
409                Ok(NautilusWsApiMessage::OrderAccepted {
410                    request_id,
411                    response,
412                })
413            }
414            RequestMeta::CancelOrder => {
415                let response = parse::decode_cancel_order(&result_data)?;
416                Ok(NautilusWsApiMessage::OrderCanceled {
417                    request_id,
418                    response,
419                })
420            }
421            RequestMeta::CancelReplaceOrder => {
422                // Cancel-replace returns both cancel and new order info
423                let new_order_response = parse::decode_new_order_full(&result_data)?;
424                let cancel_response = BinanceCancelOrderResponse {
425                    price_exponent: new_order_response.price_exponent,
426                    qty_exponent: new_order_response.qty_exponent,
427                    order_id: 0,
428                    order_list_id: None,
429                    transact_time: new_order_response.transact_time,
430                    price_mantissa: 0,
431                    orig_qty_mantissa: 0,
432                    executed_qty_mantissa: 0,
433                    cummulative_quote_qty_mantissa: 0,
434                    status: crate::common::sbe::spot::order_status::OrderStatus::Canceled,
435                    time_in_force: new_order_response.time_in_force,
436                    order_type: new_order_response.order_type,
437                    side: new_order_response.side,
438                    self_trade_prevention_mode: new_order_response.self_trade_prevention_mode,
439                    client_order_id: String::new(),
440                    orig_client_order_id: String::new(),
441                    symbol: new_order_response.symbol.clone(),
442                };
443                Ok(NautilusWsApiMessage::CancelReplaceAccepted {
444                    request_id,
445                    cancel_response,
446                    new_order_response,
447                })
448            }
449            RequestMeta::CancelAllOrders => {
450                let responses = parse::decode_cancel_open_orders(&result_data)?;
451                Ok(NautilusWsApiMessage::AllOrdersCanceled {
452                    request_id,
453                    responses,
454                })
455            }
456        }
457    }
458
459    /// Parses the WebSocketResponse SBE envelope.
460    ///
461    /// Returns (request_id, status, result_payload).
462    fn parse_envelope(&self, data: &[u8]) -> Result<(String, u16, Vec<u8>), BinanceWsApiError> {
463        if data.len() < message_header_codec::ENCODED_LENGTH {
464            return Err(BinanceWsApiError::DecodeError(
465                crate::common::sbe::error::SbeDecodeError::BufferTooShort {
466                    expected: message_header_codec::ENCODED_LENGTH,
467                    actual: data.len(),
468                },
469            ));
470        }
471
472        let buf = ReadBuf::new(data);
473
474        // Parse message header
475        let block_length = buf.get_u16_at(0);
476        let template_id = buf.get_u16_at(2);
477
478        if template_id != SBE_TEMPLATE_ID {
479            return Err(BinanceWsApiError::DecodeError(
480                crate::common::sbe::error::SbeDecodeError::UnknownTemplateId(template_id),
481            ));
482        }
483
484        let version = buf.get_u16_at(6);
485
486        // Create decoder at offset after message header
487        let decoder = WebSocketResponseDecoder::default().wrap(
488            buf,
489            message_header_codec::ENCODED_LENGTH,
490            block_length,
491            version,
492        );
493
494        // Read status from fixed block (offset 1 within block)
495        let status = decoder.status();
496
497        // Skip rate_limits group
498        let mut rate_limits = decoder.rate_limits_decoder();
499        while rate_limits.advance().unwrap_or(None).is_some() {}
500        let mut decoder = rate_limits.parent().map_err(|_| {
501            BinanceWsApiError::ClientError("Failed to get parent from rate_limits".to_string())
502        })?;
503
504        // Extract request ID
505        let id_coords = decoder.id_decoder();
506        let id_bytes = decoder.id_slice(id_coords);
507        let request_id = String::from_utf8_lossy(id_bytes).to_string();
508
509        // Extract result payload - copy to owned Vec to avoid lifetime issues
510        let result_coords = decoder.result_decoder();
511        let result_data = decoder.result_slice(result_coords).to_vec();
512
513        Ok((request_id, status, result_data))
514    }
515
516    fn create_rejection(
517        &self,
518        request_id: String,
519        code: i32,
520        msg: String,
521        meta: RequestMeta,
522    ) -> NautilusWsApiMessage {
523        match meta {
524            RequestMeta::PlaceOrder => NautilusWsApiMessage::OrderRejected {
525                request_id,
526                code,
527                msg,
528            },
529            RequestMeta::CancelOrder => NautilusWsApiMessage::CancelRejected {
530                request_id,
531                code,
532                msg,
533            },
534            RequestMeta::CancelReplaceOrder => NautilusWsApiMessage::CancelReplaceRejected {
535                request_id,
536                code,
537                msg,
538            },
539            RequestMeta::CancelAllOrders => NautilusWsApiMessage::CancelRejected {
540                request_id,
541                code,
542                msg,
543            },
544        }
545    }
546}