1use std::{
29 collections::VecDeque,
30 sync::{
31 Arc,
32 atomic::{AtomicBool, AtomicU64, Ordering},
33 },
34};
35
36use ahash::AHashMap;
37use nautilus_core::nanos::UnixNanos;
38use nautilus_model::{
39 data::{BookOrder, Data, OrderBookDelta, OrderBookDeltas, QuoteTick, TradeTick},
40 enums::{AggressorSide, BookAction, OrderSide, RecordFlag},
41 identifiers::TradeId,
42 instruments::{Instrument, InstrumentAny},
43};
44use nautilus_network::{
45 RECONNECTED,
46 websocket::{SubscriptionState, WebSocketClient},
47};
48use tokio_tungstenite::tungstenite::Message;
49use ustr::Ustr;
50
51use super::messages::{
52 BinanceWsErrorMsg, BinanceWsErrorResponse, BinanceWsResponse, BinanceWsSubscription,
53 HandlerCommand, NautilusWsMessage,
54};
55use crate::common::{
56 fixed::{mantissa_to_price, mantissa_to_quantity},
57 sbe::stream::{
58 BestBidAskStreamEvent, DepthDiffStreamEvent, DepthSnapshotStreamEvent, MessageHeader,
59 StreamDecodeError, TradesStreamEvent, template_id,
60 },
61};
62
63#[derive(Debug)]
65pub enum MarketDataMessage {
66 Trades(TradesStreamEvent),
68 BestBidAsk(BestBidAskStreamEvent),
70 DepthSnapshot(DepthSnapshotStreamEvent),
72 DepthDiff(DepthDiffStreamEvent),
74}
75
76pub fn decode_market_data(buf: &[u8]) -> Result<MarketDataMessage, StreamDecodeError> {
81 let header = MessageHeader::decode(buf)?;
82 header.validate_schema()?;
83
84 match header.template_id {
85 template_id::TRADES_STREAM_EVENT => {
86 Ok(MarketDataMessage::Trades(TradesStreamEvent::decode(buf)?))
87 }
88 template_id::BEST_BID_ASK_STREAM_EVENT => Ok(MarketDataMessage::BestBidAsk(
89 BestBidAskStreamEvent::decode(buf)?,
90 )),
91 template_id::DEPTH_SNAPSHOT_STREAM_EVENT => Ok(MarketDataMessage::DepthSnapshot(
92 DepthSnapshotStreamEvent::decode(buf)?,
93 )),
94 template_id::DEPTH_DIFF_STREAM_EVENT => Ok(MarketDataMessage::DepthDiff(
95 DepthDiffStreamEvent::decode(buf)?,
96 )),
97 _ => Err(StreamDecodeError::UnknownTemplateId(header.template_id)),
98 }
99}
100
101pub(super) struct BinanceSpotWsFeedHandler {
106 #[allow(dead_code)] signal: Arc<AtomicBool>,
108 inner: Option<WebSocketClient>,
109 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
110 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
111 #[allow(dead_code)] out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
113 subscriptions: SubscriptionState,
114 instruments_cache: AHashMap<Ustr, InstrumentAny>,
115 request_id_counter: Arc<AtomicU64>,
116 pending_messages: VecDeque<NautilusWsMessage>,
117 pending_requests: AHashMap<u64, Vec<String>>,
118}
119
120impl BinanceSpotWsFeedHandler {
121 pub(super) fn new(
123 signal: Arc<AtomicBool>,
124 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<HandlerCommand>,
125 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Message>,
126 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
127 subscriptions: SubscriptionState,
128 request_id_counter: Arc<AtomicU64>,
129 ) -> Self {
130 Self {
131 signal,
132 inner: None,
133 cmd_rx,
134 raw_rx,
135 out_tx,
136 subscriptions,
137 instruments_cache: AHashMap::new(),
138 request_id_counter,
139 pending_messages: VecDeque::new(),
140 pending_requests: AHashMap::new(),
141 }
142 }
143
144 pub(super) async fn next(&mut self) -> Option<NautilusWsMessage> {
148 if let Some(message) = self.pending_messages.pop_front() {
150 return Some(message);
151 }
152
153 loop {
154 tokio::select! {
155 Some(cmd) = self.cmd_rx.recv() => {
156 match cmd {
157 HandlerCommand::SetClient(client) => {
158 log::debug!("Handler received WebSocket client");
159 self.inner = Some(client);
160 }
161 HandlerCommand::Disconnect => {
162 log::debug!("Handler disconnecting WebSocket client");
163 self.inner = None;
164 return None;
165 }
166 HandlerCommand::InitializeInstruments(instruments) => {
167 for inst in instruments {
168 self.instruments_cache.insert(inst.symbol().inner(), inst);
169 }
170 }
171 HandlerCommand::UpdateInstrument(inst) => {
172 self.instruments_cache.insert(inst.symbol().inner(), inst);
173 }
174 HandlerCommand::Subscribe { streams } => {
175 if let Err(e) = self.handle_subscribe(streams).await {
176 log::error!("Failed to handle subscribe command: {e}");
177 }
178 }
179 HandlerCommand::Unsubscribe { streams } => {
180 if let Err(e) = self.handle_unsubscribe(streams).await {
181 log::error!("Failed to handle unsubscribe command: {e}");
182 }
183 }
184 }
185 }
186 Some(msg) = self.raw_rx.recv() => {
187 if let Message::Text(ref text) = msg
188 && text.as_str() == RECONNECTED
189 {
190 log::info!("Handler received reconnection signal");
191 return Some(NautilusWsMessage::Reconnected);
192 }
193
194 let messages = self.handle_message(msg);
195 if !messages.is_empty() {
196 let mut iter = messages.into_iter();
197 let first = iter.next();
198 self.pending_messages.extend(iter);
199 if let Some(msg) = first {
200 return Some(msg);
201 }
202 }
203 }
204 else => {
205 return None;
206 }
207 }
208 }
209 }
210
211 fn handle_message(&mut self, msg: Message) -> Vec<NautilusWsMessage> {
213 match msg {
214 Message::Binary(data) => self.handle_binary_frame(&data),
215 Message::Text(text) => self.handle_text_frame(&text),
216 Message::Close(_) => {
217 log::debug!("Received close frame");
218 vec![]
219 }
220 Message::Ping(_) | Message::Pong(_) | Message::Frame(_) => vec![],
221 }
222 }
223
224 fn handle_binary_frame(&mut self, data: &[u8]) -> Vec<NautilusWsMessage> {
226 match decode_market_data(data) {
227 Ok(MarketDataMessage::Trades(event)) => self.parse_trades_event(event),
228 Ok(MarketDataMessage::BestBidAsk(event)) => self.parse_bbo_event(event),
229 Ok(MarketDataMessage::DepthSnapshot(event)) => self.parse_depth_snapshot(event),
230 Ok(MarketDataMessage::DepthDiff(event)) => self.parse_depth_diff(event),
231 Err(e) => {
232 log::error!("SBE decode error: {e}");
233 vec![NautilusWsMessage::RawBinary(data.to_vec())]
234 }
235 }
236 }
237
238 fn handle_text_frame(&mut self, text: &str) -> Vec<NautilusWsMessage> {
240 if let Ok(response) = serde_json::from_str::<BinanceWsResponse>(text) {
241 self.handle_subscription_response(response);
242 return vec![];
243 }
244
245 if let Ok(error) = serde_json::from_str::<BinanceWsErrorResponse>(text) {
247 if let Some(id) = error.id
248 && let Some(streams) = self.pending_requests.remove(&id)
249 {
250 for stream in &streams {
251 self.subscriptions.mark_failure(stream);
252 }
253 log::warn!(
254 "Subscription request failed: id={id}, streams={streams:?}, code={}, msg={}",
255 error.code,
256 error.msg
257 );
258 }
259 return vec![NautilusWsMessage::Error(BinanceWsErrorMsg {
260 code: error.code,
261 msg: error.msg,
262 })];
263 }
264
265 if let Ok(value) = serde_json::from_str(text) {
266 vec![NautilusWsMessage::RawJson(value)]
267 } else {
268 log::warn!("Failed to parse JSON message: {text}");
269 vec![]
270 }
271 }
272
273 fn handle_subscription_response(&mut self, response: BinanceWsResponse) {
275 if let Some(streams) = self.pending_requests.remove(&response.id) {
276 if response.result.is_none() {
277 for stream in &streams {
279 self.subscriptions.confirm_subscribe(stream);
280 }
281 log::debug!("Subscription confirmed: streams={streams:?}");
282 } else {
283 for stream in &streams {
285 self.subscriptions.mark_failure(stream);
286 }
287 log::warn!(
288 "Subscription failed: streams={streams:?}, result={:?}",
289 response.result
290 );
291 }
292 } else {
293 log::debug!("Received response for unknown request: id={}", response.id);
294 }
295 }
296
297 fn parse_trades_event(&self, event: TradesStreamEvent) -> Vec<NautilusWsMessage> {
299 let symbol = Ustr::from(&event.symbol);
300
301 let Some(instrument) = self.instruments_cache.get(&symbol) else {
302 log::warn!("No instrument in cache for trades: symbol={}", event.symbol);
303 return vec![];
304 };
305
306 let instrument_id = instrument.id();
307 let price_precision = instrument.price_precision();
308 let size_precision = instrument.size_precision();
309
310 let trades: Vec<Data> = event
311 .trades
312 .iter()
313 .map(|t| {
314 let price =
315 mantissa_to_price(t.price_mantissa, event.price_exponent, price_precision);
316 let size = mantissa_to_quantity(t.qty_mantissa, event.qty_exponent, size_precision);
317 let ts_event = UnixNanos::from(event.transact_time_us as u64 * 1000); let trade = TradeTick::new(
320 instrument_id,
321 price,
322 size,
323 if t.is_buyer_maker {
324 AggressorSide::Seller
325 } else {
326 AggressorSide::Buyer
327 },
328 TradeId::new(t.id.to_string()),
329 ts_event,
330 ts_event, );
332 Data::from(trade)
333 })
334 .collect();
335
336 if trades.is_empty() {
337 vec![]
338 } else {
339 vec![NautilusWsMessage::Data(trades)]
340 }
341 }
342
343 fn parse_bbo_event(&self, event: BestBidAskStreamEvent) -> Vec<NautilusWsMessage> {
345 let symbol = Ustr::from(&event.symbol);
346
347 let Some(instrument) = self.instruments_cache.get(&symbol) else {
348 log::warn!("No instrument in cache for BBO: symbol={}", event.symbol);
349 return vec![];
350 };
351
352 let instrument_id = instrument.id();
353 let price_precision = instrument.price_precision();
354 let size_precision = instrument.size_precision();
355
356 let bid_price = mantissa_to_price(
357 event.bid_price_mantissa,
358 event.price_exponent,
359 price_precision,
360 );
361 let bid_size =
362 mantissa_to_quantity(event.bid_qty_mantissa, event.qty_exponent, size_precision);
363 let ask_price = mantissa_to_price(
364 event.ask_price_mantissa,
365 event.price_exponent,
366 price_precision,
367 );
368 let ask_size =
369 mantissa_to_quantity(event.ask_qty_mantissa, event.qty_exponent, size_precision);
370 let ts_event = UnixNanos::from(event.event_time_us as u64 * 1000); let quote = QuoteTick::new(
373 instrument_id,
374 bid_price,
375 ask_price,
376 bid_size,
377 ask_size,
378 ts_event,
379 ts_event,
380 );
381
382 vec![NautilusWsMessage::Data(vec![Data::from(quote)])]
383 }
384
385 fn parse_depth_snapshot(&self, event: DepthSnapshotStreamEvent) -> Vec<NautilusWsMessage> {
387 let symbol = Ustr::from(&event.symbol);
388
389 let Some(instrument) = self.instruments_cache.get(&symbol) else {
390 log::warn!(
391 "No instrument in cache for depth snapshot: symbol={}",
392 event.symbol
393 );
394 return vec![];
395 };
396
397 let instrument_id = instrument.id();
398 let price_precision = instrument.price_precision();
399 let size_precision = instrument.size_precision();
400 let ts_event = UnixNanos::from(event.event_time_us as u64 * 1000);
401
402 let mut deltas = Vec::with_capacity(event.bids.len() + event.asks.len() + 1);
403
404 deltas.push(OrderBookDelta::clear(instrument_id, 0, ts_event, ts_event));
406
407 for (i, level) in event.bids.iter().enumerate() {
409 let price =
410 mantissa_to_price(level.price_mantissa, event.price_exponent, price_precision);
411 let size = mantissa_to_quantity(level.qty_mantissa, event.qty_exponent, size_precision);
412 let flags = if i == event.bids.len() - 1 && event.asks.is_empty() {
413 RecordFlag::F_LAST as u8
414 } else {
415 0
416 };
417
418 let order = BookOrder::new(
419 OrderSide::Buy,
420 price,
421 size,
422 0, );
424
425 deltas.push(OrderBookDelta::new(
426 instrument_id,
427 BookAction::Add,
428 order,
429 flags,
430 0, ts_event,
432 ts_event,
433 ));
434 }
435
436 for (i, level) in event.asks.iter().enumerate() {
438 let price =
439 mantissa_to_price(level.price_mantissa, event.price_exponent, price_precision);
440 let size = mantissa_to_quantity(level.qty_mantissa, event.qty_exponent, size_precision);
441 let flags = if i == event.asks.len() - 1 {
442 RecordFlag::F_LAST as u8
443 } else {
444 0
445 };
446
447 let order = BookOrder::new(
448 OrderSide::Sell,
449 price,
450 size,
451 0, );
453
454 deltas.push(OrderBookDelta::new(
455 instrument_id,
456 BookAction::Add,
457 order,
458 flags,
459 0, ts_event,
461 ts_event,
462 ));
463 }
464
465 if deltas.len() <= 1 {
466 return vec![];
467 }
468
469 vec![NautilusWsMessage::Deltas(OrderBookDeltas::new(
470 instrument_id,
471 deltas,
472 ))]
473 }
474
475 fn parse_depth_diff(&self, event: DepthDiffStreamEvent) -> Vec<NautilusWsMessage> {
477 let symbol = Ustr::from(&event.symbol);
478
479 let Some(instrument) = self.instruments_cache.get(&symbol) else {
480 log::warn!(
481 "No instrument in cache for depth diff: symbol={}",
482 event.symbol
483 );
484 return vec![];
485 };
486
487 let instrument_id = instrument.id();
488 let price_precision = instrument.price_precision();
489 let size_precision = instrument.size_precision();
490 let ts_event = UnixNanos::from(event.event_time_us as u64 * 1000);
491
492 let mut deltas = Vec::with_capacity(event.bids.len() + event.asks.len());
493
494 for (i, level) in event.bids.iter().enumerate() {
496 let price =
497 mantissa_to_price(level.price_mantissa, event.price_exponent, price_precision);
498 let size = mantissa_to_quantity(level.qty_mantissa, event.qty_exponent, size_precision);
499
500 let action = if level.qty_mantissa == 0 {
502 BookAction::Delete
503 } else {
504 BookAction::Update
505 };
506
507 let flags = if i == event.bids.len() - 1 && event.asks.is_empty() {
508 RecordFlag::F_LAST as u8
509 } else {
510 0
511 };
512
513 let order = BookOrder::new(
514 OrderSide::Buy,
515 price,
516 size,
517 0, );
519
520 deltas.push(OrderBookDelta::new(
521 instrument_id,
522 action,
523 order,
524 flags,
525 0, ts_event,
527 ts_event,
528 ));
529 }
530
531 for (i, level) in event.asks.iter().enumerate() {
533 let price =
534 mantissa_to_price(level.price_mantissa, event.price_exponent, price_precision);
535 let size = mantissa_to_quantity(level.qty_mantissa, event.qty_exponent, size_precision);
536
537 let action = if level.qty_mantissa == 0 {
538 BookAction::Delete
539 } else {
540 BookAction::Update
541 };
542
543 let flags = if i == event.asks.len() - 1 {
544 RecordFlag::F_LAST as u8
545 } else {
546 0
547 };
548
549 let order = BookOrder::new(
550 OrderSide::Sell,
551 price,
552 size,
553 0, );
555
556 deltas.push(OrderBookDelta::new(
557 instrument_id,
558 action,
559 order,
560 flags,
561 0, ts_event,
563 ts_event,
564 ));
565 }
566
567 if deltas.is_empty() {
568 return vec![];
569 }
570
571 vec![NautilusWsMessage::Deltas(OrderBookDeltas::new(
572 instrument_id,
573 deltas,
574 ))]
575 }
576
577 async fn handle_subscribe(&mut self, streams: Vec<String>) -> anyhow::Result<()> {
579 let request_id = self.request_id_counter.fetch_add(1, Ordering::SeqCst);
580 let request = BinanceWsSubscription::subscribe(streams.clone(), request_id);
581 let payload = serde_json::to_string(&request)?;
582
583 self.pending_requests.insert(request_id, streams.clone());
585
586 for stream in &streams {
588 self.subscriptions.mark_subscribe(stream);
589 }
590
591 self.send_text(payload).await?;
592 Ok(())
593 }
594
595 async fn handle_unsubscribe(&mut self, streams: Vec<String>) -> anyhow::Result<()> {
597 let request_id = self.request_id_counter.fetch_add(1, Ordering::SeqCst);
598 let request = BinanceWsSubscription::unsubscribe(streams.clone(), request_id);
599 let payload = serde_json::to_string(&request)?;
600
601 self.send_text(payload).await?;
602
603 for stream in &streams {
606 self.subscriptions.mark_unsubscribe(stream);
607 self.subscriptions.confirm_unsubscribe(stream);
608 }
609
610 Ok(())
611 }
612
613 async fn send_text(&self, payload: String) -> anyhow::Result<()> {
615 let Some(client) = &self.inner else {
616 anyhow::bail!("No active WebSocket client");
617 };
618 client
619 .send_text(payload, None)
620 .await
621 .map_err(|e| anyhow::anyhow!("Failed to send message: {e}"))?;
622 Ok(())
623 }
624}
625
626#[cfg(test)]
627mod tests {
628 use rstest::rstest;
629
630 use super::*;
631 use crate::common::sbe::stream::STREAM_SCHEMA_ID;
632
633 #[rstest]
634 fn test_decode_empty_buffer() {
635 let err = decode_market_data(&[]).unwrap_err();
636 assert!(matches!(err, StreamDecodeError::BufferTooShort { .. }));
637 }
638
639 #[rstest]
640 fn test_decode_short_buffer() {
641 let buf = [0u8; 5];
642 let err = decode_market_data(&buf).unwrap_err();
643 assert!(matches!(err, StreamDecodeError::BufferTooShort { .. }));
644 }
645
646 #[rstest]
647 fn test_decode_wrong_schema() {
648 let mut buf = [0u8; 100];
649 buf[0..2].copy_from_slice(&50u16.to_le_bytes()); buf[2..4].copy_from_slice(&template_id::BEST_BID_ASK_STREAM_EVENT.to_le_bytes());
651 buf[4..6].copy_from_slice(&99u16.to_le_bytes()); buf[6..8].copy_from_slice(&0u16.to_le_bytes()); let err = decode_market_data(&buf).unwrap_err();
655 assert!(matches!(err, StreamDecodeError::SchemaMismatch { .. }));
656 }
657
658 #[rstest]
659 fn test_decode_unknown_template() {
660 let mut buf = [0u8; 100];
661 buf[0..2].copy_from_slice(&50u16.to_le_bytes()); buf[2..4].copy_from_slice(&9999u16.to_le_bytes()); buf[4..6].copy_from_slice(&STREAM_SCHEMA_ID.to_le_bytes());
664 buf[6..8].copy_from_slice(&0u16.to_le_bytes()); let err = decode_market_data(&buf).unwrap_err();
667 assert!(matches!(err, StreamDecodeError::UnknownTemplateId(9999)));
668 }
669}