nautilus_binance/spot/websocket/
handler.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Binance Spot WebSocket message handler.
17//!
18//! This handler processes incoming SBE binary frames and routes them to the
19//! appropriate decoder based on the message template ID. All decoders validate
20//! schema ID and return errors for malformed/truncated data instead of panicking.
21
22use crate::common::sbe::stream::{
23    BestBidAskStreamEvent, DepthDiffStreamEvent, DepthSnapshotStreamEvent, MessageHeader,
24    StreamDecodeError, TradesStreamEvent, template_id,
25};
26
27/// Decoded market data message.
28#[derive(Debug)]
29pub enum MarketDataMessage {
30    /// Trade event.
31    Trades(TradesStreamEvent),
32    /// Best bid/ask update.
33    BestBidAsk(BestBidAskStreamEvent),
34    /// Order book snapshot.
35    DepthSnapshot(DepthSnapshotStreamEvent),
36    /// Order book diff update.
37    DepthDiff(DepthDiffStreamEvent),
38}
39
40/// Decode an SBE binary frame into a market data message.
41///
42/// Validates the message header (including schema ID) and routes to the
43/// appropriate decoder based on template ID. All decode operations are
44/// bounds-checked and will return errors for malformed/truncated data.
45///
46/// # Errors
47///
48/// Returns an error if:
49/// - Buffer is too short to contain message header
50/// - Schema ID doesn't match expected stream schema
51/// - Template ID is unknown
52/// - Message body is malformed or truncated
53/// - Group counts exceed safety limits
54pub fn decode_market_data(buf: &[u8]) -> Result<MarketDataMessage, StreamDecodeError> {
55    let header = MessageHeader::decode(buf)?;
56    header.validate_schema()?;
57
58    // Each decoder also validates schema internally, but we check here first
59    // to give a better error for unknown templates
60    match header.template_id {
61        template_id::TRADES_STREAM_EVENT => {
62            Ok(MarketDataMessage::Trades(TradesStreamEvent::decode(buf)?))
63        }
64        template_id::BEST_BID_ASK_STREAM_EVENT => Ok(MarketDataMessage::BestBidAsk(
65            BestBidAskStreamEvent::decode(buf)?,
66        )),
67        template_id::DEPTH_SNAPSHOT_STREAM_EVENT => Ok(MarketDataMessage::DepthSnapshot(
68            DepthSnapshotStreamEvent::decode(buf)?,
69        )),
70        template_id::DEPTH_DIFF_STREAM_EVENT => Ok(MarketDataMessage::DepthDiff(
71            DepthDiffStreamEvent::decode(buf)?,
72        )),
73        _ => Err(StreamDecodeError::UnknownTemplateId(header.template_id)),
74    }
75}
76
77#[cfg(test)]
78mod tests {
79    use rstest::rstest;
80
81    use super::*;
82    use crate::common::sbe::stream::STREAM_SCHEMA_ID;
83
84    #[rstest]
85    fn test_decode_empty_buffer() {
86        let err = decode_market_data(&[]).unwrap_err();
87        assert!(matches!(err, StreamDecodeError::BufferTooShort { .. }));
88    }
89
90    #[rstest]
91    fn test_decode_short_buffer() {
92        let buf = [0u8; 5];
93        let err = decode_market_data(&buf).unwrap_err();
94        assert!(matches!(err, StreamDecodeError::BufferTooShort { .. }));
95    }
96
97    #[rstest]
98    fn test_decode_wrong_schema() {
99        let mut buf = [0u8; 100];
100        buf[0..2].copy_from_slice(&50u16.to_le_bytes()); // block_length
101        buf[2..4].copy_from_slice(&template_id::BEST_BID_ASK_STREAM_EVENT.to_le_bytes());
102        buf[4..6].copy_from_slice(&99u16.to_le_bytes()); // Wrong schema
103        buf[6..8].copy_from_slice(&0u16.to_le_bytes()); // version
104
105        let err = decode_market_data(&buf).unwrap_err();
106        assert!(matches!(err, StreamDecodeError::SchemaMismatch { .. }));
107    }
108
109    #[rstest]
110    fn test_decode_unknown_template() {
111        let mut buf = [0u8; 100];
112        buf[0..2].copy_from_slice(&50u16.to_le_bytes()); // block_length
113        buf[2..4].copy_from_slice(&9999u16.to_le_bytes()); // Unknown template
114        buf[4..6].copy_from_slice(&STREAM_SCHEMA_ID.to_le_bytes());
115        buf[6..8].copy_from_slice(&0u16.to_le_bytes()); // version
116
117        let err = decode_market_data(&buf).unwrap_err();
118        assert!(matches!(err, StreamDecodeError::UnknownTemplateId(9999)));
119    }
120}