1use std::{collections::HashMap, fmt::Debug, num::NonZeroU32, sync::Arc};
35
36use chrono::{DateTime, Utc};
37use dashmap::DashMap;
38use nautilus_core::{consts::NAUTILUS_USER_AGENT, nanos::UnixNanos};
39use nautilus_model::{
40 data::{Bar, BarType, TradeTick},
41 enums::{AggregationSource, BarAggregation, OrderSide, OrderType, TimeInForce},
42 identifiers::{AccountId, ClientOrderId, InstrumentId, VenueOrderId},
43 instruments::{Instrument, any::InstrumentAny},
44 reports::{FillReport, OrderStatusReport},
45 types::{Price, Quantity},
46};
47use nautilus_network::{
48 http::{HttpClient, HttpResponse, Method},
49 ratelimiter::quota::Quota,
50};
51use serde::Serialize;
52use ustr::Ustr;
53
54use super::{
55 error::{BinanceSpotHttpError, BinanceSpotHttpResult},
56 models::{
57 BinanceAccountInfo, BinanceAccountTrade, BinanceCancelOrderResponse, BinanceDepth,
58 BinanceKlines, BinanceNewOrderResponse, BinanceOrderResponse, BinanceTrades,
59 },
60 parse,
61 query::{
62 AccountInfoParams, AccountTradesParams, AllOrdersParams, CancelOpenOrdersParams,
63 CancelOrderParams, CancelReplaceOrderParams, DepthParams, KlinesParams, NewOrderParams,
64 OpenOrdersParams, QueryOrderParams, TradesParams,
65 },
66};
67use crate::{
68 common::{
69 consts::{BINANCE_SPOT_RATE_LIMITS, BinanceRateLimitQuota},
70 credential::Credential,
71 enums::{
72 BinanceEnvironment, BinanceProductType, BinanceRateLimitInterval, BinanceRateLimitType,
73 BinanceSide, BinanceTimeInForce,
74 },
75 models::BinanceErrorResponse,
76 parse::{
77 get_currency, parse_fill_report_sbe, parse_klines_to_bars,
78 parse_new_order_response_sbe, parse_order_status_report_sbe, parse_spot_instrument_sbe,
79 parse_spot_trades_sbe,
80 },
81 sbe::spot::{
82 ReadBuf, SBE_SCHEMA_ID, SBE_SCHEMA_VERSION,
83 error_response_codec::{self, ErrorResponseDecoder},
84 message_header_codec::MessageHeaderDecoder,
85 },
86 urls::get_http_base_url,
87 },
88 spot::enums::{
89 BinanceCancelReplaceMode, BinanceOrderResponseType, BinanceSpotOrderType,
90 order_type_to_binance_spot,
91 },
92};
93
94pub const SBE_SCHEMA_HEADER: &str = "3:2";
96
97const SPOT_API_PATH: &str = "/api/v3";
99
100const BINANCE_GLOBAL_RATE_KEY: &str = "binance:spot:global";
102
103const BINANCE_ORDERS_RATE_KEY: &str = "binance:spot:orders";
105
106struct RateLimitConfig {
107 default_quota: Option<Quota>,
108 keyed_quotas: Vec<(String, Quota)>,
109 order_keys: Vec<String>,
110}
111
112#[derive(Debug, Clone)]
123pub struct BinanceRawSpotHttpClient {
124 client: HttpClient,
125 base_url: String,
126 credential: Option<Credential>,
127 recv_window: Option<u64>,
128 order_rate_keys: Vec<String>,
129}
130
131impl BinanceRawSpotHttpClient {
132 pub fn new(
138 environment: BinanceEnvironment,
139 api_key: Option<String>,
140 api_secret: Option<String>,
141 base_url_override: Option<String>,
142 recv_window: Option<u64>,
143 timeout_secs: Option<u64>,
144 proxy_url: Option<String>,
145 ) -> BinanceSpotHttpResult<Self> {
146 let RateLimitConfig {
147 default_quota,
148 keyed_quotas,
149 order_keys,
150 } = Self::rate_limit_config();
151
152 let credential = match (api_key, api_secret) {
153 (Some(key), Some(secret)) => Some(Credential::new(key, secret)),
154 (None, None) => None,
155 _ => return Err(BinanceSpotHttpError::MissingCredentials),
156 };
157
158 let base_url = base_url_override.unwrap_or_else(|| {
159 get_http_base_url(BinanceProductType::Spot, environment).to_string()
160 });
161
162 let headers = Self::default_headers(&credential);
163
164 let client = HttpClient::new(
165 headers,
166 vec!["X-MBX-APIKEY".to_string()],
167 keyed_quotas,
168 default_quota,
169 timeout_secs,
170 proxy_url,
171 )?;
172
173 Ok(Self {
174 client,
175 base_url,
176 credential,
177 recv_window,
178 order_rate_keys: order_keys,
179 })
180 }
181
182 #[must_use]
184 pub const fn schema_id() -> u16 {
185 SBE_SCHEMA_ID
186 }
187
188 #[must_use]
190 pub const fn schema_version() -> u16 {
191 SBE_SCHEMA_VERSION
192 }
193
194 pub async fn get<P>(&self, path: &str, params: Option<&P>) -> BinanceSpotHttpResult<Vec<u8>>
196 where
197 P: Serialize + ?Sized,
198 {
199 self.request(Method::GET, path, params, false, false).await
200 }
201
202 pub async fn get_signed<P>(
204 &self,
205 path: &str,
206 params: Option<&P>,
207 ) -> BinanceSpotHttpResult<Vec<u8>>
208 where
209 P: Serialize + ?Sized,
210 {
211 self.request(Method::GET, path, params, true, false).await
212 }
213
214 async fn request<P>(
215 &self,
216 method: Method,
217 path: &str,
218 params: Option<&P>,
219 signed: bool,
220 use_order_quota: bool,
221 ) -> BinanceSpotHttpResult<Vec<u8>>
222 where
223 P: Serialize + ?Sized,
224 {
225 let mut query = params
226 .map(serde_urlencoded::to_string)
227 .transpose()
228 .map_err(|e| BinanceSpotHttpError::ValidationError(e.to_string()))?
229 .unwrap_or_default();
230
231 let mut headers = HashMap::new();
232 if signed {
233 let cred = self
234 .credential
235 .as_ref()
236 .ok_or(BinanceSpotHttpError::MissingCredentials)?;
237
238 if !query.is_empty() {
239 query.push('&');
240 }
241
242 let timestamp = Utc::now().timestamp_millis();
243 query.push_str(&format!("timestamp={timestamp}"));
244
245 if let Some(recv_window) = self.recv_window {
246 query.push_str(&format!("&recvWindow={recv_window}"));
247 }
248
249 let signature = cred.sign(&query);
250 query.push_str(&format!("&signature={signature}"));
251 headers.insert("X-MBX-APIKEY".to_string(), cred.api_key().to_string());
252 }
253
254 let url = self.build_url(path, &query);
255 let keys = self.rate_limit_keys(use_order_quota);
256
257 let response = self
258 .client
259 .request(
260 method,
261 url,
262 None::<&HashMap<String, Vec<String>>>,
263 Some(headers),
264 None,
265 None,
266 Some(keys),
267 )
268 .await?;
269
270 if !response.status.is_success() {
271 return self.parse_error_response(response);
272 }
273
274 Ok(response.body.to_vec())
275 }
276
277 fn build_url(&self, path: &str, query: &str) -> String {
278 let normalized_path = if path.starts_with('/') {
279 path.to_string()
280 } else {
281 format!("/{path}")
282 };
283
284 let mut url = format!("{}{}{}", self.base_url, SPOT_API_PATH, normalized_path);
285 if !query.is_empty() {
286 url.push('?');
287 url.push_str(query);
288 }
289 url
290 }
291
292 fn rate_limit_keys(&self, use_orders: bool) -> Vec<String> {
293 if use_orders {
294 let mut keys = Vec::with_capacity(1 + self.order_rate_keys.len());
295 keys.push(BINANCE_GLOBAL_RATE_KEY.to_string());
296 keys.extend(self.order_rate_keys.iter().cloned());
297 keys
298 } else {
299 vec![BINANCE_GLOBAL_RATE_KEY.to_string()]
300 }
301 }
302
303 fn parse_error_response<T>(&self, response: HttpResponse) -> BinanceSpotHttpResult<T> {
304 let status = response.status.as_u16();
305 let body = &response.body;
306
307 if let Ok(body_str) = std::str::from_utf8(body)
309 && let Ok(err) = serde_json::from_str::<BinanceErrorResponse>(body_str)
310 {
311 return Err(BinanceSpotHttpError::BinanceError {
312 code: err.code,
313 message: err.msg,
314 });
315 }
316
317 if let Some((code, message)) = Self::try_decode_sbe_error(body) {
319 return Err(BinanceSpotHttpError::BinanceError {
320 code: code.into(),
321 message,
322 });
323 }
324
325 Err(BinanceSpotHttpError::UnexpectedStatus {
326 status,
327 body: hex::encode(body),
328 })
329 }
330
331 fn try_decode_sbe_error(body: &[u8]) -> Option<(i16, String)> {
335 const HEADER_LEN: usize = 8;
336 if body.len() < HEADER_LEN + error_response_codec::SBE_BLOCK_LENGTH as usize {
337 return None;
338 }
339
340 let buf = ReadBuf::new(body);
341
342 let header = MessageHeaderDecoder::default().wrap(buf, 0);
344 if header.template_id() != error_response_codec::SBE_TEMPLATE_ID {
345 return None;
346 }
347
348 let mut decoder = ErrorResponseDecoder::default().header(header, 0);
350 let code = decoder.code();
351
352 let msg_coords = decoder.msg_decoder();
354 let msg_bytes = decoder.msg_slice(msg_coords);
355 let message = String::from_utf8_lossy(msg_bytes).into_owned();
356
357 Some((code, message))
358 }
359
360 fn default_headers(credential: &Option<Credential>) -> HashMap<String, String> {
361 let mut headers = HashMap::new();
362 headers.insert("User-Agent".to_string(), NAUTILUS_USER_AGENT.to_string());
363 headers.insert("Accept".to_string(), "application/sbe".to_string());
364 headers.insert("X-MBX-SBE".to_string(), SBE_SCHEMA_HEADER.to_string());
365 if let Some(cred) = credential {
366 headers.insert("X-MBX-APIKEY".to_string(), cred.api_key().to_string());
367 }
368 headers
369 }
370
371 fn rate_limit_config() -> RateLimitConfig {
372 let quotas = BINANCE_SPOT_RATE_LIMITS;
373 let mut keyed = Vec::new();
374 let mut order_keys = Vec::new();
375 let mut default = None;
376
377 for quota in quotas {
378 if let Some(q) = Self::quota_from(quota) {
379 match quota.rate_limit_type {
380 BinanceRateLimitType::RequestWeight if default.is_none() => {
381 default = Some(q);
382 }
383 BinanceRateLimitType::Orders => {
384 let key = format!("{}:{:?}", BINANCE_ORDERS_RATE_KEY, quota.interval);
385 order_keys.push(key.clone());
386 keyed.push((key, q));
387 }
388 _ => {}
389 }
390 }
391 }
392
393 let default_quota =
394 default.unwrap_or_else(|| Quota::per_second(NonZeroU32::new(10).unwrap()));
395
396 keyed.push((BINANCE_GLOBAL_RATE_KEY.to_string(), default_quota));
397
398 RateLimitConfig {
399 default_quota: Some(default_quota),
400 keyed_quotas: keyed,
401 order_keys,
402 }
403 }
404
405 fn quota_from(quota: &BinanceRateLimitQuota) -> Option<Quota> {
406 let burst = NonZeroU32::new(quota.limit)?;
407 match quota.interval {
408 BinanceRateLimitInterval::Second => Some(Quota::per_second(burst)),
409 BinanceRateLimitInterval::Minute => Some(Quota::per_minute(burst)),
410 BinanceRateLimitInterval::Day => {
411 Quota::with_period(std::time::Duration::from_secs(86_400))
412 .map(|q| q.allow_burst(burst))
413 }
414 }
415 }
416
417 pub async fn ping(&self) -> BinanceSpotHttpResult<()> {
423 let bytes = self.get("ping", None::<&()>).await?;
424 parse::decode_ping(&bytes)?;
425 Ok(())
426 }
427
428 pub async fn server_time(&self) -> BinanceSpotHttpResult<i64> {
436 let bytes = self.get("time", None::<&()>).await?;
437 let timestamp = parse::decode_server_time(&bytes)?;
438 Ok(timestamp)
439 }
440
441 pub async fn exchange_info(
447 &self,
448 ) -> BinanceSpotHttpResult<super::models::BinanceExchangeInfoSbe> {
449 let bytes = self.get("exchangeInfo", None::<&()>).await?;
450 let info = parse::decode_exchange_info(&bytes)?;
451 Ok(info)
452 }
453
454 pub async fn depth(&self, params: &DepthParams) -> BinanceSpotHttpResult<BinanceDepth> {
460 let bytes = self.get("depth", Some(params)).await?;
461 let depth = parse::decode_depth(&bytes)?;
462 Ok(depth)
463 }
464
465 pub async fn trades(
471 &self,
472 symbol: &str,
473 limit: Option<u32>,
474 ) -> BinanceSpotHttpResult<BinanceTrades> {
475 let params = TradesParams {
476 symbol: symbol.to_string(),
477 limit,
478 };
479 let bytes = self.get("trades", Some(¶ms)).await?;
480 let trades = parse::decode_trades(&bytes)?;
481 Ok(trades)
482 }
483
484 pub async fn klines(
490 &self,
491 symbol: &str,
492 interval: &str,
493 start_time: Option<i64>,
494 end_time: Option<i64>,
495 limit: Option<u32>,
496 ) -> BinanceSpotHttpResult<BinanceKlines> {
497 let params = KlinesParams {
498 symbol: symbol.to_string(),
499 interval: interval.to_string(),
500 start_time,
501 end_time,
502 time_zone: None,
503 limit,
504 };
505 let bytes = self.get("klines", Some(¶ms)).await?;
506 let klines = parse::decode_klines(&bytes)?;
507 Ok(klines)
508 }
509
510 pub async fn account(
516 &self,
517 params: &AccountInfoParams,
518 ) -> BinanceSpotHttpResult<BinanceAccountInfo> {
519 let bytes = self.get_signed("account", Some(params)).await?;
520 let response = parse::decode_account(&bytes)?;
521 Ok(response)
522 }
523
524 pub async fn account_trades(
530 &self,
531 symbol: &str,
532 order_id: Option<i64>,
533 start_time: Option<i64>,
534 end_time: Option<i64>,
535 limit: Option<u32>,
536 ) -> BinanceSpotHttpResult<Vec<BinanceAccountTrade>> {
537 let params = AccountTradesParams {
538 symbol: symbol.to_string(),
539 order_id,
540 start_time,
541 end_time,
542 from_id: None,
543 limit,
544 };
545 let bytes = self.get_signed("myTrades", Some(¶ms)).await?;
546 let response = parse::decode_account_trades(&bytes)?;
547 Ok(response)
548 }
549
550 pub async fn query_order(
558 &self,
559 symbol: &str,
560 order_id: Option<i64>,
561 client_order_id: Option<&str>,
562 ) -> BinanceSpotHttpResult<BinanceOrderResponse> {
563 let params = QueryOrderParams {
564 symbol: symbol.to_string(),
565 order_id,
566 orig_client_order_id: client_order_id.map(|s| s.to_string()),
567 };
568 let bytes = self.get_signed("order", Some(¶ms)).await?;
569 let response = parse::decode_order(&bytes)?;
570 Ok(response)
571 }
572
573 pub async fn open_orders(
579 &self,
580 symbol: Option<&str>,
581 ) -> BinanceSpotHttpResult<Vec<BinanceOrderResponse>> {
582 let params = OpenOrdersParams {
583 symbol: symbol.map(|s| s.to_string()),
584 };
585 let bytes = self.get_signed("openOrders", Some(¶ms)).await?;
586 let response = parse::decode_orders(&bytes)?;
587 Ok(response)
588 }
589
590 pub async fn all_orders(
596 &self,
597 symbol: &str,
598 start_time: Option<i64>,
599 end_time: Option<i64>,
600 limit: Option<u32>,
601 ) -> BinanceSpotHttpResult<Vec<BinanceOrderResponse>> {
602 let params = AllOrdersParams {
603 symbol: symbol.to_string(),
604 order_id: None,
605 start_time,
606 end_time,
607 limit,
608 };
609 let bytes = self.get_signed("allOrders", Some(¶ms)).await?;
610 let response = parse::decode_orders(&bytes)?;
611 Ok(response)
612 }
613
614 async fn post_order<P>(&self, path: &str, params: Option<&P>) -> BinanceSpotHttpResult<Vec<u8>>
616 where
617 P: Serialize + ?Sized,
618 {
619 self.request(Method::POST, path, params, true, true).await
620 }
621
622 async fn delete_order<P>(
624 &self,
625 path: &str,
626 params: Option<&P>,
627 ) -> BinanceSpotHttpResult<Vec<u8>>
628 where
629 P: Serialize + ?Sized,
630 {
631 self.request(Method::DELETE, path, params, true, true).await
632 }
633
634 #[allow(clippy::too_many_arguments)]
640 pub async fn new_order(
641 &self,
642 symbol: &str,
643 side: BinanceSide,
644 order_type: BinanceSpotOrderType,
645 time_in_force: Option<BinanceTimeInForce>,
646 quantity: Option<&str>,
647 price: Option<&str>,
648 client_order_id: Option<&str>,
649 stop_price: Option<&str>,
650 ) -> BinanceSpotHttpResult<BinanceNewOrderResponse> {
651 let params = NewOrderParams {
652 symbol: symbol.to_string(),
653 side,
654 order_type,
655 time_in_force,
656 quantity: quantity.map(|s| s.to_string()),
657 quote_order_qty: None,
658 price: price.map(|s| s.to_string()),
659 new_client_order_id: client_order_id.map(|s| s.to_string()),
660 stop_price: stop_price.map(|s| s.to_string()),
661 trailing_delta: None,
662 iceberg_qty: None,
663 new_order_resp_type: Some(BinanceOrderResponseType::Full),
664 self_trade_prevention_mode: None,
665 };
666 let bytes = self.post_order("order", Some(¶ms)).await?;
667 let response = parse::decode_new_order_full(&bytes)?;
668 Ok(response)
669 }
670
671 #[allow(clippy::too_many_arguments)]
677 pub async fn cancel_replace_order(
678 &self,
679 symbol: &str,
680 side: BinanceSide,
681 order_type: BinanceSpotOrderType,
682 time_in_force: Option<BinanceTimeInForce>,
683 quantity: Option<&str>,
684 price: Option<&str>,
685 cancel_order_id: Option<i64>,
686 cancel_client_order_id: Option<&str>,
687 new_client_order_id: Option<&str>,
688 ) -> BinanceSpotHttpResult<BinanceNewOrderResponse> {
689 let params = CancelReplaceOrderParams {
690 symbol: symbol.to_string(),
691 side,
692 order_type,
693 cancel_replace_mode: BinanceCancelReplaceMode::StopOnFailure,
694 time_in_force,
695 quantity: quantity.map(|s| s.to_string()),
696 quote_order_qty: None,
697 price: price.map(|s| s.to_string()),
698 cancel_order_id,
699 cancel_orig_client_order_id: cancel_client_order_id.map(|s| s.to_string()),
700 new_client_order_id: new_client_order_id.map(|s| s.to_string()),
701 stop_price: None,
702 trailing_delta: None,
703 iceberg_qty: None,
704 new_order_resp_type: Some(BinanceOrderResponseType::Full),
705 self_trade_prevention_mode: None,
706 };
707 let bytes = self
708 .post_order("order/cancelReplace", Some(¶ms))
709 .await?;
710 let response = parse::decode_new_order_full(&bytes)?;
711 Ok(response)
712 }
713
714 pub async fn cancel_order(
722 &self,
723 symbol: &str,
724 order_id: Option<i64>,
725 client_order_id: Option<&str>,
726 ) -> BinanceSpotHttpResult<BinanceCancelOrderResponse> {
727 let params = match (order_id, client_order_id) {
728 (Some(id), _) => CancelOrderParams::by_order_id(symbol, id),
729 (None, Some(id)) => CancelOrderParams::by_client_order_id(symbol, id.to_string()),
730 (None, None) => {
731 return Err(BinanceSpotHttpError::ValidationError(
732 "Either order_id or client_order_id must be provided".to_string(),
733 ));
734 }
735 };
736 let bytes = self.delete_order("order", Some(¶ms)).await?;
737 let response = parse::decode_cancel_order(&bytes)?;
738 Ok(response)
739 }
740
741 pub async fn cancel_open_orders(
747 &self,
748 symbol: &str,
749 ) -> BinanceSpotHttpResult<Vec<BinanceCancelOrderResponse>> {
750 let params = CancelOpenOrdersParams::new(symbol.to_string());
751 let bytes = self.delete_order("openOrders", Some(¶ms)).await?;
752 let response = parse::decode_cancel_open_orders(&bytes)?;
753 Ok(response)
754 }
755}
756
757#[cfg_attr(
763 feature = "python",
764 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.binance")
765)]
766pub struct BinanceSpotHttpClient {
767 inner: Arc<BinanceRawSpotHttpClient>,
768 instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
769}
770
771impl Clone for BinanceSpotHttpClient {
772 fn clone(&self) -> Self {
773 Self {
774 inner: self.inner.clone(),
775 instruments_cache: self.instruments_cache.clone(),
776 }
777 }
778}
779
780impl Debug for BinanceSpotHttpClient {
781 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
782 f.debug_struct(stringify!(BinanceSpotHttpClient))
783 .field("inner", &self.inner)
784 .field("instruments_cached", &self.instruments_cache.len())
785 .finish()
786 }
787}
788
789impl BinanceSpotHttpClient {
790 pub fn new(
796 environment: BinanceEnvironment,
797 api_key: Option<String>,
798 api_secret: Option<String>,
799 base_url_override: Option<String>,
800 recv_window: Option<u64>,
801 timeout_secs: Option<u64>,
802 proxy_url: Option<String>,
803 ) -> BinanceSpotHttpResult<Self> {
804 let inner = BinanceRawSpotHttpClient::new(
805 environment,
806 api_key,
807 api_secret,
808 base_url_override,
809 recv_window,
810 timeout_secs,
811 proxy_url,
812 )?;
813
814 Ok(Self {
815 inner: Arc::new(inner),
816 instruments_cache: Arc::new(DashMap::new()),
817 })
818 }
819
820 #[must_use]
822 pub fn inner(&self) -> &BinanceRawSpotHttpClient {
823 &self.inner
824 }
825
826 #[must_use]
828 pub const fn schema_id() -> u16 {
829 SBE_SCHEMA_ID
830 }
831
832 #[must_use]
834 pub const fn schema_version() -> u16 {
835 SBE_SCHEMA_VERSION
836 }
837
838 fn generate_ts_init(&self) -> UnixNanos {
840 UnixNanos::from(chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0) as u64)
841 }
842
843 fn instrument_from_cache(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
845 self.instruments_cache
846 .get(&symbol)
847 .map(|entry| entry.value().clone())
848 .ok_or_else(|| anyhow::anyhow!("Instrument {symbol} not in cache"))
849 }
850
851 pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
853 for inst in instruments {
854 self.instruments_cache
855 .insert(inst.raw_symbol().inner(), inst);
856 }
857 }
858
859 pub fn cache_instrument(&self, instrument: InstrumentAny) {
861 self.instruments_cache
862 .insert(instrument.raw_symbol().inner(), instrument);
863 }
864
865 #[must_use]
867 pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
868 self.instruments_cache
869 .get(symbol)
870 .map(|entry| entry.value().clone())
871 }
872
873 pub async fn ping(&self) -> BinanceSpotHttpResult<()> {
879 self.inner.ping().await
880 }
881
882 pub async fn server_time(&self) -> BinanceSpotHttpResult<i64> {
890 self.inner.server_time().await
891 }
892
893 pub async fn exchange_info(
899 &self,
900 ) -> BinanceSpotHttpResult<super::models::BinanceExchangeInfoSbe> {
901 self.inner.exchange_info().await
902 }
903
904 pub async fn request_instruments(&self) -> BinanceSpotHttpResult<Vec<InstrumentAny>> {
913 let info = self.exchange_info().await?;
914 let ts_init = self.generate_ts_init();
915
916 let mut instruments = Vec::with_capacity(info.symbols.len());
917 for symbol in &info.symbols {
918 match parse_spot_instrument_sbe(symbol, ts_init, ts_init) {
919 Ok(instrument) => instruments.push(instrument),
920 Err(e) => {
921 log::debug!(
922 "Skipping symbol during instrument parsing: symbol={}, error={e}",
923 symbol.symbol
924 );
925 }
926 }
927 }
928
929 self.cache_instruments(instruments.clone());
931
932 log::info!("Loaded spot instruments: count={}", instruments.len());
933 Ok(instruments)
934 }
935
936 pub async fn request_trades(
943 &self,
944 instrument_id: InstrumentId,
945 limit: Option<u32>,
946 ) -> anyhow::Result<Vec<TradeTick>> {
947 let symbol = instrument_id.symbol.inner();
948 let instrument = self.instrument_from_cache(symbol)?;
949 let ts_init = self.generate_ts_init();
950
951 let trades = self
952 .inner
953 .trades(symbol.as_str(), limit)
954 .await
955 .map_err(|e| anyhow::anyhow!(e))?;
956
957 parse_spot_trades_sbe(&trades, &instrument, ts_init)
958 }
959
960 pub async fn request_bars(
967 &self,
968 bar_type: BarType,
969 start: Option<DateTime<Utc>>,
970 end: Option<DateTime<Utc>>,
971 limit: Option<u32>,
972 ) -> anyhow::Result<Vec<Bar>> {
973 anyhow::ensure!(
974 bar_type.aggregation_source() == AggregationSource::External,
975 "Only EXTERNAL aggregation is supported"
976 );
977
978 let spec = bar_type.spec();
979 let step = spec.step.get();
980 let interval = match spec.aggregation {
981 BarAggregation::Second => {
982 anyhow::bail!("Binance Spot does not support second-level kline intervals")
983 }
984 BarAggregation::Minute => format!("{step}m"),
985 BarAggregation::Hour => format!("{step}h"),
986 BarAggregation::Day => format!("{step}d"),
987 BarAggregation::Week => format!("{step}w"),
988 BarAggregation::Month => format!("{step}M"),
989 a => anyhow::bail!("Binance does not support {a:?} aggregation"),
990 };
991
992 let symbol = bar_type.instrument_id().symbol;
993 let instrument = self.instrument_from_cache(symbol.inner())?;
994 let ts_init = self.generate_ts_init();
995
996 let klines = self
997 .inner
998 .klines(
999 symbol.as_str(),
1000 &interval,
1001 start.map(|dt| dt.timestamp_millis()),
1002 end.map(|dt| dt.timestamp_millis()),
1003 limit,
1004 )
1005 .await
1006 .map_err(|e| anyhow::anyhow!(e))?;
1007
1008 parse_klines_to_bars(&klines, bar_type, &instrument, ts_init)
1009 }
1010
1011 pub async fn request_account_state(
1017 &self,
1018 params: &AccountInfoParams,
1019 ) -> BinanceSpotHttpResult<BinanceAccountInfo> {
1020 self.inner.account(params).await
1021 }
1022
1023 pub async fn request_order_status(
1032 &self,
1033 account_id: AccountId,
1034 instrument_id: InstrumentId,
1035 venue_order_id: Option<VenueOrderId>,
1036 client_order_id: Option<ClientOrderId>,
1037 ) -> anyhow::Result<OrderStatusReport> {
1038 anyhow::ensure!(
1039 venue_order_id.is_some() || client_order_id.is_some(),
1040 "Either venue_order_id or client_order_id must be provided"
1041 );
1042
1043 let symbol = instrument_id.symbol.inner();
1044 let instrument = self.instrument_from_cache(symbol)?;
1045 let ts_init = self.generate_ts_init();
1046
1047 let order_id = venue_order_id
1048 .map(|id| id.inner().parse::<i64>())
1049 .transpose()
1050 .map_err(|_| anyhow::anyhow!("Invalid venue order ID"))?;
1051
1052 let client_id_str = client_order_id.map(|id| id.to_string());
1053
1054 let order = self
1055 .inner
1056 .query_order(symbol.as_str(), order_id, client_id_str.as_deref())
1057 .await
1058 .map_err(|e| anyhow::anyhow!(e))?;
1059
1060 parse_order_status_report_sbe(&order, account_id, &instrument, ts_init)
1061 }
1062
1063 #[allow(clippy::too_many_arguments)]
1073 pub async fn request_order_status_reports(
1074 &self,
1075 account_id: AccountId,
1076 instrument_id: Option<InstrumentId>,
1077 start: Option<DateTime<Utc>>,
1078 end: Option<DateTime<Utc>>,
1079 open_only: bool,
1080 limit: Option<u32>,
1081 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1082 let ts_init = self.generate_ts_init();
1083 let symbol = instrument_id.map(|id| id.symbol.to_string());
1084
1085 let orders = if open_only {
1086 self.inner
1087 .open_orders(symbol.as_deref())
1088 .await
1089 .map_err(|e| anyhow::anyhow!(e))?
1090 } else {
1091 let symbol = symbol
1092 .ok_or_else(|| anyhow::anyhow!("instrument_id is required when open_only=false"))?;
1093 self.inner
1094 .all_orders(
1095 &symbol,
1096 start.map(|dt| dt.timestamp_millis()),
1097 end.map(|dt| dt.timestamp_millis()),
1098 limit,
1099 )
1100 .await
1101 .map_err(|e| anyhow::anyhow!(e))?
1102 };
1103
1104 orders
1105 .iter()
1106 .map(|order| {
1107 let symbol = Ustr::from(&order.symbol);
1108 let instrument = self.instrument_from_cache(symbol)?;
1109 parse_order_status_report_sbe(order, account_id, &instrument, ts_init)
1110 })
1111 .collect()
1112 }
1113
1114 #[allow(clippy::too_many_arguments)]
1121 pub async fn request_fill_reports(
1122 &self,
1123 account_id: AccountId,
1124 instrument_id: InstrumentId,
1125 venue_order_id: Option<VenueOrderId>,
1126 start: Option<DateTime<Utc>>,
1127 end: Option<DateTime<Utc>>,
1128 limit: Option<u32>,
1129 ) -> anyhow::Result<Vec<FillReport>> {
1130 let ts_init = self.generate_ts_init();
1131 let symbol = instrument_id.symbol.inner();
1132
1133 let order_id = venue_order_id
1134 .map(|id| id.inner().parse::<i64>())
1135 .transpose()
1136 .map_err(|_| anyhow::anyhow!("Invalid venue order ID"))?;
1137
1138 let trades = self
1139 .inner
1140 .account_trades(
1141 symbol.as_str(),
1142 order_id,
1143 start.map(|dt| dt.timestamp_millis()),
1144 end.map(|dt| dt.timestamp_millis()),
1145 limit,
1146 )
1147 .await
1148 .map_err(|e| anyhow::anyhow!(e))?;
1149
1150 trades
1151 .iter()
1152 .map(|trade| {
1153 let symbol = Ustr::from(&trade.symbol);
1154 let instrument = self.instrument_from_cache(symbol)?;
1155 let commission_currency = get_currency(&trade.commission_asset);
1156 parse_fill_report_sbe(trade, account_id, &instrument, commission_currency, ts_init)
1157 })
1158 .collect()
1159 }
1160
1161 #[allow(clippy::too_many_arguments)]
1174 pub async fn submit_order(
1175 &self,
1176 account_id: AccountId,
1177 instrument_id: InstrumentId,
1178 client_order_id: ClientOrderId,
1179 order_side: OrderSide,
1180 order_type: OrderType,
1181 quantity: Quantity,
1182 time_in_force: TimeInForce,
1183 price: Option<Price>,
1184 trigger_price: Option<Price>,
1185 post_only: bool,
1186 ) -> anyhow::Result<OrderStatusReport> {
1187 let symbol = instrument_id.symbol.inner();
1188 let instrument = self.instrument_from_cache(symbol)?;
1189 let ts_init = self.generate_ts_init();
1190
1191 let binance_side = BinanceSide::try_from(order_side)?;
1192 let binance_order_type = order_type_to_binance_spot(order_type, post_only)?;
1193
1194 let is_stop_order = matches!(order_type, OrderType::StopMarket | OrderType::StopLimit);
1196 if is_stop_order && trigger_price.is_none() {
1197 anyhow::bail!("Stop orders require a trigger price");
1198 }
1199
1200 let requires_price = matches!(
1202 binance_order_type,
1203 BinanceSpotOrderType::Limit
1204 | BinanceSpotOrderType::StopLossLimit
1205 | BinanceSpotOrderType::TakeProfitLimit
1206 | BinanceSpotOrderType::LimitMaker
1207 );
1208 if requires_price && price.is_none() {
1209 anyhow::bail!("{binance_order_type:?} orders require a price");
1210 }
1211
1212 let supports_tif = matches!(
1214 binance_order_type,
1215 BinanceSpotOrderType::Limit
1216 | BinanceSpotOrderType::StopLossLimit
1217 | BinanceSpotOrderType::TakeProfitLimit
1218 );
1219 let binance_tif = if supports_tif {
1220 Some(BinanceTimeInForce::try_from(time_in_force)?)
1221 } else {
1222 None
1223 };
1224
1225 let qty_str = quantity.to_string();
1226 let price_str = price.map(|p| p.to_string());
1227 let stop_price_str = trigger_price.map(|p| p.to_string());
1228 let client_id_str = client_order_id.to_string();
1229
1230 let response = self
1231 .inner
1232 .new_order(
1233 symbol.as_str(),
1234 binance_side,
1235 binance_order_type,
1236 binance_tif,
1237 Some(&qty_str),
1238 price_str.as_deref(),
1239 Some(&client_id_str),
1240 stop_price_str.as_deref(),
1241 )
1242 .await
1243 .map_err(|e| anyhow::anyhow!(e))?;
1244
1245 parse_new_order_response_sbe(&response, account_id, &instrument, ts_init)
1246 }
1247
1248 #[allow(clippy::too_many_arguments)]
1257 pub async fn modify_order(
1258 &self,
1259 account_id: AccountId,
1260 instrument_id: InstrumentId,
1261 venue_order_id: VenueOrderId,
1262 client_order_id: ClientOrderId,
1263 order_side: OrderSide,
1264 order_type: OrderType,
1265 quantity: Quantity,
1266 time_in_force: TimeInForce,
1267 price: Option<Price>,
1268 ) -> anyhow::Result<OrderStatusReport> {
1269 let symbol = instrument_id.symbol.inner();
1270 let instrument = self.instrument_from_cache(symbol)?;
1271 let ts_init = self.generate_ts_init();
1272
1273 let binance_side = BinanceSide::try_from(order_side)?;
1274 let binance_order_type = order_type_to_binance_spot(order_type, false)?;
1275 let binance_tif = BinanceTimeInForce::try_from(time_in_force)?;
1276
1277 let cancel_order_id: i64 = venue_order_id
1278 .inner()
1279 .parse()
1280 .map_err(|_| anyhow::anyhow!("Invalid venue order ID: {venue_order_id}"))?;
1281
1282 let qty_str = quantity.to_string();
1283 let price_str = price.map(|p| p.to_string());
1284 let client_id_str = client_order_id.to_string();
1285
1286 let response = self
1287 .inner
1288 .cancel_replace_order(
1289 symbol.as_str(),
1290 binance_side,
1291 binance_order_type,
1292 Some(binance_tif),
1293 Some(&qty_str),
1294 price_str.as_deref(),
1295 Some(cancel_order_id),
1296 None,
1297 Some(&client_id_str),
1298 )
1299 .await
1300 .map_err(|e| anyhow::anyhow!(e))?;
1301
1302 parse_new_order_response_sbe(&response, account_id, &instrument, ts_init)
1303 }
1304
1305 pub async fn cancel_order(
1313 &self,
1314 instrument_id: InstrumentId,
1315 venue_order_id: Option<VenueOrderId>,
1316 client_order_id: Option<ClientOrderId>,
1317 ) -> anyhow::Result<VenueOrderId> {
1318 let symbol = instrument_id.symbol.inner();
1319
1320 let order_id = venue_order_id
1321 .map(|id| id.inner().parse::<i64>())
1322 .transpose()
1323 .map_err(|_| anyhow::anyhow!("Invalid venue order ID"))?;
1324
1325 let client_id_str = client_order_id.map(|id| id.to_string());
1326
1327 let response = self
1328 .inner
1329 .cancel_order(symbol.as_str(), order_id, client_id_str.as_deref())
1330 .await
1331 .map_err(|e| anyhow::anyhow!(e))?;
1332
1333 Ok(VenueOrderId::new(response.order_id.to_string()))
1334 }
1335
1336 pub async fn cancel_all_orders(
1344 &self,
1345 instrument_id: InstrumentId,
1346 ) -> anyhow::Result<Vec<(VenueOrderId, ClientOrderId)>> {
1347 let symbol = instrument_id.symbol.inner();
1348
1349 let responses = self
1350 .inner
1351 .cancel_open_orders(symbol.as_str())
1352 .await
1353 .map_err(|e| anyhow::anyhow!(e))?;
1354
1355 Ok(responses
1356 .into_iter()
1357 .map(|r| {
1358 (
1359 VenueOrderId::new(r.order_id.to_string()),
1360 ClientOrderId::new(&r.orig_client_order_id),
1361 )
1362 })
1363 .collect())
1364 }
1365}
1366
1367#[cfg(test)]
1368mod tests {
1369 use rstest::rstest;
1370
1371 use super::*;
1372
1373 #[rstest]
1374 fn test_schema_constants() {
1375 assert_eq!(BinanceRawSpotHttpClient::schema_id(), 3);
1376 assert_eq!(BinanceRawSpotHttpClient::schema_version(), 2);
1377 assert_eq!(BinanceSpotHttpClient::schema_id(), 3);
1378 assert_eq!(BinanceSpotHttpClient::schema_version(), 2);
1379 }
1380
1381 #[rstest]
1382 fn test_sbe_schema_header() {
1383 assert_eq!(SBE_SCHEMA_HEADER, "3:2");
1384 }
1385
1386 #[rstest]
1387 fn test_default_headers_include_sbe() {
1388 let headers = BinanceRawSpotHttpClient::default_headers(&None);
1389
1390 assert_eq!(headers.get("Accept"), Some(&"application/sbe".to_string()));
1391 assert_eq!(headers.get("X-MBX-SBE"), Some(&"3:2".to_string()));
1392 }
1393
1394 #[rstest]
1395 fn test_rate_limit_config() {
1396 let config = BinanceRawSpotHttpClient::rate_limit_config();
1397
1398 assert!(config.default_quota.is_some());
1399 assert_eq!(config.order_keys.len(), 2);
1401 }
1402}