1use std::sync::{
19 Arc, RwLock,
20 atomic::{AtomicBool, Ordering},
21};
22
23use ahash::AHashMap;
24use anyhow::Context;
25use futures_util::{StreamExt, pin_mut};
26use nautilus_common::{
27 clients::DataClient,
28 live::{runner::get_data_event_sender, runtime::get_runtime},
29 messages::{
30 DataEvent,
31 data::{
32 BarsResponse, DataResponse, InstrumentResponse, InstrumentsResponse, RequestBars,
33 RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
34 SubscribeBookDeltas, SubscribeInstrument, SubscribeInstruments, SubscribeQuotes,
35 SubscribeTrades, TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas,
36 UnsubscribeQuotes, UnsubscribeTrades,
37 },
38 },
39};
40use nautilus_core::{
41 MUTEX_POISONED,
42 datetime::datetime_to_unix_nanos,
43 time::{AtomicTime, get_atomic_clock_realtime},
44};
45use nautilus_model::{
46 data::{Data, OrderBookDeltas_API},
47 enums::BookType,
48 identifiers::{ClientId, InstrumentId, Venue},
49 instruments::{Instrument, InstrumentAny},
50};
51use tokio::task::JoinHandle;
52use tokio_util::sync::CancellationToken;
53
54use crate::{
55 common::{consts::BINANCE_VENUE, parse::bar_spec_to_binance_interval},
56 config::BinanceDataClientConfig,
57 spot::{
58 http::client::BinanceSpotHttpClient,
59 websocket::streams::{client::BinanceSpotWebSocketClient, messages::NautilusWsMessage},
60 },
61};
62
63#[derive(Debug)]
65pub struct BinanceSpotDataClient {
66 client_id: ClientId,
67 config: BinanceDataClientConfig,
68 http_client: BinanceSpotHttpClient,
69 ws_client: BinanceSpotWebSocketClient,
70 is_connected: AtomicBool,
71 cancellation_token: CancellationToken,
72 tasks: Vec<JoinHandle<()>>,
73 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
74 instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
75 clock: &'static AtomicTime,
76}
77
78impl BinanceSpotDataClient {
79 pub fn new(client_id: ClientId, config: BinanceDataClientConfig) -> anyhow::Result<Self> {
85 let clock = get_atomic_clock_realtime();
86 let data_sender = get_data_event_sender();
87
88 let http_client = BinanceSpotHttpClient::new(
89 config.environment,
90 config.api_key.clone(),
91 config.api_secret.clone(),
92 config.base_url_http.clone(),
93 None, None, None, )?;
97
98 let ws_client = BinanceSpotWebSocketClient::new(
100 config.base_url_ws.clone(),
101 config.ed25519_api_key.clone(),
102 config.ed25519_api_secret.clone(),
103 Some(20), )?;
105
106 Ok(Self {
107 client_id,
108 config,
109 http_client,
110 ws_client,
111 is_connected: AtomicBool::new(false),
112 cancellation_token: CancellationToken::new(),
113 tasks: Vec::new(),
114 data_sender,
115 instruments: Arc::new(RwLock::new(AHashMap::new())),
116 clock,
117 })
118 }
119
120 fn venue(&self) -> Venue {
121 *BINANCE_VENUE
122 }
123
124 fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
125 if let Err(e) = sender.send(DataEvent::Data(data)) {
126 log::error!("Failed to emit data event: {e}");
127 }
128 }
129
130 fn spawn_ws<F>(&self, fut: F, context: &'static str)
131 where
132 F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
133 {
134 get_runtime().spawn(async move {
135 if let Err(e) = fut.await {
136 log::error!("{context}: {e:?}");
137 }
138 });
139 }
140
141 fn handle_ws_message(
142 message: NautilusWsMessage,
143 data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
144 instruments: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
145 ) {
146 match message {
147 NautilusWsMessage::Data(payloads) => {
148 for data in payloads {
149 Self::send_data(data_sender, data);
150 }
151 }
152 NautilusWsMessage::Deltas(deltas) => {
153 Self::send_data(data_sender, Data::Deltas(OrderBookDeltas_API::new(deltas)));
154 }
155 NautilusWsMessage::Instrument(instrument) => {
156 upsert_instrument(instruments, *instrument);
157 }
158 NautilusWsMessage::Error(e) => {
159 log::error!("Binance WebSocket error: code={}, msg={}", e.code, e.msg);
160 }
161 NautilusWsMessage::RawBinary(data) => {
162 log::debug!("Unhandled binary message: {} bytes", data.len());
163 }
164 NautilusWsMessage::RawJson(value) => {
165 log::debug!("Unhandled JSON message: {value:?}");
166 }
167 NautilusWsMessage::Reconnected => {
168 log::info!("WebSocket reconnected");
169 }
170 }
171 }
172}
173
174fn upsert_instrument(
175 cache: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
176 instrument: InstrumentAny,
177) {
178 let mut guard = cache.write().expect(MUTEX_POISONED);
179 guard.insert(instrument.id(), instrument);
180}
181
182#[async_trait::async_trait(?Send)]
183impl DataClient for BinanceSpotDataClient {
184 fn client_id(&self) -> ClientId {
185 self.client_id
186 }
187
188 fn venue(&self) -> Option<Venue> {
189 Some(self.venue())
190 }
191
192 fn start(&mut self) -> anyhow::Result<()> {
193 log::info!(
194 "Started: client_id={}, product_types={:?}, environment={:?}",
195 self.client_id,
196 self.config.product_types,
197 self.config.environment,
198 );
199 Ok(())
200 }
201
202 fn stop(&mut self) -> anyhow::Result<()> {
203 log::info!("Stopping {id}", id = self.client_id);
204 self.cancellation_token.cancel();
205 self.is_connected.store(false, Ordering::Relaxed);
206 Ok(())
207 }
208
209 fn reset(&mut self) -> anyhow::Result<()> {
210 log::debug!("Resetting {id}", id = self.client_id);
211
212 self.cancellation_token.cancel();
213
214 for task in self.tasks.drain(..) {
215 task.abort();
216 }
217
218 let mut ws = self.ws_client.clone();
219 get_runtime().spawn(async move {
220 let _ = ws.close().await;
221 });
222
223 self.is_connected.store(false, Ordering::Relaxed);
224 self.cancellation_token = CancellationToken::new();
225 Ok(())
226 }
227
228 fn dispose(&mut self) -> anyhow::Result<()> {
229 log::debug!("Disposing {id}", id = self.client_id);
230 self.stop()
231 }
232
233 async fn connect(&mut self) -> anyhow::Result<()> {
234 if self.is_connected() {
235 return Ok(());
236 }
237
238 self.cancellation_token = CancellationToken::new();
240
241 let instruments = self
242 .http_client
243 .request_instruments()
244 .await
245 .context("failed to request Binance instruments")?;
246
247 self.http_client.cache_instruments(instruments.clone());
248
249 {
250 let mut guard = self.instruments.write().expect(MUTEX_POISONED);
251 for instrument in &instruments {
252 guard.insert(instrument.id(), instrument.clone());
253 }
254 }
255
256 for instrument in instruments.clone() {
257 if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
258 log::warn!("Failed to send instrument: {e}");
259 }
260 }
261
262 self.ws_client.cache_instruments(instruments);
263
264 log::info!("Connecting to Binance SBE WebSocket...");
265 self.ws_client.connect().await.map_err(|e| {
266 log::error!("Binance WebSocket connection failed: {e:?}");
267 anyhow::anyhow!("failed to connect Binance WebSocket: {e}")
268 })?;
269 log::info!("Binance SBE WebSocket connected");
270
271 let stream = self.ws_client.stream();
272 let sender = self.data_sender.clone();
273 let insts = self.instruments.clone();
274 let cancel = self.cancellation_token.clone();
275
276 let handle = get_runtime().spawn(async move {
277 pin_mut!(stream);
278 loop {
279 tokio::select! {
280 Some(message) = stream.next() => {
281 Self::handle_ws_message(message, &sender, &insts);
282 }
283 () = cancel.cancelled() => {
284 log::debug!("WebSocket stream task cancelled");
285 break;
286 }
287 }
288 }
289 });
290 self.tasks.push(handle);
291
292 self.is_connected.store(true, Ordering::Release);
293 log::info!("Connected: client_id={}", self.client_id);
294 Ok(())
295 }
296
297 async fn disconnect(&mut self) -> anyhow::Result<()> {
298 if self.is_disconnected() {
299 return Ok(());
300 }
301
302 self.cancellation_token.cancel();
303
304 let _ = self.ws_client.close().await;
305
306 let handles: Vec<_> = self.tasks.drain(..).collect();
307 for handle in handles {
308 if let Err(e) = handle.await {
309 log::error!("Error joining WebSocket task: {e}");
310 }
311 }
312
313 self.is_connected.store(false, Ordering::Release);
314 log::info!("Disconnected: client_id={}", self.client_id);
315 Ok(())
316 }
317
318 fn is_connected(&self) -> bool {
319 self.is_connected.load(Ordering::Relaxed)
320 }
321
322 fn is_disconnected(&self) -> bool {
323 !self.is_connected()
324 }
325
326 fn subscribe_instruments(&mut self, _cmd: &SubscribeInstruments) -> anyhow::Result<()> {
327 log::debug!("subscribe_instruments: Binance instruments are fetched via HTTP on connect");
328 Ok(())
329 }
330
331 fn subscribe_instrument(&mut self, _cmd: &SubscribeInstrument) -> anyhow::Result<()> {
332 log::debug!("subscribe_instrument: Binance instruments are fetched via HTTP on connect");
333 Ok(())
334 }
335
336 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
337 if cmd.book_type != BookType::L2_MBP {
338 anyhow::bail!("Binance SBE only supports L2_MBP order book deltas");
339 }
340
341 let instrument_id = cmd.instrument_id;
342 let ws = self.ws_client.clone();
343 let depth = cmd.depth.map_or(20, |d| d.get());
344
345 let depth_level = match depth {
347 1..=5 => 5,
348 6..=10 => 10,
349 _ => 20,
350 };
351
352 let stream = format!(
353 "{}@depth{}",
354 instrument_id.symbol.as_str().to_lowercase(),
355 depth_level
356 );
357
358 self.spawn_ws(
359 async move {
360 ws.subscribe(vec![stream])
361 .await
362 .context("book deltas subscription")
363 },
364 "order book subscription",
365 );
366 Ok(())
367 }
368
369 fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
370 let instrument_id = cmd.instrument_id;
371 let ws = self.ws_client.clone();
372
373 let stream = format!(
374 "{}@bestBidAsk",
375 instrument_id.symbol.as_str().to_lowercase()
376 );
377
378 self.spawn_ws(
379 async move {
380 ws.subscribe(vec![stream])
381 .await
382 .context("quotes subscription")
383 },
384 "quote subscription",
385 );
386 Ok(())
387 }
388
389 fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
390 let instrument_id = cmd.instrument_id;
391 let ws = self.ws_client.clone();
392
393 let stream = format!("{}@trade", instrument_id.symbol.as_str().to_lowercase());
394
395 self.spawn_ws(
396 async move {
397 ws.subscribe(vec![stream])
398 .await
399 .context("trades subscription")
400 },
401 "trade subscription",
402 );
403 Ok(())
404 }
405
406 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
407 let bar_type = cmd.bar_type;
408 let ws = self.ws_client.clone();
409 let interval = bar_spec_to_binance_interval(bar_type.spec())?;
410
411 let stream = format!(
412 "{}@kline_{}",
413 bar_type.instrument_id().symbol.as_str().to_lowercase(),
414 interval.as_str()
415 );
416
417 self.spawn_ws(
418 async move {
419 ws.subscribe(vec![stream])
420 .await
421 .context("bars subscription")
422 },
423 "bar subscription",
424 );
425 Ok(())
426 }
427
428 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
429 let instrument_id = cmd.instrument_id;
430 let ws = self.ws_client.clone();
431
432 let symbol_lower = instrument_id.symbol.as_str().to_lowercase();
434 let streams = vec![
435 format!("{symbol_lower}@depth5"),
436 format!("{symbol_lower}@depth10"),
437 format!("{symbol_lower}@depth20"),
438 ];
439
440 self.spawn_ws(
441 async move {
442 ws.unsubscribe(streams)
443 .await
444 .context("book deltas unsubscribe")
445 },
446 "order book unsubscribe",
447 );
448 Ok(())
449 }
450
451 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
452 let instrument_id = cmd.instrument_id;
453 let ws = self.ws_client.clone();
454
455 let stream = format!(
456 "{}@bestBidAsk",
457 instrument_id.symbol.as_str().to_lowercase()
458 );
459
460 self.spawn_ws(
461 async move {
462 ws.unsubscribe(vec![stream])
463 .await
464 .context("quotes unsubscribe")
465 },
466 "quote unsubscribe",
467 );
468 Ok(())
469 }
470
471 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
472 let instrument_id = cmd.instrument_id;
473 let ws = self.ws_client.clone();
474
475 let stream = format!("{}@trade", instrument_id.symbol.as_str().to_lowercase());
476
477 self.spawn_ws(
478 async move {
479 ws.unsubscribe(vec![stream])
480 .await
481 .context("trades unsubscribe")
482 },
483 "trade unsubscribe",
484 );
485 Ok(())
486 }
487
488 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
489 let bar_type = cmd.bar_type;
490 let ws = self.ws_client.clone();
491 let interval = bar_spec_to_binance_interval(bar_type.spec())?;
492
493 let stream = format!(
494 "{}@kline_{}",
495 bar_type.instrument_id().symbol.as_str().to_lowercase(),
496 interval.as_str()
497 );
498
499 self.spawn_ws(
500 async move {
501 ws.unsubscribe(vec![stream])
502 .await
503 .context("bars unsubscribe")
504 },
505 "bar unsubscribe",
506 );
507 Ok(())
508 }
509
510 fn request_instruments(&self, request: &RequestInstruments) -> anyhow::Result<()> {
511 let http = self.http_client.clone();
512 let sender = self.data_sender.clone();
513 let instruments_cache = self.instruments.clone();
514 let request_id = request.request_id;
515 let client_id = request.client_id.unwrap_or(self.client_id);
516 let venue = self.venue();
517 let start = request.start;
518 let end = request.end;
519 let params = request.params.clone();
520 let clock = self.clock;
521 let start_nanos = datetime_to_unix_nanos(start);
522 let end_nanos = datetime_to_unix_nanos(end);
523
524 get_runtime().spawn(async move {
525 match http.request_instruments().await {
526 Ok(instruments) => {
527 for instrument in &instruments {
528 upsert_instrument(&instruments_cache, instrument.clone());
529 }
530
531 let response = DataResponse::Instruments(InstrumentsResponse::new(
532 request_id,
533 client_id,
534 venue,
535 instruments,
536 start_nanos,
537 end_nanos,
538 clock.get_time_ns(),
539 params,
540 ));
541
542 if let Err(e) = sender.send(DataEvent::Response(response)) {
543 log::error!("Failed to send instruments response: {e}");
544 }
545 }
546 Err(e) => log::error!("Instruments request failed: {e:?}"),
547 }
548 });
549
550 Ok(())
551 }
552
553 fn request_instrument(&self, request: &RequestInstrument) -> anyhow::Result<()> {
554 let http = self.http_client.clone();
555 let sender = self.data_sender.clone();
556 let instruments = self.instruments.clone();
557 let instrument_id = request.instrument_id;
558 let request_id = request.request_id;
559 let client_id = request.client_id.unwrap_or(self.client_id);
560 let start = request.start;
561 let end = request.end;
562 let params = request.params.clone();
563 let clock = self.clock;
564 let start_nanos = datetime_to_unix_nanos(start);
565 let end_nanos = datetime_to_unix_nanos(end);
566
567 get_runtime().spawn(async move {
568 {
569 let guard = instruments.read().expect(MUTEX_POISONED);
570 if let Some(instrument) = guard.get(&instrument_id) {
571 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
572 request_id,
573 client_id,
574 instrument.id(),
575 instrument.clone(),
576 start_nanos,
577 end_nanos,
578 clock.get_time_ns(),
579 params,
580 )));
581
582 if let Err(e) = sender.send(DataEvent::Response(response)) {
583 log::error!("Failed to send instrument response: {e}");
584 }
585 return;
586 }
587 }
588
589 match http.request_instruments().await {
590 Ok(all_instruments) => {
591 for instrument in &all_instruments {
592 upsert_instrument(&instruments, instrument.clone());
593 }
594
595 let instrument = all_instruments
596 .into_iter()
597 .find(|i| i.id() == instrument_id);
598
599 if let Some(instrument) = instrument {
600 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
601 request_id,
602 client_id,
603 instrument.id(),
604 instrument,
605 start_nanos,
606 end_nanos,
607 clock.get_time_ns(),
608 params,
609 )));
610
611 if let Err(e) = sender.send(DataEvent::Response(response)) {
612 log::error!("Failed to send instrument response: {e}");
613 }
614 } else {
615 log::error!("Instrument not found: {instrument_id}");
616 }
617 }
618 Err(e) => log::error!("Instrument request failed: {e:?}"),
619 }
620 });
621
622 Ok(())
623 }
624
625 fn request_trades(&self, request: &RequestTrades) -> anyhow::Result<()> {
626 let http = self.http_client.clone();
627 let sender = self.data_sender.clone();
628 let instrument_id = request.instrument_id;
629 let limit = request.limit.map(|n| n.get() as u32);
630 let request_id = request.request_id;
631 let client_id = request.client_id.unwrap_or(self.client_id);
632 let params = request.params.clone();
633 let clock = self.clock;
634 let start_nanos = datetime_to_unix_nanos(request.start);
635 let end_nanos = datetime_to_unix_nanos(request.end);
636
637 get_runtime().spawn(async move {
638 match http
639 .request_trades(instrument_id, limit)
640 .await
641 .context("failed to request trades from Binance")
642 {
643 Ok(trades) => {
644 let response = DataResponse::Trades(TradesResponse::new(
645 request_id,
646 client_id,
647 instrument_id,
648 trades,
649 start_nanos,
650 end_nanos,
651 clock.get_time_ns(),
652 params,
653 ));
654 if let Err(e) = sender.send(DataEvent::Response(response)) {
655 log::error!("Failed to send trades response: {e}");
656 }
657 }
658 Err(e) => log::error!("Trade request failed: {e:?}"),
659 }
660 });
661
662 Ok(())
663 }
664
665 fn request_bars(&self, request: &RequestBars) -> anyhow::Result<()> {
666 let http = self.http_client.clone();
667 let sender = self.data_sender.clone();
668 let bar_type = request.bar_type;
669 let start = request.start;
670 let end = request.end;
671 let limit = request.limit.map(|n| n.get() as u32);
672 let request_id = request.request_id;
673 let client_id = request.client_id.unwrap_or(self.client_id);
674 let params = request.params.clone();
675 let clock = self.clock;
676 let start_nanos = datetime_to_unix_nanos(start);
677 let end_nanos = datetime_to_unix_nanos(end);
678
679 get_runtime().spawn(async move {
680 match http
681 .request_bars(bar_type, start, end, limit)
682 .await
683 .context("failed to request bars from Binance")
684 {
685 Ok(bars) => {
686 let response = DataResponse::Bars(BarsResponse::new(
687 request_id,
688 client_id,
689 bar_type,
690 bars,
691 start_nanos,
692 end_nanos,
693 clock.get_time_ns(),
694 params,
695 ));
696 if let Err(e) = sender.send(DataEvent::Response(response)) {
697 log::error!("Failed to send bars response: {e}");
698 }
699 }
700 Err(e) => log::error!("Bar request failed: {e:?}"),
701 }
702 });
703
704 Ok(())
705 }
706}