1use std::{
19 fmt::Debug,
20 sync::{
21 Arc,
22 atomic::{AtomicBool, AtomicU64, Ordering},
23 },
24};
25
26use ahash::AHashMap;
27use nautilus_core::time::AtomicTime;
28use nautilus_model::{
29 data::Data,
30 instruments::{Instrument, InstrumentAny},
31};
32use nautilus_network::{
33 RECONNECTED,
34 websocket::{SubscriptionState, WebSocketClient},
35};
36use ustr::Ustr;
37
38use super::{
39 messages::{
40 BinanceFuturesAccountConfigMsg, BinanceFuturesAccountUpdateMsg, BinanceFuturesAggTradeMsg,
41 BinanceFuturesAlgoUpdateMsg, BinanceFuturesBookTickerMsg, BinanceFuturesDepthUpdateMsg,
42 BinanceFuturesExecWsMessage, BinanceFuturesKlineMsg, BinanceFuturesListenKeyExpiredMsg,
43 BinanceFuturesMarginCallMsg, BinanceFuturesMarkPriceMsg, BinanceFuturesOrderUpdateMsg,
44 BinanceFuturesTradeMsg, BinanceFuturesWsErrorMsg, BinanceFuturesWsErrorResponse,
45 BinanceFuturesWsSubscribeRequest, BinanceFuturesWsSubscribeResponse, DataHandlerCommand,
46 NautilusDataWsMessage, NautilusWsMessage,
47 },
48 parse::{
49 extract_event_type, extract_symbol, parse_agg_trade, parse_book_ticker, parse_depth_update,
50 parse_kline, parse_mark_price, parse_trade,
51 },
52};
53use crate::common::{
54 consts::BINANCE_RATE_LIMIT_KEY_SUBSCRIPTION,
55 enums::{BinanceWsEventType, BinanceWsMethod},
56};
57
58pub struct BinanceFuturesDataWsFeedHandler {
60 clock: &'static AtomicTime,
61 #[allow(dead_code)] signal: Arc<AtomicBool>,
63 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<DataHandlerCommand>,
64 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Vec<u8>>,
65 #[allow(dead_code)] out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
67 client: Option<WebSocketClient>,
68 instruments: AHashMap<Ustr, InstrumentAny>,
69 subscriptions_state: SubscriptionState,
70 request_id_counter: Arc<AtomicU64>,
71 pending_requests: AHashMap<u64, Vec<String>>,
72}
73
74impl Debug for BinanceFuturesDataWsFeedHandler {
75 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76 f.debug_struct(stringify!(BinanceFuturesWsFeedHandler))
77 .field("instruments_count", &self.instruments.len())
78 .field("pending_requests", &self.pending_requests.len())
79 .finish_non_exhaustive()
80 }
81}
82
83impl BinanceFuturesDataWsFeedHandler {
84 pub fn new(
86 clock: &'static AtomicTime,
87 signal: Arc<AtomicBool>,
88 cmd_rx: tokio::sync::mpsc::UnboundedReceiver<DataHandlerCommand>,
89 raw_rx: tokio::sync::mpsc::UnboundedReceiver<Vec<u8>>,
90 out_tx: tokio::sync::mpsc::UnboundedSender<NautilusWsMessage>,
91 subscriptions_state: SubscriptionState,
92 request_id_counter: Arc<AtomicU64>,
93 ) -> Self {
94 Self {
95 clock,
96 signal,
97 cmd_rx,
98 raw_rx,
99 out_tx,
100 client: None,
101 instruments: AHashMap::new(),
102 subscriptions_state,
103 request_id_counter,
104 pending_requests: AHashMap::new(),
105 }
106 }
107
108 pub async fn next(&mut self) -> Option<NautilusWsMessage> {
112 loop {
113 if self.signal.load(Ordering::Relaxed) {
114 return None;
115 }
116
117 tokio::select! {
118 Some(cmd) = self.cmd_rx.recv() => {
119 self.handle_command(cmd).await;
120 }
121 Some(raw) = self.raw_rx.recv() => {
122 if let Some(msg) = self.handle_raw_message(raw).await {
123 return Some(msg);
124 }
125 }
126 else => {
127 return None;
128 }
129 }
130 }
131 }
132
133 async fn handle_command(&mut self, cmd: DataHandlerCommand) {
134 match cmd {
135 DataHandlerCommand::SetClient(client) => {
136 self.client = Some(client);
137 }
138 DataHandlerCommand::Disconnect => {
139 if let Some(client) = &self.client {
140 let () = client.disconnect().await;
141 }
142 self.client = None;
143 }
144 DataHandlerCommand::InitializeInstruments(instruments) => {
145 for inst in instruments {
146 self.instruments.insert(inst.raw_symbol().inner(), inst);
147 }
148 }
149 DataHandlerCommand::UpdateInstrument(instrument) => {
150 self.instruments
151 .insert(instrument.raw_symbol().inner(), instrument);
152 }
153 DataHandlerCommand::Subscribe { streams } => {
154 self.send_subscribe(streams).await;
155 }
156 DataHandlerCommand::Unsubscribe { streams } => {
157 self.send_unsubscribe(streams).await;
158 }
159 }
160 }
161
162 async fn send_subscribe(&mut self, streams: Vec<String>) {
163 let Some(client) = &self.client else {
164 log::warn!("Cannot subscribe: no client connected");
165 return;
166 };
167
168 let request_id = self.request_id_counter.fetch_add(1, Ordering::Relaxed);
169
170 self.pending_requests.insert(request_id, streams.clone());
172
173 for stream in &streams {
175 self.subscriptions_state.mark_subscribe(stream);
176 }
177
178 let request = BinanceFuturesWsSubscribeRequest {
179 method: BinanceWsMethod::Subscribe,
180 params: streams,
181 id: request_id,
182 };
183
184 let json = match serde_json::to_string(&request) {
185 Ok(j) => j,
186 Err(e) => {
187 log::error!("Failed to serialize subscribe request: {e}");
188 return;
189 }
190 };
191
192 if let Err(e) = client
193 .send_text(json, Some(BINANCE_RATE_LIMIT_KEY_SUBSCRIPTION.as_slice()))
194 .await
195 {
196 log::error!("Failed to send subscribe request: {e}");
197 }
198 }
199
200 async fn send_unsubscribe(&mut self, streams: Vec<String>) {
201 let Some(client) = &self.client else {
202 log::warn!("Cannot unsubscribe: no client connected");
203 return;
204 };
205
206 let request_id = self.request_id_counter.fetch_add(1, Ordering::Relaxed);
207
208 let request = BinanceFuturesWsSubscribeRequest {
209 method: BinanceWsMethod::Unsubscribe,
210 params: streams.clone(),
211 id: request_id,
212 };
213
214 let json = match serde_json::to_string(&request) {
215 Ok(j) => j,
216 Err(e) => {
217 log::error!("Failed to serialize unsubscribe request: {e}");
218 return;
219 }
220 };
221
222 if let Err(e) = client
223 .send_text(json, Some(BINANCE_RATE_LIMIT_KEY_SUBSCRIPTION.as_slice()))
224 .await
225 {
226 log::error!("Failed to send unsubscribe request: {e}");
227 }
228
229 for stream in &streams {
231 self.subscriptions_state.confirm_unsubscribe(stream);
232 }
233 }
234
235 async fn handle_raw_message(&mut self, raw: Vec<u8>) -> Option<NautilusWsMessage> {
236 if let Ok(text) = std::str::from_utf8(&raw)
238 && text == RECONNECTED
239 {
240 log::info!("WebSocket reconnected signal received");
241 return Some(NautilusWsMessage::Reconnected);
242 }
243
244 let json: serde_json::Value = match serde_json::from_slice(&raw) {
246 Ok(j) => j,
247 Err(e) => {
248 log::warn!("Failed to parse JSON message: {e}");
249 return None;
250 }
251 };
252
253 if json.get("result").is_some() || json.get("id").is_some() {
255 self.handle_subscription_response(&json);
256 return None;
257 }
258
259 if let Some(code) = json.get("code")
261 && let Some(code) = code.as_i64()
262 {
263 let msg = json
264 .get("msg")
265 .and_then(|m| m.as_str())
266 .unwrap_or("Unknown error")
267 .to_string();
268 return Some(NautilusWsMessage::Error(BinanceFuturesWsErrorMsg {
269 code,
270 msg,
271 }));
272 }
273
274 self.handle_stream_data(&json)
276 }
277
278 fn handle_subscription_response(&mut self, json: &serde_json::Value) {
279 if let Ok(response) =
280 serde_json::from_value::<BinanceFuturesWsSubscribeResponse>(json.clone())
281 {
282 if let Some(streams) = self.pending_requests.remove(&response.id) {
283 if response.result.is_none() {
284 for stream in &streams {
286 self.subscriptions_state.confirm_subscribe(stream);
287 }
288 log::debug!("Subscription confirmed: streams={streams:?}");
289 } else {
290 for stream in &streams {
292 self.subscriptions_state.mark_failure(stream);
293 }
294 log::warn!(
295 "Subscription failed: streams={streams:?}, result={:?}",
296 response.result
297 );
298 }
299 }
300 } else if let Ok(error) =
301 serde_json::from_value::<BinanceFuturesWsErrorResponse>(json.clone())
302 {
303 if let Some(id) = error.id
304 && let Some(streams) = self.pending_requests.remove(&id)
305 {
306 for stream in &streams {
307 self.subscriptions_state.mark_failure(stream);
308 }
309 }
310 log::warn!(
311 "WebSocket error response: code={}, msg={}",
312 error.code,
313 error.msg
314 );
315 }
316 }
317
318 fn handle_stream_data(&self, json: &serde_json::Value) -> Option<NautilusWsMessage> {
319 let ts_init = self.clock.get_time_ns();
320 let event_type = extract_event_type(json)?;
321
322 if let Some(msg) = self.handle_user_data_event(&event_type, json) {
324 return Some(NautilusWsMessage::ExecRaw(msg));
325 }
326
327 if matches!(
329 event_type,
330 BinanceWsEventType::AccountUpdate
331 | BinanceWsEventType::OrderTradeUpdate
332 | BinanceWsEventType::MarginCall
333 | BinanceWsEventType::AccountConfigUpdate
334 | BinanceWsEventType::ListenKeyExpired
335 | BinanceWsEventType::Unknown
336 ) {
337 return None;
338 }
339
340 let symbol = extract_symbol(json)?;
342 let Some(instrument) = self.instruments.get(&symbol) else {
343 log::warn!(
344 "No instrument in cache, dropping message: symbol={symbol}, event_type={event_type:?}"
345 );
346 return None;
347 };
348
349 match event_type {
350 BinanceWsEventType::AggTrade => {
351 if let Ok(msg) = serde_json::from_value::<BinanceFuturesAggTradeMsg>(json.clone()) {
352 match parse_agg_trade(&msg, instrument, ts_init) {
353 Ok(trade) => {
354 return Some(NautilusWsMessage::Data(NautilusDataWsMessage::Data(
355 vec![Data::Trade(trade)],
356 )));
357 }
358 Err(e) => {
359 log::warn!("Failed to parse aggregate trade: {e}");
360 }
361 }
362 }
363 }
364 BinanceWsEventType::Trade => {
365 if let Ok(msg) = serde_json::from_value::<BinanceFuturesTradeMsg>(json.clone()) {
366 match parse_trade(&msg, instrument, ts_init) {
367 Ok(trade) => {
368 return Some(NautilusWsMessage::Data(NautilusDataWsMessage::Data(
369 vec![Data::Trade(trade)],
370 )));
371 }
372 Err(e) => {
373 log::warn!("Failed to parse trade: {e}");
374 }
375 }
376 }
377 }
378 BinanceWsEventType::BookTicker => {
379 if let Ok(msg) = serde_json::from_value::<BinanceFuturesBookTickerMsg>(json.clone())
380 {
381 match parse_book_ticker(&msg, instrument, ts_init) {
382 Ok(quote) => {
383 return Some(NautilusWsMessage::Data(NautilusDataWsMessage::Data(
384 vec![Data::Quote(quote)],
385 )));
386 }
387 Err(e) => {
388 log::warn!("Failed to parse book ticker: {e}");
389 }
390 }
391 }
392 }
393 BinanceWsEventType::DepthUpdate => {
394 if let Ok(msg) =
395 serde_json::from_value::<BinanceFuturesDepthUpdateMsg>(json.clone())
396 {
397 match parse_depth_update(&msg, instrument, ts_init) {
398 Ok(deltas) => {
399 return Some(NautilusWsMessage::Data(
400 NautilusDataWsMessage::DepthUpdate {
401 deltas,
402 first_update_id: msg.first_update_id,
403 prev_final_update_id: msg.prev_final_update_id,
404 },
405 ));
406 }
407 Err(e) => {
408 log::warn!("Failed to parse depth update: {e}");
409 }
410 }
411 }
412 }
413 BinanceWsEventType::MarkPriceUpdate => {
414 if let Ok(msg) = serde_json::from_value::<BinanceFuturesMarkPriceMsg>(json.clone())
415 {
416 match parse_mark_price(&msg, instrument, ts_init) {
417 Ok((mark_update, index_update, _funding_update)) => {
418 return Some(NautilusWsMessage::Data(NautilusDataWsMessage::Data(
421 vec![
422 Data::MarkPriceUpdate(mark_update),
423 Data::IndexPriceUpdate(index_update),
424 ],
425 )));
426 }
427 Err(e) => {
428 log::warn!("Failed to parse mark price: {e}");
429 }
430 }
431 }
432 }
433 BinanceWsEventType::Kline => {
434 if let Ok(msg) = serde_json::from_value::<BinanceFuturesKlineMsg>(json.clone()) {
435 match parse_kline(&msg, instrument, ts_init) {
436 Ok(Some(bar)) => {
437 return Some(NautilusWsMessage::Data(NautilusDataWsMessage::Data(
438 vec![Data::Bar(bar)],
439 )));
440 }
441 Ok(None) => {
442 }
444 Err(e) => {
445 log::warn!("Failed to parse kline: {e}");
446 }
447 }
448 }
449 }
450 BinanceWsEventType::ForceOrder
451 | BinanceWsEventType::Ticker24Hr
452 | BinanceWsEventType::MiniTicker24Hr => {
453 return Some(NautilusWsMessage::Data(NautilusDataWsMessage::RawJson(
455 json.clone(),
456 )));
457 }
458
459 BinanceWsEventType::AccountUpdate
461 | BinanceWsEventType::OrderTradeUpdate
462 | BinanceWsEventType::AlgoUpdate
463 | BinanceWsEventType::MarginCall
464 | BinanceWsEventType::AccountConfigUpdate
465 | BinanceWsEventType::ListenKeyExpired
466 | BinanceWsEventType::Unknown => unreachable!(),
467 }
468
469 None
470 }
471
472 fn handle_user_data_event(
473 &self,
474 event_type: &BinanceWsEventType,
475 json: &serde_json::Value,
476 ) -> Option<BinanceFuturesExecWsMessage> {
477 match event_type {
478 BinanceWsEventType::AccountUpdate => {
479 match serde_json::from_value::<BinanceFuturesAccountUpdateMsg>(json.clone()) {
480 Ok(msg) => {
481 log::debug!(
482 "Account update: reason={:?}, balances={}, positions={}",
483 msg.account.reason,
484 msg.account.balances.len(),
485 msg.account.positions.len()
486 );
487 Some(BinanceFuturesExecWsMessage::AccountUpdate(msg))
488 }
489 Err(e) => {
490 log::warn!("Failed to parse account update: {e}");
491 None
492 }
493 }
494 }
495 BinanceWsEventType::OrderTradeUpdate => {
496 match serde_json::from_value::<BinanceFuturesOrderUpdateMsg>(json.clone()) {
497 Ok(msg) => {
498 log::debug!(
499 "Order update: symbol={}, order_id={}, exec={:?}, status={:?}",
500 msg.order.symbol,
501 msg.order.order_id,
502 msg.order.execution_type,
503 msg.order.order_status
504 );
505 Some(BinanceFuturesExecWsMessage::OrderUpdate(Box::new(msg)))
506 }
507 Err(e) => {
508 log::warn!("Failed to parse order update: {e}");
509 None
510 }
511 }
512 }
513 BinanceWsEventType::AlgoUpdate => {
514 match serde_json::from_value::<BinanceFuturesAlgoUpdateMsg>(json.clone()) {
515 Ok(msg) => {
516 log::debug!(
517 "Algo order update: symbol={}, algo_id={}, status={:?}",
518 msg.algo_order.symbol,
519 msg.algo_order.algo_id,
520 msg.algo_order.algo_status
521 );
522 Some(BinanceFuturesExecWsMessage::AlgoUpdate(Box::new(msg)))
523 }
524 Err(e) => {
525 log::warn!("Failed to parse algo order update: {e}");
526 None
527 }
528 }
529 }
530 BinanceWsEventType::MarginCall => {
531 match serde_json::from_value::<BinanceFuturesMarginCallMsg>(json.clone()) {
532 Ok(msg) => {
533 log::warn!(
534 "Margin call: cross_wallet_balance={}, positions_at_risk={}",
535 msg.cross_wallet_balance,
536 msg.positions.len()
537 );
538 Some(BinanceFuturesExecWsMessage::MarginCall(msg))
539 }
540 Err(e) => {
541 log::warn!("Failed to parse margin call: {e}");
542 None
543 }
544 }
545 }
546 BinanceWsEventType::AccountConfigUpdate => {
547 match serde_json::from_value::<BinanceFuturesAccountConfigMsg>(json.clone()) {
548 Ok(msg) => {
549 if let Some(ref lc) = msg.leverage_config {
550 log::debug!(
551 "Account config update: symbol={}, leverage={}",
552 lc.symbol,
553 lc.leverage
554 );
555 }
556 Some(BinanceFuturesExecWsMessage::AccountConfigUpdate(msg))
557 }
558 Err(e) => {
559 log::warn!("Failed to parse account config update: {e}");
560 None
561 }
562 }
563 }
564 BinanceWsEventType::ListenKeyExpired => {
565 match serde_json::from_value::<BinanceFuturesListenKeyExpiredMsg>(json.clone()) {
566 Ok(msg) => {
567 log::warn!("Listen key expired at {}", msg.event_time);
568 Some(BinanceFuturesExecWsMessage::ListenKeyExpired)
569 }
570 Err(e) => {
571 log::warn!("Failed to parse listen key expired: {e}");
572 None
573 }
574 }
575 }
576 _ => None,
577 }
578 }
579}