Skip to main content

nautilus_binance/futures/websocket/
handler_data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Binance Futures WebSocket handler for JSON market data streams.
17
18use std::{
19    fmt::Debug,
20    sync::{
21        Arc,
22        atomic::{AtomicBool, AtomicU64, Ordering},
23    },
24};
25
26use ahash::AHashMap;
27use nautilus_core::time::AtomicTime;
28use nautilus_model::{
29    data::Data,
30    instruments::{Instrument, InstrumentAny},
31};
32use nautilus_network::{
33    RECONNECTED,
34    websocket::{SubscriptionState, WebSocketClient},
35};
36use ustr::Ustr;
37
38use super::{
39    messages::{
40        BinanceFuturesAccountConfigMsg, BinanceFuturesAccountUpdateMsg, BinanceFuturesAggTradeMsg,
41        BinanceFuturesAlgoUpdateMsg, BinanceFuturesBookTickerMsg, BinanceFuturesDepthUpdateMsg,
42        BinanceFuturesExecWsMessage, BinanceFuturesKlineMsg, BinanceFuturesListenKeyExpiredMsg,
43        BinanceFuturesMarginCallMsg, BinanceFuturesMarkPriceMsg, BinanceFuturesOrderUpdateMsg,
44        BinanceFuturesTradeMsg, BinanceFuturesWsErrorMsg, BinanceFuturesWsErrorResponse,
45        BinanceFuturesWsSubscribeRequest, BinanceFuturesWsSubscribeResponse, DataHandlerCommand,
46        NautilusDataWsMessage, NautilusWsMessage,
47    },
48    parse::{
49        extract_event_type, extract_symbol, parse_agg_trade, parse_book_ticker, parse_depth_update,
50        parse_kline, parse_mark_price, parse_trade,
51    },
52};
53use crate::common::{
54    consts::BINANCE_RATE_LIMIT_KEY_SUBSCRIPTION,
55    enums::{BinanceWsEventType, BinanceWsMethod},
56};
57
58/// Handler for Binance Futures WebSocket JSON streams.
59pub struct BinanceFuturesDataWsFeedHandler {
60    clock: &'static AtomicTime,
61    #[allow(dead_code)] // Reserved for shutdown signal handling
62    signal: Arc<AtomicBool>,
63    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<DataHandlerCommand>,
64    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Vec<u8>>,
65    #[allow(dead_code)] // Reserved for async message emission
66    out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
67    client: Option<WebSocketClient>,
68    instruments: AHashMap<Ustr, InstrumentAny>,
69    subscriptions_state: SubscriptionState,
70    request_id_counter: Arc<AtomicU64>,
71    pending_requests: AHashMap<u64, Vec<String>>,
72}
73
74impl Debug for BinanceFuturesDataWsFeedHandler {
75    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76        f.debug_struct(stringify!(BinanceFuturesWsFeedHandler))
77            .field("instruments_count", &self.instruments.len())
78            .field("pending_requests", &self.pending_requests.len())
79            .finish_non_exhaustive()
80    }
81}
82
83impl BinanceFuturesDataWsFeedHandler {
84    /// Creates a new handler instance.
85    pub fn new(
86        clock: &'static AtomicTime,
87        signal: Arc<AtomicBool>,
88        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<DataHandlerCommand>,
89        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Vec<u8>>,
90        out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
91        subscriptions_state: SubscriptionState,
92        request_id_counter: Arc<AtomicU64>,
93    ) -> Self {
94        Self {
95            clock,
96            signal,
97            cmd_rx,
98            raw_rx,
99            out_tx,
100            client: None,
101            instruments: AHashMap::new(),
102            subscriptions_state,
103            request_id_counter,
104            pending_requests: AHashMap::new(),
105        }
106    }
107
108    /// Returns the next message from the handler.
109    ///
110    /// Processes both commands and raw WebSocket messages.
111    pub async fn next(&mut self) -> Option<NautilusWsMessage> {
112        loop {
113            if self.signal.load(Ordering::Relaxed) {
114                return None;
115            }
116
117            tokio::select! {
118                Some(cmd) = self.cmd_rx.recv() => {
119                    self.handle_command(cmd).await;
120                }
121                Some(raw) = self.raw_rx.recv() => {
122                    if let Some(msg) = self.handle_raw_message(raw).await {
123                        return Some(msg);
124                    }
125                }
126                else => {
127                    return None;
128                }
129            }
130        }
131    }
132
133    async fn handle_command(&mut self, cmd: DataHandlerCommand) {
134        match cmd {
135            DataHandlerCommand::SetClient(client) => {
136                self.client = Some(client);
137            }
138            DataHandlerCommand::Disconnect => {
139                if let Some(client) = &self.client {
140                    let () = client.disconnect().await;
141                }
142                self.client = None;
143            }
144            DataHandlerCommand::InitializeInstruments(instruments) => {
145                for inst in instruments {
146                    self.instruments.insert(inst.raw_symbol().inner(), inst);
147                }
148            }
149            DataHandlerCommand::UpdateInstrument(instrument) => {
150                self.instruments
151                    .insert(instrument.raw_symbol().inner(), instrument);
152            }
153            DataHandlerCommand::Subscribe { streams } => {
154                self.send_subscribe(streams).await;
155            }
156            DataHandlerCommand::Unsubscribe { streams } => {
157                self.send_unsubscribe(streams).await;
158            }
159        }
160    }
161
162    async fn send_subscribe(&mut self, streams: Vec<String>) {
163        let Some(client) = &self.client else {
164            log::warn!("Cannot subscribe: no client connected");
165            return;
166        };
167
168        let request_id = self.request_id_counter.fetch_add(1, Ordering::Relaxed);
169
170        // Track pending request
171        self.pending_requests.insert(request_id, streams.clone());
172
173        // Mark streams as pending subscribe
174        for stream in &streams {
175            self.subscriptions_state.mark_subscribe(stream);
176        }
177
178        let request = BinanceFuturesWsSubscribeRequest {
179            method: BinanceWsMethod::Subscribe,
180            params: streams,
181            id: request_id,
182        };
183
184        let json = match serde_json::to_string(&request) {
185            Ok(j) => j,
186            Err(e) => {
187                log::error!("Failed to serialize subscribe request: {e}");
188                return;
189            }
190        };
191
192        if let Err(e) = client
193            .send_text(json, Some(BINANCE_RATE_LIMIT_KEY_SUBSCRIPTION.as_slice()))
194            .await
195        {
196            log::error!("Failed to send subscribe request: {e}");
197        }
198    }
199
200    async fn send_unsubscribe(&mut self, streams: Vec<String>) {
201        let Some(client) = &self.client else {
202            log::warn!("Cannot unsubscribe: no client connected");
203            return;
204        };
205
206        let request_id = self.request_id_counter.fetch_add(1, Ordering::Relaxed);
207
208        let request = BinanceFuturesWsSubscribeRequest {
209            method: BinanceWsMethod::Unsubscribe,
210            params: streams.clone(),
211            id: request_id,
212        };
213
214        let json = match serde_json::to_string(&request) {
215            Ok(j) => j,
216            Err(e) => {
217                log::error!("Failed to serialize unsubscribe request: {e}");
218                return;
219            }
220        };
221
222        if let Err(e) = client
223            .send_text(json, Some(BINANCE_RATE_LIMIT_KEY_SUBSCRIPTION.as_slice()))
224            .await
225        {
226            log::error!("Failed to send unsubscribe request: {e}");
227        }
228
229        // Mark as unsubscribed
230        for stream in &streams {
231            self.subscriptions_state.confirm_unsubscribe(stream);
232        }
233    }
234
235    async fn handle_raw_message(&mut self, raw: Vec<u8>) -> Option<NautilusWsMessage> {
236        // Check for reconnection signal
237        if let Ok(text) = std::str::from_utf8(&raw)
238            && text == RECONNECTED
239        {
240            log::info!("WebSocket reconnected signal received");
241            return Some(NautilusWsMessage::Reconnected);
242        }
243
244        // Parse JSON
245        let json: serde_json::Value = match serde_json::from_slice(&raw) {
246            Ok(j) => j,
247            Err(e) => {
248                log::warn!("Failed to parse JSON message: {e}");
249                return None;
250            }
251        };
252
253        // Check for subscription response
254        if json.get("result").is_some() || json.get("id").is_some() {
255            self.handle_subscription_response(&json);
256            return None;
257        }
258
259        // Check for error response
260        if let Some(code) = json.get("code")
261            && let Some(code) = code.as_i64()
262        {
263            let msg = json
264                .get("msg")
265                .and_then(|m| m.as_str())
266                .unwrap_or("Unknown error")
267                .to_string();
268            return Some(NautilusWsMessage::Error(BinanceFuturesWsErrorMsg {
269                code,
270                msg,
271            }));
272        }
273
274        // Handle stream data
275        self.handle_stream_data(&json)
276    }
277
278    fn handle_subscription_response(&mut self, json: &serde_json::Value) {
279        if let Ok(response) =
280            serde_json::from_value::<BinanceFuturesWsSubscribeResponse>(json.clone())
281        {
282            if let Some(streams) = self.pending_requests.remove(&response.id) {
283                if response.result.is_none() {
284                    // Success - confirm subscriptions
285                    for stream in &streams {
286                        self.subscriptions_state.confirm_subscribe(stream);
287                    }
288                    log::debug!("Subscription confirmed: streams={streams:?}");
289                } else {
290                    // Failure - mark streams as failed
291                    for stream in &streams {
292                        self.subscriptions_state.mark_failure(stream);
293                    }
294                    log::warn!(
295                        "Subscription failed: streams={streams:?}, result={:?}",
296                        response.result
297                    );
298                }
299            }
300        } else if let Ok(error) =
301            serde_json::from_value::<BinanceFuturesWsErrorResponse>(json.clone())
302        {
303            if let Some(id) = error.id
304                && let Some(streams) = self.pending_requests.remove(&id)
305            {
306                for stream in &streams {
307                    self.subscriptions_state.mark_failure(stream);
308                }
309            }
310            log::warn!(
311                "WebSocket error response: code={}, msg={}",
312                error.code,
313                error.msg
314            );
315        }
316    }
317
318    fn handle_stream_data(&self, json: &serde_json::Value) -> Option<NautilusWsMessage> {
319        let ts_init = self.clock.get_time_ns();
320        let event_type = extract_event_type(json)?;
321
322        // Handle user data stream events first (they don't follow market data pattern)
323        if let Some(msg) = self.handle_user_data_event(&event_type, json) {
324            return Some(NautilusWsMessage::ExecRaw(msg));
325        }
326
327        // Skip user data events that weren't parsed (they use raw symbols, not Nautilus format)
328        if matches!(
329            event_type,
330            BinanceWsEventType::AccountUpdate
331                | BinanceWsEventType::OrderTradeUpdate
332                | BinanceWsEventType::MarginCall
333                | BinanceWsEventType::AccountConfigUpdate
334                | BinanceWsEventType::ListenKeyExpired
335                | BinanceWsEventType::Unknown
336        ) {
337            return None;
338        }
339
340        // Market data events require symbol and instrument lookup
341        let symbol = extract_symbol(json)?;
342        let Some(instrument) = self.instruments.get(&symbol) else {
343            log::warn!(
344                "No instrument in cache, dropping message: symbol={symbol}, event_type={event_type:?}"
345            );
346            return None;
347        };
348
349        match event_type {
350            BinanceWsEventType::AggTrade => {
351                if let Ok(msg) = serde_json::from_value::<BinanceFuturesAggTradeMsg>(json.clone()) {
352                    match parse_agg_trade(&msg, instrument, ts_init) {
353                        Ok(trade) => {
354                            return Some(NautilusWsMessage::Data(NautilusDataWsMessage::Data(
355                                vec![Data::Trade(trade)],
356                            )));
357                        }
358                        Err(e) => {
359                            log::warn!("Failed to parse aggregate trade: {e}");
360                        }
361                    }
362                }
363            }
364            BinanceWsEventType::Trade => {
365                if let Ok(msg) = serde_json::from_value::<BinanceFuturesTradeMsg>(json.clone()) {
366                    match parse_trade(&msg, instrument, ts_init) {
367                        Ok(trade) => {
368                            return Some(NautilusWsMessage::Data(NautilusDataWsMessage::Data(
369                                vec![Data::Trade(trade)],
370                            )));
371                        }
372                        Err(e) => {
373                            log::warn!("Failed to parse trade: {e}");
374                        }
375                    }
376                }
377            }
378            BinanceWsEventType::BookTicker => {
379                if let Ok(msg) = serde_json::from_value::<BinanceFuturesBookTickerMsg>(json.clone())
380                {
381                    match parse_book_ticker(&msg, instrument, ts_init) {
382                        Ok(quote) => {
383                            return Some(NautilusWsMessage::Data(NautilusDataWsMessage::Data(
384                                vec![Data::Quote(quote)],
385                            )));
386                        }
387                        Err(e) => {
388                            log::warn!("Failed to parse book ticker: {e}");
389                        }
390                    }
391                }
392            }
393            BinanceWsEventType::DepthUpdate => {
394                if let Ok(msg) =
395                    serde_json::from_value::<BinanceFuturesDepthUpdateMsg>(json.clone())
396                {
397                    match parse_depth_update(&msg, instrument, ts_init) {
398                        Ok(deltas) => {
399                            return Some(NautilusWsMessage::Data(
400                                NautilusDataWsMessage::DepthUpdate {
401                                    deltas,
402                                    first_update_id: msg.first_update_id,
403                                    prev_final_update_id: msg.prev_final_update_id,
404                                },
405                            ));
406                        }
407                        Err(e) => {
408                            log::warn!("Failed to parse depth update: {e}");
409                        }
410                    }
411                }
412            }
413            BinanceWsEventType::MarkPriceUpdate => {
414                if let Ok(msg) = serde_json::from_value::<BinanceFuturesMarkPriceMsg>(json.clone())
415                {
416                    match parse_mark_price(&msg, instrument, ts_init) {
417                        Ok((mark_update, index_update, _funding_update)) => {
418                            // Note: FundingRateUpdate is not a variant of Data enum
419                            // Funding rates need custom data handling (like Python adapter)
420                            return Some(NautilusWsMessage::Data(NautilusDataWsMessage::Data(
421                                vec![
422                                    Data::MarkPriceUpdate(mark_update),
423                                    Data::IndexPriceUpdate(index_update),
424                                ],
425                            )));
426                        }
427                        Err(e) => {
428                            log::warn!("Failed to parse mark price: {e}");
429                        }
430                    }
431                }
432            }
433            BinanceWsEventType::Kline => {
434                if let Ok(msg) = serde_json::from_value::<BinanceFuturesKlineMsg>(json.clone()) {
435                    match parse_kline(&msg, instrument, ts_init) {
436                        Ok(Some(bar)) => {
437                            return Some(NautilusWsMessage::Data(NautilusDataWsMessage::Data(
438                                vec![Data::Bar(bar)],
439                            )));
440                        }
441                        Ok(None) => {
442                            // Kline not closed yet, skip
443                        }
444                        Err(e) => {
445                            log::warn!("Failed to parse kline: {e}");
446                        }
447                    }
448                }
449            }
450            BinanceWsEventType::ForceOrder
451            | BinanceWsEventType::Ticker24Hr
452            | BinanceWsEventType::MiniTicker24Hr => {
453                // Pass through as raw JSON for now
454                return Some(NautilusWsMessage::Data(NautilusDataWsMessage::RawJson(
455                    json.clone(),
456                )));
457            }
458
459            // User data events and Unknown handled before instrument lookup
460            BinanceWsEventType::AccountUpdate
461            | BinanceWsEventType::OrderTradeUpdate
462            | BinanceWsEventType::AlgoUpdate
463            | BinanceWsEventType::MarginCall
464            | BinanceWsEventType::AccountConfigUpdate
465            | BinanceWsEventType::ListenKeyExpired
466            | BinanceWsEventType::Unknown => unreachable!(),
467        }
468
469        None
470    }
471
472    fn handle_user_data_event(
473        &self,
474        event_type: &BinanceWsEventType,
475        json: &serde_json::Value,
476    ) -> Option<BinanceFuturesExecWsMessage> {
477        match event_type {
478            BinanceWsEventType::AccountUpdate => {
479                match serde_json::from_value::<BinanceFuturesAccountUpdateMsg>(json.clone()) {
480                    Ok(msg) => {
481                        log::debug!(
482                            "Account update: reason={:?}, balances={}, positions={}",
483                            msg.account.reason,
484                            msg.account.balances.len(),
485                            msg.account.positions.len()
486                        );
487                        Some(BinanceFuturesExecWsMessage::AccountUpdate(msg))
488                    }
489                    Err(e) => {
490                        log::warn!("Failed to parse account update: {e}");
491                        None
492                    }
493                }
494            }
495            BinanceWsEventType::OrderTradeUpdate => {
496                match serde_json::from_value::<BinanceFuturesOrderUpdateMsg>(json.clone()) {
497                    Ok(msg) => {
498                        log::debug!(
499                            "Order update: symbol={}, order_id={}, exec={:?}, status={:?}",
500                            msg.order.symbol,
501                            msg.order.order_id,
502                            msg.order.execution_type,
503                            msg.order.order_status
504                        );
505                        Some(BinanceFuturesExecWsMessage::OrderUpdate(Box::new(msg)))
506                    }
507                    Err(e) => {
508                        log::warn!("Failed to parse order update: {e}");
509                        None
510                    }
511                }
512            }
513            BinanceWsEventType::AlgoUpdate => {
514                match serde_json::from_value::<BinanceFuturesAlgoUpdateMsg>(json.clone()) {
515                    Ok(msg) => {
516                        log::debug!(
517                            "Algo order update: symbol={}, algo_id={}, status={:?}",
518                            msg.algo_order.symbol,
519                            msg.algo_order.algo_id,
520                            msg.algo_order.algo_status
521                        );
522                        Some(BinanceFuturesExecWsMessage::AlgoUpdate(Box::new(msg)))
523                    }
524                    Err(e) => {
525                        log::warn!("Failed to parse algo order update: {e}");
526                        None
527                    }
528                }
529            }
530            BinanceWsEventType::MarginCall => {
531                match serde_json::from_value::<BinanceFuturesMarginCallMsg>(json.clone()) {
532                    Ok(msg) => {
533                        log::warn!(
534                            "Margin call: cross_wallet_balance={}, positions_at_risk={}",
535                            msg.cross_wallet_balance,
536                            msg.positions.len()
537                        );
538                        Some(BinanceFuturesExecWsMessage::MarginCall(msg))
539                    }
540                    Err(e) => {
541                        log::warn!("Failed to parse margin call: {e}");
542                        None
543                    }
544                }
545            }
546            BinanceWsEventType::AccountConfigUpdate => {
547                match serde_json::from_value::<BinanceFuturesAccountConfigMsg>(json.clone()) {
548                    Ok(msg) => {
549                        if let Some(ref lc) = msg.leverage_config {
550                            log::debug!(
551                                "Account config update: symbol={}, leverage={}",
552                                lc.symbol,
553                                lc.leverage
554                            );
555                        }
556                        Some(BinanceFuturesExecWsMessage::AccountConfigUpdate(msg))
557                    }
558                    Err(e) => {
559                        log::warn!("Failed to parse account config update: {e}");
560                        None
561                    }
562                }
563            }
564            BinanceWsEventType::ListenKeyExpired => {
565                match serde_json::from_value::<BinanceFuturesListenKeyExpiredMsg>(json.clone()) {
566                    Ok(msg) => {
567                        log::warn!("Listen key expired at {}", msg.event_time);
568                        Some(BinanceFuturesExecWsMessage::ListenKeyExpired)
569                    }
570                    Err(e) => {
571                        log::warn!("Failed to parse listen key expired: {e}");
572                        None
573                    }
574                }
575            }
576            _ => None,
577        }
578    }
579}