nautilus_binance/spot/websocket/streams/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Binance Spot WebSocket message handler.
17//!
18//! The handler runs in a dedicated Tokio task as the I/O boundary between the client
19//! orchestrator and the network layer. It exclusively owns the `WebSocketClient` and
20//! processes commands from the client via an unbounded channel.
21//!
22//! Key responsibilities:
23//! - Command processing: Receives `HandlerCommand` from client, executes WebSocket operations.
24//! - SBE binary decoding: Routes binary frames to appropriate SBE decoders.
25//! - Message transformation: Parses raw venue messages into Nautilus domain events.
26//! - Subscription tracking: Manages pending subscription state.
27
28use std::{
29    collections::VecDeque,
30    sync::{
31        Arc,
32        atomic::{AtomicBool, AtomicU64, Ordering},
33    },
34};
35
36use ahash::AHashMap;
37use nautilus_core::nanos::UnixNanos;
38use nautilus_model::{
39    data::{BookOrder, Data, OrderBookDelta, OrderBookDeltas, QuoteTick, TradeTick},
40    enums::{AggressorSide, BookAction, OrderSide, RecordFlag},
41    identifiers::TradeId,
42    instruments::{Instrument, InstrumentAny},
43};
44use nautilus_network::{
45    RECONNECTED,
46    websocket::{SubscriptionState, WebSocketClient},
47};
48use tokio_tungstenite::tungstenite::Message;
49use ustr::Ustr;
50
51use super::messages::{
52    BinanceWsErrorMsg, BinanceWsErrorResponse, BinanceWsResponse, BinanceWsSubscription,
53    HandlerCommand, NautilusWsMessage,
54};
55use crate::common::{
56    fixed::{mantissa_to_price, mantissa_to_quantity},
57    sbe::stream::{
58        BestBidAskStreamEvent, DepthDiffStreamEvent, DepthSnapshotStreamEvent, MessageHeader,
59        StreamDecodeError, TradesStreamEvent, template_id,
60    },
61};
62
63/// Decoded market data message.
64#[derive(Debug)]
65pub enum MarketDataMessage {
66    /// Trade event.
67    Trades(TradesStreamEvent),
68    /// Best bid/ask update.
69    BestBidAsk(BestBidAskStreamEvent),
70    /// Order book snapshot.
71    DepthSnapshot(DepthSnapshotStreamEvent),
72    /// Order book diff update.
73    DepthDiff(DepthDiffStreamEvent),
74}
75
76/// Decode an SBE binary frame into a market data message.
77///
78/// Validates the message header (including schema ID) and routes to the
79/// appropriate decoder based on template ID.
80pub fn decode_market_data(buf: &[u8]) -> Result<MarketDataMessage, StreamDecodeError> {
81    let header = MessageHeader::decode(buf)?;
82    header.validate_schema()?;
83
84    match header.template_id {
85        template_id::TRADES_STREAM_EVENT => {
86            Ok(MarketDataMessage::Trades(TradesStreamEvent::decode(buf)?))
87        }
88        template_id::BEST_BID_ASK_STREAM_EVENT => Ok(MarketDataMessage::BestBidAsk(
89            BestBidAskStreamEvent::decode(buf)?,
90        )),
91        template_id::DEPTH_SNAPSHOT_STREAM_EVENT => Ok(MarketDataMessage::DepthSnapshot(
92            DepthSnapshotStreamEvent::decode(buf)?,
93        )),
94        template_id::DEPTH_DIFF_STREAM_EVENT => Ok(MarketDataMessage::DepthDiff(
95            DepthDiffStreamEvent::decode(buf)?,
96        )),
97        _ => Err(StreamDecodeError::UnknownTemplateId(header.template_id)),
98    }
99}
100
101/// Binance Spot WebSocket feed handler.
102///
103/// Runs in a dedicated Tokio task, processing commands from the client
104/// and transforming raw WebSocket messages into Nautilus domain events.
105pub(super) struct BinanceSpotWsFeedHandler {
106    #[allow(dead_code)] // Reserved for shutdown signal handling
107    signal: Arc<AtomicBool>,
108    inner: Option<WebSocketClient>,
109    cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
110    raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
111    #[allow(dead_code)] // Reserved for async message emission
112    out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
113    subscriptions: SubscriptionState,
114    instruments_cache: AHashMap<Ustr, InstrumentAny>,
115    request_id_counter: Arc<AtomicU64>,
116    pending_messages: VecDeque<NautilusWsMessage>,
117    pending_requests: AHashMap<u64, Vec<String>>,
118}
119
120impl BinanceSpotWsFeedHandler {
121    /// Creates a new handler instance.
122    pub(super) fn new(
123        signal: Arc<AtomicBool>,
124        cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
125        raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
126        out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
127        subscriptions: SubscriptionState,
128        request_id_counter: Arc<AtomicU64>,
129    ) -> Self {
130        Self {
131            signal,
132            inner: None,
133            cmd_rx,
134            raw_rx,
135            out_tx,
136            subscriptions,
137            instruments_cache: AHashMap::new(),
138            request_id_counter,
139            pending_messages: VecDeque::new(),
140            pending_requests: AHashMap::new(),
141        }
142    }
143
144    /// Main event loop - processes commands and raw messages.
145    ///
146    /// Returns `Some(message)` when there's output to emit, `None` when disconnected.
147    pub(super) async fn next(&mut self) -> Option<NautilusWsMessage> {
148        // Return any pending messages first
149        if let Some(message) = self.pending_messages.pop_front() {
150            return Some(message);
151        }
152
153        loop {
154            tokio::select! {
155                Some(cmd) = self.cmd_rx.recv() => {
156                    match cmd {
157                        HandlerCommand::SetClient(client) => {
158                            log::debug!("Handler received WebSocket client");
159                            self.inner = Some(client);
160                        }
161                        HandlerCommand::Disconnect => {
162                            log::debug!("Handler disconnecting WebSocket client");
163                            self.inner = None;
164                            return None;
165                        }
166                        HandlerCommand::InitializeInstruments(instruments) => {
167                            for inst in instruments {
168                                self.instruments_cache.insert(inst.symbol().inner(), inst);
169                            }
170                        }
171                        HandlerCommand::UpdateInstrument(inst) => {
172                            self.instruments_cache.insert(inst.symbol().inner(), inst);
173                        }
174                        HandlerCommand::Subscribe { streams } => {
175                            if let Err(e) = self.handle_subscribe(streams).await {
176                                log::error!("Failed to handle subscribe command: {e}");
177                            }
178                        }
179                        HandlerCommand::Unsubscribe { streams } => {
180                            if let Err(e) = self.handle_unsubscribe(streams).await {
181                                log::error!("Failed to handle unsubscribe command: {e}");
182                            }
183                        }
184                    }
185                }
186                Some(msg) = self.raw_rx.recv() => {
187                    if let Message::Text(ref text) = msg
188                        && text.as_str() == RECONNECTED
189                    {
190                        log::info!("Handler received reconnection signal");
191                        return Some(NautilusWsMessage::Reconnected);
192                    }
193
194                    let messages = self.handle_message(msg);
195                    if !messages.is_empty() {
196                        let mut iter = messages.into_iter();
197                        let first = iter.next();
198                        self.pending_messages.extend(iter);
199                        if let Some(msg) = first {
200                            return Some(msg);
201                        }
202                    }
203                }
204                else => {
205                    return None;
206                }
207            }
208        }
209    }
210
211    /// Handle incoming WebSocket message.
212    fn handle_message(&mut self, msg: Message) -> Vec<NautilusWsMessage> {
213        match msg {
214            Message::Binary(data) => self.handle_binary_frame(&data),
215            Message::Text(text) => self.handle_text_frame(&text),
216            Message::Close(_) => {
217                log::debug!("Received close frame");
218                vec![]
219            }
220            Message::Ping(_) | Message::Pong(_) | Message::Frame(_) => vec![],
221        }
222    }
223
224    /// Handle binary SBE frame.
225    fn handle_binary_frame(&mut self, data: &[u8]) -> Vec<NautilusWsMessage> {
226        match decode_market_data(data) {
227            Ok(MarketDataMessage::Trades(event)) => self.parse_trades_event(event),
228            Ok(MarketDataMessage::BestBidAsk(event)) => self.parse_bbo_event(event),
229            Ok(MarketDataMessage::DepthSnapshot(event)) => self.parse_depth_snapshot(event),
230            Ok(MarketDataMessage::DepthDiff(event)) => self.parse_depth_diff(event),
231            Err(e) => {
232                log::error!("SBE decode error: {e}");
233                vec![NautilusWsMessage::RawBinary(data.to_vec())]
234            }
235        }
236    }
237
238    /// Handle text JSON frame.
239    fn handle_text_frame(&mut self, text: &str) -> Vec<NautilusWsMessage> {
240        if let Ok(response) = serde_json::from_str::<BinanceWsResponse>(text) {
241            self.handle_subscription_response(response);
242            return vec![];
243        }
244
245        // Error response includes id for request correlation
246        if let Ok(error) = serde_json::from_str::<BinanceWsErrorResponse>(text) {
247            if let Some(id) = error.id
248                && let Some(streams) = self.pending_requests.remove(&id)
249            {
250                for stream in &streams {
251                    self.subscriptions.mark_failure(stream);
252                }
253                log::warn!(
254                    "Subscription request failed: id={id}, streams={streams:?}, code={}, msg={}",
255                    error.code,
256                    error.msg
257                );
258            }
259            return vec![NautilusWsMessage::Error(BinanceWsErrorMsg {
260                code: error.code,
261                msg: error.msg,
262            })];
263        }
264
265        if let Ok(value) = serde_json::from_str(text) {
266            vec![NautilusWsMessage::RawJson(value)]
267        } else {
268            log::warn!("Failed to parse JSON message: {text}");
269            vec![]
270        }
271    }
272
273    /// Handle subscription response.
274    fn handle_subscription_response(&mut self, response: BinanceWsResponse) {
275        if let Some(streams) = self.pending_requests.remove(&response.id) {
276            if response.result.is_none() {
277                // Success - confirm subscriptions
278                for stream in &streams {
279                    self.subscriptions.confirm_subscribe(stream);
280                }
281                log::debug!("Subscription confirmed: streams={streams:?}");
282            } else {
283                // Failure - mark streams as failed
284                for stream in &streams {
285                    self.subscriptions.mark_failure(stream);
286                }
287                log::warn!(
288                    "Subscription failed: streams={streams:?}, result={:?}",
289                    response.result
290                );
291            }
292        } else {
293            log::debug!("Received response for unknown request: id={}", response.id);
294        }
295    }
296
297    /// Parse trades stream event into Nautilus TradeTicks.
298    fn parse_trades_event(&self, event: TradesStreamEvent) -> Vec<NautilusWsMessage> {
299        let symbol = Ustr::from(&event.symbol);
300
301        let Some(instrument) = self.instruments_cache.get(&symbol) else {
302            log::warn!("No instrument in cache for trades: symbol={}", event.symbol);
303            return vec![];
304        };
305
306        let instrument_id = instrument.id();
307        let price_precision = instrument.price_precision();
308        let size_precision = instrument.size_precision();
309
310        let trades: Vec<Data> = event
311            .trades
312            .iter()
313            .map(|t| {
314                let price =
315                    mantissa_to_price(t.price_mantissa, event.price_exponent, price_precision);
316                let size = mantissa_to_quantity(t.qty_mantissa, event.qty_exponent, size_precision);
317                let ts_event = UnixNanos::from(event.transact_time_us as u64 * 1000); // us to ns
318
319                let trade = TradeTick::new(
320                    instrument_id,
321                    price,
322                    size,
323                    if t.is_buyer_maker {
324                        AggressorSide::Seller
325                    } else {
326                        AggressorSide::Buyer
327                    },
328                    TradeId::new(t.id.to_string()),
329                    ts_event,
330                    ts_event, // ts_init same as ts_event
331                );
332                Data::from(trade)
333            })
334            .collect();
335
336        if trades.is_empty() {
337            vec![]
338        } else {
339            vec![NautilusWsMessage::Data(trades)]
340        }
341    }
342
343    /// Parse best bid/ask event into Nautilus QuoteTick.
344    fn parse_bbo_event(&self, event: BestBidAskStreamEvent) -> Vec<NautilusWsMessage> {
345        let symbol = Ustr::from(&event.symbol);
346
347        let Some(instrument) = self.instruments_cache.get(&symbol) else {
348            log::warn!("No instrument in cache for BBO: symbol={}", event.symbol);
349            return vec![];
350        };
351
352        let instrument_id = instrument.id();
353        let price_precision = instrument.price_precision();
354        let size_precision = instrument.size_precision();
355
356        let bid_price = mantissa_to_price(
357            event.bid_price_mantissa,
358            event.price_exponent,
359            price_precision,
360        );
361        let bid_size =
362            mantissa_to_quantity(event.bid_qty_mantissa, event.qty_exponent, size_precision);
363        let ask_price = mantissa_to_price(
364            event.ask_price_mantissa,
365            event.price_exponent,
366            price_precision,
367        );
368        let ask_size =
369            mantissa_to_quantity(event.ask_qty_mantissa, event.qty_exponent, size_precision);
370        let ts_event = UnixNanos::from(event.event_time_us as u64 * 1000); // us to ns
371
372        let quote = QuoteTick::new(
373            instrument_id,
374            bid_price,
375            ask_price,
376            bid_size,
377            ask_size,
378            ts_event,
379            ts_event,
380        );
381
382        vec![NautilusWsMessage::Data(vec![Data::from(quote)])]
383    }
384
385    /// Parse depth snapshot event into Nautilus OrderBookDeltas.
386    fn parse_depth_snapshot(&self, event: DepthSnapshotStreamEvent) -> Vec<NautilusWsMessage> {
387        let symbol = Ustr::from(&event.symbol);
388
389        let Some(instrument) = self.instruments_cache.get(&symbol) else {
390            log::warn!(
391                "No instrument in cache for depth snapshot: symbol={}",
392                event.symbol
393            );
394            return vec![];
395        };
396
397        let instrument_id = instrument.id();
398        let price_precision = instrument.price_precision();
399        let size_precision = instrument.size_precision();
400        let ts_event = UnixNanos::from(event.event_time_us as u64 * 1000);
401
402        let mut deltas = Vec::with_capacity(event.bids.len() + event.asks.len() + 1);
403
404        // Add clear delta first
405        deltas.push(OrderBookDelta::clear(instrument_id, 0, ts_event, ts_event));
406
407        // Add bid levels
408        for (i, level) in event.bids.iter().enumerate() {
409            let price =
410                mantissa_to_price(level.price_mantissa, event.price_exponent, price_precision);
411            let size = mantissa_to_quantity(level.qty_mantissa, event.qty_exponent, size_precision);
412            let flags = if i == event.bids.len() - 1 && event.asks.is_empty() {
413                RecordFlag::F_LAST as u8
414            } else {
415                0
416            };
417
418            let order = BookOrder::new(
419                OrderSide::Buy,
420                price,
421                size,
422                0, // order_id
423            );
424
425            deltas.push(OrderBookDelta::new(
426                instrument_id,
427                BookAction::Add,
428                order,
429                flags,
430                0, // sequence
431                ts_event,
432                ts_event,
433            ));
434        }
435
436        // Add ask levels
437        for (i, level) in event.asks.iter().enumerate() {
438            let price =
439                mantissa_to_price(level.price_mantissa, event.price_exponent, price_precision);
440            let size = mantissa_to_quantity(level.qty_mantissa, event.qty_exponent, size_precision);
441            let flags = if i == event.asks.len() - 1 {
442                RecordFlag::F_LAST as u8
443            } else {
444                0
445            };
446
447            let order = BookOrder::new(
448                OrderSide::Sell,
449                price,
450                size,
451                0, // order_id
452            );
453
454            deltas.push(OrderBookDelta::new(
455                instrument_id,
456                BookAction::Add,
457                order,
458                flags,
459                0, // sequence
460                ts_event,
461                ts_event,
462            ));
463        }
464
465        if deltas.len() <= 1 {
466            return vec![];
467        }
468
469        vec![NautilusWsMessage::Deltas(OrderBookDeltas::new(
470            instrument_id,
471            deltas,
472        ))]
473    }
474
475    /// Parse depth diff event into Nautilus OrderBookDeltas.
476    fn parse_depth_diff(&self, event: DepthDiffStreamEvent) -> Vec<NautilusWsMessage> {
477        let symbol = Ustr::from(&event.symbol);
478
479        let Some(instrument) = self.instruments_cache.get(&symbol) else {
480            log::warn!(
481                "No instrument in cache for depth diff: symbol={}",
482                event.symbol
483            );
484            return vec![];
485        };
486
487        let instrument_id = instrument.id();
488        let price_precision = instrument.price_precision();
489        let size_precision = instrument.size_precision();
490        let ts_event = UnixNanos::from(event.event_time_us as u64 * 1000);
491
492        let mut deltas = Vec::with_capacity(event.bids.len() + event.asks.len());
493
494        // Add bid updates
495        for (i, level) in event.bids.iter().enumerate() {
496            let price =
497                mantissa_to_price(level.price_mantissa, event.price_exponent, price_precision);
498            let size = mantissa_to_quantity(level.qty_mantissa, event.qty_exponent, size_precision);
499
500            // Zero size means delete, otherwise update
501            let action = if level.qty_mantissa == 0 {
502                BookAction::Delete
503            } else {
504                BookAction::Update
505            };
506
507            let flags = if i == event.bids.len() - 1 && event.asks.is_empty() {
508                RecordFlag::F_LAST as u8
509            } else {
510                0
511            };
512
513            let order = BookOrder::new(
514                OrderSide::Buy,
515                price,
516                size,
517                0, // order_id
518            );
519
520            deltas.push(OrderBookDelta::new(
521                instrument_id,
522                action,
523                order,
524                flags,
525                0, // sequence
526                ts_event,
527                ts_event,
528            ));
529        }
530
531        // Add ask updates
532        for (i, level) in event.asks.iter().enumerate() {
533            let price =
534                mantissa_to_price(level.price_mantissa, event.price_exponent, price_precision);
535            let size = mantissa_to_quantity(level.qty_mantissa, event.qty_exponent, size_precision);
536
537            let action = if level.qty_mantissa == 0 {
538                BookAction::Delete
539            } else {
540                BookAction::Update
541            };
542
543            let flags = if i == event.asks.len() - 1 {
544                RecordFlag::F_LAST as u8
545            } else {
546                0
547            };
548
549            let order = BookOrder::new(
550                OrderSide::Sell,
551                price,
552                size,
553                0, // order_id
554            );
555
556            deltas.push(OrderBookDelta::new(
557                instrument_id,
558                action,
559                order,
560                flags,
561                0, // sequence
562                ts_event,
563                ts_event,
564            ));
565        }
566
567        if deltas.is_empty() {
568            return vec![];
569        }
570
571        vec![NautilusWsMessage::Deltas(OrderBookDeltas::new(
572            instrument_id,
573            deltas,
574        ))]
575    }
576
577    /// Handle subscribe command.
578    async fn handle_subscribe(&mut self, streams: Vec<String>) -> anyhow::Result<()> {
579        let request_id = self.request_id_counter.fetch_add(1, Ordering::SeqCst);
580        let request = BinanceWsSubscription::subscribe(streams.clone(), request_id);
581        let payload = serde_json::to_string(&request)?;
582
583        // Track pending request for confirmation
584        self.pending_requests.insert(request_id, streams.clone());
585
586        // Mark streams as pending
587        for stream in &streams {
588            self.subscriptions.mark_subscribe(stream);
589        }
590
591        self.send_text(payload).await?;
592        Ok(())
593    }
594
595    /// Handle unsubscribe command.
596    async fn handle_unsubscribe(&mut self, streams: Vec<String>) -> anyhow::Result<()> {
597        let request_id = self.request_id_counter.fetch_add(1, Ordering::SeqCst);
598        let request = BinanceWsSubscription::unsubscribe(streams.clone(), request_id);
599        let payload = serde_json::to_string(&request)?;
600
601        self.send_text(payload).await?;
602
603        // Immediately confirm unsubscribe (don't wait for response)
604        // We don't track unsubscribe failures - the stream will simply stop
605        for stream in &streams {
606            self.subscriptions.mark_unsubscribe(stream);
607            self.subscriptions.confirm_unsubscribe(stream);
608        }
609
610        Ok(())
611    }
612
613    /// Send text message via WebSocket.
614    async fn send_text(&self, payload: String) -> anyhow::Result<()> {
615        let Some(client) = &self.inner else {
616            anyhow::bail!("No active WebSocket client");
617        };
618        client
619            .send_text(payload, None)
620            .await
621            .map_err(|e| anyhow::anyhow!("Failed to send message: {e}"))?;
622        Ok(())
623    }
624}
625
626#[cfg(test)]
627mod tests {
628    use rstest::rstest;
629
630    use super::*;
631    use crate::common::sbe::stream::STREAM_SCHEMA_ID;
632
633    #[rstest]
634    fn test_decode_empty_buffer() {
635        let err = decode_market_data(&[]).unwrap_err();
636        assert!(matches!(err, StreamDecodeError::BufferTooShort { .. }));
637    }
638
639    #[rstest]
640    fn test_decode_short_buffer() {
641        let buf = [0u8; 5];
642        let err = decode_market_data(&buf).unwrap_err();
643        assert!(matches!(err, StreamDecodeError::BufferTooShort { .. }));
644    }
645
646    #[rstest]
647    fn test_decode_wrong_schema() {
648        let mut buf = [0u8; 100];
649        buf[0..2].copy_from_slice(&50u16.to_le_bytes()); // block_length
650        buf[2..4].copy_from_slice(&template_id::BEST_BID_ASK_STREAM_EVENT.to_le_bytes());
651        buf[4..6].copy_from_slice(&99u16.to_le_bytes()); // Wrong schema
652        buf[6..8].copy_from_slice(&0u16.to_le_bytes()); // version
653
654        let err = decode_market_data(&buf).unwrap_err();
655        assert!(matches!(err, StreamDecodeError::SchemaMismatch { .. }));
656    }
657
658    #[rstest]
659    fn test_decode_unknown_template() {
660        let mut buf = [0u8; 100];
661        buf[0..2].copy_from_slice(&50u16.to_le_bytes()); // block_length
662        buf[2..4].copy_from_slice(&9999u16.to_le_bytes()); // Unknown template
663        buf[4..6].copy_from_slice(&STREAM_SCHEMA_ID.to_le_bytes());
664        buf[6..8].copy_from_slice(&0u16.to_le_bytes()); // version
665
666        let err = decode_market_data(&buf).unwrap_err();
667        assert!(matches!(err, StreamDecodeError::UnknownTemplateId(9999)));
668    }
669}