Skip to main content

nautilus_binance/futures/
data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live market data client implementation for the Binance Futures adapter.
17
18use std::sync::{
19    Arc, RwLock,
20    atomic::{AtomicBool, Ordering},
21};
22
23use ahash::AHashMap;
24use anyhow::Context;
25use futures_util::{StreamExt, pin_mut};
26use nautilus_common::{
27    clients::DataClient,
28    live::{runner::get_data_event_sender, runtime::get_runtime},
29    messages::{
30        DataEvent,
31        data::{
32            BarsResponse, DataResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
33            RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
34            SubscribeBookDeltas, SubscribeFundingRates, SubscribeIndexPrices, SubscribeInstrument,
35            SubscribeInstruments, SubscribeMarkPrices, SubscribeQuotes, SubscribeTrades,
36            TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas, UnsubscribeFundingRates,
37            UnsubscribeIndexPrices, UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
38        },
39    },
40};
41use nautilus_core::{
42    MUTEX_POISONED,
43    datetime::{NANOSECONDS_IN_MILLISECOND, datetime_to_unix_nanos},
44    nanos::UnixNanos,
45    time::{AtomicTime, get_atomic_clock_realtime},
46};
47use nautilus_model::{
48    data::{BookOrder, Data, OrderBookDelta, OrderBookDeltas, OrderBookDeltas_API},
49    enums::{BookAction, BookType, OrderSide, RecordFlag},
50    identifiers::{ClientId, InstrumentId, Venue},
51    instruments::{Instrument, InstrumentAny},
52    types::{Price, Quantity},
53};
54use tokio::task::JoinHandle;
55use tokio_util::sync::CancellationToken;
56
57use crate::{
58    common::{
59        consts::{BINANCE_BOOK_DEPTHS, BINANCE_VENUE},
60        enums::BinanceProductType,
61        parse::bar_spec_to_binance_interval,
62        symbol::format_binance_stream_symbol,
63    },
64    config::BinanceDataClientConfig,
65    futures::{
66        http::{
67            client::BinanceFuturesHttpClient, models::BinanceOrderBook, query::BinanceDepthParams,
68        },
69        websocket::{
70            client::BinanceFuturesWebSocketClient,
71            messages::{NautilusDataWsMessage, NautilusWsMessage},
72        },
73    },
74};
75
76#[derive(Debug, Clone)]
77struct BufferedDepthUpdate {
78    deltas: OrderBookDeltas,
79    first_update_id: u64,
80    final_update_id: u64,
81    prev_final_update_id: u64,
82}
83
84#[derive(Debug)]
85struct BookBuffer {
86    updates: Vec<BufferedDepthUpdate>,
87    epoch: u64,
88}
89
90impl BookBuffer {
91    fn new(epoch: u64) -> Self {
92        Self {
93            updates: Vec::new(),
94            epoch,
95        }
96    }
97}
98
99/// Binance Futures data client for USD-M and COIN-M markets.
100#[derive(Debug)]
101pub struct BinanceFuturesDataClient {
102    clock: &'static AtomicTime,
103    client_id: ClientId,
104    config: BinanceDataClientConfig,
105    product_type: BinanceProductType,
106    http_client: BinanceFuturesHttpClient,
107    ws_client: BinanceFuturesWebSocketClient,
108    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
109    is_connected: AtomicBool,
110    cancellation_token: CancellationToken,
111    tasks: Vec<JoinHandle<()>>,
112    instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
113    book_buffers: Arc<RwLock<AHashMap<InstrumentId, BookBuffer>>>,
114    book_subscriptions: Arc<RwLock<AHashMap<InstrumentId, u32>>>,
115    mark_price_refs: Arc<RwLock<AHashMap<InstrumentId, u32>>>,
116    book_epoch: Arc<RwLock<u64>>,
117}
118
119impl BinanceFuturesDataClient {
120    /// Creates a new [`BinanceFuturesDataClient`] instance.
121    ///
122    /// # Errors
123    ///
124    /// Returns an error if the client fails to initialize or if the product type
125    /// is not a futures type (UsdM or CoinM).
126    pub fn new(
127        client_id: ClientId,
128        config: BinanceDataClientConfig,
129        product_type: BinanceProductType,
130    ) -> anyhow::Result<Self> {
131        match product_type {
132            BinanceProductType::UsdM | BinanceProductType::CoinM => {}
133            _ => {
134                anyhow::bail!(
135                    "BinanceFuturesDataClient requires UsdM or CoinM product type, was {product_type:?}"
136                );
137            }
138        }
139
140        let clock = get_atomic_clock_realtime();
141        let data_sender = get_data_event_sender();
142
143        let http_client = BinanceFuturesHttpClient::new(
144            product_type,
145            config.environment,
146            config.api_key.clone(),
147            config.api_secret.clone(),
148            config.base_url_http.clone(),
149            None, // recv_window
150            None, // timeout_secs
151            None, // proxy_url
152        )?;
153
154        let ws_client = BinanceFuturesWebSocketClient::new(
155            product_type,
156            config.environment,
157            config.api_key.clone(),
158            config.api_secret.clone(),
159            config.base_url_ws.clone(),
160            Some(20), // Heartbeat interval
161        )?;
162
163        Ok(Self {
164            clock,
165            client_id,
166            config,
167            product_type,
168            http_client,
169            ws_client,
170            data_sender,
171            is_connected: AtomicBool::new(false),
172            cancellation_token: CancellationToken::new(),
173            tasks: Vec::new(),
174            instruments: Arc::new(RwLock::new(AHashMap::new())),
175            book_buffers: Arc::new(RwLock::new(AHashMap::new())),
176            book_subscriptions: Arc::new(RwLock::new(AHashMap::new())),
177            mark_price_refs: Arc::new(RwLock::new(AHashMap::new())),
178            book_epoch: Arc::new(RwLock::new(0)),
179        })
180    }
181
182    fn venue(&self) -> Venue {
183        *BINANCE_VENUE
184    }
185
186    fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
187        if let Err(e) = sender.send(DataEvent::Data(data)) {
188            log::error!("Failed to emit data event: {e}");
189        }
190    }
191
192    fn spawn_ws<F>(&self, fut: F, context: &'static str)
193    where
194        F: Future<Output = anyhow::Result<()>> + Send + 'static,
195    {
196        get_runtime().spawn(async move {
197            if let Err(e) = fut.await {
198                log::error!("{context}: {e:?}");
199            }
200        });
201    }
202
203    #[allow(clippy::too_many_arguments)]
204    fn handle_ws_message(
205        msg: NautilusWsMessage,
206        data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
207        instruments: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
208        book_buffers: &Arc<RwLock<AHashMap<InstrumentId, BookBuffer>>>,
209        book_subscriptions: &Arc<RwLock<AHashMap<InstrumentId, u32>>>,
210        book_epoch: &Arc<RwLock<u64>>,
211        http_client: &BinanceFuturesHttpClient,
212        clock: &'static AtomicTime,
213    ) {
214        match msg {
215            NautilusWsMessage::Data(data_msg) => match data_msg {
216                NautilusDataWsMessage::Data(payloads) => {
217                    for data in payloads {
218                        Self::send_data(data_sender, data);
219                    }
220                }
221                NautilusDataWsMessage::DepthUpdate {
222                    deltas,
223                    first_update_id,
224                    prev_final_update_id,
225                } => {
226                    let instrument_id = deltas.instrument_id;
227                    let final_update_id = deltas.sequence;
228
229                    // Check if we're buffering for this instrument
230                    {
231                        let mut buffers = book_buffers.write().expect(MUTEX_POISONED);
232                        if let Some(buffer) = buffers.get_mut(&instrument_id) {
233                            buffer.updates.push(BufferedDepthUpdate {
234                                deltas,
235                                first_update_id,
236                                final_update_id,
237                                prev_final_update_id,
238                            });
239                            return;
240                        }
241                    }
242
243                    // Not buffering, emit directly
244                    Self::send_data(data_sender, Data::Deltas(OrderBookDeltas_API::new(deltas)));
245                }
246                NautilusDataWsMessage::Instrument(instrument) => {
247                    upsert_instrument(instruments, *instrument);
248                }
249                NautilusDataWsMessage::RawJson(value) => {
250                    log::debug!("Unhandled JSON message: {value:?}");
251                }
252            },
253            NautilusWsMessage::Exec(exec_msg) => {
254                log::debug!("Received exec message in data client (ignored): {exec_msg:?}");
255            }
256            NautilusWsMessage::ExecRaw(raw_msg) => {
257                log::debug!("Received raw exec message in data client (ignored): {raw_msg:?}");
258            }
259            NautilusWsMessage::Error(e) => {
260                log::error!(
261                    "Binance Futures WebSocket error: code={}, msg={}",
262                    e.code,
263                    e.msg
264                );
265            }
266            NautilusWsMessage::Reconnected => {
267                log::info!("WebSocket reconnected, rebuilding order book snapshots");
268
269                // Increment epoch to invalidate any in-flight snapshot tasks
270                let epoch = {
271                    let mut guard = book_epoch.write().expect(MUTEX_POISONED);
272                    *guard = guard.wrapping_add(1);
273                    *guard
274                };
275
276                // Get all active book subscriptions
277                let subs: Vec<(InstrumentId, u32)> = {
278                    let guard = book_subscriptions.read().expect(MUTEX_POISONED);
279                    guard.iter().map(|(k, v)| (*k, *v)).collect()
280                };
281
282                // Trigger snapshot rebuild for each active subscription
283                for (instrument_id, depth) in subs {
284                    // Start buffering deltas with new epoch
285                    {
286                        let mut buffers = book_buffers.write().expect(MUTEX_POISONED);
287                        buffers.insert(instrument_id, BookBuffer::new(epoch));
288                    }
289
290                    log::info!(
291                        "OrderBook snapshot rebuild for {instrument_id} @ depth {depth} \
292                        starting (reconnect, epoch={epoch})"
293                    );
294
295                    // Spawn snapshot fetch task
296                    let http = http_client.clone();
297                    let sender = data_sender.clone();
298                    let buffers = book_buffers.clone();
299                    let insts = instruments.clone();
300
301                    get_runtime().spawn(async move {
302                        Self::fetch_and_emit_snapshot(
303                            http,
304                            sender,
305                            buffers,
306                            insts,
307                            instrument_id,
308                            depth,
309                            epoch,
310                            clock,
311                        )
312                        .await;
313                    });
314                }
315            }
316        }
317    }
318
319    #[allow(clippy::too_many_arguments)]
320    async fn fetch_and_emit_snapshot(
321        http: BinanceFuturesHttpClient,
322        sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
323        buffers: Arc<RwLock<AHashMap<InstrumentId, BookBuffer>>>,
324        instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
325        instrument_id: InstrumentId,
326        depth: u32,
327        epoch: u64,
328        clock: &'static AtomicTime,
329    ) {
330        Self::fetch_and_emit_snapshot_inner(
331            http,
332            sender,
333            buffers,
334            instruments,
335            instrument_id,
336            depth,
337            epoch,
338            clock,
339            0,
340        )
341        .await;
342    }
343
344    #[allow(clippy::too_many_arguments)]
345    async fn fetch_and_emit_snapshot_inner(
346        http: BinanceFuturesHttpClient,
347        sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
348        buffers: Arc<RwLock<AHashMap<InstrumentId, BookBuffer>>>,
349        instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
350        instrument_id: InstrumentId,
351        depth: u32,
352        epoch: u64,
353        clock: &'static AtomicTime,
354        retry_count: u32,
355    ) {
356        const MAX_RETRIES: u32 = 3;
357
358        let symbol = format_binance_stream_symbol(&instrument_id).to_uppercase();
359        let params = BinanceDepthParams {
360            symbol,
361            limit: Some(depth),
362        };
363
364        match http.depth(&params).await {
365            Ok(order_book) => {
366                let ts_init = clock.get_time_ns();
367                let last_update_id = order_book.last_update_id as u64;
368
369                // Check if subscription was cancelled or epoch changed
370                {
371                    let guard = buffers.read().expect(MUTEX_POISONED);
372                    match guard.get(&instrument_id) {
373                        None => {
374                            log::debug!(
375                                "OrderBook subscription for {instrument_id} was cancelled, \
376                                discarding snapshot"
377                            );
378                            return;
379                        }
380                        Some(buffer) if buffer.epoch != epoch => {
381                            log::debug!(
382                                "OrderBook snapshot for {instrument_id} is stale \
383                                (epoch {epoch} != {}), discarding",
384                                buffer.epoch
385                            );
386                            return;
387                        }
388                        _ => {}
389                    }
390                }
391
392                // Get instrument for precision
393                let (price_precision, size_precision) = {
394                    let guard = instruments.read().expect(MUTEX_POISONED);
395                    match guard.get(&instrument_id) {
396                        Some(inst) => (inst.price_precision(), inst.size_precision()),
397                        None => {
398                            log::error!("No instrument in cache for snapshot: {instrument_id}");
399                            let mut buffers = buffers.write().expect(MUTEX_POISONED);
400                            buffers.remove(&instrument_id);
401                            return;
402                        }
403                    }
404                };
405
406                // Validate first applicable update per Binance spec:
407                // First update must satisfy: U <= lastUpdateId+1 AND u >= lastUpdateId+1
408                let first_valid = {
409                    let guard = buffers.read().expect(MUTEX_POISONED);
410                    guard.get(&instrument_id).and_then(|buffer| {
411                        buffer
412                            .updates
413                            .iter()
414                            .find(|u| u.final_update_id > last_update_id)
415                            .cloned()
416                    })
417                };
418
419                if let Some(first) = &first_valid {
420                    let target = last_update_id + 1;
421                    let valid_overlap =
422                        first.first_update_id <= target && first.final_update_id >= target;
423
424                    if !valid_overlap {
425                        if retry_count < MAX_RETRIES {
426                            log::warn!(
427                                "OrderBook overlap validation failed for {instrument_id}: \
428                                lastUpdateId={last_update_id}, first_update_id={}, \
429                                final_update_id={} (need U <= {} <= u), \
430                                retrying snapshot (attempt {}/{})",
431                                first.first_update_id,
432                                first.final_update_id,
433                                target,
434                                retry_count + 1,
435                                MAX_RETRIES
436                            );
437
438                            {
439                                let mut buffers = buffers.write().expect(MUTEX_POISONED);
440                                if let Some(buffer) = buffers.get_mut(&instrument_id)
441                                    && buffer.epoch == epoch
442                                {
443                                    buffer.updates.clear();
444                                }
445                            }
446
447                            Box::pin(Self::fetch_and_emit_snapshot_inner(
448                                http,
449                                sender,
450                                buffers,
451                                instruments,
452                                instrument_id,
453                                depth,
454                                epoch,
455                                clock,
456                                retry_count + 1,
457                            ))
458                            .await;
459                            return;
460                        }
461                        log::error!(
462                            "OrderBook overlap validation failed for {instrument_id} after \
463                            {MAX_RETRIES} retries; book may be inconsistent"
464                        );
465                    }
466                }
467
468                let snapshot_deltas = parse_order_book_snapshot(
469                    &order_book,
470                    instrument_id,
471                    price_precision,
472                    size_precision,
473                    ts_init,
474                );
475
476                if let Err(e) = sender.send(DataEvent::Data(Data::Deltas(
477                    OrderBookDeltas_API::new(snapshot_deltas),
478                ))) {
479                    log::error!("Failed to send snapshot: {e}");
480                }
481
482                // Take buffered updates but keep buffer entry during replay
483                let buffered = {
484                    let mut buffers = buffers.write().expect(MUTEX_POISONED);
485                    if let Some(buffer) = buffers.get_mut(&instrument_id) {
486                        if buffer.epoch != epoch {
487                            return;
488                        }
489                        std::mem::take(&mut buffer.updates)
490                    } else {
491                        return;
492                    }
493                };
494
495                // Replay buffered updates with continuity validation
496                let mut replayed = 0;
497                let mut last_final_update_id = last_update_id;
498
499                for update in buffered {
500                    // Drop updates where u <= lastUpdateId
501                    if update.final_update_id <= last_update_id {
502                        continue;
503                    }
504
505                    // Validate continuity: pu should equal last emitted final_update_id
506                    // (for first update, this validates pu == snapshot lastUpdateId)
507                    if update.prev_final_update_id != last_final_update_id {
508                        if retry_count < MAX_RETRIES {
509                            log::warn!(
510                                "OrderBook continuity break for {instrument_id}: \
511                                expected pu={last_final_update_id}, was pu={}, \
512                                triggering resync (attempt {}/{})",
513                                update.prev_final_update_id,
514                                retry_count + 1,
515                                MAX_RETRIES
516                            );
517
518                            {
519                                let mut buffers = buffers.write().expect(MUTEX_POISONED);
520                                if let Some(buffer) = buffers.get_mut(&instrument_id)
521                                    && buffer.epoch == epoch
522                                {
523                                    buffer.updates.clear();
524                                }
525                            }
526
527                            Box::pin(Self::fetch_and_emit_snapshot_inner(
528                                http,
529                                sender,
530                                buffers,
531                                instruments,
532                                instrument_id,
533                                depth,
534                                epoch,
535                                clock,
536                                retry_count + 1,
537                            ))
538                            .await;
539                            return;
540                        }
541                        log::error!(
542                            "OrderBook continuity break for {instrument_id} after {MAX_RETRIES} \
543                            retries: expected pu={last_final_update_id}, was pu={}; \
544                            book may be inconsistent",
545                            update.prev_final_update_id
546                        );
547                    }
548
549                    last_final_update_id = update.final_update_id;
550                    replayed += 1;
551
552                    if let Err(e) = sender.send(DataEvent::Data(Data::Deltas(
553                        OrderBookDeltas_API::new(update.deltas),
554                    ))) {
555                        log::error!("Failed to send replayed deltas: {e}");
556                    }
557                }
558
559                // Drain any updates that arrived during replay
560                loop {
561                    let more = {
562                        let mut buffers = buffers.write().expect(MUTEX_POISONED);
563                        if let Some(buffer) = buffers.get_mut(&instrument_id) {
564                            if buffer.epoch != epoch {
565                                break;
566                            }
567                            if buffer.updates.is_empty() {
568                                buffers.remove(&instrument_id);
569                                break;
570                            }
571                            std::mem::take(&mut buffer.updates)
572                        } else {
573                            break;
574                        }
575                    };
576
577                    for update in more {
578                        if update.final_update_id <= last_update_id {
579                            continue;
580                        }
581
582                        if update.prev_final_update_id != last_final_update_id {
583                            if retry_count < MAX_RETRIES {
584                                log::warn!(
585                                    "OrderBook continuity break for {instrument_id}: \
586                                    expected pu={last_final_update_id}, was pu={}, \
587                                    triggering resync (attempt {}/{})",
588                                    update.prev_final_update_id,
589                                    retry_count + 1,
590                                    MAX_RETRIES
591                                );
592
593                                {
594                                    let mut buffers = buffers.write().expect(MUTEX_POISONED);
595                                    if let Some(buffer) = buffers.get_mut(&instrument_id)
596                                        && buffer.epoch == epoch
597                                    {
598                                        buffer.updates.clear();
599                                    }
600                                }
601
602                                Box::pin(Self::fetch_and_emit_snapshot_inner(
603                                    http,
604                                    sender,
605                                    buffers,
606                                    instruments,
607                                    instrument_id,
608                                    depth,
609                                    epoch,
610                                    clock,
611                                    retry_count + 1,
612                                ))
613                                .await;
614                                return;
615                            }
616                            log::error!(
617                                "OrderBook continuity break for {instrument_id} after \
618                                {MAX_RETRIES} retries; book may be inconsistent"
619                            );
620                        }
621
622                        last_final_update_id = update.final_update_id;
623                        replayed += 1;
624
625                        if let Err(e) = sender.send(DataEvent::Data(Data::Deltas(
626                            OrderBookDeltas_API::new(update.deltas),
627                        ))) {
628                            log::error!("Failed to send replayed deltas: {e}");
629                        }
630                    }
631                }
632
633                log::info!(
634                    "OrderBook snapshot rebuild for {instrument_id} completed \
635                    (lastUpdateId={last_update_id}, replayed={replayed})"
636                );
637            }
638            Err(e) => {
639                log::error!("Failed to request order book snapshot for {instrument_id}: {e}");
640                let mut buffers = buffers.write().expect(MUTEX_POISONED);
641                buffers.remove(&instrument_id);
642            }
643        }
644    }
645}
646
647fn upsert_instrument(
648    cache: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
649    instrument: InstrumentAny,
650) {
651    let mut guard = cache.write().expect(MUTEX_POISONED);
652    guard.insert(instrument.id(), instrument);
653}
654
655fn parse_order_book_snapshot(
656    order_book: &BinanceOrderBook,
657    instrument_id: InstrumentId,
658    price_precision: u8,
659    size_precision: u8,
660    ts_init: UnixNanos,
661) -> OrderBookDeltas {
662    let sequence = order_book.last_update_id as u64;
663    let ts_event = order_book.transaction_time.map_or(ts_init, |t| {
664        UnixNanos::from((t as u64) * NANOSECONDS_IN_MILLISECOND)
665    });
666
667    let total_levels = order_book.bids.len() + order_book.asks.len();
668    let mut deltas = Vec::with_capacity(total_levels + 1);
669
670    // First delta is CLEAR to reset the book
671    deltas.push(OrderBookDelta::clear(
672        instrument_id,
673        sequence,
674        ts_event,
675        ts_init,
676    ));
677
678    for (i, (price_str, qty_str)) in order_book.bids.iter().enumerate() {
679        let price: f64 = price_str.parse().unwrap_or(0.0);
680        let size: f64 = qty_str.parse().unwrap_or(0.0);
681
682        let is_last = i == order_book.bids.len() - 1 && order_book.asks.is_empty();
683        let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
684
685        let order = BookOrder::new(
686            OrderSide::Buy,
687            Price::new(price, price_precision),
688            Quantity::new(size, size_precision),
689            0,
690        );
691
692        deltas.push(OrderBookDelta::new(
693            instrument_id,
694            BookAction::Add,
695            order,
696            flags,
697            sequence,
698            ts_event,
699            ts_init,
700        ));
701    }
702
703    for (i, (price_str, qty_str)) in order_book.asks.iter().enumerate() {
704        let price: f64 = price_str.parse().unwrap_or(0.0);
705        let size: f64 = qty_str.parse().unwrap_or(0.0);
706
707        let is_last = i == order_book.asks.len() - 1;
708        let flags = if is_last { RecordFlag::F_LAST as u8 } else { 0 };
709
710        let order = BookOrder::new(
711            OrderSide::Sell,
712            Price::new(price, price_precision),
713            Quantity::new(size, size_precision),
714            0,
715        );
716
717        deltas.push(OrderBookDelta::new(
718            instrument_id,
719            BookAction::Add,
720            order,
721            flags,
722            sequence,
723            ts_event,
724            ts_init,
725        ));
726    }
727
728    OrderBookDeltas::new(instrument_id, deltas)
729}
730
731#[async_trait::async_trait(?Send)]
732impl DataClient for BinanceFuturesDataClient {
733    fn client_id(&self) -> ClientId {
734        self.client_id
735    }
736
737    fn venue(&self) -> Option<Venue> {
738        Some(self.venue())
739    }
740
741    fn start(&mut self) -> anyhow::Result<()> {
742        log::info!(
743            "Started: client_id={}, product_type={:?}, environment={:?}",
744            self.client_id,
745            self.product_type,
746            self.config.environment,
747        );
748        Ok(())
749    }
750
751    fn stop(&mut self) -> anyhow::Result<()> {
752        log::info!("Stopping {id}", id = self.client_id);
753        self.cancellation_token.cancel();
754        self.is_connected.store(false, Ordering::Relaxed);
755        Ok(())
756    }
757
758    fn reset(&mut self) -> anyhow::Result<()> {
759        log::debug!("Resetting {id}", id = self.client_id);
760
761        self.cancellation_token.cancel();
762
763        for task in self.tasks.drain(..) {
764            task.abort();
765        }
766
767        let mut ws = self.ws_client.clone();
768        get_runtime().spawn(async move {
769            let _ = ws.close().await;
770        });
771
772        // Clear subscription state so resubscribes issue fresh WS subscribes
773        {
774            let mut refs = self.mark_price_refs.write().expect(MUTEX_POISONED);
775            refs.clear();
776        }
777        {
778            let mut subs = self.book_subscriptions.write().expect(MUTEX_POISONED);
779            subs.clear();
780        }
781        {
782            let mut buffers = self.book_buffers.write().expect(MUTEX_POISONED);
783            buffers.clear();
784        }
785
786        self.is_connected.store(false, Ordering::Relaxed);
787        self.cancellation_token = CancellationToken::new();
788        Ok(())
789    }
790
791    fn dispose(&mut self) -> anyhow::Result<()> {
792        log::debug!("Disposing {id}", id = self.client_id);
793        self.stop()
794    }
795
796    async fn connect(&mut self) -> anyhow::Result<()> {
797        if self.is_connected() {
798            return Ok(());
799        }
800
801        // Reinitialize token in case of reconnection after disconnect
802        self.cancellation_token = CancellationToken::new();
803
804        let instruments = self
805            .http_client
806            .request_instruments()
807            .await
808            .context("failed to request Binance Futures instruments")?;
809
810        {
811            let mut guard = self.instruments.write().expect(MUTEX_POISONED);
812            for instrument in &instruments {
813                guard.insert(instrument.id(), instrument.clone());
814            }
815        }
816
817        for instrument in instruments.clone() {
818            if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
819                log::warn!("Failed to send instrument: {e}");
820            }
821        }
822
823        self.ws_client.cache_instruments(instruments);
824
825        log::info!("Connecting to Binance Futures WebSocket...");
826        self.ws_client.connect().await.map_err(|e| {
827            log::error!("Binance Futures WebSocket connection failed: {e:?}");
828            anyhow::anyhow!("failed to connect Binance Futures WebSocket: {e}")
829        })?;
830        log::info!("Binance Futures WebSocket connected");
831
832        let stream = self.ws_client.stream();
833        let sender = self.data_sender.clone();
834        let insts = self.instruments.clone();
835        let buffers = self.book_buffers.clone();
836        let book_subs = self.book_subscriptions.clone();
837        let book_epoch = self.book_epoch.clone();
838        let http = self.http_client.clone();
839        let clock = self.clock;
840        let cancel = self.cancellation_token.clone();
841
842        let handle = get_runtime().spawn(async move {
843            pin_mut!(stream);
844            loop {
845                tokio::select! {
846                    Some(message) = stream.next() => {
847                        Self::handle_ws_message(
848                            message,
849                            &sender,
850                            &insts,
851                            &buffers,
852                            &book_subs,
853                            &book_epoch,
854                            &http,
855                            clock,
856                        );
857                    }
858                    () = cancel.cancelled() => {
859                        log::debug!("WebSocket stream task cancelled");
860                        break;
861                    }
862                }
863            }
864        });
865        self.tasks.push(handle);
866
867        self.is_connected.store(true, Ordering::Release);
868        log::info!("Connected: client_id={}", self.client_id);
869        Ok(())
870    }
871
872    async fn disconnect(&mut self) -> anyhow::Result<()> {
873        if self.is_disconnected() {
874            return Ok(());
875        }
876
877        self.cancellation_token.cancel();
878
879        let _ = self.ws_client.close().await;
880
881        let handles: Vec<_> = self.tasks.drain(..).collect();
882        for handle in handles {
883            if let Err(e) = handle.await {
884                log::error!("Error joining WebSocket task: {e}");
885            }
886        }
887
888        // Clear subscription state so resubscribes issue fresh WS subscribes
889        {
890            let mut refs = self.mark_price_refs.write().expect(MUTEX_POISONED);
891            refs.clear();
892        }
893        {
894            let mut subs = self.book_subscriptions.write().expect(MUTEX_POISONED);
895            subs.clear();
896        }
897        {
898            let mut buffers = self.book_buffers.write().expect(MUTEX_POISONED);
899            buffers.clear();
900        }
901
902        self.is_connected.store(false, Ordering::Release);
903        log::info!("Disconnected: client_id={}", self.client_id);
904        Ok(())
905    }
906
907    fn is_connected(&self) -> bool {
908        self.is_connected.load(Ordering::Relaxed)
909    }
910
911    fn is_disconnected(&self) -> bool {
912        !self.is_connected()
913    }
914
915    fn subscribe_instruments(&mut self, _cmd: &SubscribeInstruments) -> anyhow::Result<()> {
916        log::debug!(
917            "subscribe_instruments: Binance Futures instruments are fetched via HTTP on connect"
918        );
919        Ok(())
920    }
921
922    fn subscribe_instrument(&mut self, _cmd: &SubscribeInstrument) -> anyhow::Result<()> {
923        log::debug!(
924            "subscribe_instrument: Binance Futures instruments are fetched via HTTP on connect"
925        );
926        Ok(())
927    }
928
929    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
930        if cmd.book_type != BookType::L2_MBP {
931            anyhow::bail!("Binance Futures only supports L2_MBP order book deltas");
932        }
933
934        let instrument_id = cmd.instrument_id;
935        let depth = cmd.depth.map_or(1000, |d| d.get() as u32);
936
937        if !BINANCE_BOOK_DEPTHS.contains(&depth) {
938            anyhow::bail!(
939                "Invalid depth {depth} for Binance Futures order book. \
940                Valid values: {BINANCE_BOOK_DEPTHS:?}"
941            );
942        }
943
944        // Track subscription for reconnect handling
945        {
946            let mut subs = self.book_subscriptions.write().expect(MUTEX_POISONED);
947            subs.insert(instrument_id, depth);
948        }
949
950        // Bump epoch to invalidate any in-flight snapshot from a prior subscription
951        let epoch = {
952            let mut guard = self.book_epoch.write().expect(MUTEX_POISONED);
953            *guard = guard.wrapping_add(1);
954            *guard
955        };
956
957        // Start buffering deltas for this instrument
958        {
959            let mut buffers = self.book_buffers.write().expect(MUTEX_POISONED);
960            buffers.insert(instrument_id, BookBuffer::new(epoch));
961        }
962
963        log::info!("OrderBook snapshot rebuild for {instrument_id} @ depth {depth} starting");
964
965        // Subscribe to WebSocket depth stream (0ms = unthrottled for Futures)
966        let ws = self.ws_client.clone();
967        let stream = format!("{}@depth@0ms", format_binance_stream_symbol(&instrument_id));
968
969        self.spawn_ws(
970            async move {
971                ws.subscribe(vec![stream])
972                    .await
973                    .context("book deltas subscription")
974            },
975            "order book subscription",
976        );
977
978        // Spawn task to fetch HTTP snapshot and replay buffered deltas
979        let http = self.http_client.clone();
980        let sender = self.data_sender.clone();
981        let buffers = self.book_buffers.clone();
982        let instruments = self.instruments.clone();
983        let clock = self.clock;
984
985        get_runtime().spawn(async move {
986            Self::fetch_and_emit_snapshot(
987                http,
988                sender,
989                buffers,
990                instruments,
991                instrument_id,
992                depth,
993                epoch,
994                clock,
995            )
996            .await;
997        });
998
999        Ok(())
1000    }
1001
1002    fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
1003        let instrument_id = cmd.instrument_id;
1004        let ws = self.ws_client.clone();
1005
1006        // Binance Futures uses bookTicker for best bid/ask
1007        let stream = format!(
1008            "{}@bookTicker",
1009            format_binance_stream_symbol(&instrument_id)
1010        );
1011
1012        self.spawn_ws(
1013            async move {
1014                ws.subscribe(vec![stream])
1015                    .await
1016                    .context("quotes subscription")
1017            },
1018            "quote subscription",
1019        );
1020        Ok(())
1021    }
1022
1023    fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
1024        let instrument_id = cmd.instrument_id;
1025        let ws = self.ws_client.clone();
1026
1027        // Binance Futures uses aggTrade for aggregate trades
1028        let stream = format!("{}@aggTrade", format_binance_stream_symbol(&instrument_id));
1029
1030        self.spawn_ws(
1031            async move {
1032                ws.subscribe(vec![stream])
1033                    .await
1034                    .context("trades subscription")
1035            },
1036            "trade subscription",
1037        );
1038        Ok(())
1039    }
1040
1041    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
1042        let bar_type = cmd.bar_type;
1043        let ws = self.ws_client.clone();
1044        let interval = bar_spec_to_binance_interval(bar_type.spec())?;
1045
1046        let stream = format!(
1047            "{}@kline_{}",
1048            format_binance_stream_symbol(&bar_type.instrument_id()),
1049            interval.as_str()
1050        );
1051
1052        self.spawn_ws(
1053            async move {
1054                ws.subscribe(vec![stream])
1055                    .await
1056                    .context("bars subscription")
1057            },
1058            "bar subscription",
1059        );
1060        Ok(())
1061    }
1062
1063    fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
1064        let instrument_id = cmd.instrument_id;
1065
1066        // Mark/index/funding share the same stream - use ref counting
1067        let should_subscribe = {
1068            let mut refs = self.mark_price_refs.write().expect(MUTEX_POISONED);
1069            let count = refs.entry(instrument_id).or_insert(0);
1070            *count += 1;
1071            *count == 1
1072        };
1073
1074        if should_subscribe {
1075            let ws = self.ws_client.clone();
1076            let stream = format!(
1077                "{}@markPrice@1s",
1078                format_binance_stream_symbol(&instrument_id)
1079            );
1080
1081            self.spawn_ws(
1082                async move {
1083                    ws.subscribe(vec![stream])
1084                        .await
1085                        .context("mark prices subscription")
1086                },
1087                "mark prices subscription",
1088            );
1089        }
1090        Ok(())
1091    }
1092
1093    fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
1094        let instrument_id = cmd.instrument_id;
1095
1096        // Mark/index/funding share the same stream - use ref counting
1097        let should_subscribe = {
1098            let mut refs = self.mark_price_refs.write().expect(MUTEX_POISONED);
1099            let count = refs.entry(instrument_id).or_insert(0);
1100            *count += 1;
1101            *count == 1
1102        };
1103
1104        if should_subscribe {
1105            let ws = self.ws_client.clone();
1106            let stream = format!(
1107                "{}@markPrice@1s",
1108                format_binance_stream_symbol(&instrument_id)
1109            );
1110
1111            self.spawn_ws(
1112                async move {
1113                    ws.subscribe(vec![stream])
1114                        .await
1115                        .context("index prices subscription")
1116                },
1117                "index prices subscription",
1118            );
1119        }
1120        Ok(())
1121    }
1122
1123    fn subscribe_funding_rates(&mut self, _cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
1124        // FundingRateUpdate is not a variant of the Data enum, so we cannot emit funding rates
1125        // through the standard data channel. This requires custom data handling.
1126        anyhow::bail!(
1127            "Funding rate subscriptions are not yet supported for Binance Futures. \
1128            The Data enum does not have a FundingRateUpdate variant."
1129        )
1130    }
1131
1132    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
1133        let instrument_id = cmd.instrument_id;
1134        let ws = self.ws_client.clone();
1135
1136        // Remove subscription tracking
1137        {
1138            let mut subs = self.book_subscriptions.write().expect(MUTEX_POISONED);
1139            subs.remove(&instrument_id);
1140        }
1141
1142        // Remove buffer to prevent snapshot task from emitting after unsubscribe
1143        {
1144            let mut buffers = self.book_buffers.write().expect(MUTEX_POISONED);
1145            buffers.remove(&instrument_id);
1146        }
1147
1148        let symbol_lower = format_binance_stream_symbol(&instrument_id);
1149        let streams = vec![
1150            format!("{symbol_lower}@depth"),
1151            format!("{symbol_lower}@depth@0ms"),
1152            format!("{symbol_lower}@depth@100ms"),
1153            format!("{symbol_lower}@depth@250ms"),
1154            format!("{symbol_lower}@depth@500ms"),
1155        ];
1156
1157        self.spawn_ws(
1158            async move {
1159                ws.unsubscribe(streams)
1160                    .await
1161                    .context("book deltas unsubscribe")
1162            },
1163            "order book unsubscribe",
1164        );
1165        Ok(())
1166    }
1167
1168    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
1169        let instrument_id = cmd.instrument_id;
1170        let ws = self.ws_client.clone();
1171
1172        let stream = format!(
1173            "{}@bookTicker",
1174            format_binance_stream_symbol(&instrument_id)
1175        );
1176
1177        self.spawn_ws(
1178            async move {
1179                ws.unsubscribe(vec![stream])
1180                    .await
1181                    .context("quotes unsubscribe")
1182            },
1183            "quote unsubscribe",
1184        );
1185        Ok(())
1186    }
1187
1188    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
1189        let instrument_id = cmd.instrument_id;
1190        let ws = self.ws_client.clone();
1191
1192        let stream = format!("{}@aggTrade", format_binance_stream_symbol(&instrument_id));
1193
1194        self.spawn_ws(
1195            async move {
1196                ws.unsubscribe(vec![stream])
1197                    .await
1198                    .context("trades unsubscribe")
1199            },
1200            "trade unsubscribe",
1201        );
1202        Ok(())
1203    }
1204
1205    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
1206        let bar_type = cmd.bar_type;
1207        let ws = self.ws_client.clone();
1208        let interval = bar_spec_to_binance_interval(bar_type.spec())?;
1209
1210        let stream = format!(
1211            "{}@kline_{}",
1212            format_binance_stream_symbol(&bar_type.instrument_id()),
1213            interval.as_str()
1214        );
1215
1216        self.spawn_ws(
1217            async move {
1218                ws.unsubscribe(vec![stream])
1219                    .await
1220                    .context("bars unsubscribe")
1221            },
1222            "bar unsubscribe",
1223        );
1224        Ok(())
1225    }
1226
1227    fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
1228        let instrument_id = cmd.instrument_id;
1229
1230        // Mark/index/funding share the same stream - use ref counting
1231        let should_unsubscribe = {
1232            let mut refs = self.mark_price_refs.write().expect(MUTEX_POISONED);
1233            if let Some(count) = refs.get_mut(&instrument_id) {
1234                *count = count.saturating_sub(1);
1235                if *count == 0 {
1236                    refs.remove(&instrument_id);
1237                    true
1238                } else {
1239                    false
1240                }
1241            } else {
1242                false
1243            }
1244        };
1245
1246        if should_unsubscribe {
1247            let ws = self.ws_client.clone();
1248            let symbol_lower = format_binance_stream_symbol(&instrument_id);
1249            let streams = vec![
1250                format!("{symbol_lower}@markPrice"),
1251                format!("{symbol_lower}@markPrice@1s"),
1252                format!("{symbol_lower}@markPrice@3s"),
1253            ];
1254
1255            self.spawn_ws(
1256                async move {
1257                    ws.unsubscribe(streams)
1258                        .await
1259                        .context("mark prices unsubscribe")
1260                },
1261                "mark prices unsubscribe",
1262            );
1263        }
1264        Ok(())
1265    }
1266
1267    fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
1268        let instrument_id = cmd.instrument_id;
1269
1270        // Mark/index/funding share the same stream - use ref counting
1271        let should_unsubscribe = {
1272            let mut refs = self.mark_price_refs.write().expect(MUTEX_POISONED);
1273            if let Some(count) = refs.get_mut(&instrument_id) {
1274                *count = count.saturating_sub(1);
1275                if *count == 0 {
1276                    refs.remove(&instrument_id);
1277                    true
1278                } else {
1279                    false
1280                }
1281            } else {
1282                false
1283            }
1284        };
1285
1286        if should_unsubscribe {
1287            let ws = self.ws_client.clone();
1288            let symbol_lower = format_binance_stream_symbol(&instrument_id);
1289            let streams = vec![
1290                format!("{symbol_lower}@markPrice"),
1291                format!("{symbol_lower}@markPrice@1s"),
1292                format!("{symbol_lower}@markPrice@3s"),
1293            ];
1294
1295            self.spawn_ws(
1296                async move {
1297                    ws.unsubscribe(streams)
1298                        .await
1299                        .context("index prices unsubscribe")
1300                },
1301                "index prices unsubscribe",
1302            );
1303        }
1304        Ok(())
1305    }
1306
1307    fn unsubscribe_funding_rates(&mut self, _cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
1308        // Funding rate subscriptions are not supported (see subscribe_funding_rates)
1309        Ok(())
1310    }
1311
1312    fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
1313        let http = self.http_client.clone();
1314        let sender = self.data_sender.clone();
1315        let instruments_cache = self.instruments.clone();
1316        let request_id = request.request_id;
1317        let client_id = request.client_id.unwrap_or(self.client_id);
1318        let venue = self.venue();
1319        let start = request.start;
1320        let end = request.end;
1321        let params = request.params;
1322        let clock = self.clock;
1323        let start_nanos = datetime_to_unix_nanos(start);
1324        let end_nanos = datetime_to_unix_nanos(end);
1325
1326        get_runtime().spawn(async move {
1327            match http.request_instruments().await {
1328                Ok(instruments) => {
1329                    for instrument in &instruments {
1330                        upsert_instrument(&instruments_cache, instrument.clone());
1331                    }
1332
1333                    let response = DataResponse::Instruments(InstrumentsResponse::new(
1334                        request_id,
1335                        client_id,
1336                        venue,
1337                        instruments,
1338                        start_nanos,
1339                        end_nanos,
1340                        clock.get_time_ns(),
1341                        params,
1342                    ));
1343
1344                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1345                        log::error!("Failed to send instruments response: {e}");
1346                    }
1347                }
1348                Err(e) => log::error!("Instruments request failed: {e:?}"),
1349            }
1350        });
1351
1352        Ok(())
1353    }
1354
1355    fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
1356        let http = self.http_client.clone();
1357        let sender = self.data_sender.clone();
1358        let instruments = self.instruments.clone();
1359        let instrument_id = request.instrument_id;
1360        let request_id = request.request_id;
1361        let client_id = request.client_id.unwrap_or(self.client_id);
1362        let start = request.start;
1363        let end = request.end;
1364        let params = request.params;
1365        let clock = self.clock;
1366        let start_nanos = datetime_to_unix_nanos(start);
1367        let end_nanos = datetime_to_unix_nanos(end);
1368
1369        get_runtime().spawn(async move {
1370            {
1371                let guard = instruments.read().expect(MUTEX_POISONED);
1372                if let Some(instrument) = guard.get(&instrument_id) {
1373                    let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
1374                        request_id,
1375                        client_id,
1376                        instrument.id(),
1377                        instrument.clone(),
1378                        start_nanos,
1379                        end_nanos,
1380                        clock.get_time_ns(),
1381                        params,
1382                    )));
1383
1384                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1385                        log::error!("Failed to send instrument response: {e}");
1386                    }
1387                    return;
1388                }
1389            }
1390
1391            match http.request_instruments().await {
1392                Ok(all_instruments) => {
1393                    for instrument in &all_instruments {
1394                        upsert_instrument(&instruments, instrument.clone());
1395                    }
1396
1397                    let instrument = all_instruments
1398                        .into_iter()
1399                        .find(|i| i.id() == instrument_id);
1400
1401                    if let Some(instrument) = instrument {
1402                        let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
1403                            request_id,
1404                            client_id,
1405                            instrument.id(),
1406                            instrument,
1407                            start_nanos,
1408                            end_nanos,
1409                            clock.get_time_ns(),
1410                            params,
1411                        )));
1412
1413                        if let Err(e) = sender.send(DataEvent::Response(response)) {
1414                            log::error!("Failed to send instrument response: {e}");
1415                        }
1416                    } else {
1417                        log::error!("Instrument not found: {instrument_id}");
1418                    }
1419                }
1420                Err(e) => log::error!("Instrument request failed: {e:?}"),
1421            }
1422        });
1423
1424        Ok(())
1425    }
1426
1427    fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
1428        let http = self.http_client.clone();
1429        let sender = self.data_sender.clone();
1430        let instrument_id = request.instrument_id;
1431        let limit = request.limit.map(|n| n.get() as u32);
1432        let request_id = request.request_id;
1433        let client_id = request.client_id.unwrap_or(self.client_id);
1434        let params = request.params;
1435        let clock = self.clock;
1436        let start_nanos = datetime_to_unix_nanos(request.start);
1437        let end_nanos = datetime_to_unix_nanos(request.end);
1438
1439        get_runtime().spawn(async move {
1440            match http
1441                .request_trades(instrument_id, limit)
1442                .await
1443                .context("failed to request trades from Binance Futures")
1444            {
1445                Ok(trades) => {
1446                    let response = DataResponse::Trades(TradesResponse::new(
1447                        request_id,
1448                        client_id,
1449                        instrument_id,
1450                        trades,
1451                        start_nanos,
1452                        end_nanos,
1453                        clock.get_time_ns(),
1454                        params,
1455                    ));
1456                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1457                        log::error!("Failed to send trades response: {e}");
1458                    }
1459                }
1460                Err(e) => log::error!("Trade request failed: {e:?}"),
1461            }
1462        });
1463
1464        Ok(())
1465    }
1466
1467    fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
1468        let http = self.http_client.clone();
1469        let sender = self.data_sender.clone();
1470        let bar_type = request.bar_type;
1471        let start = request.start;
1472        let end = request.end;
1473        let limit = request.limit.map(|n| n.get() as u32);
1474        let request_id = request.request_id;
1475        let client_id = request.client_id.unwrap_or(self.client_id);
1476        let params = request.params;
1477        let clock = self.clock;
1478        let start_nanos = datetime_to_unix_nanos(start);
1479        let end_nanos = datetime_to_unix_nanos(end);
1480
1481        get_runtime().spawn(async move {
1482            match http
1483                .request_bars(bar_type, start, end, limit)
1484                .await
1485                .context("failed to request bars from Binance Futures")
1486            {
1487                Ok(bars) => {
1488                    let response = DataResponse::Bars(BarsResponse::new(
1489                        request_id,
1490                        client_id,
1491                        bar_type,
1492                        bars,
1493                        start_nanos,
1494                        end_nanos,
1495                        clock.get_time_ns(),
1496                        params,
1497                    ));
1498                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1499                        log::error!("Failed to send bars response: {e}");
1500                    }
1501                }
1502                Err(e) => log::error!("Bar request failed: {e:?}"),
1503            }
1504        });
1505
1506        Ok(())
1507    }
1508}