nautilus_binance/spot/websocket/trading/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Binance Spot WebSocket API message handler.
17//!
18//! The handler runs in a dedicated Tokio task as the I/O boundary between the client
19//! orchestrator and the network layer. It exclusively owns the `WebSocketClient` and
20//! processes commands from the client via an unbounded channel.
21//!
22//! ## Key Responsibilities
23//!
24//! - Command processing: Receives `HandlerCommand` from client, serializes to JSON requests.
25//! - Response decoding: Parses SBE binary responses using schema 3 decoders.
26//! - Request correlation: Matches responses to pending requests by ID.
27//! - Message transformation: Emits `NautilusWsApiMessage` events to client via channel.
28
29use std::{
30    fmt::Debug,
31    sync::{
32        Arc,
33        atomic::{AtomicBool, Ordering},
34    },
35};
36
37use ahash::AHashMap;
38use nautilus_network::{RECONNECTED, websocket::WebSocketClient};
39use tokio_tungstenite::tungstenite::Message;
40
41use super::{
42    error::{BinanceWsApiError, BinanceWsApiResult},
43    messages::{HandlerCommand, NautilusWsApiMessage, RequestMeta, WsApiRequest, method},
44};
45use crate::{
46    common::{
47        credential::Credential,
48        sbe::spot::{
49            ReadBuf, message_header_codec,
50            web_socket_response_codec::{SBE_TEMPLATE_ID, WebSocketResponseDecoder},
51        },
52    },
53    spot::http::{models::BinanceCancelOrderResponse, parse},
54};
55
56/// Binance Spot WebSocket API handler.
57///
58/// Runs in a dedicated Tokio task, processing commands from the client
59/// and transforming raw WebSocket messages into Nautilus domain events.
60/// Messages are sent to the client via the output channel.
61pub struct BinanceSpotWsApiHandler {
62    signal: Arc<AtomicBool>,
63    inner: Option<WebSocketClient>,
64    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
65    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
66    out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsApiMessage>,
67    credential: Arc<Credential>,
68    pending_requests: AHashMap<String, RequestMeta>,
69}
70
71impl Debug for BinanceSpotWsApiHandler {
72    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73        f.debug_struct(stringify!(BinanceSpotWsApiHandler))
74            .field("inner", &self.inner.as_ref().map(|_| "<client>"))
75            .field(
76                "pending_requests",
77                &format!("{} pending", self.pending_requests.len()),
78            )
79            .finish_non_exhaustive()
80    }
81}
82
83impl BinanceSpotWsApiHandler {
84    /// Creates a new handler instance.
85    #[must_use]
86    pub fn new(
87        signal: Arc<AtomicBool>,
88        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
89        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
90        out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsApiMessage>,
91        credential: Arc<Credential>,
92    ) -> Self {
93        Self {
94            signal,
95            inner: None,
96            cmd_rx,
97            raw_rx,
98            out_tx,
99            credential,
100            pending_requests: AHashMap::new(),
101        }
102    }
103
104    /// Main event loop - processes commands and raw messages.
105    ///
106    /// Sends output messages via `out_tx` channel. Returns `false` when disconnected
107    /// or the signal is set, indicating the handler should exit.
108    pub async fn run(&mut self) -> bool {
109        loop {
110            if self.signal.load(Ordering::Relaxed) {
111                return false;
112            }
113
114            tokio::select! {
115                Some(cmd) = self.cmd_rx.recv() => {
116                    match cmd {
117                        HandlerCommand::SetClient(client) => {
118                            log::debug!("Handler received WebSocket client");
119                            self.inner = Some(client);
120                            self.emit(NautilusWsApiMessage::Connected);
121                        }
122                        HandlerCommand::Disconnect => {
123                            log::debug!("Handler disconnecting WebSocket client");
124                            self.inner = None;
125                            return false;
126                        }
127                        HandlerCommand::PlaceOrder { id, params } => {
128                            if let Err(e) = self.handle_place_order(id.clone(), params).await {
129                                log::error!("Failed to handle place order command: {e}");
130                                self.emit(NautilusWsApiMessage::OrderRejected {
131                                    request_id: id,
132                                    code: -1,
133                                    msg: e.to_string(),
134                                });
135                            }
136                        }
137                        HandlerCommand::CancelOrder { id, params } => {
138                            if let Err(e) = self.handle_cancel_order(id.clone(), params).await {
139                                log::error!("Failed to handle cancel order command: {e}");
140                                self.emit(NautilusWsApiMessage::CancelRejected {
141                                    request_id: id,
142                                    code: -1,
143                                    msg: e.to_string(),
144                                });
145                            }
146                        }
147                        HandlerCommand::CancelReplaceOrder { id, params } => {
148                            if let Err(e) = self.handle_cancel_replace_order(id.clone(), params).await {
149                                log::error!("Failed to handle cancel replace command: {e}");
150                                self.emit(NautilusWsApiMessage::CancelReplaceRejected {
151                                    request_id: id,
152                                    code: -1,
153                                    msg: e.to_string(),
154                                });
155                            }
156                        }
157                        HandlerCommand::CancelAllOrders { id, symbol } => {
158                            if let Err(e) = self.handle_cancel_all_orders(id.clone(), symbol).await {
159                                log::error!("Failed to handle cancel all command: {e}");
160                                self.emit(NautilusWsApiMessage::CancelRejected {
161                                    request_id: id,
162                                    code: -1,
163                                    msg: e.to_string(),
164                                });
165                            }
166                        }
167                    }
168                }
169                Some(msg) = self.raw_rx.recv() => {
170                    if let Message::Text(ref text) = msg
171                        && text.as_str() == RECONNECTED
172                    {
173                        log::info!("Handler received reconnection signal");
174
175                        // Fail any pending requests - they won't get responses on new connection
176                        self.fail_pending_requests();
177
178                        self.emit(NautilusWsApiMessage::Reconnected);
179                        continue;
180                    }
181
182                    self.handle_message(msg);
183                }
184                else => {
185                    // Both channels closed
186                    return false;
187                }
188            }
189        }
190    }
191
192    /// Sends a message to the output channel.
193    fn emit(&self, msg: NautilusWsApiMessage) {
194        if let Err(e) = self.out_tx.send(msg) {
195            log::error!("Failed to send message to output channel: {e}");
196        }
197    }
198
199    /// Fails all pending requests after a reconnection.
200    fn fail_pending_requests(&mut self) {
201        if self.pending_requests.is_empty() {
202            return;
203        }
204
205        let count = self.pending_requests.len();
206        log::warn!("Failing {count} pending requests after reconnection");
207
208        let pending = std::mem::take(&mut self.pending_requests);
209        for (request_id, meta) in pending {
210            let msg = self.create_rejection(
211                request_id,
212                -1,
213                "Connection lost before response received".to_string(),
214                meta,
215            );
216            self.emit(msg);
217        }
218    }
219
220    async fn handle_place_order(
221        &mut self,
222        id: String,
223        params: crate::spot::http::query::NewOrderParams,
224    ) -> BinanceWsApiResult<()> {
225        let params_json = serde_json::to_value(&params)
226            .map_err(|e| BinanceWsApiError::ClientError(e.to_string()))?;
227        let signed_params = self.sign_params(params_json)?;
228
229        let request = WsApiRequest::new(&id, method::ORDER_PLACE, signed_params);
230        self.pending_requests
231            .insert(id.clone(), RequestMeta::PlaceOrder);
232        self.send_request(request).await
233    }
234
235    async fn handle_cancel_order(
236        &mut self,
237        id: String,
238        params: crate::spot::http::query::CancelOrderParams,
239    ) -> BinanceWsApiResult<()> {
240        let params_json = serde_json::to_value(&params)
241            .map_err(|e| BinanceWsApiError::ClientError(e.to_string()))?;
242        let signed_params = self.sign_params(params_json)?;
243
244        let request = WsApiRequest::new(&id, method::ORDER_CANCEL, signed_params);
245        self.pending_requests
246            .insert(id.clone(), RequestMeta::CancelOrder);
247        self.send_request(request).await
248    }
249
250    async fn handle_cancel_replace_order(
251        &mut self,
252        id: String,
253        params: crate::spot::http::query::CancelReplaceOrderParams,
254    ) -> BinanceWsApiResult<()> {
255        let params_json = serde_json::to_value(&params)
256            .map_err(|e| BinanceWsApiError::ClientError(e.to_string()))?;
257        let signed_params = self.sign_params(params_json)?;
258
259        let request = WsApiRequest::new(&id, method::ORDER_CANCEL_REPLACE, signed_params);
260        self.pending_requests
261            .insert(id.clone(), RequestMeta::CancelReplaceOrder);
262        self.send_request(request).await
263    }
264
265    async fn handle_cancel_all_orders(
266        &mut self,
267        id: String,
268        symbol: String,
269    ) -> BinanceWsApiResult<()> {
270        let params_json = serde_json::json!({ "symbol": symbol });
271        let signed_params = self.sign_params(params_json)?;
272
273        let request = WsApiRequest::new(&id, method::OPEN_ORDERS_CANCEL_ALL, signed_params);
274        self.pending_requests
275            .insert(id.clone(), RequestMeta::CancelAllOrders);
276        self.send_request(request).await
277    }
278
279    fn sign_params(&self, mut params: serde_json::Value) -> BinanceWsApiResult<serde_json::Value> {
280        let timestamp = std::time::SystemTime::now()
281            .duration_since(std::time::UNIX_EPOCH)
282            .map_err(|e| BinanceWsApiError::ClientError(e.to_string()))?
283            .as_millis() as i64;
284
285        if let Some(obj) = params.as_object_mut() {
286            obj.insert("timestamp".to_string(), serde_json::json!(timestamp));
287            obj.insert(
288                "apiKey".to_string(),
289                serde_json::json!(self.credential.api_key()),
290            );
291        }
292
293        let query_string = serde_urlencoded::to_string(&params)
294            .map_err(|e| BinanceWsApiError::ClientError(e.to_string()))?;
295        let signature = self.credential.sign(&query_string);
296
297        if let Some(obj) = params.as_object_mut() {
298            obj.insert("signature".to_string(), serde_json::json!(signature));
299        }
300
301        Ok(params)
302    }
303
304    async fn send_request(&mut self, request: WsApiRequest) -> BinanceWsApiResult<()> {
305        use super::client::BINANCE_WS_RATE_LIMIT_KEY_ORDER;
306
307        let client = self.inner.as_mut().ok_or_else(|| {
308            BinanceWsApiError::ConnectionError("WebSocket not connected".to_string())
309        })?;
310
311        let json = serde_json::to_string(&request)
312            .map_err(|e| BinanceWsApiError::ClientError(e.to_string()))?;
313
314        log::debug!(
315            "Sending WebSocket API request id={} method={}",
316            request.id,
317            request.method
318        );
319
320        // Apply rate limiting for order operations
321        let rate_limit_keys = Some(vec![BINANCE_WS_RATE_LIMIT_KEY_ORDER.to_string()]);
322
323        client.send_text(json, rate_limit_keys).await.map_err(|e| {
324            BinanceWsApiError::ConnectionError(format!("Failed to send request: {e}"))
325        })?;
326
327        Ok(())
328    }
329
330    fn handle_message(&mut self, msg: Message) {
331        match msg {
332            Message::Binary(data) => self.handle_binary_response(&data),
333            Message::Text(text) => self.handle_text_response(&text),
334            Message::Ping(_) | Message::Pong(_) => {}
335            Message::Close(frame) => {
336                log::debug!("WebSocket closed: {frame:?}");
337            }
338            Message::Frame(_) => {}
339        }
340    }
341
342    fn handle_binary_response(&mut self, data: &[u8]) {
343        match self.decode_ws_api_response(data) {
344            Ok(response) => self.emit(response),
345            Err(e) => {
346                log::error!("Failed to decode WebSocket API response: {e}");
347                self.emit(NautilusWsApiMessage::Error(e.to_string()));
348            }
349        }
350    }
351
352    fn handle_text_response(&mut self, text: &str) {
353        // Text responses are typically JSON errors
354        match serde_json::from_str::<serde_json::Value>(text) {
355            Ok(json) => {
356                if let Some(code) = json.get("code").and_then(|v| v.as_i64()) {
357                    let msg = json
358                        .get("msg")
359                        .and_then(|v| v.as_str())
360                        .unwrap_or("Unknown error");
361                    let id = json.get("id").and_then(|v| v.as_str()).map(String::from);
362
363                    if let Some(request_id) = id
364                        && let Some(meta) = self.pending_requests.remove(&request_id)
365                    {
366                        let rejection =
367                            self.create_rejection(request_id, code as i32, msg.to_string(), meta);
368                        self.emit(rejection);
369                        return;
370                    }
371                    log::warn!(
372                        "Received error response without matching request ID: code={code} msg={msg}"
373                    );
374                }
375            }
376            Err(e) => {
377                log::warn!("Failed to parse text response as JSON: {e}");
378            }
379        }
380    }
381
382    fn decode_ws_api_response(
383        &mut self,
384        data: &[u8],
385    ) -> Result<NautilusWsApiMessage, BinanceWsApiError> {
386        // Parse SBE envelope to extract request ID and inner payload
387        let (request_id, status, result_data) = self.parse_envelope(data)?;
388
389        // Look up the pending request by ID
390        let meta = self.pending_requests.remove(&request_id).ok_or_else(|| {
391            BinanceWsApiError::UnknownRequestId(format!("No pending request for ID: {request_id}"))
392        })?;
393
394        // Check for error status (non-200)
395        if status != 200 {
396            return Ok(self.create_rejection(
397                request_id,
398                status as i32,
399                format!("Request failed with status {status}"),
400                meta,
401            ));
402        }
403
404        // Decode the inner payload based on request type
405        match meta {
406            RequestMeta::PlaceOrder => {
407                let response = parse::decode_new_order_full(&result_data)?;
408                Ok(NautilusWsApiMessage::OrderAccepted {
409                    request_id,
410                    response,
411                })
412            }
413            RequestMeta::CancelOrder => {
414                let response = parse::decode_cancel_order(&result_data)?;
415                Ok(NautilusWsApiMessage::OrderCanceled {
416                    request_id,
417                    response,
418                })
419            }
420            RequestMeta::CancelReplaceOrder => {
421                // Cancel-replace returns both cancel and new order info
422                let new_order_response = parse::decode_new_order_full(&result_data)?;
423                let cancel_response = BinanceCancelOrderResponse {
424                    price_exponent: new_order_response.price_exponent,
425                    qty_exponent: new_order_response.qty_exponent,
426                    order_id: 0,
427                    order_list_id: None,
428                    transact_time: new_order_response.transact_time,
429                    price_mantissa: 0,
430                    orig_qty_mantissa: 0,
431                    executed_qty_mantissa: 0,
432                    cummulative_quote_qty_mantissa: 0,
433                    status: crate::common::sbe::spot::order_status::OrderStatus::Canceled,
434                    time_in_force: new_order_response.time_in_force,
435                    order_type: new_order_response.order_type,
436                    side: new_order_response.side,
437                    self_trade_prevention_mode: new_order_response.self_trade_prevention_mode,
438                    client_order_id: String::new(),
439                    orig_client_order_id: String::new(),
440                    symbol: new_order_response.symbol.clone(),
441                };
442                Ok(NautilusWsApiMessage::CancelReplaceAccepted {
443                    request_id,
444                    cancel_response,
445                    new_order_response,
446                })
447            }
448            RequestMeta::CancelAllOrders => {
449                let responses = parse::decode_cancel_open_orders(&result_data)?;
450                Ok(NautilusWsApiMessage::AllOrdersCanceled {
451                    request_id,
452                    responses,
453                })
454            }
455        }
456    }
457
458    /// Parses the WebSocketResponse SBE envelope.
459    ///
460    /// Returns (request_id, status, result_payload).
461    fn parse_envelope(&self, data: &[u8]) -> Result<(String, u16, Vec<u8>), BinanceWsApiError> {
462        if data.len() < message_header_codec::ENCODED_LENGTH {
463            return Err(BinanceWsApiError::DecodeError(
464                crate::common::sbe::error::SbeDecodeError::BufferTooShort {
465                    expected: message_header_codec::ENCODED_LENGTH,
466                    actual: data.len(),
467                },
468            ));
469        }
470
471        let buf = ReadBuf::new(data);
472
473        // Parse message header
474        let block_length = buf.get_u16_at(0);
475        let template_id = buf.get_u16_at(2);
476
477        if template_id != SBE_TEMPLATE_ID {
478            return Err(BinanceWsApiError::DecodeError(
479                crate::common::sbe::error::SbeDecodeError::UnknownTemplateId(template_id),
480            ));
481        }
482
483        let version = buf.get_u16_at(6);
484
485        // Create decoder at offset after message header
486        let decoder = WebSocketResponseDecoder::default().wrap(
487            buf,
488            message_header_codec::ENCODED_LENGTH,
489            block_length,
490            version,
491        );
492
493        // Read status from fixed block (offset 1 within block)
494        let status = decoder.status();
495
496        // Skip rate_limits group
497        let mut rate_limits = decoder.rate_limits_decoder();
498        while rate_limits.advance().unwrap_or(None).is_some() {}
499        let mut decoder = rate_limits.parent().map_err(|_| {
500            BinanceWsApiError::ClientError("Failed to get parent from rate_limits".to_string())
501        })?;
502
503        // Extract request ID
504        let id_coords = decoder.id_decoder();
505        let id_bytes = decoder.id_slice(id_coords);
506        let request_id = String::from_utf8_lossy(id_bytes).to_string();
507
508        // Extract result payload - copy to owned Vec to avoid lifetime issues
509        let result_coords = decoder.result_decoder();
510        let result_data = decoder.result_slice(result_coords).to_vec();
511
512        Ok((request_id, status, result_data))
513    }
514
515    fn create_rejection(
516        &self,
517        request_id: String,
518        code: i32,
519        msg: String,
520        meta: RequestMeta,
521    ) -> NautilusWsApiMessage {
522        match meta {
523            RequestMeta::PlaceOrder => NautilusWsApiMessage::OrderRejected {
524                request_id,
525                code,
526                msg,
527            },
528            RequestMeta::CancelOrder => NautilusWsApiMessage::CancelRejected {
529                request_id,
530                code,
531                msg,
532            },
533            RequestMeta::CancelReplaceOrder => NautilusWsApiMessage::CancelReplaceRejected {
534                request_id,
535                code,
536                msg,
537            },
538            RequestMeta::CancelAllOrders => NautilusWsApiMessage::CancelRejected {
539                request_id,
540                code,
541                msg,
542            },
543        }
544    }
545}