Skip to main content

nautilus_binance/spot/websocket/streams/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Binance Spot WebSocket message handler.
17//!
18//! The handler runs in a dedicated Tokio task as the I/O boundary between the client
19//! orchestrator and the network layer. It exclusively owns the `WebSocketClient` and
20//! processes commands from the client via an unbounded channel.
21//!
22//! Key responsibilities:
23//! - Command processing: Receives `HandlerCommand` from client, executes WebSocket operations.
24//! - SBE binary decoding: Routes binary frames to appropriate SBE decoders.
25//! - Message transformation: Parses raw venue messages into Nautilus domain events.
26//! - Subscription tracking: Manages pending subscription state.
27
28use std::{
29    collections::VecDeque,
30    sync::{
31        Arc,
32        atomic::{AtomicBool, AtomicU64, Ordering},
33    },
34};
35
36use ahash::AHashMap;
37use nautilus_model::{
38    data::Data,
39    instruments::{Instrument, InstrumentAny},
40};
41use nautilus_network::{
42    RECONNECTED,
43    websocket::{SubscriptionState, WebSocketClient},
44};
45use tokio_tungstenite::tungstenite::Message;
46use ustr::Ustr;
47
48// Re-export for backwards compatibility
49pub use super::parse::{MarketDataMessage, decode_market_data};
50use super::{
51    messages::{
52        BinanceSpotWsMessage, BinanceWsErrorMsg, BinanceWsErrorResponse, BinanceWsResponse,
53        BinanceWsSubscription, HandlerCommand, NautilusSpotDataWsMessage,
54    },
55    parse::{
56        decode_market_data as decode_sbe, parse_bbo_event, parse_depth_diff, parse_depth_snapshot,
57        parse_trades_event,
58    },
59};
60use crate::common::consts::BINANCE_RATE_LIMIT_KEY_SUBSCRIPTION;
61
62/// Binance Spot WebSocket feed handler.
63///
64/// Runs in a dedicated Tokio task, processing commands from the client
65/// and transforming raw WebSocket messages into Nautilus domain events.
66pub(super) struct BinanceSpotWsFeedHandler {
67    #[allow(dead_code)] // Reserved for shutdown signal handling
68    signal: Arc<AtomicBool>,
69    inner: Option<WebSocketClient>,
70    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
71    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
72    #[allow(dead_code)] // Reserved for async message emission
73    out_tx: tokio::sync::mpsc::UnboundedSender<BinanceSpotWsMessage>,
74    subscriptions: SubscriptionState,
75    instruments_cache: AHashMap<Ustr, InstrumentAny>,
76    request_id_counter: Arc<AtomicU64>,
77    pending_messages: VecDeque<BinanceSpotWsMessage>,
78    pending_requests: AHashMap<u64, Vec<String>>,
79}
80
81impl BinanceSpotWsFeedHandler {
82    /// Creates a new handler instance.
83    pub(super) fn new(
84        signal: Arc<AtomicBool>,
85        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
86        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
87        out_tx: tokio::sync::mpsc::UnboundedSender<BinanceSpotWsMessage>,
88        subscriptions: SubscriptionState,
89        request_id_counter: Arc<AtomicU64>,
90    ) -> Self {
91        Self {
92            signal,
93            inner: None,
94            cmd_rx,
95            raw_rx,
96            out_tx,
97            subscriptions,
98            instruments_cache: AHashMap::new(),
99            request_id_counter,
100            pending_messages: VecDeque::new(),
101            pending_requests: AHashMap::new(),
102        }
103    }
104
105    /// Main event loop - processes commands and raw messages.
106    ///
107    /// Returns `Some(message)` when there's output to emit, `None` when disconnected.
108    pub(super) async fn next(&mut self) -> Option<BinanceSpotWsMessage> {
109        // Return any pending messages first
110        if let Some(message) = self.pending_messages.pop_front() {
111            return Some(message);
112        }
113
114        loop {
115            tokio::select! {
116                Some(cmd) = self.cmd_rx.recv() => {
117                    match cmd {
118                        HandlerCommand::SetClient(client) => {
119                            log::debug!("Handler received WebSocket client");
120                            self.inner = Some(client);
121                        }
122                        HandlerCommand::Disconnect => {
123                            log::debug!("Handler disconnecting WebSocket client");
124                            self.inner = None;
125                            return None;
126                        }
127                        HandlerCommand::InitializeInstruments(instruments) => {
128                            for inst in instruments {
129                                self.instruments_cache.insert(inst.symbol().inner(), inst);
130                            }
131                        }
132                        HandlerCommand::UpdateInstrument(inst) => {
133                            self.instruments_cache.insert(inst.symbol().inner(), inst);
134                        }
135                        HandlerCommand::Subscribe { streams } => {
136                            if let Err(e) = self.handle_subscribe(streams).await {
137                                log::error!("Failed to handle subscribe command: {e}");
138                            }
139                        }
140                        HandlerCommand::Unsubscribe { streams } => {
141                            if let Err(e) = self.handle_unsubscribe(streams).await {
142                                log::error!("Failed to handle unsubscribe command: {e}");
143                            }
144                        }
145                    }
146                }
147                Some(msg) = self.raw_rx.recv() => {
148                    if let Message::Text(ref text) = msg
149                        && text.as_str() == RECONNECTED
150                    {
151                        log::info!("Handler received reconnection signal");
152                        return Some(BinanceSpotWsMessage::Reconnected);
153                    }
154
155                    let messages = self.handle_message(msg);
156                    if !messages.is_empty() {
157                        let mut iter = messages.into_iter();
158                        let first = iter.next();
159                        self.pending_messages.extend(iter);
160                        if let Some(msg) = first {
161                            return Some(msg);
162                        }
163                    }
164                }
165                else => {
166                    return None;
167                }
168            }
169        }
170    }
171
172    /// Handle incoming WebSocket message.
173    fn handle_message(&mut self, msg: Message) -> Vec<BinanceSpotWsMessage> {
174        match msg {
175            Message::Binary(data) => self.handle_binary_frame(&data),
176            Message::Text(text) => self.handle_text_frame(&text),
177            Message::Close(_) => {
178                log::debug!("Received close frame");
179                vec![]
180            }
181            Message::Ping(_) | Message::Pong(_) | Message::Frame(_) => vec![],
182        }
183    }
184
185    /// Handle binary SBE frame.
186    fn handle_binary_frame(&mut self, data: &[u8]) -> Vec<BinanceSpotWsMessage> {
187        match decode_sbe(data) {
188            Ok(MarketDataMessage::Trades(event)) => self.handle_trades_event(&event),
189            Ok(MarketDataMessage::BestBidAsk(event)) => self.handle_bbo_event(&event),
190            Ok(MarketDataMessage::DepthSnapshot(event)) => self.handle_depth_snapshot(&event),
191            Ok(MarketDataMessage::DepthDiff(event)) => self.handle_depth_diff(&event),
192            Err(e) => {
193                log::error!("SBE decode error: {e}");
194                vec![BinanceSpotWsMessage::Data(
195                    NautilusSpotDataWsMessage::RawBinary(data.to_vec()),
196                )]
197            }
198        }
199    }
200
201    /// Handle text JSON frame.
202    fn handle_text_frame(&mut self, text: &str) -> Vec<BinanceSpotWsMessage> {
203        if let Ok(response) = serde_json::from_str::<BinanceWsResponse>(text) {
204            self.handle_subscription_response(response);
205            return vec![];
206        }
207
208        // Error response includes id for request correlation
209        if let Ok(error) = serde_json::from_str::<BinanceWsErrorResponse>(text) {
210            if let Some(id) = error.id
211                && let Some(streams) = self.pending_requests.remove(&id)
212            {
213                for stream in &streams {
214                    self.subscriptions.mark_failure(stream);
215                }
216                log::warn!(
217                    "Subscription request failed: id={id}, streams={streams:?}, code={}, msg={}",
218                    error.code,
219                    error.msg
220                );
221            }
222            return vec![BinanceSpotWsMessage::Error(BinanceWsErrorMsg {
223                code: error.code,
224                msg: error.msg,
225            })];
226        }
227
228        if let Ok(value) = serde_json::from_str(text) {
229            vec![BinanceSpotWsMessage::Data(
230                NautilusSpotDataWsMessage::RawJson(value),
231            )]
232        } else {
233            log::warn!("Failed to parse JSON message: {text}");
234            vec![]
235        }
236    }
237
238    /// Handle subscription response.
239    fn handle_subscription_response(&mut self, response: BinanceWsResponse) {
240        if let Some(streams) = self.pending_requests.remove(&response.id) {
241            if response.result.is_none() {
242                // Success - confirm subscriptions
243                for stream in &streams {
244                    self.subscriptions.confirm_subscribe(stream);
245                }
246                log::debug!("Subscription confirmed: streams={streams:?}");
247            } else {
248                // Failure - mark streams as failed
249                for stream in &streams {
250                    self.subscriptions.mark_failure(stream);
251                }
252                log::warn!(
253                    "Subscription failed: streams={streams:?}, result={:?}",
254                    response.result
255                );
256            }
257        } else {
258            log::debug!("Received response for unknown request: id={}", response.id);
259        }
260    }
261
262    /// Handle trades stream event.
263    fn handle_trades_event(
264        &self,
265        event: &crate::common::sbe::stream::TradesStreamEvent,
266    ) -> Vec<BinanceSpotWsMessage> {
267        let symbol = Ustr::from(&event.symbol);
268
269        let Some(instrument) = self.instruments_cache.get(&symbol) else {
270            log::warn!("No instrument in cache for trades: symbol={}", event.symbol);
271            return vec![];
272        };
273
274        let trades = parse_trades_event(event, instrument);
275        if trades.is_empty() {
276            vec![]
277        } else {
278            vec![BinanceSpotWsMessage::Data(NautilusSpotDataWsMessage::Data(
279                trades,
280            ))]
281        }
282    }
283
284    /// Handle best bid/ask event.
285    fn handle_bbo_event(
286        &self,
287        event: &crate::common::sbe::stream::BestBidAskStreamEvent,
288    ) -> Vec<BinanceSpotWsMessage> {
289        let symbol = Ustr::from(&event.symbol);
290
291        let Some(instrument) = self.instruments_cache.get(&symbol) else {
292            log::warn!("No instrument in cache for BBO: symbol={}", event.symbol);
293            return vec![];
294        };
295
296        let quote = parse_bbo_event(event, instrument);
297        vec![BinanceSpotWsMessage::Data(NautilusSpotDataWsMessage::Data(
298            vec![Data::from(quote)],
299        ))]
300    }
301
302    /// Handle depth snapshot event.
303    fn handle_depth_snapshot(
304        &self,
305        event: &crate::common::sbe::stream::DepthSnapshotStreamEvent,
306    ) -> Vec<BinanceSpotWsMessage> {
307        let symbol = Ustr::from(&event.symbol);
308
309        let Some(instrument) = self.instruments_cache.get(&symbol) else {
310            log::warn!(
311                "No instrument in cache for depth snapshot: symbol={}",
312                event.symbol
313            );
314            return vec![];
315        };
316
317        match parse_depth_snapshot(event, instrument) {
318            Some(deltas) => vec![BinanceSpotWsMessage::Data(
319                NautilusSpotDataWsMessage::Deltas(deltas),
320            )],
321            None => vec![],
322        }
323    }
324
325    /// Handle depth diff event.
326    fn handle_depth_diff(
327        &self,
328        event: &crate::common::sbe::stream::DepthDiffStreamEvent,
329    ) -> Vec<BinanceSpotWsMessage> {
330        let symbol = Ustr::from(&event.symbol);
331
332        let Some(instrument) = self.instruments_cache.get(&symbol) else {
333            log::warn!(
334                "No instrument in cache for depth diff: symbol={}",
335                event.symbol
336            );
337            return vec![];
338        };
339
340        match parse_depth_diff(event, instrument) {
341            Some(deltas) => vec![BinanceSpotWsMessage::Data(
342                NautilusSpotDataWsMessage::Deltas(deltas),
343            )],
344            None => vec![],
345        }
346    }
347
348    /// Handle subscribe command.
349    async fn handle_subscribe(&mut self, streams: Vec<String>) -> anyhow::Result<()> {
350        let request_id = self.request_id_counter.fetch_add(1, Ordering::SeqCst);
351        let request = BinanceWsSubscription::subscribe(streams.clone(), request_id);
352        let payload = serde_json::to_string(&request)?;
353
354        // Track pending request for confirmation
355        self.pending_requests.insert(request_id, streams.clone());
356
357        // Mark streams as pending
358        for stream in &streams {
359            self.subscriptions.mark_subscribe(stream);
360        }
361
362        self.send_text(
363            payload,
364            Some(BINANCE_RATE_LIMIT_KEY_SUBSCRIPTION.as_slice()),
365        )
366        .await?;
367        Ok(())
368    }
369
370    /// Handle unsubscribe command.
371    async fn handle_unsubscribe(&mut self, streams: Vec<String>) -> anyhow::Result<()> {
372        let request_id = self.request_id_counter.fetch_add(1, Ordering::SeqCst);
373        let request = BinanceWsSubscription::unsubscribe(streams.clone(), request_id);
374        let payload = serde_json::to_string(&request)?;
375
376        self.send_text(
377            payload,
378            Some(BINANCE_RATE_LIMIT_KEY_SUBSCRIPTION.as_slice()),
379        )
380        .await?;
381
382        // Immediately confirm unsubscribe (don't wait for response)
383        // We don't track unsubscribe failures - the stream will simply stop
384        for stream in &streams {
385            self.subscriptions.mark_unsubscribe(stream);
386            self.subscriptions.confirm_unsubscribe(stream);
387        }
388
389        Ok(())
390    }
391
392    /// Send text message via WebSocket.
393    async fn send_text(
394        &self,
395        payload: String,
396        rate_limit_keys: Option<&[Ustr]>,
397    ) -> anyhow::Result<()> {
398        let Some(client) = &self.inner else {
399            anyhow::bail!("No active WebSocket client");
400        };
401        client
402            .send_text(payload, rate_limit_keys)
403            .await
404            .map_err(|e| anyhow::anyhow!("Failed to send message: {e}"))?;
405        Ok(())
406    }
407}