nautilus_binance/data/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live market data client implementation for the Binance Spot adapter.
17
18use std::sync::{
19    Arc, RwLock,
20    atomic::{AtomicBool, Ordering},
21};
22
23use ahash::AHashMap;
24use anyhow::Context;
25use futures_util::{StreamExt, pin_mut};
26use nautilus_common::{
27    clients::DataClient,
28    live::{runner::get_data_event_sender, runtime::get_runtime},
29    messages::{
30        DataEvent,
31        data::{
32            BarsResponse, DataResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
33            RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
34            SubscribeBookDeltas, SubscribeInstrument, SubscribeInstruments, SubscribeQuotes,
35            SubscribeTrades, TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas,
36            UnsubscribeQuotes, UnsubscribeTrades,
37        },
38    },
39};
40use nautilus_core::{
41    MUTEX_POISONED,
42    datetime::datetime_to_unix_nanos,
43    time::{AtomicTime, get_atomic_clock_realtime},
44};
45use nautilus_model::{
46    data::{Data, OrderBookDeltas_API},
47    enums::BookType,
48    identifiers::{ClientId, InstrumentId, Venue},
49    instruments::{Instrument, InstrumentAny},
50};
51use tokio::task::JoinHandle;
52use tokio_util::sync::CancellationToken;
53
54use crate::{
55    common::{consts::BINANCE_VENUE, parse::bar_spec_to_binance_interval},
56    config::BinanceDataClientConfig,
57    spot::{
58        http::client::BinanceSpotHttpClient,
59        websocket::streams::{client::BinanceSpotWebSocketClient, messages::NautilusWsMessage},
60    },
61};
62
63/// Binance Spot data client for SBE market data.
64#[derive(Debug)]
65pub struct BinanceSpotDataClient {
66    client_id: ClientId,
67    config: BinanceDataClientConfig,
68    http_client: BinanceSpotHttpClient,
69    ws_client: BinanceSpotWebSocketClient,
70    is_connected: AtomicBool,
71    cancellation_token: CancellationToken,
72    tasks: Vec<JoinHandle<()>>,
73    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
74    instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
75    clock: &'static AtomicTime,
76}
77
78impl BinanceSpotDataClient {
79    /// Creates a new [`BinanceSpotDataClient`] instance.
80    ///
81    /// # Errors
82    ///
83    /// Returns an error if the client fails to initialize.
84    pub fn new(client_id: ClientId, config: BinanceDataClientConfig) -> anyhow::Result<Self> {
85        let clock = get_atomic_clock_realtime();
86        let data_sender = get_data_event_sender();
87
88        let http_client = BinanceSpotHttpClient::new(
89            config.environment,
90            config.api_key.clone(),
91            config.api_secret.clone(),
92            config.base_url_http.clone(),
93            None, // recv_window
94            None, // timeout_secs
95            None, // proxy_url
96        )?;
97
98        // SBE streams require Ed25519 authentication
99        let ws_client = BinanceSpotWebSocketClient::new(
100            config.base_url_ws.clone(),
101            config.ed25519_api_key.clone(),
102            config.ed25519_api_secret.clone(),
103            Some(20), // Heartbeat interval
104        )?;
105
106        Ok(Self {
107            client_id,
108            config,
109            http_client,
110            ws_client,
111            is_connected: AtomicBool::new(false),
112            cancellation_token: CancellationToken::new(),
113            tasks: Vec::new(),
114            data_sender,
115            instruments: Arc::new(RwLock::new(AHashMap::new())),
116            clock,
117        })
118    }
119
120    fn venue(&self) -> Venue {
121        *BINANCE_VENUE
122    }
123
124    fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
125        if let Err(e) = sender.send(DataEvent::Data(data)) {
126            log::error!("Failed to emit data event: {e}");
127        }
128    }
129
130    fn spawn_ws<F>(&self, fut: F, context: &'static str)
131    where
132        F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
133    {
134        get_runtime().spawn(async move {
135            if let Err(e) = fut.await {
136                log::error!("{context}: {e:?}");
137            }
138        });
139    }
140
141    fn handle_ws_message(
142        message: NautilusWsMessage,
143        data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
144        instruments: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
145    ) {
146        match message {
147            NautilusWsMessage::Data(payloads) => {
148                for data in payloads {
149                    Self::send_data(data_sender, data);
150                }
151            }
152            NautilusWsMessage::Deltas(deltas) => {
153                Self::send_data(data_sender, Data::Deltas(OrderBookDeltas_API::new(deltas)));
154            }
155            NautilusWsMessage::Instrument(instrument) => {
156                upsert_instrument(instruments, *instrument);
157            }
158            NautilusWsMessage::Error(e) => {
159                log::error!("Binance WebSocket error: code={}, msg={}", e.code, e.msg);
160            }
161            NautilusWsMessage::RawBinary(data) => {
162                log::debug!("Unhandled binary message: {} bytes", data.len());
163            }
164            NautilusWsMessage::RawJson(value) => {
165                log::debug!("Unhandled JSON message: {value:?}");
166            }
167            NautilusWsMessage::Reconnected => {
168                log::info!("WebSocket reconnected");
169            }
170        }
171    }
172}
173
174fn upsert_instrument(
175    cache: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
176    instrument: InstrumentAny,
177) {
178    let mut guard = cache.write().expect(MUTEX_POISONED);
179    guard.insert(instrument.id(), instrument);
180}
181
182#[async_trait::async_trait(?Send)]
183impl DataClient for BinanceSpotDataClient {
184    fn client_id(&self) -> ClientId {
185        self.client_id
186    }
187
188    fn venue(&self) -> Option<Venue> {
189        Some(self.venue())
190    }
191
192    fn start(&mut self) -> anyhow::Result<()> {
193        log::info!(
194            "Started: client_id={}, product_types={:?}, environment={:?}",
195            self.client_id,
196            self.config.product_types,
197            self.config.environment,
198        );
199        Ok(())
200    }
201
202    fn stop(&mut self) -> anyhow::Result<()> {
203        log::info!("Stopping {id}", id = self.client_id);
204        self.cancellation_token.cancel();
205        self.is_connected.store(false, Ordering::Relaxed);
206        Ok(())
207    }
208
209    fn reset(&mut self) -> anyhow::Result<()> {
210        log::debug!("Resetting {id}", id = self.client_id);
211
212        self.cancellation_token.cancel();
213
214        for task in self.tasks.drain(..) {
215            task.abort();
216        }
217
218        let mut ws = self.ws_client.clone();
219        get_runtime().spawn(async move {
220            let _ = ws.close().await;
221        });
222
223        self.is_connected.store(false, Ordering::Relaxed);
224        self.cancellation_token = CancellationToken::new();
225        Ok(())
226    }
227
228    fn dispose(&mut self) -> anyhow::Result<()> {
229        log::debug!("Disposing {id}", id = self.client_id);
230        self.stop()
231    }
232
233    async fn connect(&mut self) -> anyhow::Result<()> {
234        if self.is_connected() {
235            return Ok(());
236        }
237
238        // Reinitialize token in case of reconnection after disconnect
239        self.cancellation_token = CancellationToken::new();
240
241        let instruments = self
242            .http_client
243            .request_instruments()
244            .await
245            .context("failed to request Binance instruments")?;
246
247        self.http_client.cache_instruments(instruments.clone());
248
249        {
250            let mut guard = self.instruments.write().expect(MUTEX_POISONED);
251            for instrument in &instruments {
252                guard.insert(instrument.id(), instrument.clone());
253            }
254        }
255
256        for instrument in instruments.clone() {
257            if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
258                log::warn!("Failed to send instrument: {e}");
259            }
260        }
261
262        self.ws_client.cache_instruments(instruments);
263
264        log::info!("Connecting to Binance SBE WebSocket...");
265        self.ws_client.connect().await.map_err(|e| {
266            log::error!("Binance WebSocket connection failed: {e:?}");
267            anyhow::anyhow!("failed to connect Binance WebSocket: {e}")
268        })?;
269        log::info!("Binance SBE WebSocket connected");
270
271        let stream = self.ws_client.stream();
272        let sender = self.data_sender.clone();
273        let insts = self.instruments.clone();
274        let cancel = self.cancellation_token.clone();
275
276        let handle = get_runtime().spawn(async move {
277            pin_mut!(stream);
278            loop {
279                tokio::select! {
280                    Some(message) = stream.next() => {
281                        Self::handle_ws_message(message, &sender, &insts);
282                    }
283                    () = cancel.cancelled() => {
284                        log::debug!("WebSocket stream task cancelled");
285                        break;
286                    }
287                }
288            }
289        });
290        self.tasks.push(handle);
291
292        self.is_connected.store(true, Ordering::Release);
293        log::info!("Connected: client_id={}", self.client_id);
294        Ok(())
295    }
296
297    async fn disconnect(&mut self) -> anyhow::Result<()> {
298        if self.is_disconnected() {
299            return Ok(());
300        }
301
302        self.cancellation_token.cancel();
303
304        let _ = self.ws_client.close().await;
305
306        let handles: Vec<_> = self.tasks.drain(..).collect();
307        for handle in handles {
308            if let Err(e) = handle.await {
309                log::error!("Error joining WebSocket task: {e}");
310            }
311        }
312
313        self.is_connected.store(false, Ordering::Release);
314        log::info!("Disconnected: client_id={}", self.client_id);
315        Ok(())
316    }
317
318    fn is_connected(&self) -> bool {
319        self.is_connected.load(Ordering::Relaxed)
320    }
321
322    fn is_disconnected(&self) -> bool {
323        !self.is_connected()
324    }
325
326    fn subscribe_instruments(&mut self, _cmd: &SubscribeInstruments) -> anyhow::Result<()> {
327        log::debug!("subscribe_instruments: Binance instruments are fetched via HTTP on connect");
328        Ok(())
329    }
330
331    fn subscribe_instrument(&mut self, _cmd: &SubscribeInstrument) -> anyhow::Result<()> {
332        log::debug!("subscribe_instrument: Binance instruments are fetched via HTTP on connect");
333        Ok(())
334    }
335
336    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
337        if cmd.book_type != BookType::L2_MBP {
338            anyhow::bail!("Binance SBE only supports L2_MBP order book deltas");
339        }
340
341        let instrument_id = cmd.instrument_id;
342        let ws = self.ws_client.clone();
343        let depth = cmd.depth.map_or(20, |d| d.get());
344
345        // Binance SBE depth streams: depth5, depth10, depth20
346        let depth_level = match depth {
347            1..=5 => 5,
348            6..=10 => 10,
349            _ => 20,
350        };
351
352        let stream = format!(
353            "{}@depth{}",
354            instrument_id.symbol.as_str().to_lowercase(),
355            depth_level
356        );
357
358        self.spawn_ws(
359            async move {
360                ws.subscribe(vec![stream])
361                    .await
362                    .context("book deltas subscription")
363            },
364            "order book subscription",
365        );
366        Ok(())
367    }
368
369    fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
370        let instrument_id = cmd.instrument_id;
371        let ws = self.ws_client.clone();
372
373        let stream = format!(
374            "{}@bestBidAsk",
375            instrument_id.symbol.as_str().to_lowercase()
376        );
377
378        self.spawn_ws(
379            async move {
380                ws.subscribe(vec![stream])
381                    .await
382                    .context("quotes subscription")
383            },
384            "quote subscription",
385        );
386        Ok(())
387    }
388
389    fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
390        let instrument_id = cmd.instrument_id;
391        let ws = self.ws_client.clone();
392
393        let stream = format!("{}@trade", instrument_id.symbol.as_str().to_lowercase());
394
395        self.spawn_ws(
396            async move {
397                ws.subscribe(vec![stream])
398                    .await
399                    .context("trades subscription")
400            },
401            "trade subscription",
402        );
403        Ok(())
404    }
405
406    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
407        let bar_type = cmd.bar_type;
408        let ws = self.ws_client.clone();
409        let interval = bar_spec_to_binance_interval(bar_type.spec())?;
410
411        let stream = format!(
412            "{}@kline_{}",
413            bar_type.instrument_id().symbol.as_str().to_lowercase(),
414            interval.as_str()
415        );
416
417        self.spawn_ws(
418            async move {
419                ws.subscribe(vec![stream])
420                    .await
421                    .context("bars subscription")
422            },
423            "bar subscription",
424        );
425        Ok(())
426    }
427
428    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
429        let instrument_id = cmd.instrument_id;
430        let ws = self.ws_client.clone();
431
432        // Unsubscribe from all depth levels for this symbol
433        let symbol_lower = instrument_id.symbol.as_str().to_lowercase();
434        let streams = vec![
435            format!("{symbol_lower}@depth5"),
436            format!("{symbol_lower}@depth10"),
437            format!("{symbol_lower}@depth20"),
438        ];
439
440        self.spawn_ws(
441            async move {
442                ws.unsubscribe(streams)
443                    .await
444                    .context("book deltas unsubscribe")
445            },
446            "order book unsubscribe",
447        );
448        Ok(())
449    }
450
451    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
452        let instrument_id = cmd.instrument_id;
453        let ws = self.ws_client.clone();
454
455        let stream = format!(
456            "{}@bestBidAsk",
457            instrument_id.symbol.as_str().to_lowercase()
458        );
459
460        self.spawn_ws(
461            async move {
462                ws.unsubscribe(vec![stream])
463                    .await
464                    .context("quotes unsubscribe")
465            },
466            "quote unsubscribe",
467        );
468        Ok(())
469    }
470
471    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
472        let instrument_id = cmd.instrument_id;
473        let ws = self.ws_client.clone();
474
475        let stream = format!("{}@trade", instrument_id.symbol.as_str().to_lowercase());
476
477        self.spawn_ws(
478            async move {
479                ws.unsubscribe(vec![stream])
480                    .await
481                    .context("trades unsubscribe")
482            },
483            "trade unsubscribe",
484        );
485        Ok(())
486    }
487
488    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
489        let bar_type = cmd.bar_type;
490        let ws = self.ws_client.clone();
491        let interval = bar_spec_to_binance_interval(bar_type.spec())?;
492
493        let stream = format!(
494            "{}@kline_{}",
495            bar_type.instrument_id().symbol.as_str().to_lowercase(),
496            interval.as_str()
497        );
498
499        self.spawn_ws(
500            async move {
501                ws.unsubscribe(vec![stream])
502                    .await
503                    .context("bars unsubscribe")
504            },
505            "bar unsubscribe",
506        );
507        Ok(())
508    }
509
510    fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
511        let http = self.http_client.clone();
512        let sender = self.data_sender.clone();
513        let instruments_cache = self.instruments.clone();
514        let request_id = request.request_id;
515        let client_id = request.client_id.unwrap_or(self.client_id);
516        let venue = self.venue();
517        let start = request.start;
518        let end = request.end;
519        let params = request.params.clone();
520        let clock = self.clock;
521        let start_nanos = datetime_to_unix_nanos(start);
522        let end_nanos = datetime_to_unix_nanos(end);
523
524        get_runtime().spawn(async move {
525            match http.request_instruments().await {
526                Ok(instruments) => {
527                    for instrument in &instruments {
528                        upsert_instrument(&instruments_cache, instrument.clone());
529                    }
530
531                    let response = DataResponse::Instruments(InstrumentsResponse::new(
532                        request_id,
533                        client_id,
534                        venue,
535                        instruments,
536                        start_nanos,
537                        end_nanos,
538                        clock.get_time_ns(),
539                        params,
540                    ));
541
542                    if let Err(e) = sender.send(DataEvent::Response(response)) {
543                        log::error!("Failed to send instruments response: {e}");
544                    }
545                }
546                Err(e) => log::error!("Instruments request failed: {e:?}"),
547            }
548        });
549
550        Ok(())
551    }
552
553    fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
554        let http = self.http_client.clone();
555        let sender = self.data_sender.clone();
556        let instruments = self.instruments.clone();
557        let instrument_id = request.instrument_id;
558        let request_id = request.request_id;
559        let client_id = request.client_id.unwrap_or(self.client_id);
560        let start = request.start;
561        let end = request.end;
562        let params = request.params.clone();
563        let clock = self.clock;
564        let start_nanos = datetime_to_unix_nanos(start);
565        let end_nanos = datetime_to_unix_nanos(end);
566
567        get_runtime().spawn(async move {
568            {
569                let guard = instruments.read().expect(MUTEX_POISONED);
570                if let Some(instrument) = guard.get(&instrument_id) {
571                    let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
572                        request_id,
573                        client_id,
574                        instrument.id(),
575                        instrument.clone(),
576                        start_nanos,
577                        end_nanos,
578                        clock.get_time_ns(),
579                        params,
580                    )));
581
582                    if let Err(e) = sender.send(DataEvent::Response(response)) {
583                        log::error!("Failed to send instrument response: {e}");
584                    }
585                    return;
586                }
587            }
588
589            match http.request_instruments().await {
590                Ok(all_instruments) => {
591                    for instrument in &all_instruments {
592                        upsert_instrument(&instruments, instrument.clone());
593                    }
594
595                    let instrument = all_instruments
596                        .into_iter()
597                        .find(|i| i.id() == instrument_id);
598
599                    if let Some(instrument) = instrument {
600                        let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
601                            request_id,
602                            client_id,
603                            instrument.id(),
604                            instrument,
605                            start_nanos,
606                            end_nanos,
607                            clock.get_time_ns(),
608                            params,
609                        )));
610
611                        if let Err(e) = sender.send(DataEvent::Response(response)) {
612                            log::error!("Failed to send instrument response: {e}");
613                        }
614                    } else {
615                        log::error!("Instrument not found: {instrument_id}");
616                    }
617                }
618                Err(e) => log::error!("Instrument request failed: {e:?}"),
619            }
620        });
621
622        Ok(())
623    }
624
625    fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
626        let http = self.http_client.clone();
627        let sender = self.data_sender.clone();
628        let instrument_id = request.instrument_id;
629        let limit = request.limit.map(|n| n.get() as u32);
630        let request_id = request.request_id;
631        let client_id = request.client_id.unwrap_or(self.client_id);
632        let params = request.params.clone();
633        let clock = self.clock;
634        let start_nanos = datetime_to_unix_nanos(request.start);
635        let end_nanos = datetime_to_unix_nanos(request.end);
636
637        get_runtime().spawn(async move {
638            match http
639                .request_trades(instrument_id, limit)
640                .await
641                .context("failed to request trades from Binance")
642            {
643                Ok(trades) => {
644                    let response = DataResponse::Trades(TradesResponse::new(
645                        request_id,
646                        client_id,
647                        instrument_id,
648                        trades,
649                        start_nanos,
650                        end_nanos,
651                        clock.get_time_ns(),
652                        params,
653                    ));
654                    if let Err(e) = sender.send(DataEvent::Response(response)) {
655                        log::error!("Failed to send trades response: {e}");
656                    }
657                }
658                Err(e) => log::error!("Trade request failed: {e:?}"),
659            }
660        });
661
662        Ok(())
663    }
664
665    fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
666        let http = self.http_client.clone();
667        let sender = self.data_sender.clone();
668        let bar_type = request.bar_type;
669        let start = request.start;
670        let end = request.end;
671        let limit = request.limit.map(|n| n.get() as u32);
672        let request_id = request.request_id;
673        let client_id = request.client_id.unwrap_or(self.client_id);
674        let params = request.params.clone();
675        let clock = self.clock;
676        let start_nanos = datetime_to_unix_nanos(start);
677        let end_nanos = datetime_to_unix_nanos(end);
678
679        get_runtime().spawn(async move {
680            match http
681                .request_bars(bar_type, start, end, limit)
682                .await
683                .context("failed to request bars from Binance")
684            {
685                Ok(bars) => {
686                    let response = DataResponse::Bars(BarsResponse::new(
687                        request_id,
688                        client_id,
689                        bar_type,
690                        bars,
691                        start_nanos,
692                        end_nanos,
693                        clock.get_time_ns(),
694                        params,
695                    ));
696                    if let Err(e) = sender.send(DataEvent::Response(response)) {
697                        log::error!("Failed to send bars response: {e}");
698                    }
699                }
700                Err(e) => log::error!("Bar request failed: {e:?}"),
701            }
702        });
703
704        Ok(())
705    }
706}