Skip to main content

nautilus_binance/spot/
data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live market data client implementation for the Binance Spot adapter.
17
18use std::sync::{
19    Arc, RwLock,
20    atomic::{AtomicBool, Ordering},
21};
22
23use ahash::AHashMap;
24use anyhow::Context;
25use futures_util::{StreamExt, pin_mut};
26use nautilus_common::{
27    clients::DataClient,
28    live::{runner::get_data_event_sender, runtime::get_runtime},
29    messages::{
30        DataEvent,
31        data::{
32            BarsResponse, DataResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
33            RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
34            SubscribeBookDeltas, SubscribeInstrument, SubscribeInstruments, SubscribeQuotes,
35            SubscribeTrades, TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas,
36            UnsubscribeQuotes, UnsubscribeTrades,
37        },
38    },
39};
40use nautilus_core::{
41    MUTEX_POISONED,
42    datetime::datetime_to_unix_nanos,
43    time::{AtomicTime, get_atomic_clock_realtime},
44};
45use nautilus_model::{
46    data::{Data, OrderBookDeltas_API},
47    enums::BookType,
48    identifiers::{ClientId, InstrumentId, Venue},
49    instruments::{Instrument, InstrumentAny},
50};
51use tokio::task::JoinHandle;
52use tokio_util::sync::CancellationToken;
53
54use crate::{
55    common::{
56        consts::BINANCE_VENUE, credential::resolve_credentials, enums::BinanceProductType,
57        parse::bar_spec_to_binance_interval,
58    },
59    config::BinanceDataClientConfig,
60    spot::{
61        http::client::BinanceSpotHttpClient,
62        websocket::streams::{
63            client::BinanceSpotWebSocketClient,
64            messages::{BinanceSpotWsMessage, NautilusSpotDataWsMessage},
65        },
66    },
67};
68
69/// Binance Spot data client for SBE market data.
70#[derive(Debug)]
71pub struct BinanceSpotDataClient {
72    clock: &'static AtomicTime,
73    client_id: ClientId,
74    config: BinanceDataClientConfig,
75    http_client: BinanceSpotHttpClient,
76    ws_client: BinanceSpotWebSocketClient,
77    is_connected: AtomicBool,
78    cancellation_token: CancellationToken,
79    tasks: Vec<JoinHandle<()>>,
80    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
81    instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
82}
83
84impl BinanceSpotDataClient {
85    /// Creates a new [`BinanceSpotDataClient`] instance.
86    ///
87    /// # Errors
88    ///
89    /// Returns an error if the client fails to initialize.
90    pub fn new(client_id: ClientId, config: BinanceDataClientConfig) -> anyhow::Result<Self> {
91        let http_client = BinanceSpotHttpClient::new(
92            config.environment,
93            config.api_key.clone(),
94            config.api_secret.clone(),
95            config.base_url_http.clone(),
96            None, // recv_window
97            None, // timeout_secs
98            None, // proxy_url
99        )?;
100
101        let product_type = config
102            .product_types
103            .first()
104            .copied()
105            .unwrap_or(BinanceProductType::Spot);
106
107        let creds = resolve_credentials(
108            config.api_key.clone(),
109            config.api_secret.clone(),
110            config.environment,
111            product_type,
112        )
113        .ok();
114
115        // SBE streams require Ed25519 authentication
116        let ws_client = BinanceSpotWebSocketClient::new(
117            config.base_url_ws.clone(),
118            creds.as_ref().map(|(k, _)| k.clone()),
119            creds.as_ref().map(|(_, s)| s.clone()),
120            Some(20), // Heartbeat interval
121        )?;
122
123        let clock = get_atomic_clock_realtime();
124        let data_sender = get_data_event_sender();
125
126        Ok(Self {
127            clock,
128            client_id,
129            config,
130            http_client,
131            ws_client,
132            is_connected: AtomicBool::new(false),
133            cancellation_token: CancellationToken::new(),
134            tasks: Vec::new(),
135            data_sender,
136            instruments: Arc::new(RwLock::new(AHashMap::new())),
137        })
138    }
139
140    fn venue(&self) -> Venue {
141        *BINANCE_VENUE
142    }
143
144    fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
145        if let Err(e) = sender.send(DataEvent::Data(data)) {
146            log::error!("Failed to emit data event: {e}");
147        }
148    }
149
150    fn spawn_ws<F>(&self, fut: F, context: &'static str)
151    where
152        F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
153    {
154        get_runtime().spawn(async move {
155            if let Err(e) = fut.await {
156                log::error!("{context}: {e:?}");
157            }
158        });
159    }
160
161    fn handle_ws_message(
162        msg: BinanceSpotWsMessage,
163        data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
164        instruments: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
165    ) {
166        match msg {
167            BinanceSpotWsMessage::Data(data_msg) => match data_msg {
168                NautilusSpotDataWsMessage::Data(payloads) => {
169                    for data in payloads {
170                        Self::send_data(data_sender, data);
171                    }
172                }
173                NautilusSpotDataWsMessage::Deltas(deltas) => {
174                    Self::send_data(data_sender, Data::Deltas(OrderBookDeltas_API::new(deltas)));
175                }
176                NautilusSpotDataWsMessage::Instrument(instrument) => {
177                    upsert_instrument(instruments, *instrument);
178                }
179                NautilusSpotDataWsMessage::RawBinary(data) => {
180                    log::debug!("Unhandled binary message: {} bytes", data.len());
181                }
182                NautilusSpotDataWsMessage::RawJson(value) => {
183                    log::debug!("Unhandled JSON message: {value:?}");
184                }
185            },
186            BinanceSpotWsMessage::Error(e) => {
187                log::error!("Binance WebSocket error: code={}, msg={}", e.code, e.msg);
188            }
189            BinanceSpotWsMessage::Reconnected => {
190                log::info!("WebSocket reconnected");
191            }
192        }
193    }
194}
195
196fn upsert_instrument(
197    cache: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
198    instrument: InstrumentAny,
199) {
200    let mut guard = cache.write().expect(MUTEX_POISONED);
201    guard.insert(instrument.id(), instrument);
202}
203
204#[async_trait::async_trait(?Send)]
205impl DataClient for BinanceSpotDataClient {
206    fn client_id(&self) -> ClientId {
207        self.client_id
208    }
209
210    fn venue(&self) -> Option<Venue> {
211        Some(self.venue())
212    }
213
214    fn start(&mut self) -> anyhow::Result<()> {
215        log::info!(
216            "Started: client_id={}, product_types={:?}, environment={:?}",
217            self.client_id,
218            self.config.product_types,
219            self.config.environment,
220        );
221        Ok(())
222    }
223
224    fn stop(&mut self) -> anyhow::Result<()> {
225        log::info!("Stopping {id}", id = self.client_id);
226        self.cancellation_token.cancel();
227        self.is_connected.store(false, Ordering::Relaxed);
228        Ok(())
229    }
230
231    fn reset(&mut self) -> anyhow::Result<()> {
232        log::debug!("Resetting {id}", id = self.client_id);
233
234        self.cancellation_token.cancel();
235
236        for task in self.tasks.drain(..) {
237            task.abort();
238        }
239
240        let mut ws = self.ws_client.clone();
241        get_runtime().spawn(async move {
242            let _ = ws.close().await;
243        });
244
245        self.is_connected.store(false, Ordering::Relaxed);
246        self.cancellation_token = CancellationToken::new();
247        Ok(())
248    }
249
250    fn dispose(&mut self) -> anyhow::Result<()> {
251        log::debug!("Disposing {id}", id = self.client_id);
252        self.stop()
253    }
254
255    async fn connect(&mut self) -> anyhow::Result<()> {
256        if self.is_connected() {
257            return Ok(());
258        }
259
260        // Reinitialize token in case of reconnection after disconnect
261        self.cancellation_token = CancellationToken::new();
262
263        let instruments = self
264            .http_client
265            .request_instruments()
266            .await
267            .context("failed to request Binance instruments")?;
268
269        self.http_client.cache_instruments(instruments.clone());
270
271        {
272            let mut guard = self.instruments.write().expect(MUTEX_POISONED);
273            for instrument in &instruments {
274                guard.insert(instrument.id(), instrument.clone());
275            }
276        }
277
278        for instrument in instruments.clone() {
279            if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
280                log::warn!("Failed to send instrument: {e}");
281            }
282        }
283
284        self.ws_client.cache_instruments(instruments);
285
286        log::info!("Connecting to Binance SBE WebSocket...");
287        self.ws_client.connect().await.map_err(|e| {
288            log::error!("Binance WebSocket connection failed: {e:?}");
289            anyhow::anyhow!("failed to connect Binance WebSocket: {e}")
290        })?;
291        log::info!("Binance SBE WebSocket connected");
292
293        let stream = self.ws_client.stream();
294        let sender = self.data_sender.clone();
295        let insts = self.instruments.clone();
296        let cancel = self.cancellation_token.clone();
297
298        let handle = get_runtime().spawn(async move {
299            pin_mut!(stream);
300            loop {
301                tokio::select! {
302                    Some(message) = stream.next() => {
303                        Self::handle_ws_message(message, &sender, &insts);
304                    }
305                    () = cancel.cancelled() => {
306                        log::debug!("WebSocket stream task cancelled");
307                        break;
308                    }
309                }
310            }
311        });
312        self.tasks.push(handle);
313
314        self.is_connected.store(true, Ordering::Release);
315        log::info!("Connected: client_id={}", self.client_id);
316        Ok(())
317    }
318
319    async fn disconnect(&mut self) -> anyhow::Result<()> {
320        if self.is_disconnected() {
321            return Ok(());
322        }
323
324        self.cancellation_token.cancel();
325
326        let _ = self.ws_client.close().await;
327
328        let handles: Vec<_> = self.tasks.drain(..).collect();
329        for handle in handles {
330            if let Err(e) = handle.await {
331                log::error!("Error joining WebSocket task: {e}");
332            }
333        }
334
335        self.is_connected.store(false, Ordering::Release);
336        log::info!("Disconnected: client_id={}", self.client_id);
337        Ok(())
338    }
339
340    fn is_connected(&self) -> bool {
341        self.is_connected.load(Ordering::Relaxed)
342    }
343
344    fn is_disconnected(&self) -> bool {
345        !self.is_connected()
346    }
347
348    fn subscribe_instruments(&mut self, _cmd: &SubscribeInstruments) -> anyhow::Result<()> {
349        log::debug!("subscribe_instruments: Binance instruments are fetched via HTTP on connect");
350        Ok(())
351    }
352
353    fn subscribe_instrument(&mut self, _cmd: &SubscribeInstrument) -> anyhow::Result<()> {
354        log::debug!("subscribe_instrument: Binance instruments are fetched via HTTP on connect");
355        Ok(())
356    }
357
358    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
359        if cmd.book_type != BookType::L2_MBP {
360            anyhow::bail!("Binance SBE only supports L2_MBP order book deltas");
361        }
362
363        let instrument_id = cmd.instrument_id;
364        let ws = self.ws_client.clone();
365        let depth = cmd.depth.map_or(20, |d| d.get());
366
367        // Binance SBE depth streams: depth5, depth10, depth20
368        let depth_level = match depth {
369            1..=5 => 5,
370            6..=10 => 10,
371            _ => 20,
372        };
373
374        let stream = format!(
375            "{}@depth{}",
376            instrument_id.symbol.as_str().to_lowercase(),
377            depth_level
378        );
379
380        self.spawn_ws(
381            async move {
382                ws.subscribe(vec![stream])
383                    .await
384                    .context("book deltas subscription")
385            },
386            "order book subscription",
387        );
388        Ok(())
389    }
390
391    fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
392        let instrument_id = cmd.instrument_id;
393        let ws = self.ws_client.clone();
394
395        let stream = format!(
396            "{}@bestBidAsk",
397            instrument_id.symbol.as_str().to_lowercase()
398        );
399
400        self.spawn_ws(
401            async move {
402                ws.subscribe(vec![stream])
403                    .await
404                    .context("quotes subscription")
405            },
406            "quote subscription",
407        );
408        Ok(())
409    }
410
411    fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
412        let instrument_id = cmd.instrument_id;
413        let ws = self.ws_client.clone();
414
415        let stream = format!("{}@trade", instrument_id.symbol.as_str().to_lowercase());
416
417        self.spawn_ws(
418            async move {
419                ws.subscribe(vec![stream])
420                    .await
421                    .context("trades subscription")
422            },
423            "trade subscription",
424        );
425        Ok(())
426    }
427
428    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
429        let bar_type = cmd.bar_type;
430        let ws = self.ws_client.clone();
431        let interval = bar_spec_to_binance_interval(bar_type.spec())?;
432
433        let stream = format!(
434            "{}@kline_{}",
435            bar_type.instrument_id().symbol.as_str().to_lowercase(),
436            interval.as_str()
437        );
438
439        self.spawn_ws(
440            async move {
441                ws.subscribe(vec![stream])
442                    .await
443                    .context("bars subscription")
444            },
445            "bar subscription",
446        );
447        Ok(())
448    }
449
450    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
451        let instrument_id = cmd.instrument_id;
452        let ws = self.ws_client.clone();
453
454        // Unsubscribe from all depth levels for this symbol
455        let symbol_lower = instrument_id.symbol.as_str().to_lowercase();
456        let streams = vec![
457            format!("{symbol_lower}@depth5"),
458            format!("{symbol_lower}@depth10"),
459            format!("{symbol_lower}@depth20"),
460        ];
461
462        self.spawn_ws(
463            async move {
464                ws.unsubscribe(streams)
465                    .await
466                    .context("book deltas unsubscribe")
467            },
468            "order book unsubscribe",
469        );
470        Ok(())
471    }
472
473    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
474        let instrument_id = cmd.instrument_id;
475        let ws = self.ws_client.clone();
476
477        let stream = format!(
478            "{}@bestBidAsk",
479            instrument_id.symbol.as_str().to_lowercase()
480        );
481
482        self.spawn_ws(
483            async move {
484                ws.unsubscribe(vec![stream])
485                    .await
486                    .context("quotes unsubscribe")
487            },
488            "quote unsubscribe",
489        );
490        Ok(())
491    }
492
493    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
494        let instrument_id = cmd.instrument_id;
495        let ws = self.ws_client.clone();
496
497        let stream = format!("{}@trade", instrument_id.symbol.as_str().to_lowercase());
498
499        self.spawn_ws(
500            async move {
501                ws.unsubscribe(vec![stream])
502                    .await
503                    .context("trades unsubscribe")
504            },
505            "trade unsubscribe",
506        );
507        Ok(())
508    }
509
510    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
511        let bar_type = cmd.bar_type;
512        let ws = self.ws_client.clone();
513        let interval = bar_spec_to_binance_interval(bar_type.spec())?;
514
515        let stream = format!(
516            "{}@kline_{}",
517            bar_type.instrument_id().symbol.as_str().to_lowercase(),
518            interval.as_str()
519        );
520
521        self.spawn_ws(
522            async move {
523                ws.unsubscribe(vec![stream])
524                    .await
525                    .context("bars unsubscribe")
526            },
527            "bar unsubscribe",
528        );
529        Ok(())
530    }
531
532    fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
533        let http = self.http_client.clone();
534        let sender = self.data_sender.clone();
535        let instruments_cache = self.instruments.clone();
536        let request_id = request.request_id;
537        let client_id = request.client_id.unwrap_or(self.client_id);
538        let venue = self.venue();
539        let start = request.start;
540        let end = request.end;
541        let params = request.params;
542        let clock = self.clock;
543        let start_nanos = datetime_to_unix_nanos(start);
544        let end_nanos = datetime_to_unix_nanos(end);
545
546        get_runtime().spawn(async move {
547            match http.request_instruments().await {
548                Ok(instruments) => {
549                    for instrument in &instruments {
550                        upsert_instrument(&instruments_cache, instrument.clone());
551                    }
552
553                    let response = DataResponse::Instruments(InstrumentsResponse::new(
554                        request_id,
555                        client_id,
556                        venue,
557                        instruments,
558                        start_nanos,
559                        end_nanos,
560                        clock.get_time_ns(),
561                        params,
562                    ));
563
564                    if let Err(e) = sender.send(DataEvent::Response(response)) {
565                        log::error!("Failed to send instruments response: {e}");
566                    }
567                }
568                Err(e) => log::error!("Instruments request failed: {e:?}"),
569            }
570        });
571
572        Ok(())
573    }
574
575    fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
576        let http = self.http_client.clone();
577        let sender = self.data_sender.clone();
578        let instruments = self.instruments.clone();
579        let instrument_id = request.instrument_id;
580        let request_id = request.request_id;
581        let client_id = request.client_id.unwrap_or(self.client_id);
582        let start = request.start;
583        let end = request.end;
584        let params = request.params;
585        let clock = self.clock;
586        let start_nanos = datetime_to_unix_nanos(start);
587        let end_nanos = datetime_to_unix_nanos(end);
588
589        get_runtime().spawn(async move {
590            {
591                let guard = instruments.read().expect(MUTEX_POISONED);
592                if let Some(instrument) = guard.get(&instrument_id) {
593                    let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
594                        request_id,
595                        client_id,
596                        instrument.id(),
597                        instrument.clone(),
598                        start_nanos,
599                        end_nanos,
600                        clock.get_time_ns(),
601                        params,
602                    )));
603
604                    if let Err(e) = sender.send(DataEvent::Response(response)) {
605                        log::error!("Failed to send instrument response: {e}");
606                    }
607                    return;
608                }
609            }
610
611            match http.request_instruments().await {
612                Ok(all_instruments) => {
613                    for instrument in &all_instruments {
614                        upsert_instrument(&instruments, instrument.clone());
615                    }
616
617                    let instrument = all_instruments
618                        .into_iter()
619                        .find(|i| i.id() == instrument_id);
620
621                    if let Some(instrument) = instrument {
622                        let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
623                            request_id,
624                            client_id,
625                            instrument.id(),
626                            instrument,
627                            start_nanos,
628                            end_nanos,
629                            clock.get_time_ns(),
630                            params,
631                        )));
632
633                        if let Err(e) = sender.send(DataEvent::Response(response)) {
634                            log::error!("Failed to send instrument response: {e}");
635                        }
636                    } else {
637                        log::error!("Instrument not found: {instrument_id}");
638                    }
639                }
640                Err(e) => log::error!("Instrument request failed: {e:?}"),
641            }
642        });
643
644        Ok(())
645    }
646
647    fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
648        let http = self.http_client.clone();
649        let sender = self.data_sender.clone();
650        let instrument_id = request.instrument_id;
651        let limit = request.limit.map(|n| n.get() as u32);
652        let request_id = request.request_id;
653        let client_id = request.client_id.unwrap_or(self.client_id);
654        let params = request.params;
655        let clock = self.clock;
656        let start_nanos = datetime_to_unix_nanos(request.start);
657        let end_nanos = datetime_to_unix_nanos(request.end);
658
659        get_runtime().spawn(async move {
660            match http
661                .request_trades(instrument_id, limit)
662                .await
663                .context("failed to request trades from Binance")
664            {
665                Ok(trades) => {
666                    let response = DataResponse::Trades(TradesResponse::new(
667                        request_id,
668                        client_id,
669                        instrument_id,
670                        trades,
671                        start_nanos,
672                        end_nanos,
673                        clock.get_time_ns(),
674                        params,
675                    ));
676                    if let Err(e) = sender.send(DataEvent::Response(response)) {
677                        log::error!("Failed to send trades response: {e}");
678                    }
679                }
680                Err(e) => log::error!("Trade request failed: {e:?}"),
681            }
682        });
683
684        Ok(())
685    }
686
687    fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
688        let http = self.http_client.clone();
689        let sender = self.data_sender.clone();
690        let bar_type = request.bar_type;
691        let start = request.start;
692        let end = request.end;
693        let limit = request.limit.map(|n| n.get() as u32);
694        let request_id = request.request_id;
695        let client_id = request.client_id.unwrap_or(self.client_id);
696        let params = request.params;
697        let clock = self.clock;
698        let start_nanos = datetime_to_unix_nanos(start);
699        let end_nanos = datetime_to_unix_nanos(end);
700
701        get_runtime().spawn(async move {
702            match http
703                .request_bars(bar_type, start, end, limit)
704                .await
705                .context("failed to request bars from Binance")
706            {
707                Ok(bars) => {
708                    let response = DataResponse::Bars(BarsResponse::new(
709                        request_id,
710                        client_id,
711                        bar_type,
712                        bars,
713                        start_nanos,
714                        end_nanos,
715                        clock.get_time_ns(),
716                        params,
717                    ));
718                    if let Err(e) = sender.send(DataEvent::Response(response)) {
719                        log::error!("Failed to send bars response: {e}");
720                    }
721                }
722                Err(e) => log::error!("Bar request failed: {e:?}"),
723            }
724        });
725
726        Ok(())
727    }
728}