1use std::sync::{
19 Arc, RwLock,
20 atomic::{AtomicBool, Ordering},
21};
22
23use ahash::AHashMap;
24use anyhow::Context;
25use futures_util::{StreamExt, pin_mut};
26use nautilus_common::{
27 clients::DataClient,
28 live::{runner::get_data_event_sender, runtime::get_runtime},
29 messages::{
30 DataEvent,
31 data::{
32 BarsResponse, DataResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
33 RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
34 SubscribeBookDeltas, SubscribeFundingRates, SubscribeIndexPrices, SubscribeInstrument,
35 SubscribeInstruments, SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades,
36 TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeFundingRates,
37 UnsubscribeIndexPrices, UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
38 },
39 },
40};
41use nautilus_core::{
42 MUTEX_POISONED,
43 datetime::{NANOSECONDS_IN_MILLISECOND, datetime_to_unix_nanos},
44 nanos::UnixNanos,
45 time::{AtomicTime, get_atomic_clock_realtime},
46};
47use nautilus_model::{
48 data::{BookOrder, Data, OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API},
49 enums::{BookAction, BookType, OrderSide, RecordFlag},
50 identifiers::{ClientId, InstrumentId, Venue},
51 instruments::{Instrument, InstrumentAny},
52 types::{Price, Quantity},
53};
54use tokio::task::JoinHandle;
55use tokio_util::sync::CancellationToken;
56
57use crate::{
58 common::{
59 consts::{BINANCE_BOOK_DEPTHS, BINANCE_VENUE},
60 enums::BinanceProductType,
61 parse::bar_spec_to_binance_interval,
62 symbol::format_binance_stream_symbol,
63 },
64 config::BinanceDataClientConfig,
65 futures::{
66 http::{
67 client::BinanceFuturesHttpClient, models::BinanceOrderBook, query::BinanceDepthParams,
68 },
69 websocket::{
70 client::BinanceFuturesWebSocketClient,
71 messages::{NautilusDataWsMessage, NautilusWsMessage},
72 },
73 },
74};
75
76#[derive(Debug, Clone)]
77struct BufferedDepthUpdate {
78 deltas: OrderBookDeltas,
79 first_update_id: u64,
80 final_update_id: u64,
81 prev_final_update_id: u64,
82}
83
84#[derive(Debug)]
85struct BookBuffer {
86 updates: Vec<BufferedDepthUpdate>,
87 epoch: u64,
88}
89
90impl BookBuffer {
91 fn new(epoch: u64) -> Self {
92 Self {
93 updates: Vec::new(),
94 epoch,
95 }
96 }
97}
98
99#[derive(Debug)]
101pub struct BinanceFuturesDataClient {
102 clock: &'static AtomicTime,
103 client_id: ClientId,
104 config: BinanceDataClientConfig,
105 product_type: BinanceProductType,
106 http_client: BinanceFuturesHttpClient,
107 ws_client: BinanceFuturesWebSocketClient,
108 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
109 is_connected: AtomicBool,
110 cancellation_token: CancellationToken,
111 tasks: Vec<JoinHandle<()>>,
112 instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
113 book_buffers: Arc<RwLock<AHashMap<InstrumentId, BookBuffer>>>,
114 book_subscriptions: Arc<RwLock<AHashMap<InstrumentId, u32>>>,
115 mark_price_refs: Arc<RwLock<AHashMap<InstrumentId, u32>>>,
116 book_epoch: Arc<RwLock<u64>>,
117}
118
119impl BinanceFuturesDataClient {
120 pub fn new(
127 client_id: ClientId,
128 config: BinanceDataClientConfig,
129 product_type: BinanceProductType,
130 ) -> anyhow::Result<Self> {
131 match product_type {
132 BinanceProductType::UsdM | BinanceProductType::CoinM => {}
133 _ => {
134 anyhow::bail!(
135 "BinanceFuturesDataClient requires UsdM or CoinM product type, was {product_type:?}"
136 );
137 }
138 }
139
140 let clock = get_atomic_clock_realtime();
141 let data_sender = get_data_event_sender();
142
143 let http_client = BinanceFuturesHttpClient::new(
144 product_type,
145 config.environment,
146 config.api_key.clone(),
147 config.api_secret.clone(),
148 config.base_url_http.clone(),
149 None, None, None, )?;
153
154 let ws_client = BinanceFuturesWebSocketClient::new(
155 product_type,
156 config.environment,
157 config.api_key.clone(),
158 config.api_secret.clone(),
159 config.base_url_ws.clone(),
160 Some(20), )?;
162
163 Ok(Self {
164 clock,
165 client_id,
166 config,
167 product_type,
168 http_client,
169 ws_client,
170 data_sender,
171 is_connected: AtomicBool::new(false),
172 cancellation_token: CancellationToken::new(),
173 tasks: Vec::new(),
174 instruments: Arc::new(RwLock::new(AHashMap::new())),
175 book_buffers: Arc::new(RwLock::new(AHashMap::new())),
176 book_subscriptions: Arc::new(RwLock::new(AHashMap::new())),
177 mark_price_refs: Arc::new(RwLock::new(AHashMap::new())),
178 book_epoch: Arc::new(RwLock::new(0)),
179 })
180 }
181
182 fn venue(&self) -> Venue {
183 *BINANCE_VENUE
184 }
185
186 fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
187 if let Err(e) = sender.send(DataEvent::Data(data)) {
188 log::error!("Failed to emit data event: {e}");
189 }
190 }
191
192 fn spawn_ws<F>(&self, fut: F, context: &'static str)
193 where
194 F: Future<Output = anyhow::Result<()>> + Send + 'static,
195 {
196 get_runtime().spawn(async move {
197 if let Err(e) = fut.await {
198 log::error!("{context}: {e:?}");
199 }
200 });
201 }
202
203 #[allow(clippy::too_many_arguments)]
204 fn handle_ws_message(
205 msg: NautilusWsMessage,
206 data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
207 instruments: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
208 book_buffers: &Arc<RwLock<AHashMap<InstrumentId, BookBuffer>>>,
209 book_subscriptions: &Arc<RwLock<AHashMap<InstrumentId, u32>>>,
210 book_epoch: &Arc<RwLock<u64>>,
211 http_client: &BinanceFuturesHttpClient,
212 clock: &'static AtomicTime,
213 ) {
214 match msg {
215 NautilusWsMessage::Data(data_msg) => match data_msg {
216 NautilusDataWsMessage::Data(payloads) => {
217 for data in payloads {
218 Self::send_data(data_sender, data);
219 }
220 }
221 NautilusDataWsMessage::DepthUpdate {
222 deltas,
223 first_update_id,
224 prev_final_update_id,
225 } => {
226 let instrument_id = deltas.instrument_id;
227 let final_update_id = deltas.sequence;
228
229 {
231 let mut buffers = book_buffers.write().expect(MUTEX_POISONED);
232 if let Some(buffer) = buffers.get_mut(&instrument_id) {
233 buffer.updates.push(BufferedDepthUpdate {
234 deltas,
235 first_update_id,
236 final_update_id,
237 prev_final_update_id,
238 });
239 return;
240 }
241 }
242
243 Self::send_data(data_sender, Data::Deltas(OrderBookDeltas_API::new(deltas)));
245 }
246 NautilusDataWsMessage::Instrument(instrument) => {
247 upsert_instrument(instruments, *instrument);
248 }
249 NautilusDataWsMessage::RawJson(value) => {
250 log::debug!("Unhandled JSON message: {value:?}");
251 }
252 },
253 NautilusWsMessage::Exec(exec_msg) => {
254 log::debug!("Received exec message in data client (ignored): {exec_msg:?}");
255 }
256 NautilusWsMessage::ExecRaw(raw_msg) => {
257 log::debug!("Received raw exec message in data client (ignored): {raw_msg:?}");
258 }
259 NautilusWsMessage::Error(e) => {
260 log::error!(
261 "Binance Futures WebSocket error: code={}, msg={}",
262 e.code,
263 e.msg
264 );
265 }
266 NautilusWsMessage::Reconnected => {
267 log::info!("WebSocket reconnected, rebuilding order book snapshots");
268
269 let epoch = {
271 let mut guard = book_epoch.write().expect(MUTEX_POISONED);
272 *guard = guard.wrapping_add(1);
273 *guard
274 };
275
276 let subs: Vec<(InstrumentId, u32)> = {
278 let guard = book_subscriptions.read().expect(MUTEX_POISONED);
279 guard.iter().map(|(k, v)| (*k, *v)).collect()
280 };
281
282 for (instrument_id, depth) in subs {
284 {
286 let mut buffers = book_buffers.write().expect(MUTEX_POISONED);
287 buffers.insert(instrument_id, BookBuffer::new(epoch));
288 }
289
290 log::info!(
291 "OrderBook snapshot rebuild for {instrument_id} @ depth {depth} \
292 starting (reconnect, epoch={epoch})"
293 );
294
295 let http = http_client.clone();
297 let sender = data_sender.clone();
298 let buffers = book_buffers.clone();
299 let insts = instruments.clone();
300
301 get_runtime().spawn(async move {
302 Self::fetch_and_emit_snapshot(
303 http,
304 sender,
305 buffers,
306 insts,
307 instrument_id,
308 depth,
309 epoch,
310 clock,
311 )
312 .await;
313 });
314 }
315 }
316 }
317 }
318
319 #[allow(clippy::too_many_arguments)]
320 async fn fetch_and_emit_snapshot(
321 http: BinanceFuturesHttpClient,
322 sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
323 buffers: Arc<RwLock<AHashMap<InstrumentId, BookBuffer>>>,
324 instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
325 instrument_id: InstrumentId,
326 depth: u32,
327 epoch: u64,
328 clock: &'static AtomicTime,
329 ) {
330 Self::fetch_and_emit_snapshot_inner(
331 http,
332 sender,
333 buffers,
334 instruments,
335 instrument_id,
336 depth,
337 epoch,
338 clock,
339 0,
340 )
341 .await;
342 }
343
344 #[allow(clippy::too_many_arguments)]
345 async fn fetch_and_emit_snapshot_inner(
346 http: BinanceFuturesHttpClient,
347 sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
348 buffers: Arc<RwLock<AHashMap<InstrumentId, BookBuffer>>>,
349 instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
350 instrument_id: InstrumentId,
351 depth: u32,
352 epoch: u64,
353 clock: &'static AtomicTime,
354 retry_count: u32,
355 ) {
356 const MAX_RETRIES: u32 = 3;
357
358 let symbol = format_binance_stream_symbol(&instrument_id).to_uppercase();
359 let params = BinanceDepthParams {
360 symbol,
361 limit: Some(depth),
362 };
363
364 match http.depth(¶ms).await {
365 Ok(order_book) => {
366 let ts_init = clock.get_time_ns();
367 let last_update_id = order_book.last_update_id as u64;
368
369 {
371 let guard = buffers.read().expect(MUTEX_POISONED);
372 match guard.get(&instrument_id) {
373 None => {
374 log::debug!(
375 "OrderBook subscription for {instrument_id} was cancelled, \
376 discarding snapshot"
377 );
378 return;
379 }
380 Some(buffer) if buffer.epoch != epoch => {
381 log::debug!(
382 "OrderBook snapshot for {instrument_id} is stale \
383 (epoch {epoch} != {}), discarding",
384 buffer.epoch
385 );
386 return;
387 }
388 _ => {}
389 }
390 }
391
392 let (price_precision, size_precision) = {
394 let guard = instruments.read().expect(MUTEX_POISONED);
395 match guard.get(&instrument_id) {
396 Some(inst) => (inst.price_precision(), inst.size_precision()),
397 None => {
398 log::error!("No instrument in cache for snapshot: {instrument_id}");
399 let mut buffers = buffers.write().expect(MUTEX_POISONED);
400 buffers.remove(&instrument_id);
401 return;
402 }
403 }
404 };
405
406 let first_valid = {
409 let guard = buffers.read().expect(MUTEX_POISONED);
410 guard.get(&instrument_id).and_then(|buffer| {
411 buffer
412 .updates
413 .iter()
414 .find(|u| u.final_update_id > last_update_id)
415 .cloned()
416 })
417 };
418
419 if let Some(first) = &first_valid {
420 let target = last_update_id + 1;
421 let valid_overlap =
422 first.first_update_id <= target && first.final_update_id >= target;
423
424 if !valid_overlap {
425 if retry_count < MAX_RETRIES {
426 log::warn!(
427 "OrderBook overlap validation failed for {instrument_id}: \
428 lastUpdateId={last_update_id}, first_update_id={}, \
429 final_update_id={} (need U <= {} <= u), \
430 retrying snapshot (attempt {}/{})",
431 first.first_update_id,
432 first.final_update_id,
433 target,
434 retry_count + 1,
435 MAX_RETRIES
436 );
437
438 {
439 let mut buffers = buffers.write().expect(MUTEX_POISONED);
440 if let Some(buffer) = buffers.get_mut(&instrument_id)
441 && buffer.epoch == epoch
442 {
443 buffer.updates.clear();
444 }
445 }
446
447 Box::pin(Self::fetch_and_emit_snapshot_inner(
448 http,
449 sender,
450 buffers,
451 instruments,
452 instrument_id,
453 depth,
454 epoch,
455 clock,
456 retry_count + 1,
457 ))
458 .await;
459 return;
460 }
461 log::error!(
462 "OrderBook overlap validation failed for {instrument_id} after \
463 {MAX_RETRIES} retries; book may be inconsistent"
464 );
465 }
466 }
467
468 let snapshot_deltas = parse_order_book_snapshot(
469 &order_book,
470 instrument_id,
471 price_precision,
472 size_precision,
473 ts_init,
474 );
475
476 if let Err(e) = sender.send(DataEvent::Data(Data::Deltas(
477 OrderBookDeltas_API::new(snapshot_deltas),
478 ))) {
479 log::error!("Failed to send snapshot: {e}");
480 }
481
482 let buffered = {
484 let mut buffers = buffers.write().expect(MUTEX_POISONED);
485 if let Some(buffer) = buffers.get_mut(&instrument_id) {
486 if buffer.epoch != epoch {
487 return;
488 }
489 std::mem::take(&mut buffer.updates)
490 } else {
491 return;
492 }
493 };
494
495 let mut replayed = 0;
497 let mut last_final_update_id = last_update_id;
498
499 for update in buffered {
500 if update.final_update_id <= last_update_id {
502 continue;
503 }
504
505 if update.prev_final_update_id != last_final_update_id {
508 if retry_count < MAX_RETRIES {
509 log::warn!(
510 "OrderBook continuity break for {instrument_id}: \
511 expected pu={last_final_update_id}, was pu={}, \
512 triggering resync (attempt {}/{})",
513 update.prev_final_update_id,
514 retry_count + 1,
515 MAX_RETRIES
516 );
517
518 {
519 let mut buffers = buffers.write().expect(MUTEX_POISONED);
520 if let Some(buffer) = buffers.get_mut(&instrument_id)
521 && buffer.epoch == epoch
522 {
523 buffer.updates.clear();
524 }
525 }
526
527 Box::pin(Self::fetch_and_emit_snapshot_inner(
528 http,
529 sender,
530 buffers,
531 instruments,
532 instrument_id,
533 depth,
534 epoch,
535 clock,
536 retry_count + 1,
537 ))
538 .await;
539 return;
540 }
541 log::error!(
542 "OrderBook continuity break for {instrument_id} after {MAX_RETRIES} \
543 retries: expected pu={last_final_update_id}, was pu={}; \
544 book may be inconsistent",
545 update.prev_final_update_id
546 );
547 }
548
549 last_final_update_id = update.final_update_id;
550 replayed += 1;
551
552 if let Err(e) = sender.send(DataEvent::Data(Data::Deltas(
553 OrderBookDeltas_API::new(update.deltas),
554 ))) {
555 log::error!("Failed to send replayed deltas: {e}");
556 }
557 }
558
559 loop {
561 let more = {
562 let mut buffers = buffers.write().expect(MUTEX_POISONED);
563 if let Some(buffer) = buffers.get_mut(&instrument_id) {
564 if buffer.epoch != epoch {
565 break;
566 }
567 if buffer.updates.is_empty() {
568 buffers.remove(&instrument_id);
569 break;
570 }
571 std::mem::take(&mut buffer.updates)
572 } else {
573 break;
574 }
575 };
576
577 for update in more {
578 if update.final_update_id <= last_update_id {
579 continue;
580 }
581
582 if update.prev_final_update_id != last_final_update_id {
583 if retry_count < MAX_RETRIES {
584 log::warn!(
585 "OrderBook continuity break for {instrument_id}: \
586 expected pu={last_final_update_id}, was pu={}, \
587 triggering resync (attempt {}/{})",
588 update.prev_final_update_id,
589 retry_count + 1,
590 MAX_RETRIES
591 );
592
593 {
594 let mut buffers = buffers.write().expect(MUTEX_POISONED);
595 if let Some(buffer) = buffers.get_mut(&instrument_id)
596 && buffer.epoch == epoch
597 {
598 buffer.updates.clear();
599 }
600 }
601
602 Box::pin(Self::fetch_and_emit_snapshot_inner(
603 http,
604 sender,
605 buffers,
606 instruments,
607 instrument_id,
608 depth,
609 epoch,
610 clock,
611 retry_count + 1,
612 ))
613 .await;
614 return;
615 }
616 log::error!(
617 "OrderBook continuity break for {instrument_id} after \
618 {MAX_RETRIES} retries; book may be inconsistent"
619 );
620 }
621
622 last_final_update_id = update.final_update_id;
623 replayed += 1;
624
625 if let Err(e) = sender.send(DataEvent::Data(Data::Deltas(
626 OrderBookDeltas_API::new(update.deltas),
627 ))) {
628 log::error!("Failed to send replayed deltas: {e}");
629 }
630 }
631 }
632
633 log::info!(
634 "OrderBook snapshot rebuild for {instrument_id} completed \
635 (lastUpdateId={last_update_id}, replayed={replayed})"
636 );
637 }
638 Err(e) => {
639 log::error!("Failed to request order book snapshot for {instrument_id}: {e}");
640 let mut buffers = buffers.write().expect(MUTEX_POISONED);
641 buffers.remove(&instrument_id);
642 }
643 }
644 }
645}
646
647fn upsert_instrument(
648 cache: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
649 instrument: InstrumentAny,
650) {
651 let mut guard = cache.write().expect(MUTEX_POISONED);
652 guard.insert(instrument.id(), instrument);
653}
654
655fn parse_order_book_snapshot(
656 order_book: &BinanceOrderBook,
657 instrument_id: InstrumentId,
658 price_precision: u8,
659 size_precision: u8,
660 ts_init: UnixNanos,
661) -> OrderBookDeltas {
662 let sequence = order_book.last_update_id as u64;
663 let ts_event = order_book.transaction_time.map_or(ts_init, |t| {
664 UnixNanos::from((t as u64) * NANOSECONDS_IN_MILLISECOND)
665 });
666
667 let total_levels = order_book.bids.len() + order_book.asks.len();
668 let mut deltas = Vec::with_capacity(total_levels + 1);
669
670 deltas.push(OrderBookDelta::clear(
672 instrument_id,
673 sequence,
674 ts_event,
675 ts_init,
676 ));
677
678 for (i, (price_str, qty_str)) in order_book.bids.iter().enumerate() {
679 let price: f64 = price_str.parse().unwrap_or(0.0);
680 let size: f64 = qty_str.parse().unwrap_or(0.0);
681
682 let is_last = i == order_book.bids.len() - 1 && order_book.asks.is_empty();
683 let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
684
685 let order = BookOrder::new(
686 OrderSide::Buy,
687 Price::new(price, price_precision),
688 Quantity::new(size, size_precision),
689 0,
690 );
691
692 deltas.push(OrderBookDelta::new(
693 instrument_id,
694 BookAction::Add,
695 order,
696 flags,
697 sequence,
698 ts_event,
699 ts_init,
700 ));
701 }
702
703 for (i, (price_str, qty_str)) in order_book.asks.iter().enumerate() {
704 let price: f64 = price_str.parse().unwrap_or(0.0);
705 let size: f64 = qty_str.parse().unwrap_or(0.0);
706
707 let is_last = i == order_book.asks.len() - 1;
708 let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
709
710 let order = BookOrder::new(
711 OrderSide::Sell,
712 Price::new(price, price_precision),
713 Quantity::new(size, size_precision),
714 0,
715 );
716
717 deltas.push(OrderBookDelta::new(
718 instrument_id,
719 BookAction::Add,
720 order,
721 flags,
722 sequence,
723 ts_event,
724 ts_init,
725 ));
726 }
727
728 OrderBookDeltas::new(instrument_id, deltas)
729}
730
731#[async_trait::async_trait(?Send)]
732impl DataClient for BinanceFuturesDataClient {
733 fn client_id(&self) -> ClientId {
734 self.client_id
735 }
736
737 fn venue(&self) -> Option<Venue> {
738 Some(self.venue())
739 }
740
741 fn start(&mut self) -> anyhow::Result<()> {
742 log::info!(
743 "Started: client_id={}, product_type={:?}, environment={:?}",
744 self.client_id,
745 self.product_type,
746 self.config.environment,
747 );
748 Ok(())
749 }
750
751 fn stop(&mut self) -> anyhow::Result<()> {
752 log::info!("Stopping {id}", id = self.client_id);
753 self.cancellation_token.cancel();
754 self.is_connected.store(false, Ordering::Relaxed);
755 Ok(())
756 }
757
758 fn reset(&mut self) -> anyhow::Result<()> {
759 log::debug!("Resetting {id}", id = self.client_id);
760
761 self.cancellation_token.cancel();
762
763 for task in self.tasks.drain(..) {
764 task.abort();
765 }
766
767 let mut ws = self.ws_client.clone();
768 get_runtime().spawn(async move {
769 let _ = ws.close().await;
770 });
771
772 {
774 let mut refs = self.mark_price_refs.write().expect(MUTEX_POISONED);
775 refs.clear();
776 }
777 {
778 let mut subs = self.book_subscriptions.write().expect(MUTEX_POISONED);
779 subs.clear();
780 }
781 {
782 let mut buffers = self.book_buffers.write().expect(MUTEX_POISONED);
783 buffers.clear();
784 }
785
786 self.is_connected.store(false, Ordering::Relaxed);
787 self.cancellation_token = CancellationToken::new();
788 Ok(())
789 }
790
791 fn dispose(&mut self) -> anyhow::Result<()> {
792 log::debug!("Disposing {id}", id = self.client_id);
793 self.stop()
794 }
795
796 async fn connect(&mut self) -> anyhow::Result<()> {
797 if self.is_connected() {
798 return Ok(());
799 }
800
801 self.cancellation_token = CancellationToken::new();
803
804 let instruments = self
805 .http_client
806 .request_instruments()
807 .await
808 .context("failed to request Binance Futures instruments")?;
809
810 {
811 let mut guard = self.instruments.write().expect(MUTEX_POISONED);
812 for instrument in &instruments {
813 guard.insert(instrument.id(), instrument.clone());
814 }
815 }
816
817 for instrument in instruments.clone() {
818 if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
819 log::warn!("Failed to send instrument: {e}");
820 }
821 }
822
823 self.ws_client.cache_instruments(instruments);
824
825 log::info!("Connecting to Binance Futures WebSocket...");
826 self.ws_client.connect().await.map_err(|e| {
827 log::error!("Binance Futures WebSocket connection failed: {e:?}");
828 anyhow::anyhow!("failed to connect Binance Futures WebSocket: {e}")
829 })?;
830 log::info!("Binance Futures WebSocket connected");
831
832 let stream = self.ws_client.stream();
833 let sender = self.data_sender.clone();
834 let insts = self.instruments.clone();
835 let buffers = self.book_buffers.clone();
836 let book_subs = self.book_subscriptions.clone();
837 let book_epoch = self.book_epoch.clone();
838 let http = self.http_client.clone();
839 let clock = self.clock;
840 let cancel = self.cancellation_token.clone();
841
842 let handle = get_runtime().spawn(async move {
843 pin_mut!(stream);
844 loop {
845 tokio::select! {
846 Some(message) = stream.next() => {
847 Self::handle_ws_message(
848 message,
849 &sender,
850 &insts,
851 &buffers,
852 &book_subs,
853 &book_epoch,
854 &http,
855 clock,
856 );
857 }
858 () = cancel.cancelled() => {
859 log::debug!("WebSocket stream task cancelled");
860 break;
861 }
862 }
863 }
864 });
865 self.tasks.push(handle);
866
867 self.is_connected.store(true, Ordering::Release);
868 log::info!("Connected: client_id={}", self.client_id);
869 Ok(())
870 }
871
872 async fn disconnect(&mut self) -> anyhow::Result<()> {
873 if self.is_disconnected() {
874 return Ok(());
875 }
876
877 self.cancellation_token.cancel();
878
879 let _ = self.ws_client.close().await;
880
881 let handles: Vec<_> = self.tasks.drain(..).collect();
882 for handle in handles {
883 if let Err(e) = handle.await {
884 log::error!("Error joining WebSocket task: {e}");
885 }
886 }
887
888 {
890 let mut refs = self.mark_price_refs.write().expect(MUTEX_POISONED);
891 refs.clear();
892 }
893 {
894 let mut subs = self.book_subscriptions.write().expect(MUTEX_POISONED);
895 subs.clear();
896 }
897 {
898 let mut buffers = self.book_buffers.write().expect(MUTEX_POISONED);
899 buffers.clear();
900 }
901
902 self.is_connected.store(false, Ordering::Release);
903 log::info!("Disconnected: client_id={}", self.client_id);
904 Ok(())
905 }
906
907 fn is_connected(&self) -> bool {
908 self.is_connected.load(Ordering::Relaxed)
909 }
910
911 fn is_disconnected(&self) -> bool {
912 !self.is_connected()
913 }
914
915 fn subscribe_instruments(&mut self, _cmd: &SubscribeInstruments) -> anyhow::Result<()> {
916 log::debug!(
917 "subscribe_instruments: Binance Futures instruments are fetched via HTTP on connect"
918 );
919 Ok(())
920 }
921
922 fn subscribe_instrument(&mut self, _cmd: &SubscribeInstrument) -> anyhow::Result<()> {
923 log::debug!(
924 "subscribe_instrument: Binance Futures instruments are fetched via HTTP on connect"
925 );
926 Ok(())
927 }
928
929 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
930 if cmd.book_type != BookType::L2_MBP {
931 anyhow::bail!("Binance Futures only supports L2_MBP order book deltas");
932 }
933
934 let instrument_id = cmd.instrument_id;
935 let depth = cmd.depth.map_or(1000, |d| d.get() as u32);
936
937 if !BINANCE_BOOK_DEPTHS.contains(&depth) {
938 anyhow::bail!(
939 "Invalid depth {depth} for Binance Futures order book. \
940 Valid values: {BINANCE_BOOK_DEPTHS:?}"
941 );
942 }
943
944 {
946 let mut subs = self.book_subscriptions.write().expect(MUTEX_POISONED);
947 subs.insert(instrument_id, depth);
948 }
949
950 let epoch = {
952 let mut guard = self.book_epoch.write().expect(MUTEX_POISONED);
953 *guard = guard.wrapping_add(1);
954 *guard
955 };
956
957 {
959 let mut buffers = self.book_buffers.write().expect(MUTEX_POISONED);
960 buffers.insert(instrument_id, BookBuffer::new(epoch));
961 }
962
963 log::info!("OrderBook snapshot rebuild for {instrument_id} @ depth {depth} starting");
964
965 let ws = self.ws_client.clone();
967 let stream = format!("{}@depth@0ms", format_binance_stream_symbol(&instrument_id));
968
969 self.spawn_ws(
970 async move {
971 ws.subscribe(vec![stream])
972 .await
973 .context("book deltas subscription")
974 },
975 "order book subscription",
976 );
977
978 let http = self.http_client.clone();
980 let sender = self.data_sender.clone();
981 let buffers = self.book_buffers.clone();
982 let instruments = self.instruments.clone();
983 let clock = self.clock;
984
985 get_runtime().spawn(async move {
986 Self::fetch_and_emit_snapshot(
987 http,
988 sender,
989 buffers,
990 instruments,
991 instrument_id,
992 depth,
993 epoch,
994 clock,
995 )
996 .await;
997 });
998
999 Ok(())
1000 }
1001
1002 fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
1003 let instrument_id = cmd.instrument_id;
1004 let ws = self.ws_client.clone();
1005
1006 let stream = format!(
1008 "{}@bookTicker",
1009 format_binance_stream_symbol(&instrument_id)
1010 );
1011
1012 self.spawn_ws(
1013 async move {
1014 ws.subscribe(vec![stream])
1015 .await
1016 .context("quotes subscription")
1017 },
1018 "quote subscription",
1019 );
1020 Ok(())
1021 }
1022
1023 fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
1024 let instrument_id = cmd.instrument_id;
1025 let ws = self.ws_client.clone();
1026
1027 let stream = format!("{}@aggTrade", format_binance_stream_symbol(&instrument_id));
1029
1030 self.spawn_ws(
1031 async move {
1032 ws.subscribe(vec![stream])
1033 .await
1034 .context("trades subscription")
1035 },
1036 "trade subscription",
1037 );
1038 Ok(())
1039 }
1040
1041 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
1042 let bar_type = cmd.bar_type;
1043 let ws = self.ws_client.clone();
1044 let interval = bar_spec_to_binance_interval(bar_type.spec())?;
1045
1046 let stream = format!(
1047 "{}@kline_{}",
1048 format_binance_stream_symbol(&bar_type.instrument_id()),
1049 interval.as_str()
1050 );
1051
1052 self.spawn_ws(
1053 async move {
1054 ws.subscribe(vec![stream])
1055 .await
1056 .context("bars subscription")
1057 },
1058 "bar subscription",
1059 );
1060 Ok(())
1061 }
1062
1063 fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
1064 let instrument_id = cmd.instrument_id;
1065
1066 let should_subscribe = {
1068 let mut refs = self.mark_price_refs.write().expect(MUTEX_POISONED);
1069 let count = refs.entry(instrument_id).or_insert(0);
1070 *count += 1;
1071 *count == 1
1072 };
1073
1074 if should_subscribe {
1075 let ws = self.ws_client.clone();
1076 let stream = format!(
1077 "{}@markPrice@1s",
1078 format_binance_stream_symbol(&instrument_id)
1079 );
1080
1081 self.spawn_ws(
1082 async move {
1083 ws.subscribe(vec![stream])
1084 .await
1085 .context("mark prices subscription")
1086 },
1087 "mark prices subscription",
1088 );
1089 }
1090 Ok(())
1091 }
1092
1093 fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
1094 let instrument_id = cmd.instrument_id;
1095
1096 let should_subscribe = {
1098 let mut refs = self.mark_price_refs.write().expect(MUTEX_POISONED);
1099 let count = refs.entry(instrument_id).or_insert(0);
1100 *count += 1;
1101 *count == 1
1102 };
1103
1104 if should_subscribe {
1105 let ws = self.ws_client.clone();
1106 let stream = format!(
1107 "{}@markPrice@1s",
1108 format_binance_stream_symbol(&instrument_id)
1109 );
1110
1111 self.spawn_ws(
1112 async move {
1113 ws.subscribe(vec![stream])
1114 .await
1115 .context("index prices subscription")
1116 },
1117 "index prices subscription",
1118 );
1119 }
1120 Ok(())
1121 }
1122
1123 fn subscribe_funding_rates(&mut self, _cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
1124 anyhow::bail!(
1127 "Funding rate subscriptions are not yet supported for Binance Futures. \
1128 The Data enum does not have a FundingRateUpdate variant."
1129 )
1130 }
1131
1132 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
1133 let instrument_id = cmd.instrument_id;
1134 let ws = self.ws_client.clone();
1135
1136 {
1138 let mut subs = self.book_subscriptions.write().expect(MUTEX_POISONED);
1139 subs.remove(&instrument_id);
1140 }
1141
1142 {
1144 let mut buffers = self.book_buffers.write().expect(MUTEX_POISONED);
1145 buffers.remove(&instrument_id);
1146 }
1147
1148 let symbol_lower = format_binance_stream_symbol(&instrument_id);
1149 let streams = vec![
1150 format!("{symbol_lower}@depth"),
1151 format!("{symbol_lower}@depth@0ms"),
1152 format!("{symbol_lower}@depth@100ms"),
1153 format!("{symbol_lower}@depth@250ms"),
1154 format!("{symbol_lower}@depth@500ms"),
1155 ];
1156
1157 self.spawn_ws(
1158 async move {
1159 ws.unsubscribe(streams)
1160 .await
1161 .context("book deltas unsubscribe")
1162 },
1163 "order book unsubscribe",
1164 );
1165 Ok(())
1166 }
1167
1168 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
1169 let instrument_id = cmd.instrument_id;
1170 let ws = self.ws_client.clone();
1171
1172 let stream = format!(
1173 "{}@bookTicker",
1174 format_binance_stream_symbol(&instrument_id)
1175 );
1176
1177 self.spawn_ws(
1178 async move {
1179 ws.unsubscribe(vec![stream])
1180 .await
1181 .context("quotes unsubscribe")
1182 },
1183 "quote unsubscribe",
1184 );
1185 Ok(())
1186 }
1187
1188 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
1189 let instrument_id = cmd.instrument_id;
1190 let ws = self.ws_client.clone();
1191
1192 let stream = format!("{}@aggTrade", format_binance_stream_symbol(&instrument_id));
1193
1194 self.spawn_ws(
1195 async move {
1196 ws.unsubscribe(vec![stream])
1197 .await
1198 .context("trades unsubscribe")
1199 },
1200 "trade unsubscribe",
1201 );
1202 Ok(())
1203 }
1204
1205 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
1206 let bar_type = cmd.bar_type;
1207 let ws = self.ws_client.clone();
1208 let interval = bar_spec_to_binance_interval(bar_type.spec())?;
1209
1210 let stream = format!(
1211 "{}@kline_{}",
1212 format_binance_stream_symbol(&bar_type.instrument_id()),
1213 interval.as_str()
1214 );
1215
1216 self.spawn_ws(
1217 async move {
1218 ws.unsubscribe(vec![stream])
1219 .await
1220 .context("bars unsubscribe")
1221 },
1222 "bar unsubscribe",
1223 );
1224 Ok(())
1225 }
1226
1227 fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
1228 let instrument_id = cmd.instrument_id;
1229
1230 let should_unsubscribe = {
1232 let mut refs = self.mark_price_refs.write().expect(MUTEX_POISONED);
1233 if let Some(count) = refs.get_mut(&instrument_id) {
1234 *count = count.saturating_sub(1);
1235 if *count == 0 {
1236 refs.remove(&instrument_id);
1237 true
1238 } else {
1239 false
1240 }
1241 } else {
1242 false
1243 }
1244 };
1245
1246 if should_unsubscribe {
1247 let ws = self.ws_client.clone();
1248 let symbol_lower = format_binance_stream_symbol(&instrument_id);
1249 let streams = vec![
1250 format!("{symbol_lower}@markPrice"),
1251 format!("{symbol_lower}@markPrice@1s"),
1252 format!("{symbol_lower}@markPrice@3s"),
1253 ];
1254
1255 self.spawn_ws(
1256 async move {
1257 ws.unsubscribe(streams)
1258 .await
1259 .context("mark prices unsubscribe")
1260 },
1261 "mark prices unsubscribe",
1262 );
1263 }
1264 Ok(())
1265 }
1266
1267 fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
1268 let instrument_id = cmd.instrument_id;
1269
1270 let should_unsubscribe = {
1272 let mut refs = self.mark_price_refs.write().expect(MUTEX_POISONED);
1273 if let Some(count) = refs.get_mut(&instrument_id) {
1274 *count = count.saturating_sub(1);
1275 if *count == 0 {
1276 refs.remove(&instrument_id);
1277 true
1278 } else {
1279 false
1280 }
1281 } else {
1282 false
1283 }
1284 };
1285
1286 if should_unsubscribe {
1287 let ws = self.ws_client.clone();
1288 let symbol_lower = format_binance_stream_symbol(&instrument_id);
1289 let streams = vec![
1290 format!("{symbol_lower}@markPrice"),
1291 format!("{symbol_lower}@markPrice@1s"),
1292 format!("{symbol_lower}@markPrice@3s"),
1293 ];
1294
1295 self.spawn_ws(
1296 async move {
1297 ws.unsubscribe(streams)
1298 .await
1299 .context("index prices unsubscribe")
1300 },
1301 "index prices unsubscribe",
1302 );
1303 }
1304 Ok(())
1305 }
1306
1307 fn unsubscribe_funding_rates(&mut self, _cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
1308 Ok(())
1310 }
1311
1312 fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
1313 let http = self.http_client.clone();
1314 let sender = self.data_sender.clone();
1315 let instruments_cache = self.instruments.clone();
1316 let request_id = request.request_id;
1317 let client_id = request.client_id.unwrap_or(self.client_id);
1318 let venue = self.venue();
1319 let start = request.start;
1320 let end = request.end;
1321 let params = request.params;
1322 let clock = self.clock;
1323 let start_nanos = datetime_to_unix_nanos(start);
1324 let end_nanos = datetime_to_unix_nanos(end);
1325
1326 get_runtime().spawn(async move {
1327 match http.request_instruments().await {
1328 Ok(instruments) => {
1329 for instrument in &instruments {
1330 upsert_instrument(&instruments_cache, instrument.clone());
1331 }
1332
1333 let response = DataResponse::Instruments(InstrumentsResponse::new(
1334 request_id,
1335 client_id,
1336 venue,
1337 instruments,
1338 start_nanos,
1339 end_nanos,
1340 clock.get_time_ns(),
1341 params,
1342 ));
1343
1344 if let Err(e) = sender.send(DataEvent::Response(response)) {
1345 log::error!("Failed to send instruments response: {e}");
1346 }
1347 }
1348 Err(e) => log::error!("Instruments request failed: {e:?}"),
1349 }
1350 });
1351
1352 Ok(())
1353 }
1354
1355 fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
1356 let http = self.http_client.clone();
1357 let sender = self.data_sender.clone();
1358 let instruments = self.instruments.clone();
1359 let instrument_id = request.instrument_id;
1360 let request_id = request.request_id;
1361 let client_id = request.client_id.unwrap_or(self.client_id);
1362 let start = request.start;
1363 let end = request.end;
1364 let params = request.params;
1365 let clock = self.clock;
1366 let start_nanos = datetime_to_unix_nanos(start);
1367 let end_nanos = datetime_to_unix_nanos(end);
1368
1369 get_runtime().spawn(async move {
1370 {
1371 let guard = instruments.read().expect(MUTEX_POISONED);
1372 if let Some(instrument) = guard.get(&instrument_id) {
1373 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
1374 request_id,
1375 client_id,
1376 instrument.id(),
1377 instrument.clone(),
1378 start_nanos,
1379 end_nanos,
1380 clock.get_time_ns(),
1381 params,
1382 )));
1383
1384 if let Err(e) = sender.send(DataEvent::Response(response)) {
1385 log::error!("Failed to send instrument response: {e}");
1386 }
1387 return;
1388 }
1389 }
1390
1391 match http.request_instruments().await {
1392 Ok(all_instruments) => {
1393 for instrument in &all_instruments {
1394 upsert_instrument(&instruments, instrument.clone());
1395 }
1396
1397 let instrument = all_instruments
1398 .into_iter()
1399 .find(|i| i.id() == instrument_id);
1400
1401 if let Some(instrument) = instrument {
1402 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
1403 request_id,
1404 client_id,
1405 instrument.id(),
1406 instrument,
1407 start_nanos,
1408 end_nanos,
1409 clock.get_time_ns(),
1410 params,
1411 )));
1412
1413 if let Err(e) = sender.send(DataEvent::Response(response)) {
1414 log::error!("Failed to send instrument response: {e}");
1415 }
1416 } else {
1417 log::error!("Instrument not found: {instrument_id}");
1418 }
1419 }
1420 Err(e) => log::error!("Instrument request failed: {e:?}"),
1421 }
1422 });
1423
1424 Ok(())
1425 }
1426
1427 fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
1428 let http = self.http_client.clone();
1429 let sender = self.data_sender.clone();
1430 let instrument_id = request.instrument_id;
1431 let limit = request.limit.map(|n| n.get() as u32);
1432 let request_id = request.request_id;
1433 let client_id = request.client_id.unwrap_or(self.client_id);
1434 let params = request.params;
1435 let clock = self.clock;
1436 let start_nanos = datetime_to_unix_nanos(request.start);
1437 let end_nanos = datetime_to_unix_nanos(request.end);
1438
1439 get_runtime().spawn(async move {
1440 match http
1441 .request_trades(instrument_id, limit)
1442 .await
1443 .context("failed to request trades from Binance Futures")
1444 {
1445 Ok(trades) => {
1446 let response = DataResponse::Trades(TradesResponse::new(
1447 request_id,
1448 client_id,
1449 instrument_id,
1450 trades,
1451 start_nanos,
1452 end_nanos,
1453 clock.get_time_ns(),
1454 params,
1455 ));
1456 if let Err(e) = sender.send(DataEvent::Response(response)) {
1457 log::error!("Failed to send trades response: {e}");
1458 }
1459 }
1460 Err(e) => log::error!("Trade request failed: {e:?}"),
1461 }
1462 });
1463
1464 Ok(())
1465 }
1466
1467 fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
1468 let http = self.http_client.clone();
1469 let sender = self.data_sender.clone();
1470 let bar_type = request.bar_type;
1471 let start = request.start;
1472 let end = request.end;
1473 let limit = request.limit.map(|n| n.get() as u32);
1474 let request_id = request.request_id;
1475 let client_id = request.client_id.unwrap_or(self.client_id);
1476 let params = request.params;
1477 let clock = self.clock;
1478 let start_nanos = datetime_to_unix_nanos(start);
1479 let end_nanos = datetime_to_unix_nanos(end);
1480
1481 get_runtime().spawn(async move {
1482 match http
1483 .request_bars(bar_type, start, end, limit)
1484 .await
1485 .context("failed to request bars from Binance Futures")
1486 {
1487 Ok(bars) => {
1488 let response = DataResponse::Bars(BarsResponse::new(
1489 request_id,
1490 client_id,
1491 bar_type,
1492 bars,
1493 start_nanos,
1494 end_nanos,
1495 clock.get_time_ns(),
1496 params,
1497 ));
1498 if let Err(e) = sender.send(DataEvent::Response(response)) {
1499 log::error!("Failed to send bars response: {e}");
1500 }
1501 }
1502 Err(e) => log::error!("Bar request failed: {e:?}"),
1503 }
1504 });
1505
1506 Ok(())
1507 }
1508}