nautilus_binance/futures/websocket/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Binance Futures WebSocket handler for JSON market data streams.
17
18use std::{
19    collections::HashMap,
20    fmt::Debug,
21    sync::{
22        Arc,
23        atomic::{AtomicBool, AtomicU64, Ordering},
24    },
25};
26
27use nautilus_model::{
28    data::Data,
29    instruments::{Instrument, InstrumentAny},
30};
31use nautilus_network::{
32    RECONNECTED,
33    websocket::{SubscriptionState, WebSocketClient},
34};
35use ustr::Ustr;
36
37use super::{
38    messages::{
39        BinanceFuturesAggTradeMsg, BinanceFuturesBookTickerMsg, BinanceFuturesDepthUpdateMsg,
40        BinanceFuturesHandlerCommand, BinanceFuturesTradeMsg, BinanceFuturesWsErrorMsg,
41        BinanceFuturesWsErrorResponse, BinanceFuturesWsSubscribeRequest,
42        BinanceFuturesWsSubscribeResponse, NautilusFuturesWsMessage,
43    },
44    parse::{
45        extract_event_type, extract_symbol, parse_agg_trade, parse_book_ticker, parse_depth_update,
46        parse_trade,
47    },
48};
49use crate::common::enums::{BinanceWsEventType, BinanceWsMethod};
50
51/// Handler for Binance Futures WebSocket JSON streams.
52pub struct BinanceFuturesWsFeedHandler {
53    #[allow(dead_code)] // Reserved for shutdown signal handling
54    signal: Arc<AtomicBool>,
55    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<BinanceFuturesHandlerCommand>,
56    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Vec<u8>>,
57    #[allow(dead_code)] // Reserved for async message emission
58    out_tx: tokio::sync::mpsc::UnboundedSender<NautilusFuturesWsMessage>,
59    client: Option<WebSocketClient>,
60    instruments: HashMap<Ustr, InstrumentAny>,
61    subscriptions_state: SubscriptionState,
62    request_id_counter: Arc<AtomicU64>,
63    pending_requests: HashMap<u64, Vec<String>>,
64}
65
66impl Debug for BinanceFuturesWsFeedHandler {
67    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68        f.debug_struct(stringify!(BinanceFuturesWsFeedHandler))
69            .field("instruments_count", &self.instruments.len())
70            .field("pending_requests", &self.pending_requests.len())
71            .finish_non_exhaustive()
72    }
73}
74
75impl BinanceFuturesWsFeedHandler {
76    /// Creates a new handler instance.
77    pub fn new(
78        signal: Arc<AtomicBool>,
79        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<BinanceFuturesHandlerCommand>,
80        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Vec<u8>>,
81        out_tx: tokio::sync::mpsc::UnboundedSender<NautilusFuturesWsMessage>,
82        subscriptions_state: SubscriptionState,
83        request_id_counter: Arc<AtomicU64>,
84    ) -> Self {
85        Self {
86            signal,
87            cmd_rx,
88            raw_rx,
89            out_tx,
90            client: None,
91            instruments: HashMap::new(),
92            subscriptions_state,
93            request_id_counter,
94            pending_requests: HashMap::new(),
95        }
96    }
97
98    /// Returns the next message from the handler.
99    ///
100    /// Processes both commands and raw WebSocket messages.
101    pub async fn next(&mut self) -> Option<NautilusFuturesWsMessage> {
102        loop {
103            if self.signal.load(Ordering::Relaxed) {
104                return None;
105            }
106
107            tokio::select! {
108                Some(cmd) = self.cmd_rx.recv() => {
109                    self.handle_command(cmd).await;
110                }
111                Some(raw) = self.raw_rx.recv() => {
112                    if let Some(msg) = self.handle_raw_message(raw).await {
113                        return Some(msg);
114                    }
115                }
116                else => {
117                    return None;
118                }
119            }
120        }
121    }
122
123    async fn handle_command(&mut self, cmd: BinanceFuturesHandlerCommand) {
124        match cmd {
125            BinanceFuturesHandlerCommand::SetClient(client) => {
126                self.client = Some(client);
127            }
128            BinanceFuturesHandlerCommand::Disconnect => {
129                if let Some(client) = &self.client {
130                    let () = client.disconnect().await;
131                }
132                self.client = None;
133            }
134            BinanceFuturesHandlerCommand::InitializeInstruments(instruments) => {
135                for inst in instruments {
136                    self.instruments.insert(inst.raw_symbol().inner(), inst);
137                }
138            }
139            BinanceFuturesHandlerCommand::UpdateInstrument(instrument) => {
140                self.instruments
141                    .insert(instrument.raw_symbol().inner(), instrument);
142            }
143            BinanceFuturesHandlerCommand::Subscribe { streams } => {
144                self.send_subscribe(streams).await;
145            }
146            BinanceFuturesHandlerCommand::Unsubscribe { streams } => {
147                self.send_unsubscribe(streams).await;
148            }
149        }
150    }
151
152    async fn send_subscribe(&mut self, streams: Vec<String>) {
153        let Some(client) = &self.client else {
154            log::warn!("Cannot subscribe: no client connected");
155            return;
156        };
157
158        let request_id = self.request_id_counter.fetch_add(1, Ordering::Relaxed);
159
160        // Track pending request
161        self.pending_requests.insert(request_id, streams.clone());
162
163        // Mark streams as pending subscribe
164        for stream in &streams {
165            self.subscriptions_state.mark_subscribe(stream);
166        }
167
168        let request = BinanceFuturesWsSubscribeRequest {
169            method: BinanceWsMethod::Subscribe,
170            params: streams,
171            id: request_id,
172        };
173
174        let json = match serde_json::to_string(&request) {
175            Ok(j) => j,
176            Err(e) => {
177                log::error!("Failed to serialize subscribe request: {e}");
178                return;
179            }
180        };
181
182        if let Err(e) = client.send_text(json, None).await {
183            log::error!("Failed to send subscribe request: {e}");
184        }
185    }
186
187    async fn send_unsubscribe(&mut self, streams: Vec<String>) {
188        let Some(client) = &self.client else {
189            log::warn!("Cannot unsubscribe: no client connected");
190            return;
191        };
192
193        let request_id = self.request_id_counter.fetch_add(1, Ordering::Relaxed);
194
195        let request = BinanceFuturesWsSubscribeRequest {
196            method: BinanceWsMethod::Unsubscribe,
197            params: streams.clone(),
198            id: request_id,
199        };
200
201        let json = match serde_json::to_string(&request) {
202            Ok(j) => j,
203            Err(e) => {
204                log::error!("Failed to serialize unsubscribe request: {e}");
205                return;
206            }
207        };
208
209        if let Err(e) = client.send_text(json, None).await {
210            log::error!("Failed to send unsubscribe request: {e}");
211        }
212
213        // Mark as unsubscribed
214        for stream in &streams {
215            self.subscriptions_state.confirm_unsubscribe(stream);
216        }
217    }
218
219    async fn handle_raw_message(&mut self, raw: Vec<u8>) -> Option<NautilusFuturesWsMessage> {
220        // Check for reconnection signal
221        if let Ok(text) = std::str::from_utf8(&raw)
222            && text == RECONNECTED
223        {
224            log::info!("WebSocket reconnected signal received");
225            return Some(NautilusFuturesWsMessage::Reconnected);
226        }
227
228        // Parse JSON
229        let json: serde_json::Value = match serde_json::from_slice(&raw) {
230            Ok(j) => j,
231            Err(e) => {
232                log::warn!("Failed to parse JSON message: {e}");
233                return None;
234            }
235        };
236
237        // Check for subscription response
238        if json.get("result").is_some() || json.get("id").is_some() {
239            self.handle_subscription_response(&json);
240            return None;
241        }
242
243        // Check for error response
244        if let Some(code) = json.get("code")
245            && let Some(code) = code.as_i64()
246        {
247            let msg = json
248                .get("msg")
249                .and_then(|m| m.as_str())
250                .unwrap_or("Unknown error")
251                .to_string();
252            return Some(NautilusFuturesWsMessage::Error(BinanceFuturesWsErrorMsg {
253                code,
254                msg,
255            }));
256        }
257
258        // Handle stream data
259        self.handle_stream_data(&json)
260    }
261
262    fn handle_subscription_response(&mut self, json: &serde_json::Value) {
263        if let Ok(response) =
264            serde_json::from_value::<BinanceFuturesWsSubscribeResponse>(json.clone())
265        {
266            if let Some(streams) = self.pending_requests.remove(&response.id) {
267                if response.result.is_none() {
268                    // Success - confirm subscriptions
269                    for stream in &streams {
270                        self.subscriptions_state.confirm_subscribe(stream);
271                    }
272                    log::debug!("Subscription confirmed: streams={streams:?}");
273                } else {
274                    // Failure - mark streams as failed
275                    for stream in &streams {
276                        self.subscriptions_state.mark_failure(stream);
277                    }
278                    log::warn!(
279                        "Subscription failed: streams={streams:?}, result={:?}",
280                        response.result
281                    );
282                }
283            }
284        } else if let Ok(error) =
285            serde_json::from_value::<BinanceFuturesWsErrorResponse>(json.clone())
286        {
287            if let Some(id) = error.id
288                && let Some(streams) = self.pending_requests.remove(&id)
289            {
290                for stream in &streams {
291                    self.subscriptions_state.mark_failure(stream);
292                }
293            }
294            log::warn!(
295                "WebSocket error response: code={}, msg={}",
296                error.code,
297                error.msg
298            );
299        }
300    }
301
302    fn handle_stream_data(&self, json: &serde_json::Value) -> Option<NautilusFuturesWsMessage> {
303        let event_type = extract_event_type(json)?;
304        let symbol = extract_symbol(json)?;
305
306        // Look up instrument
307        let Some(instrument) = self.instruments.get(&symbol) else {
308            log::warn!(
309                "No instrument in cache, dropping message: symbol={symbol}, event_type={event_type:?}"
310            );
311            return None;
312        };
313
314        match event_type {
315            BinanceWsEventType::AggTrade => {
316                if let Ok(msg) = serde_json::from_value::<BinanceFuturesAggTradeMsg>(json.clone()) {
317                    match parse_agg_trade(&msg, instrument) {
318                        Ok(trade) => {
319                            return Some(NautilusFuturesWsMessage::Data(vec![Data::Trade(trade)]));
320                        }
321                        Err(e) => {
322                            log::warn!("Failed to parse aggregate trade: {e}");
323                        }
324                    }
325                }
326            }
327            BinanceWsEventType::Trade => {
328                if let Ok(msg) = serde_json::from_value::<BinanceFuturesTradeMsg>(json.clone()) {
329                    match parse_trade(&msg, instrument) {
330                        Ok(trade) => {
331                            return Some(NautilusFuturesWsMessage::Data(vec![Data::Trade(trade)]));
332                        }
333                        Err(e) => {
334                            log::warn!("Failed to parse trade: {e}");
335                        }
336                    }
337                }
338            }
339            BinanceWsEventType::BookTicker => {
340                if let Ok(msg) = serde_json::from_value::<BinanceFuturesBookTickerMsg>(json.clone())
341                {
342                    match parse_book_ticker(&msg, instrument) {
343                        Ok(quote) => {
344                            return Some(NautilusFuturesWsMessage::Data(vec![Data::Quote(quote)]));
345                        }
346                        Err(e) => {
347                            log::warn!("Failed to parse book ticker: {e}");
348                        }
349                    }
350                }
351            }
352            BinanceWsEventType::DepthUpdate => {
353                if let Ok(msg) =
354                    serde_json::from_value::<BinanceFuturesDepthUpdateMsg>(json.clone())
355                {
356                    match parse_depth_update(&msg, instrument) {
357                        Ok(deltas) => {
358                            return Some(NautilusFuturesWsMessage::Deltas(deltas));
359                        }
360                        Err(e) => {
361                            log::warn!("Failed to parse depth update: {e}");
362                        }
363                    }
364                }
365            }
366            BinanceWsEventType::MarkPriceUpdate
367            | BinanceWsEventType::Kline
368            | BinanceWsEventType::ForceOrder
369            | BinanceWsEventType::Ticker24Hr
370            | BinanceWsEventType::MiniTicker24Hr => {
371                // Pass through as raw JSON for now
372                return Some(NautilusFuturesWsMessage::RawJson(json.clone()));
373            }
374            BinanceWsEventType::Unknown => {
375                log::debug!("Unknown event type: {:?}", json.get("e"));
376            }
377        }
378
379        None
380    }
381}