1use std::sync::{
19 Arc, RwLock,
20 atomic::{AtomicBool, Ordering},
21};
22
23use ahash::AHashMap;
24use anyhow::Context;
25use futures_util::{StreamExt, pin_mut};
26use nautilus_common::{
27 clients::DataClient,
28 live::{runner::get_data_event_sender, runtime::get_runtime},
29 messages::{
30 DataEvent,
31 data::{
32 BarsResponse, DataResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
33 RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
34 SubscribeBookDeltas, SubscribeInstrument, SubscribeInstruments, SubscribeQuotes,
35 SubscribeTrades, TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas,
36 UnsubscribeQuotes, UnsubscribeTrades,
37 },
38 },
39};
40use nautilus_core::{
41 MUTEX_POISONED,
42 datetime::datetime_to_unix_nanos,
43 time::{AtomicTime, get_atomic_clock_realtime},
44};
45use nautilus_model::{
46 data::{Data, OrderBookDeltas_API},
47 enums::BookType,
48 identifiers::{ClientId, InstrumentId, Venue},
49 instruments::{Instrument, InstrumentAny},
50};
51use tokio::task::JoinHandle;
52use tokio_util::sync::CancellationToken;
53
54use crate::{
55 common::{
56 consts::BINANCE_VENUE, credential::resolve_credentials, enums::BinanceProductType,
57 parse::bar_spec_to_binance_interval,
58 },
59 config::BinanceDataClientConfig,
60 spot::{
61 http::client::BinanceSpotHttpClient,
62 websocket::streams::{
63 client::BinanceSpotWebSocketClient,
64 messages::{BinanceSpotWsMessage, NautilusSpotDataWsMessage},
65 },
66 },
67};
68
69#[derive(Debug)]
71pub struct BinanceSpotDataClient {
72 clock: &'static AtomicTime,
73 client_id: ClientId,
74 config: BinanceDataClientConfig,
75 http_client: BinanceSpotHttpClient,
76 ws_client: BinanceSpotWebSocketClient,
77 is_connected: AtomicBool,
78 cancellation_token: CancellationToken,
79 tasks: Vec<JoinHandle<()>>,
80 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
81 instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
82}
83
84impl BinanceSpotDataClient {
85 pub fn new(client_id: ClientId, config: BinanceDataClientConfig) -> anyhow::Result<Self> {
91 let http_client = BinanceSpotHttpClient::new(
92 config.environment,
93 config.api_key.clone(),
94 config.api_secret.clone(),
95 config.base_url_http.clone(),
96 None, None, None, )?;
100
101 let product_type = config
102 .product_types
103 .first()
104 .copied()
105 .unwrap_or(BinanceProductType::Spot);
106
107 let creds = resolve_credentials(
108 config.api_key.clone(),
109 config.api_secret.clone(),
110 config.environment,
111 product_type,
112 )
113 .ok();
114
115 let ws_client = BinanceSpotWebSocketClient::new(
117 config.base_url_ws.clone(),
118 creds.as_ref().map(|(k, _)| k.clone()),
119 creds.as_ref().map(|(_, s)| s.clone()),
120 Some(20), )?;
122
123 let clock = get_atomic_clock_realtime();
124 let data_sender = get_data_event_sender();
125
126 Ok(Self {
127 clock,
128 client_id,
129 config,
130 http_client,
131 ws_client,
132 is_connected: AtomicBool::new(false),
133 cancellation_token: CancellationToken::new(),
134 tasks: Vec::new(),
135 data_sender,
136 instruments: Arc::new(RwLock::new(AHashMap::new())),
137 })
138 }
139
140 fn venue(&self) -> Venue {
141 *BINANCE_VENUE
142 }
143
144 fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
145 if let Err(e) = sender.send(DataEvent::Data(data)) {
146 log::error!("Failed to emit data event: {e}");
147 }
148 }
149
150 fn spawn_ws<F>(&self, fut: F, context: &'static str)
151 where
152 F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
153 {
154 get_runtime().spawn(async move {
155 if let Err(e) = fut.await {
156 log::error!("{context}: {e:?}");
157 }
158 });
159 }
160
161 fn handle_ws_message(
162 msg: BinanceSpotWsMessage,
163 data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
164 instruments: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
165 ) {
166 match msg {
167 BinanceSpotWsMessage::Data(data_msg) => match data_msg {
168 NautilusSpotDataWsMessage::Data(payloads) => {
169 for data in payloads {
170 Self::send_data(data_sender, data);
171 }
172 }
173 NautilusSpotDataWsMessage::Deltas(deltas) => {
174 Self::send_data(data_sender, Data::Deltas(OrderBookDeltas_API::new(deltas)));
175 }
176 NautilusSpotDataWsMessage::Instrument(instrument) => {
177 upsert_instrument(instruments, *instrument);
178 }
179 NautilusSpotDataWsMessage::RawBinary(data) => {
180 log::debug!("Unhandled binary message: {} bytes", data.len());
181 }
182 NautilusSpotDataWsMessage::RawJson(value) => {
183 log::debug!("Unhandled JSON message: {value:?}");
184 }
185 },
186 BinanceSpotWsMessage::Error(e) => {
187 log::error!("Binance WebSocket error: code={}, msg={}", e.code, e.msg);
188 }
189 BinanceSpotWsMessage::Reconnected => {
190 log::info!("WebSocket reconnected");
191 }
192 }
193 }
194}
195
196fn upsert_instrument(
197 cache: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
198 instrument: InstrumentAny,
199) {
200 let mut guard = cache.write().expect(MUTEX_POISONED);
201 guard.insert(instrument.id(), instrument);
202}
203
204#[async_trait::async_trait(?Send)]
205impl DataClient for BinanceSpotDataClient {
206 fn client_id(&self) -> ClientId {
207 self.client_id
208 }
209
210 fn venue(&self) -> Option<Venue> {
211 Some(self.venue())
212 }
213
214 fn start(&mut self) -> anyhow::Result<()> {
215 log::info!(
216 "Started: client_id={}, product_types={:?}, environment={:?}",
217 self.client_id,
218 self.config.product_types,
219 self.config.environment,
220 );
221 Ok(())
222 }
223
224 fn stop(&mut self) -> anyhow::Result<()> {
225 log::info!("Stopping {id}", id = self.client_id);
226 self.cancellation_token.cancel();
227 self.is_connected.store(false, Ordering::Relaxed);
228 Ok(())
229 }
230
231 fn reset(&mut self) -> anyhow::Result<()> {
232 log::debug!("Resetting {id}", id = self.client_id);
233
234 self.cancellation_token.cancel();
235
236 for task in self.tasks.drain(..) {
237 task.abort();
238 }
239
240 let mut ws = self.ws_client.clone();
241 get_runtime().spawn(async move {
242 let _ = ws.close().await;
243 });
244
245 self.is_connected.store(false, Ordering::Relaxed);
246 self.cancellation_token = CancellationToken::new();
247 Ok(())
248 }
249
250 fn dispose(&mut self) -> anyhow::Result<()> {
251 log::debug!("Disposing {id}", id = self.client_id);
252 self.stop()
253 }
254
255 async fn connect(&mut self) -> anyhow::Result<()> {
256 if self.is_connected() {
257 return Ok(());
258 }
259
260 self.cancellation_token = CancellationToken::new();
262
263 let instruments = self
264 .http_client
265 .request_instruments()
266 .await
267 .context("failed to request Binance instruments")?;
268
269 self.http_client.cache_instruments(instruments.clone());
270
271 {
272 let mut guard = self.instruments.write().expect(MUTEX_POISONED);
273 for instrument in &instruments {
274 guard.insert(instrument.id(), instrument.clone());
275 }
276 }
277
278 for instrument in instruments.clone() {
279 if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
280 log::warn!("Failed to send instrument: {e}");
281 }
282 }
283
284 self.ws_client.cache_instruments(instruments);
285
286 log::info!("Connecting to Binance SBE WebSocket...");
287 self.ws_client.connect().await.map_err(|e| {
288 log::error!("Binance WebSocket connection failed: {e:?}");
289 anyhow::anyhow!("failed to connect Binance WebSocket: {e}")
290 })?;
291 log::info!("Binance SBE WebSocket connected");
292
293 let stream = self.ws_client.stream();
294 let sender = self.data_sender.clone();
295 let insts = self.instruments.clone();
296 let cancel = self.cancellation_token.clone();
297
298 let handle = get_runtime().spawn(async move {
299 pin_mut!(stream);
300 loop {
301 tokio::select! {
302 Some(message) = stream.next() => {
303 Self::handle_ws_message(message, &sender, &insts);
304 }
305 () = cancel.cancelled() => {
306 log::debug!("WebSocket stream task cancelled");
307 break;
308 }
309 }
310 }
311 });
312 self.tasks.push(handle);
313
314 self.is_connected.store(true, Ordering::Release);
315 log::info!("Connected: client_id={}", self.client_id);
316 Ok(())
317 }
318
319 async fn disconnect(&mut self) -> anyhow::Result<()> {
320 if self.is_disconnected() {
321 return Ok(());
322 }
323
324 self.cancellation_token.cancel();
325
326 let _ = self.ws_client.close().await;
327
328 let handles: Vec<_> = self.tasks.drain(..).collect();
329 for handle in handles {
330 if let Err(e) = handle.await {
331 log::error!("Error joining WebSocket task: {e}");
332 }
333 }
334
335 self.is_connected.store(false, Ordering::Release);
336 log::info!("Disconnected: client_id={}", self.client_id);
337 Ok(())
338 }
339
340 fn is_connected(&self) -> bool {
341 self.is_connected.load(Ordering::Relaxed)
342 }
343
344 fn is_disconnected(&self) -> bool {
345 !self.is_connected()
346 }
347
348 fn subscribe_instruments(&mut self, _cmd: &SubscribeInstruments) -> anyhow::Result<()> {
349 log::debug!("subscribe_instruments: Binance instruments are fetched via HTTP on connect");
350 Ok(())
351 }
352
353 fn subscribe_instrument(&mut self, _cmd: &SubscribeInstrument) -> anyhow::Result<()> {
354 log::debug!("subscribe_instrument: Binance instruments are fetched via HTTP on connect");
355 Ok(())
356 }
357
358 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
359 if cmd.book_type != BookType::L2_MBP {
360 anyhow::bail!("Binance SBE only supports L2_MBP order book deltas");
361 }
362
363 let instrument_id = cmd.instrument_id;
364 let ws = self.ws_client.clone();
365 let depth = cmd.depth.map_or(20, |d| d.get());
366
367 let depth_level = match depth {
369 1..=5 => 5,
370 6..=10 => 10,
371 _ => 20,
372 };
373
374 let stream = format!(
375 "{}@depth{}",
376 instrument_id.symbol.as_str().to_lowercase(),
377 depth_level
378 );
379
380 self.spawn_ws(
381 async move {
382 ws.subscribe(vec![stream])
383 .await
384 .context("book deltas subscription")
385 },
386 "order book subscription",
387 );
388 Ok(())
389 }
390
391 fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
392 let instrument_id = cmd.instrument_id;
393 let ws = self.ws_client.clone();
394
395 let stream = format!(
396 "{}@bestBidAsk",
397 instrument_id.symbol.as_str().to_lowercase()
398 );
399
400 self.spawn_ws(
401 async move {
402 ws.subscribe(vec![stream])
403 .await
404 .context("quotes subscription")
405 },
406 "quote subscription",
407 );
408 Ok(())
409 }
410
411 fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
412 let instrument_id = cmd.instrument_id;
413 let ws = self.ws_client.clone();
414
415 let stream = format!("{}@trade", instrument_id.symbol.as_str().to_lowercase());
416
417 self.spawn_ws(
418 async move {
419 ws.subscribe(vec![stream])
420 .await
421 .context("trades subscription")
422 },
423 "trade subscription",
424 );
425 Ok(())
426 }
427
428 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
429 let bar_type = cmd.bar_type;
430 let ws = self.ws_client.clone();
431 let interval = bar_spec_to_binance_interval(bar_type.spec())?;
432
433 let stream = format!(
434 "{}@kline_{}",
435 bar_type.instrument_id().symbol.as_str().to_lowercase(),
436 interval.as_str()
437 );
438
439 self.spawn_ws(
440 async move {
441 ws.subscribe(vec![stream])
442 .await
443 .context("bars subscription")
444 },
445 "bar subscription",
446 );
447 Ok(())
448 }
449
450 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
451 let instrument_id = cmd.instrument_id;
452 let ws = self.ws_client.clone();
453
454 let symbol_lower = instrument_id.symbol.as_str().to_lowercase();
456 let streams = vec![
457 format!("{symbol_lower}@depth5"),
458 format!("{symbol_lower}@depth10"),
459 format!("{symbol_lower}@depth20"),
460 ];
461
462 self.spawn_ws(
463 async move {
464 ws.unsubscribe(streams)
465 .await
466 .context("book deltas unsubscribe")
467 },
468 "order book unsubscribe",
469 );
470 Ok(())
471 }
472
473 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
474 let instrument_id = cmd.instrument_id;
475 let ws = self.ws_client.clone();
476
477 let stream = format!(
478 "{}@bestBidAsk",
479 instrument_id.symbol.as_str().to_lowercase()
480 );
481
482 self.spawn_ws(
483 async move {
484 ws.unsubscribe(vec![stream])
485 .await
486 .context("quotes unsubscribe")
487 },
488 "quote unsubscribe",
489 );
490 Ok(())
491 }
492
493 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
494 let instrument_id = cmd.instrument_id;
495 let ws = self.ws_client.clone();
496
497 let stream = format!("{}@trade", instrument_id.symbol.as_str().to_lowercase());
498
499 self.spawn_ws(
500 async move {
501 ws.unsubscribe(vec![stream])
502 .await
503 .context("trades unsubscribe")
504 },
505 "trade unsubscribe",
506 );
507 Ok(())
508 }
509
510 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
511 let bar_type = cmd.bar_type;
512 let ws = self.ws_client.clone();
513 let interval = bar_spec_to_binance_interval(bar_type.spec())?;
514
515 let stream = format!(
516 "{}@kline_{}",
517 bar_type.instrument_id().symbol.as_str().to_lowercase(),
518 interval.as_str()
519 );
520
521 self.spawn_ws(
522 async move {
523 ws.unsubscribe(vec![stream])
524 .await
525 .context("bars unsubscribe")
526 },
527 "bar unsubscribe",
528 );
529 Ok(())
530 }
531
532 fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
533 let http = self.http_client.clone();
534 let sender = self.data_sender.clone();
535 let instruments_cache = self.instruments.clone();
536 let request_id = request.request_id;
537 let client_id = request.client_id.unwrap_or(self.client_id);
538 let venue = self.venue();
539 let start = request.start;
540 let end = request.end;
541 let params = request.params;
542 let clock = self.clock;
543 let start_nanos = datetime_to_unix_nanos(start);
544 let end_nanos = datetime_to_unix_nanos(end);
545
546 get_runtime().spawn(async move {
547 match http.request_instruments().await {
548 Ok(instruments) => {
549 for instrument in &instruments {
550 upsert_instrument(&instruments_cache, instrument.clone());
551 }
552
553 let response = DataResponse::Instruments(InstrumentsResponse::new(
554 request_id,
555 client_id,
556 venue,
557 instruments,
558 start_nanos,
559 end_nanos,
560 clock.get_time_ns(),
561 params,
562 ));
563
564 if let Err(e) = sender.send(DataEvent::Response(response)) {
565 log::error!("Failed to send instruments response: {e}");
566 }
567 }
568 Err(e) => log::error!("Instruments request failed: {e:?}"),
569 }
570 });
571
572 Ok(())
573 }
574
575 fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
576 let http = self.http_client.clone();
577 let sender = self.data_sender.clone();
578 let instruments = self.instruments.clone();
579 let instrument_id = request.instrument_id;
580 let request_id = request.request_id;
581 let client_id = request.client_id.unwrap_or(self.client_id);
582 let start = request.start;
583 let end = request.end;
584 let params = request.params;
585 let clock = self.clock;
586 let start_nanos = datetime_to_unix_nanos(start);
587 let end_nanos = datetime_to_unix_nanos(end);
588
589 get_runtime().spawn(async move {
590 {
591 let guard = instruments.read().expect(MUTEX_POISONED);
592 if let Some(instrument) = guard.get(&instrument_id) {
593 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
594 request_id,
595 client_id,
596 instrument.id(),
597 instrument.clone(),
598 start_nanos,
599 end_nanos,
600 clock.get_time_ns(),
601 params,
602 )));
603
604 if let Err(e) = sender.send(DataEvent::Response(response)) {
605 log::error!("Failed to send instrument response: {e}");
606 }
607 return;
608 }
609 }
610
611 match http.request_instruments().await {
612 Ok(all_instruments) => {
613 for instrument in &all_instruments {
614 upsert_instrument(&instruments, instrument.clone());
615 }
616
617 let instrument = all_instruments
618 .into_iter()
619 .find(|i| i.id() == instrument_id);
620
621 if let Some(instrument) = instrument {
622 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
623 request_id,
624 client_id,
625 instrument.id(),
626 instrument,
627 start_nanos,
628 end_nanos,
629 clock.get_time_ns(),
630 params,
631 )));
632
633 if let Err(e) = sender.send(DataEvent::Response(response)) {
634 log::error!("Failed to send instrument response: {e}");
635 }
636 } else {
637 log::error!("Instrument not found: {instrument_id}");
638 }
639 }
640 Err(e) => log::error!("Instrument request failed: {e:?}"),
641 }
642 });
643
644 Ok(())
645 }
646
647 fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
648 let http = self.http_client.clone();
649 let sender = self.data_sender.clone();
650 let instrument_id = request.instrument_id;
651 let limit = request.limit.map(|n| n.get() as u32);
652 let request_id = request.request_id;
653 let client_id = request.client_id.unwrap_or(self.client_id);
654 let params = request.params;
655 let clock = self.clock;
656 let start_nanos = datetime_to_unix_nanos(request.start);
657 let end_nanos = datetime_to_unix_nanos(request.end);
658
659 get_runtime().spawn(async move {
660 match http
661 .request_trades(instrument_id, limit)
662 .await
663 .context("failed to request trades from Binance")
664 {
665 Ok(trades) => {
666 let response = DataResponse::Trades(TradesResponse::new(
667 request_id,
668 client_id,
669 instrument_id,
670 trades,
671 start_nanos,
672 end_nanos,
673 clock.get_time_ns(),
674 params,
675 ));
676 if let Err(e) = sender.send(DataEvent::Response(response)) {
677 log::error!("Failed to send trades response: {e}");
678 }
679 }
680 Err(e) => log::error!("Trade request failed: {e:?}"),
681 }
682 });
683
684 Ok(())
685 }
686
687 fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
688 let http = self.http_client.clone();
689 let sender = self.data_sender.clone();
690 let bar_type = request.bar_type;
691 let start = request.start;
692 let end = request.end;
693 let limit = request.limit.map(|n| n.get() as u32);
694 let request_id = request.request_id;
695 let client_id = request.client_id.unwrap_or(self.client_id);
696 let params = request.params;
697 let clock = self.clock;
698 let start_nanos = datetime_to_unix_nanos(start);
699 let end_nanos = datetime_to_unix_nanos(end);
700
701 get_runtime().spawn(async move {
702 match http
703 .request_bars(bar_type, start, end, limit)
704 .await
705 .context("failed to request bars from Binance")
706 {
707 Ok(bars) => {
708 let response = DataResponse::Bars(BarsResponse::new(
709 request_id,
710 client_id,
711 bar_type,
712 bars,
713 start_nanos,
714 end_nanos,
715 clock.get_time_ns(),
716 params,
717 ));
718 if let Err(e) = sender.send(DataEvent::Response(response)) {
719 log::error!("Failed to send bars response: {e}");
720 }
721 }
722 Err(e) => log::error!("Bar request failed: {e:?}"),
723 }
724 });
725
726 Ok(())
727 }
728}