nautilus_binance/spot/websocket/streams/
handler.rs1use std::{
29 collections::VecDeque,
30 sync::{
31 Arc,
32 atomic::{AtomicBool, AtomicU64, Ordering},
33 },
34};
35
36use ahash::AHashMap;
37use nautilus_model::{
38 data::Data,
39 instruments::{Instrument, InstrumentAny},
40};
41use nautilus_network::{
42 RECONNECTED,
43 websocket::{SubscriptionState, WebSocketClient},
44};
45use tokio_tungstenite::tungstenite::Message;
46use ustr::Ustr;
47
48pub use super::parse::{MarketDataMessage, decode_market_data};
50use super::{
51 messages::{
52 BinanceSpotWsMessage, BinanceWsErrorMsg, BinanceWsErrorResponse, BinanceWsResponse,
53 BinanceWsSubscription, HandlerCommand, NautilusSpotDataWsMessage,
54 },
55 parse::{
56 decode_market_data as decode_sbe, parse_bbo_event, parse_depth_diff, parse_depth_snapshot,
57 parse_trades_event,
58 },
59};
60use crate::common::consts::BINANCE_RATE_LIMIT_KEY_SUBSCRIPTION;
61
62pub(super) struct BinanceSpotWsFeedHandler {
67 #[allow(dead_code)] signal: Arc<AtomicBool>,
69 inner: Option<WebSocketClient>,
70 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
71 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
72 #[allow(dead_code)] out_tx: tokio::sync::mpsc::UnboundedSender<BinanceSpotWsMessage>,
74 subscriptions: SubscriptionState,
75 instruments_cache: AHashMap<Ustr, InstrumentAny>,
76 request_id_counter: Arc<AtomicU64>,
77 pending_messages: VecDeque<BinanceSpotWsMessage>,
78 pending_requests: AHashMap<u64, Vec<String>>,
79}
80
81impl BinanceSpotWsFeedHandler {
82 pub(super) fn new(
84 signal: Arc<AtomicBool>,
85 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
86 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
87 out_tx: tokio::sync::mpsc::UnboundedSender<BinanceSpotWsMessage>,
88 subscriptions: SubscriptionState,
89 request_id_counter: Arc<AtomicU64>,
90 ) -> Self {
91 Self {
92 signal,
93 inner: None,
94 cmd_rx,
95 raw_rx,
96 out_tx,
97 subscriptions,
98 instruments_cache: AHashMap::new(),
99 request_id_counter,
100 pending_messages: VecDeque::new(),
101 pending_requests: AHashMap::new(),
102 }
103 }
104
105 pub(super) async fn next(&mut self) -> Option<BinanceSpotWsMessage> {
109 if let Some(message) = self.pending_messages.pop_front() {
111 return Some(message);
112 }
113
114 loop {
115 tokio::select! {
116 Some(cmd) = self.cmd_rx.recv() => {
117 match cmd {
118 HandlerCommand::SetClient(client) => {
119 log::debug!("Handler received WebSocket client");
120 self.inner = Some(client);
121 }
122 HandlerCommand::Disconnect => {
123 log::debug!("Handler disconnecting WebSocket client");
124 self.inner = None;
125 return None;
126 }
127 HandlerCommand::InitializeInstruments(instruments) => {
128 for inst in instruments {
129 self.instruments_cache.insert(inst.symbol().inner(), inst);
130 }
131 }
132 HandlerCommand::UpdateInstrument(inst) => {
133 self.instruments_cache.insert(inst.symbol().inner(), inst);
134 }
135 HandlerCommand::Subscribe { streams } => {
136 if let Err(e) = self.handle_subscribe(streams).await {
137 log::error!("Failed to handle subscribe command: {e}");
138 }
139 }
140 HandlerCommand::Unsubscribe { streams } => {
141 if let Err(e) = self.handle_unsubscribe(streams).await {
142 log::error!("Failed to handle unsubscribe command: {e}");
143 }
144 }
145 }
146 }
147 Some(msg) = self.raw_rx.recv() => {
148 if let Message::Text(ref text) = msg
149 && text.as_str() == RECONNECTED
150 {
151 log::info!("Handler received reconnection signal");
152 return Some(BinanceSpotWsMessage::Reconnected);
153 }
154
155 let messages = self.handle_message(msg);
156 if !messages.is_empty() {
157 let mut iter = messages.into_iter();
158 let first = iter.next();
159 self.pending_messages.extend(iter);
160 if let Some(msg) = first {
161 return Some(msg);
162 }
163 }
164 }
165 else => {
166 return None;
167 }
168 }
169 }
170 }
171
172 fn handle_message(&mut self, msg: Message) -> Vec<BinanceSpotWsMessage> {
174 match msg {
175 Message::Binary(data) => self.handle_binary_frame(&data),
176 Message::Text(text) => self.handle_text_frame(&text),
177 Message::Close(_) => {
178 log::debug!("Received close frame");
179 vec![]
180 }
181 Message::Ping(_) | Message::Pong(_) | Message::Frame(_) => vec![],
182 }
183 }
184
185 fn handle_binary_frame(&mut self, data: &[u8]) -> Vec<BinanceSpotWsMessage> {
187 match decode_sbe(data) {
188 Ok(MarketDataMessage::Trades(event)) => self.handle_trades_event(&event),
189 Ok(MarketDataMessage::BestBidAsk(event)) => self.handle_bbo_event(&event),
190 Ok(MarketDataMessage::DepthSnapshot(event)) => self.handle_depth_snapshot(&event),
191 Ok(MarketDataMessage::DepthDiff(event)) => self.handle_depth_diff(&event),
192 Err(e) => {
193 log::error!("SBE decode error: {e}");
194 vec![BinanceSpotWsMessage::Data(
195 NautilusSpotDataWsMessage::RawBinary(data.to_vec()),
196 )]
197 }
198 }
199 }
200
201 fn handle_text_frame(&mut self, text: &str) -> Vec<BinanceSpotWsMessage> {
203 if let Ok(response) = serde_json::from_str::<BinanceWsResponse>(text) {
204 self.handle_subscription_response(response);
205 return vec![];
206 }
207
208 if let Ok(error) = serde_json::from_str::<BinanceWsErrorResponse>(text) {
210 if let Some(id) = error.id
211 && let Some(streams) = self.pending_requests.remove(&id)
212 {
213 for stream in &streams {
214 self.subscriptions.mark_failure(stream);
215 }
216 log::warn!(
217 "Subscription request failed: id={id}, streams={streams:?}, code={}, msg={}",
218 error.code,
219 error.msg
220 );
221 }
222 return vec![BinanceSpotWsMessage::Error(BinanceWsErrorMsg {
223 code: error.code,
224 msg: error.msg,
225 })];
226 }
227
228 if let Ok(value) = serde_json::from_str(text) {
229 vec![BinanceSpotWsMessage::Data(
230 NautilusSpotDataWsMessage::RawJson(value),
231 )]
232 } else {
233 log::warn!("Failed to parse JSON message: {text}");
234 vec![]
235 }
236 }
237
238 fn handle_subscription_response(&mut self, response: BinanceWsResponse) {
240 if let Some(streams) = self.pending_requests.remove(&response.id) {
241 if response.result.is_none() {
242 for stream in &streams {
244 self.subscriptions.confirm_subscribe(stream);
245 }
246 log::debug!("Subscription confirmed: streams={streams:?}");
247 } else {
248 for stream in &streams {
250 self.subscriptions.mark_failure(stream);
251 }
252 log::warn!(
253 "Subscription failed: streams={streams:?}, result={:?}",
254 response.result
255 );
256 }
257 } else {
258 log::debug!("Received response for unknown request: id={}", response.id);
259 }
260 }
261
262 fn handle_trades_event(
264 &self,
265 event: &crate::common::sbe::stream::TradesStreamEvent,
266 ) -> Vec<BinanceSpotWsMessage> {
267 let symbol = Ustr::from(&event.symbol);
268
269 let Some(instrument) = self.instruments_cache.get(&symbol) else {
270 log::warn!("No instrument in cache for trades: symbol={}", event.symbol);
271 return vec![];
272 };
273
274 let trades = parse_trades_event(event, instrument);
275 if trades.is_empty() {
276 vec![]
277 } else {
278 vec![BinanceSpotWsMessage::Data(NautilusSpotDataWsMessage::Data(
279 trades,
280 ))]
281 }
282 }
283
284 fn handle_bbo_event(
286 &self,
287 event: &crate::common::sbe::stream::BestBidAskStreamEvent,
288 ) -> Vec<BinanceSpotWsMessage> {
289 let symbol = Ustr::from(&event.symbol);
290
291 let Some(instrument) = self.instruments_cache.get(&symbol) else {
292 log::warn!("No instrument in cache for BBO: symbol={}", event.symbol);
293 return vec![];
294 };
295
296 let quote = parse_bbo_event(event, instrument);
297 vec![BinanceSpotWsMessage::Data(NautilusSpotDataWsMessage::Data(
298 vec![Data::from(quote)],
299 ))]
300 }
301
302 fn handle_depth_snapshot(
304 &self,
305 event: &crate::common::sbe::stream::DepthSnapshotStreamEvent,
306 ) -> Vec<BinanceSpotWsMessage> {
307 let symbol = Ustr::from(&event.symbol);
308
309 let Some(instrument) = self.instruments_cache.get(&symbol) else {
310 log::warn!(
311 "No instrument in cache for depth snapshot: symbol={}",
312 event.symbol
313 );
314 return vec![];
315 };
316
317 match parse_depth_snapshot(event, instrument) {
318 Some(deltas) => vec![BinanceSpotWsMessage::Data(
319 NautilusSpotDataWsMessage::Deltas(deltas),
320 )],
321 None => vec![],
322 }
323 }
324
325 fn handle_depth_diff(
327 &self,
328 event: &crate::common::sbe::stream::DepthDiffStreamEvent,
329 ) -> Vec<BinanceSpotWsMessage> {
330 let symbol = Ustr::from(&event.symbol);
331
332 let Some(instrument) = self.instruments_cache.get(&symbol) else {
333 log::warn!(
334 "No instrument in cache for depth diff: symbol={}",
335 event.symbol
336 );
337 return vec![];
338 };
339
340 match parse_depth_diff(event, instrument) {
341 Some(deltas) => vec![BinanceSpotWsMessage::Data(
342 NautilusSpotDataWsMessage::Deltas(deltas),
343 )],
344 None => vec![],
345 }
346 }
347
348 async fn handle_subscribe(&mut self, streams: Vec<String>) -> anyhow::Result<()> {
350 let request_id = self.request_id_counter.fetch_add(1, Ordering::SeqCst);
351 let request = BinanceWsSubscription::subscribe(streams.clone(), request_id);
352 let payload = serde_json::to_string(&request)?;
353
354 self.pending_requests.insert(request_id, streams.clone());
356
357 for stream in &streams {
359 self.subscriptions.mark_subscribe(stream);
360 }
361
362 self.send_text(
363 payload,
364 Some(BINANCE_RATE_LIMIT_KEY_SUBSCRIPTION.as_slice()),
365 )
366 .await?;
367 Ok(())
368 }
369
370 async fn handle_unsubscribe(&mut self, streams: Vec<String>) -> anyhow::Result<()> {
372 let request_id = self.request_id_counter.fetch_add(1, Ordering::SeqCst);
373 let request = BinanceWsSubscription::unsubscribe(streams.clone(), request_id);
374 let payload = serde_json::to_string(&request)?;
375
376 self.send_text(
377 payload,
378 Some(BINANCE_RATE_LIMIT_KEY_SUBSCRIPTION.as_slice()),
379 )
380 .await?;
381
382 for stream in &streams {
385 self.subscriptions.mark_unsubscribe(stream);
386 self.subscriptions.confirm_unsubscribe(stream);
387 }
388
389 Ok(())
390 }
391
392 async fn send_text(
394 &self,
395 payload: String,
396 rate_limit_keys: Option<&[Ustr]>,
397 ) -> anyhow::Result<()> {
398 let Some(client) = &self.inner else {
399 anyhow::bail!("No active WebSocket client");
400 };
401 client
402 .send_text(payload, rate_limit_keys)
403 .await
404 .map_err(|e| anyhow::anyhow!("Failed to send message: {e}"))?;
405 Ok(())
406 }
407}