nautilus_binance/spot/websocket/
handler.rs1use crate::common::sbe::stream::{
23 BestBidAskStreamEvent, DepthDiffStreamEvent, DepthSnapshotStreamEvent, MessageHeader,
24 StreamDecodeError, TradesStreamEvent, template_id,
25};
26
27#[derive(Debug)]
29pub enum MarketDataMessage {
30 Trades(TradesStreamEvent),
32 BestBidAsk(BestBidAskStreamEvent),
34 DepthSnapshot(DepthSnapshotStreamEvent),
36 DepthDiff(DepthDiffStreamEvent),
38}
39
40pub fn decode_market_data(buf: &[u8]) -> Result<MarketDataMessage, StreamDecodeError> {
55 let header = MessageHeader::decode(buf)?;
56 header.validate_schema()?;
57
58 match header.template_id {
61 template_id::TRADES_STREAM_EVENT => {
62 Ok(MarketDataMessage::Trades(TradesStreamEvent::decode(buf)?))
63 }
64 template_id::BEST_BID_ASK_STREAM_EVENT => Ok(MarketDataMessage::BestBidAsk(
65 BestBidAskStreamEvent::decode(buf)?,
66 )),
67 template_id::DEPTH_SNAPSHOT_STREAM_EVENT => Ok(MarketDataMessage::DepthSnapshot(
68 DepthSnapshotStreamEvent::decode(buf)?,
69 )),
70 template_id::DEPTH_DIFF_STREAM_EVENT => Ok(MarketDataMessage::DepthDiff(
71 DepthDiffStreamEvent::decode(buf)?,
72 )),
73 _ => Err(StreamDecodeError::UnknownTemplateId(header.template_id)),
74 }
75}
76
77#[cfg(test)]
78mod tests {
79 use rstest::rstest;
80
81 use super::*;
82 use crate::common::sbe::stream::STREAM_SCHEMA_ID;
83
84 #[rstest]
85 fn test_decode_empty_buffer() {
86 let err = decode_market_data(&[]).unwrap_err();
87 assert!(matches!(err, StreamDecodeError::BufferTooShort { .. }));
88 }
89
90 #[rstest]
91 fn test_decode_short_buffer() {
92 let buf = [0u8; 5];
93 let err = decode_market_data(&buf).unwrap_err();
94 assert!(matches!(err, StreamDecodeError::BufferTooShort { .. }));
95 }
96
97 #[rstest]
98 fn test_decode_wrong_schema() {
99 let mut buf = [0u8; 100];
100 buf[0..2].copy_from_slice(&50u16.to_le_bytes()); buf[2..4].copy_from_slice(&template_id::BEST_BID_ASK_STREAM_EVENT.to_le_bytes());
102 buf[4..6].copy_from_slice(&99u16.to_le_bytes()); buf[6..8].copy_from_slice(&0u16.to_le_bytes()); let err = decode_market_data(&buf).unwrap_err();
106 assert!(matches!(err, StreamDecodeError::SchemaMismatch { .. }));
107 }
108
109 #[rstest]
110 fn test_decode_unknown_template() {
111 let mut buf = [0u8; 100];
112 buf[0..2].copy_from_slice(&50u16.to_le_bytes()); buf[2..4].copy_from_slice(&9999u16.to_le_bytes()); buf[4..6].copy_from_slice(&STREAM_SCHEMA_ID.to_le_bytes());
115 buf[6..8].copy_from_slice(&0u16.to_le_bytes()); let err = decode_market_data(&buf).unwrap_err();
118 assert!(matches!(err, StreamDecodeError::UnknownTemplateId(9999)));
119 }
120}