nautilus_binance/futures/websocket/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Binance Futures WebSocket handler for JSON market data streams.
17
18use std::{
19    collections::HashMap,
20    fmt::Debug,
21    sync::{
22        Arc,
23        atomic::{AtomicBool, AtomicU64, Ordering},
24    },
25};
26
27use nautilus_model::{
28    data::Data,
29    instruments::{Instrument, InstrumentAny},
30};
31use nautilus_network::{
32    RECONNECTED,
33    websocket::{SubscriptionState, WebSocketClient},
34};
35use ustr::Ustr;
36
37use super::{
38    messages::{
39        BinanceFuturesAggTradeMsg, BinanceFuturesBookTickerMsg, BinanceFuturesDepthUpdateMsg,
40        BinanceFuturesHandlerCommand, BinanceFuturesTradeMsg, BinanceFuturesWsErrorMsg,
41        BinanceFuturesWsErrorResponse, BinanceFuturesWsSubscribeRequest,
42        BinanceFuturesWsSubscribeResponse, NautilusFuturesWsMessage,
43    },
44    parse::{
45        extract_event_type, extract_symbol, parse_agg_trade, parse_book_ticker, parse_depth_update,
46        parse_trade,
47    },
48};
49use crate::common::enums::{BinanceWsEventType, BinanceWsMethod};
50
51/// Handler for Binance Futures WebSocket JSON streams.
52pub struct BinanceFuturesWsFeedHandler {
53    #[allow(dead_code)] // Reserved for shutdown signal handling
54    signal: Arc<AtomicBool>,
55    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<BinanceFuturesHandlerCommand>,
56    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Vec<u8>>,
57    #[allow(dead_code)] // Reserved for async message emission
58    out_tx: tokio::sync::mpsc::UnboundedSender<NautilusFuturesWsMessage>,
59    client: Option<WebSocketClient>,
60    instruments: HashMap<Ustr, InstrumentAny>,
61    subscriptions_state: SubscriptionState,
62    request_id_counter: Arc<AtomicU64>,
63    pending_requests: HashMap<u64, Vec<String>>,
64}
65
66impl Debug for BinanceFuturesWsFeedHandler {
67    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68        f.debug_struct("BinanceFuturesWsFeedHandler")
69            .field("instruments_count", &self.instruments.len())
70            .field("pending_requests", &self.pending_requests.len())
71            .finish_non_exhaustive()
72    }
73}
74
75impl BinanceFuturesWsFeedHandler {
76    /// Creates a new handler instance.
77    pub fn new(
78        signal: Arc<AtomicBool>,
79        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<BinanceFuturesHandlerCommand>,
80        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Vec<u8>>,
81        out_tx: tokio::sync::mpsc::UnboundedSender<NautilusFuturesWsMessage>,
82        subscriptions_state: SubscriptionState,
83        request_id_counter: Arc<AtomicU64>,
84    ) -> Self {
85        Self {
86            signal,
87            cmd_rx,
88            raw_rx,
89            out_tx,
90            client: None,
91            instruments: HashMap::new(),
92            subscriptions_state,
93            request_id_counter,
94            pending_requests: HashMap::new(),
95        }
96    }
97
98    /// Returns the next message from the handler.
99    ///
100    /// Processes both commands and raw WebSocket messages.
101    pub async fn next(&mut self) -> Option<NautilusFuturesWsMessage> {
102        loop {
103            if self.signal.load(Ordering::Relaxed) {
104                return None;
105            }
106
107            tokio::select! {
108                Some(cmd) = self.cmd_rx.recv() => {
109                    self.handle_command(cmd).await;
110                }
111                Some(raw) = self.raw_rx.recv() => {
112                    if let Some(msg) = self.handle_raw_message(raw).await {
113                        return Some(msg);
114                    }
115                }
116                else => {
117                    return None;
118                }
119            }
120        }
121    }
122
123    async fn handle_command(&mut self, cmd: BinanceFuturesHandlerCommand) {
124        match cmd {
125            BinanceFuturesHandlerCommand::SetClient(client) => {
126                self.client = Some(client);
127            }
128            BinanceFuturesHandlerCommand::Disconnect => {
129                if let Some(client) = &self.client {
130                    let _ = client.disconnect().await;
131                }
132                self.client = None;
133            }
134            BinanceFuturesHandlerCommand::InitializeInstruments(instruments) => {
135                for inst in instruments {
136                    self.instruments.insert(inst.raw_symbol().inner(), inst);
137                }
138            }
139            BinanceFuturesHandlerCommand::UpdateInstrument(instrument) => {
140                self.instruments
141                    .insert(instrument.raw_symbol().inner(), instrument);
142            }
143            BinanceFuturesHandlerCommand::Subscribe { streams } => {
144                self.send_subscribe(streams).await;
145            }
146            BinanceFuturesHandlerCommand::Unsubscribe { streams } => {
147                self.send_unsubscribe(streams).await;
148            }
149        }
150    }
151
152    async fn send_subscribe(&mut self, streams: Vec<String>) {
153        let Some(client) = &self.client else {
154            tracing::warn!("Cannot subscribe: no client connected");
155            return;
156        };
157
158        let request_id = self.request_id_counter.fetch_add(1, Ordering::Relaxed);
159
160        // Track pending request
161        self.pending_requests.insert(request_id, streams.clone());
162
163        // Mark streams as pending subscribe
164        for stream in &streams {
165            self.subscriptions_state.mark_subscribe(stream);
166        }
167
168        let request = BinanceFuturesWsSubscribeRequest {
169            method: BinanceWsMethod::Subscribe,
170            params: streams,
171            id: request_id,
172        };
173
174        let json = match serde_json::to_string(&request) {
175            Ok(j) => j,
176            Err(e) => {
177                tracing::error!(error = %e, "Failed to serialize subscribe request");
178                return;
179            }
180        };
181
182        if let Err(e) = client.send_text(json, None).await {
183            tracing::error!(error = %e, "Failed to send subscribe request");
184        }
185    }
186
187    async fn send_unsubscribe(&mut self, streams: Vec<String>) {
188        let Some(client) = &self.client else {
189            tracing::warn!("Cannot unsubscribe: no client connected");
190            return;
191        };
192
193        let request_id = self.request_id_counter.fetch_add(1, Ordering::Relaxed);
194
195        let request = BinanceFuturesWsSubscribeRequest {
196            method: BinanceWsMethod::Unsubscribe,
197            params: streams.clone(),
198            id: request_id,
199        };
200
201        let json = match serde_json::to_string(&request) {
202            Ok(j) => j,
203            Err(e) => {
204                tracing::error!(error = %e, "Failed to serialize unsubscribe request");
205                return;
206            }
207        };
208
209        if let Err(e) = client.send_text(json, None).await {
210            tracing::error!(error = %e, "Failed to send unsubscribe request");
211        }
212
213        // Mark as unsubscribed
214        for stream in &streams {
215            self.subscriptions_state.confirm_unsubscribe(stream);
216        }
217    }
218
219    async fn handle_raw_message(&mut self, raw: Vec<u8>) -> Option<NautilusFuturesWsMessage> {
220        // Check for reconnection signal
221        if let Ok(text) = std::str::from_utf8(&raw)
222            && text == RECONNECTED
223        {
224            tracing::info!("WebSocket reconnected signal received");
225            return Some(NautilusFuturesWsMessage::Reconnected);
226        }
227
228        // Parse JSON
229        let json: serde_json::Value = match serde_json::from_slice(&raw) {
230            Ok(j) => j,
231            Err(e) => {
232                tracing::warn!(error = %e, "Failed to parse JSON message");
233                return None;
234            }
235        };
236
237        // Check for subscription response
238        if json.get("result").is_some() || json.get("id").is_some() {
239            self.handle_subscription_response(&json);
240            return None;
241        }
242
243        // Check for error response
244        if let Some(code) = json.get("code")
245            && let Some(code) = code.as_i64()
246        {
247            let msg = json
248                .get("msg")
249                .and_then(|m| m.as_str())
250                .unwrap_or("Unknown error")
251                .to_string();
252            return Some(NautilusFuturesWsMessage::Error(BinanceFuturesWsErrorMsg {
253                code,
254                msg,
255            }));
256        }
257
258        // Handle stream data
259        self.handle_stream_data(&json)
260    }
261
262    fn handle_subscription_response(&mut self, json: &serde_json::Value) {
263        if let Ok(response) =
264            serde_json::from_value::<BinanceFuturesWsSubscribeResponse>(json.clone())
265        {
266            if let Some(streams) = self.pending_requests.remove(&response.id) {
267                if response.result.is_none() {
268                    // Success - confirm subscriptions
269                    for stream in &streams {
270                        self.subscriptions_state.confirm_subscribe(stream);
271                    }
272                    tracing::debug!(streams = ?streams, "Subscription confirmed");
273                } else {
274                    // Failure - mark streams as failed
275                    for stream in &streams {
276                        self.subscriptions_state.mark_failure(stream);
277                    }
278                    tracing::warn!(streams = ?streams, result = ?response.result, "Subscription failed");
279                }
280            }
281        } else if let Ok(error) =
282            serde_json::from_value::<BinanceFuturesWsErrorResponse>(json.clone())
283        {
284            if let Some(id) = error.id
285                && let Some(streams) = self.pending_requests.remove(&id)
286            {
287                for stream in &streams {
288                    self.subscriptions_state.mark_failure(stream);
289                }
290            }
291            tracing::warn!(code = error.code, msg = %error.msg, "WebSocket error response");
292        }
293    }
294
295    fn handle_stream_data(&self, json: &serde_json::Value) -> Option<NautilusFuturesWsMessage> {
296        let event_type = extract_event_type(json)?;
297        let symbol = extract_symbol(json)?;
298
299        // Look up instrument
300        let Some(instrument) = self.instruments.get(&symbol) else {
301            tracing::warn!(
302                symbol = %symbol,
303                event_type = ?event_type,
304                "No instrument in cache, dropping message"
305            );
306            return None;
307        };
308
309        match event_type {
310            BinanceWsEventType::AggTrade => {
311                if let Ok(msg) = serde_json::from_value::<BinanceFuturesAggTradeMsg>(json.clone()) {
312                    match parse_agg_trade(&msg, instrument) {
313                        Ok(trade) => {
314                            return Some(NautilusFuturesWsMessage::Data(vec![Data::Trade(trade)]));
315                        }
316                        Err(e) => {
317                            tracing::warn!(error = %e, "Failed to parse aggregate trade");
318                        }
319                    }
320                }
321            }
322            BinanceWsEventType::Trade => {
323                if let Ok(msg) = serde_json::from_value::<BinanceFuturesTradeMsg>(json.clone()) {
324                    match parse_trade(&msg, instrument) {
325                        Ok(trade) => {
326                            return Some(NautilusFuturesWsMessage::Data(vec![Data::Trade(trade)]));
327                        }
328                        Err(e) => {
329                            tracing::warn!(error = %e, "Failed to parse trade");
330                        }
331                    }
332                }
333            }
334            BinanceWsEventType::BookTicker => {
335                if let Ok(msg) = serde_json::from_value::<BinanceFuturesBookTickerMsg>(json.clone())
336                {
337                    match parse_book_ticker(&msg, instrument) {
338                        Ok(quote) => {
339                            return Some(NautilusFuturesWsMessage::Data(vec![Data::Quote(quote)]));
340                        }
341                        Err(e) => {
342                            tracing::warn!(error = %e, "Failed to parse book ticker");
343                        }
344                    }
345                }
346            }
347            BinanceWsEventType::DepthUpdate => {
348                if let Ok(msg) =
349                    serde_json::from_value::<BinanceFuturesDepthUpdateMsg>(json.clone())
350                {
351                    match parse_depth_update(&msg, instrument) {
352                        Ok(deltas) => {
353                            return Some(NautilusFuturesWsMessage::Deltas(deltas));
354                        }
355                        Err(e) => {
356                            tracing::warn!(error = %e, "Failed to parse depth update");
357                        }
358                    }
359                }
360            }
361            BinanceWsEventType::MarkPriceUpdate
362            | BinanceWsEventType::Kline
363            | BinanceWsEventType::ForceOrder
364            | BinanceWsEventType::Ticker24Hr
365            | BinanceWsEventType::MiniTicker24Hr => {
366                // Pass through as raw JSON for now
367                return Some(NautilusFuturesWsMessage::RawJson(json.clone()));
368            }
369            BinanceWsEventType::Unknown => {
370                tracing::debug!(event_type = ?json.get("e"), "Unknown event type");
371            }
372        }
373
374        None
375    }
376}