nautilus_binance/spot/http/
client.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Binance Spot HTTP client with SBE encoding.
17//!
18//! This client communicates with Binance Spot REST API using SBE (Simple Binary
19//! Encoding) for all request/response payloads, providing microsecond timestamp
20//! precision and reduced latency compared to JSON.
21//!
22//! ## Architecture
23//!
24//! Two-layer client pattern:
25//! - [`BinanceRawSpotHttpClient`]: Low-level API methods returning raw bytes.
26//! - [`BinanceSpotHttpClient`]: High-level methods with SBE decoding.
27//!
28//! ## SBE Headers
29//!
30//! All requests include:
31//! - `Accept: application/sbe`
32//! - `X-MBX-SBE: 3:2` (schema ID:version)
33
34use std::{collections::HashMap, fmt::Debug, num::NonZeroU32, sync::Arc};
35
36use chrono::{DateTime, Utc};
37use dashmap::DashMap;
38use nautilus_core::{consts::NAUTILUS_USER_AGENT, nanos::UnixNanos};
39use nautilus_model::{
40    data::{Bar, BarType, TradeTick},
41    enums::{AggregationSource, BarAggregation, OrderSide, OrderType, TimeInForce},
42    identifiers::{AccountId, ClientOrderId, InstrumentId, VenueOrderId},
43    instruments::{Instrument, any::InstrumentAny},
44    reports::{FillReport, OrderStatusReport},
45    types::{Price, Quantity},
46};
47use nautilus_network::{
48    http::{HttpClient, HttpResponse, Method},
49    ratelimiter::quota::Quota,
50};
51use serde::Serialize;
52use ustr::Ustr;
53
54use super::{
55    error::{BinanceSpotHttpError, BinanceSpotHttpResult},
56    models::{
57        BinanceAccountInfo, BinanceAccountTrade, BinanceCancelOrderResponse, BinanceDepth,
58        BinanceKlines, BinanceNewOrderResponse, BinanceOrderResponse, BinanceTrades,
59    },
60    parse,
61    query::{
62        AccountInfoParams, AccountTradesParams, AllOrdersParams, CancelOpenOrdersParams,
63        CancelOrderParams, CancelReplaceOrderParams, DepthParams, KlinesParams, NewOrderParams,
64        OpenOrdersParams, QueryOrderParams, TradesParams,
65    },
66};
67use crate::{
68    common::{
69        consts::{BINANCE_SPOT_RATE_LIMITS, BinanceRateLimitQuota},
70        credential::Credential,
71        enums::{
72            BinanceEnvironment, BinanceProductType, BinanceRateLimitInterval, BinanceRateLimitType,
73            BinanceSide, BinanceTimeInForce,
74        },
75        models::BinanceErrorResponse,
76        parse::{
77            get_currency, parse_fill_report_sbe, parse_klines_to_bars,
78            parse_new_order_response_sbe, parse_order_status_report_sbe, parse_spot_instrument_sbe,
79            parse_spot_trades_sbe,
80        },
81        sbe::spot::{
82            ReadBuf, SBE_SCHEMA_ID, SBE_SCHEMA_VERSION,
83            error_response_codec::{self, ErrorResponseDecoder},
84            message_header_codec::MessageHeaderDecoder,
85        },
86        urls::get_http_base_url,
87    },
88    spot::enums::{
89        BinanceCancelReplaceMode, BinanceOrderResponseType, BinanceSpotOrderType,
90        order_type_to_binance_spot,
91    },
92};
93
94/// SBE schema header value for Spot API.
95pub const SBE_SCHEMA_HEADER: &str = "3:2";
96
97/// Binance Spot API path.
98const SPOT_API_PATH: &str = "/api/v3";
99
100/// Global rate limit key.
101const BINANCE_GLOBAL_RATE_KEY: &str = "binance:spot:global";
102
103/// Orders rate limit key prefix.
104const BINANCE_ORDERS_RATE_KEY: &str = "binance:spot:orders";
105
106struct RateLimitConfig {
107    default_quota: Option<Quota>,
108    keyed_quotas: Vec<(String, Quota)>,
109    order_keys: Vec<String>,
110}
111
112/// Low-level HTTP client for Binance Spot REST API with SBE encoding.
113///
114/// Handles:
115/// - Base URL resolution by environment.
116/// - Optional HMAC SHA256 signing for private endpoints.
117/// - Rate limiting using Spot API quotas.
118/// - SBE decoding to Binance-specific response types.
119///
120/// Methods are named to match Binance API endpoints and return
121/// venue-specific types (decoded from SBE).
122#[derive(Debug, Clone)]
123pub struct BinanceRawSpotHttpClient {
124    client: HttpClient,
125    base_url: String,
126    credential: Option<Credential>,
127    recv_window: Option<u64>,
128    order_rate_keys: Vec<String>,
129}
130
131impl BinanceRawSpotHttpClient {
132    /// Creates a new Binance Spot raw HTTP client.
133    ///
134    /// # Errors
135    ///
136    /// Returns an error if the underlying [`HttpClient`] fails to build.
137    pub fn new(
138        environment: BinanceEnvironment,
139        api_key: Option<String>,
140        api_secret: Option<String>,
141        base_url_override: Option<String>,
142        recv_window: Option<u64>,
143        timeout_secs: Option<u64>,
144        proxy_url: Option<String>,
145    ) -> BinanceSpotHttpResult<Self> {
146        let RateLimitConfig {
147            default_quota,
148            keyed_quotas,
149            order_keys,
150        } = Self::rate_limit_config();
151
152        let credential = match (api_key, api_secret) {
153            (Some(key), Some(secret)) => Some(Credential::new(key, secret)),
154            (None, None) => None,
155            _ => return Err(BinanceSpotHttpError::MissingCredentials),
156        };
157
158        let base_url = base_url_override.unwrap_or_else(|| {
159            get_http_base_url(BinanceProductType::Spot, environment).to_string()
160        });
161
162        let headers = Self::default_headers(&credential);
163
164        let client = HttpClient::new(
165            headers,
166            vec!["X-MBX-APIKEY".to_string()],
167            keyed_quotas,
168            default_quota,
169            timeout_secs,
170            proxy_url,
171        )?;
172
173        Ok(Self {
174            client,
175            base_url,
176            credential,
177            recv_window,
178            order_rate_keys: order_keys,
179        })
180    }
181
182    /// Returns the SBE schema ID.
183    #[must_use]
184    pub const fn schema_id() -> u16 {
185        SBE_SCHEMA_ID
186    }
187
188    /// Returns the SBE schema version.
189    #[must_use]
190    pub const fn schema_version() -> u16 {
191        SBE_SCHEMA_VERSION
192    }
193
194    /// Performs a GET request and returns raw response bytes.
195    pub async fn get<P>(&self, path: &str, params: Option<&P>) -> BinanceSpotHttpResult<Vec<u8>>
196    where
197        P: Serialize + ?Sized,
198    {
199        self.request(Method::GET, path, params, false, false).await
200    }
201
202    /// Performs a signed GET request and returns raw response bytes.
203    pub async fn get_signed<P>(
204        &self,
205        path: &str,
206        params: Option<&P>,
207    ) -> BinanceSpotHttpResult<Vec<u8>>
208    where
209        P: Serialize + ?Sized,
210    {
211        self.request(Method::GET, path, params, true, false).await
212    }
213
214    async fn request<P>(
215        &self,
216        method: Method,
217        path: &str,
218        params: Option<&P>,
219        signed: bool,
220        use_order_quota: bool,
221    ) -> BinanceSpotHttpResult<Vec<u8>>
222    where
223        P: Serialize + ?Sized,
224    {
225        let mut query = params
226            .map(serde_urlencoded::to_string)
227            .transpose()
228            .map_err(|e| BinanceSpotHttpError::ValidationError(e.to_string()))?
229            .unwrap_or_default();
230
231        let mut headers = HashMap::new();
232        if signed {
233            let cred = self
234                .credential
235                .as_ref()
236                .ok_or(BinanceSpotHttpError::MissingCredentials)?;
237
238            if !query.is_empty() {
239                query.push('&');
240            }
241
242            let timestamp = Utc::now().timestamp_millis();
243            query.push_str(&format!("timestamp={timestamp}"));
244
245            if let Some(recv_window) = self.recv_window {
246                query.push_str(&format!("&recvWindow={recv_window}"));
247            }
248
249            let signature = cred.sign(&query);
250            query.push_str(&format!("&signature={signature}"));
251            headers.insert("X-MBX-APIKEY".to_string(), cred.api_key().to_string());
252        }
253
254        let url = self.build_url(path, &query);
255        let keys = self.rate_limit_keys(use_order_quota);
256
257        let response = self
258            .client
259            .request(
260                method,
261                url,
262                None::<&HashMap<String, Vec<String>>>,
263                Some(headers),
264                None,
265                None,
266                Some(keys),
267            )
268            .await?;
269
270        if !response.status.is_success() {
271            return self.parse_error_response(response);
272        }
273
274        Ok(response.body.to_vec())
275    }
276
277    fn build_url(&self, path: &str, query: &str) -> String {
278        let normalized_path = if path.starts_with('/') {
279            path.to_string()
280        } else {
281            format!("/{path}")
282        };
283
284        let mut url = format!("{}{}{}", self.base_url, SPOT_API_PATH, normalized_path);
285        if !query.is_empty() {
286            url.push('?');
287            url.push_str(query);
288        }
289        url
290    }
291
292    fn rate_limit_keys(&self, use_orders: bool) -> Vec<String> {
293        if use_orders {
294            let mut keys = Vec::with_capacity(1 + self.order_rate_keys.len());
295            keys.push(BINANCE_GLOBAL_RATE_KEY.to_string());
296            keys.extend(self.order_rate_keys.iter().cloned());
297            keys
298        } else {
299            vec![BINANCE_GLOBAL_RATE_KEY.to_string()]
300        }
301    }
302
303    fn parse_error_response<T>(&self, response: HttpResponse) -> BinanceSpotHttpResult<T> {
304        let status = response.status.as_u16();
305        let body = &response.body;
306
307        // Binance may return JSON errors even when SBE was requested
308        if let Ok(body_str) = std::str::from_utf8(body)
309            && let Ok(err) = serde_json::from_str::<BinanceErrorResponse>(body_str)
310        {
311            return Err(BinanceSpotHttpError::BinanceError {
312                code: err.code,
313                message: err.msg,
314            });
315        }
316
317        // Try to decode SBE error response
318        if let Some((code, message)) = Self::try_decode_sbe_error(body) {
319            return Err(BinanceSpotHttpError::BinanceError {
320                code: code.into(),
321                message,
322            });
323        }
324
325        Err(BinanceSpotHttpError::UnexpectedStatus {
326            status,
327            body: hex::encode(body),
328        })
329    }
330
331    /// Attempts to decode an SBE error response.
332    ///
333    /// Returns Some((code, message)) if successfully decoded, None otherwise.
334    fn try_decode_sbe_error(body: &[u8]) -> Option<(i16, String)> {
335        const HEADER_LEN: usize = 8;
336        if body.len() < HEADER_LEN + error_response_codec::SBE_BLOCK_LENGTH as usize {
337            return None;
338        }
339
340        let buf = ReadBuf::new(body);
341
342        // Decode message header
343        let header = MessageHeaderDecoder::default().wrap(buf, 0);
344        if header.template_id() != error_response_codec::SBE_TEMPLATE_ID {
345            return None;
346        }
347
348        // Decode error response
349        let mut decoder = ErrorResponseDecoder::default().header(header, 0);
350        let code = decoder.code();
351
352        // Decode the message string (VAR_DATA with 2-byte length prefix)
353        let msg_coords = decoder.msg_decoder();
354        let msg_bytes = decoder.msg_slice(msg_coords);
355        let message = String::from_utf8_lossy(msg_bytes).into_owned();
356
357        Some((code, message))
358    }
359
360    fn default_headers(credential: &Option<Credential>) -> HashMap<String, String> {
361        let mut headers = HashMap::new();
362        headers.insert("User-Agent".to_string(), NAUTILUS_USER_AGENT.to_string());
363        headers.insert("Accept".to_string(), "application/sbe".to_string());
364        headers.insert("X-MBX-SBE".to_string(), SBE_SCHEMA_HEADER.to_string());
365        if let Some(cred) = credential {
366            headers.insert("X-MBX-APIKEY".to_string(), cred.api_key().to_string());
367        }
368        headers
369    }
370
371    fn rate_limit_config() -> RateLimitConfig {
372        let quotas = BINANCE_SPOT_RATE_LIMITS;
373        let mut keyed = Vec::new();
374        let mut order_keys = Vec::new();
375        let mut default = None;
376
377        for quota in quotas {
378            if let Some(q) = Self::quota_from(quota) {
379                match quota.rate_limit_type {
380                    BinanceRateLimitType::RequestWeight if default.is_none() => {
381                        default = Some(q);
382                    }
383                    BinanceRateLimitType::Orders => {
384                        let key = format!("{}:{:?}", BINANCE_ORDERS_RATE_KEY, quota.interval);
385                        order_keys.push(key.clone());
386                        keyed.push((key, q));
387                    }
388                    _ => {}
389                }
390            }
391        }
392
393        let default_quota =
394            default.unwrap_or_else(|| Quota::per_second(NonZeroU32::new(10).unwrap()));
395
396        keyed.push((BINANCE_GLOBAL_RATE_KEY.to_string(), default_quota));
397
398        RateLimitConfig {
399            default_quota: Some(default_quota),
400            keyed_quotas: keyed,
401            order_keys,
402        }
403    }
404
405    fn quota_from(quota: &BinanceRateLimitQuota) -> Option<Quota> {
406        let burst = NonZeroU32::new(quota.limit)?;
407        match quota.interval {
408            BinanceRateLimitInterval::Second => Some(Quota::per_second(burst)),
409            BinanceRateLimitInterval::Minute => Some(Quota::per_minute(burst)),
410            BinanceRateLimitInterval::Day => {
411                Quota::with_period(std::time::Duration::from_secs(86_400))
412                    .map(|q| q.allow_burst(burst))
413            }
414        }
415    }
416
417    /// Tests connectivity to the API.
418    ///
419    /// # Errors
420    ///
421    /// Returns an error if the request fails or SBE decoding fails.
422    pub async fn ping(&self) -> BinanceSpotHttpResult<()> {
423        let bytes = self.get("ping", None::<&()>).await?;
424        parse::decode_ping(&bytes)?;
425        Ok(())
426    }
427
428    /// Returns the server time in **microseconds** since epoch.
429    ///
430    /// Note: SBE provides microsecond precision vs JSON's milliseconds.
431    ///
432    /// # Errors
433    ///
434    /// Returns an error if the request fails or SBE decoding fails.
435    pub async fn server_time(&self) -> BinanceSpotHttpResult<i64> {
436        let bytes = self.get("time", None::<&()>).await?;
437        let timestamp = parse::decode_server_time(&bytes)?;
438        Ok(timestamp)
439    }
440
441    /// Returns exchange information including trading symbols.
442    ///
443    /// # Errors
444    ///
445    /// Returns an error if the request fails or SBE decoding fails.
446    pub async fn exchange_info(
447        &self,
448    ) -> BinanceSpotHttpResult<super::models::BinanceExchangeInfoSbe> {
449        let bytes = self.get("exchangeInfo", None::<&()>).await?;
450        let info = parse::decode_exchange_info(&bytes)?;
451        Ok(info)
452    }
453
454    /// Returns order book depth for a symbol.
455    ///
456    /// # Errors
457    ///
458    /// Returns an error if the request fails or SBE decoding fails.
459    pub async fn depth(&self, params: &DepthParams) -> BinanceSpotHttpResult<BinanceDepth> {
460        let bytes = self.get("depth", Some(params)).await?;
461        let depth = parse::decode_depth(&bytes)?;
462        Ok(depth)
463    }
464
465    /// Returns recent trades for a symbol.
466    ///
467    /// # Errors
468    ///
469    /// Returns an error if the request fails or SBE decoding fails.
470    pub async fn trades(
471        &self,
472        symbol: &str,
473        limit: Option<u32>,
474    ) -> BinanceSpotHttpResult<BinanceTrades> {
475        let params = TradesParams {
476            symbol: symbol.to_string(),
477            limit,
478        };
479        let bytes = self.get("trades", Some(&params)).await?;
480        let trades = parse::decode_trades(&bytes)?;
481        Ok(trades)
482    }
483
484    /// Returns kline (candlestick) data for a symbol.
485    ///
486    /// # Errors
487    ///
488    /// Returns an error if the request fails or SBE decoding fails.
489    pub async fn klines(
490        &self,
491        symbol: &str,
492        interval: &str,
493        start_time: Option<i64>,
494        end_time: Option<i64>,
495        limit: Option<u32>,
496    ) -> BinanceSpotHttpResult<BinanceKlines> {
497        let params = KlinesParams {
498            symbol: symbol.to_string(),
499            interval: interval.to_string(),
500            start_time,
501            end_time,
502            time_zone: None,
503            limit,
504        };
505        let bytes = self.get("klines", Some(&params)).await?;
506        let klines = parse::decode_klines(&bytes)?;
507        Ok(klines)
508    }
509
510    /// Returns account information including balances.
511    ///
512    /// # Errors
513    ///
514    /// Returns an error if the request fails or SBE decoding fails.
515    pub async fn account(
516        &self,
517        params: &AccountInfoParams,
518    ) -> BinanceSpotHttpResult<BinanceAccountInfo> {
519        let bytes = self.get_signed("account", Some(params)).await?;
520        let response = parse::decode_account(&bytes)?;
521        Ok(response)
522    }
523
524    /// Returns account trade history for a symbol.
525    ///
526    /// # Errors
527    ///
528    /// Returns an error if the request fails or SBE decoding fails.
529    pub async fn account_trades(
530        &self,
531        symbol: &str,
532        order_id: Option<i64>,
533        start_time: Option<i64>,
534        end_time: Option<i64>,
535        limit: Option<u32>,
536    ) -> BinanceSpotHttpResult<Vec<BinanceAccountTrade>> {
537        let params = AccountTradesParams {
538            symbol: symbol.to_string(),
539            order_id,
540            start_time,
541            end_time,
542            from_id: None,
543            limit,
544        };
545        let bytes = self.get_signed("myTrades", Some(&params)).await?;
546        let response = parse::decode_account_trades(&bytes)?;
547        Ok(response)
548    }
549
550    /// Queries an order's status.
551    ///
552    /// Either `order_id` or `client_order_id` must be provided.
553    ///
554    /// # Errors
555    ///
556    /// Returns an error if the request fails or SBE decoding fails.
557    pub async fn query_order(
558        &self,
559        symbol: &str,
560        order_id: Option<i64>,
561        client_order_id: Option<&str>,
562    ) -> BinanceSpotHttpResult<BinanceOrderResponse> {
563        let params = QueryOrderParams {
564            symbol: symbol.to_string(),
565            order_id,
566            orig_client_order_id: client_order_id.map(|s| s.to_string()),
567        };
568        let bytes = self.get_signed("order", Some(&params)).await?;
569        let response = parse::decode_order(&bytes)?;
570        Ok(response)
571    }
572
573    /// Returns all open orders for a symbol or all symbols.
574    ///
575    /// # Errors
576    ///
577    /// Returns an error if the request fails or SBE decoding fails.
578    pub async fn open_orders(
579        &self,
580        symbol: Option<&str>,
581    ) -> BinanceSpotHttpResult<Vec<BinanceOrderResponse>> {
582        let params = OpenOrdersParams {
583            symbol: symbol.map(|s| s.to_string()),
584        };
585        let bytes = self.get_signed("openOrders", Some(&params)).await?;
586        let response = parse::decode_orders(&bytes)?;
587        Ok(response)
588    }
589
590    /// Returns all orders (including closed) for a symbol.
591    ///
592    /// # Errors
593    ///
594    /// Returns an error if the request fails or SBE decoding fails.
595    pub async fn all_orders(
596        &self,
597        symbol: &str,
598        start_time: Option<i64>,
599        end_time: Option<i64>,
600        limit: Option<u32>,
601    ) -> BinanceSpotHttpResult<Vec<BinanceOrderResponse>> {
602        let params = AllOrdersParams {
603            symbol: symbol.to_string(),
604            order_id: None,
605            start_time,
606            end_time,
607            limit,
608        };
609        let bytes = self.get_signed("allOrders", Some(&params)).await?;
610        let response = parse::decode_orders(&bytes)?;
611        Ok(response)
612    }
613
614    /// Performs a signed POST request for order operations.
615    async fn post_order<P>(&self, path: &str, params: Option<&P>) -> BinanceSpotHttpResult<Vec<u8>>
616    where
617        P: Serialize + ?Sized,
618    {
619        self.request(Method::POST, path, params, true, true).await
620    }
621
622    /// Performs a signed DELETE request for cancel operations.
623    async fn delete_order<P>(
624        &self,
625        path: &str,
626        params: Option<&P>,
627    ) -> BinanceSpotHttpResult<Vec<u8>>
628    where
629        P: Serialize + ?Sized,
630    {
631        self.request(Method::DELETE, path, params, true, true).await
632    }
633
634    /// Creates a new order.
635    ///
636    /// # Errors
637    ///
638    /// Returns an error if the request fails or SBE decoding fails.
639    #[allow(clippy::too_many_arguments)]
640    pub async fn new_order(
641        &self,
642        symbol: &str,
643        side: BinanceSide,
644        order_type: BinanceSpotOrderType,
645        time_in_force: Option<BinanceTimeInForce>,
646        quantity: Option<&str>,
647        price: Option<&str>,
648        client_order_id: Option<&str>,
649        stop_price: Option<&str>,
650    ) -> BinanceSpotHttpResult<BinanceNewOrderResponse> {
651        let params = NewOrderParams {
652            symbol: symbol.to_string(),
653            side,
654            order_type,
655            time_in_force,
656            quantity: quantity.map(|s| s.to_string()),
657            quote_order_qty: None,
658            price: price.map(|s| s.to_string()),
659            new_client_order_id: client_order_id.map(|s| s.to_string()),
660            stop_price: stop_price.map(|s| s.to_string()),
661            trailing_delta: None,
662            iceberg_qty: None,
663            new_order_resp_type: Some(BinanceOrderResponseType::Full),
664            self_trade_prevention_mode: None,
665        };
666        let bytes = self.post_order("order", Some(&params)).await?;
667        let response = parse::decode_new_order_full(&bytes)?;
668        Ok(response)
669    }
670
671    /// Cancels an existing order and places a new order atomically.
672    ///
673    /// # Errors
674    ///
675    /// Returns an error if the request fails or SBE decoding fails.
676    #[allow(clippy::too_many_arguments)]
677    pub async fn cancel_replace_order(
678        &self,
679        symbol: &str,
680        side: BinanceSide,
681        order_type: BinanceSpotOrderType,
682        time_in_force: Option<BinanceTimeInForce>,
683        quantity: Option<&str>,
684        price: Option<&str>,
685        cancel_order_id: Option<i64>,
686        cancel_client_order_id: Option<&str>,
687        new_client_order_id: Option<&str>,
688    ) -> BinanceSpotHttpResult<BinanceNewOrderResponse> {
689        let params = CancelReplaceOrderParams {
690            symbol: symbol.to_string(),
691            side,
692            order_type,
693            cancel_replace_mode: BinanceCancelReplaceMode::StopOnFailure,
694            time_in_force,
695            quantity: quantity.map(|s| s.to_string()),
696            quote_order_qty: None,
697            price: price.map(|s| s.to_string()),
698            cancel_order_id,
699            cancel_orig_client_order_id: cancel_client_order_id.map(|s| s.to_string()),
700            new_client_order_id: new_client_order_id.map(|s| s.to_string()),
701            stop_price: None,
702            trailing_delta: None,
703            iceberg_qty: None,
704            new_order_resp_type: Some(BinanceOrderResponseType::Full),
705            self_trade_prevention_mode: None,
706        };
707        let bytes = self
708            .post_order("order/cancelReplace", Some(&params))
709            .await?;
710        let response = parse::decode_new_order_full(&bytes)?;
711        Ok(response)
712    }
713
714    /// Cancels an existing order.
715    ///
716    /// Either `order_id` or `client_order_id` must be provided.
717    ///
718    /// # Errors
719    ///
720    /// Returns an error if the request fails or SBE decoding fails.
721    pub async fn cancel_order(
722        &self,
723        symbol: &str,
724        order_id: Option<i64>,
725        client_order_id: Option<&str>,
726    ) -> BinanceSpotHttpResult<BinanceCancelOrderResponse> {
727        let params = match (order_id, client_order_id) {
728            (Some(id), _) => CancelOrderParams::by_order_id(symbol, id),
729            (None, Some(id)) => CancelOrderParams::by_client_order_id(symbol, id.to_string()),
730            (None, None) => {
731                return Err(BinanceSpotHttpError::ValidationError(
732                    "Either order_id or client_order_id must be provided".to_string(),
733                ));
734            }
735        };
736        let bytes = self.delete_order("order", Some(&params)).await?;
737        let response = parse::decode_cancel_order(&bytes)?;
738        Ok(response)
739    }
740
741    /// Cancels all open orders for a symbol.
742    ///
743    /// # Errors
744    ///
745    /// Returns an error if the request fails or SBE decoding fails.
746    pub async fn cancel_open_orders(
747        &self,
748        symbol: &str,
749    ) -> BinanceSpotHttpResult<Vec<BinanceCancelOrderResponse>> {
750        let params = CancelOpenOrdersParams::new(symbol.to_string());
751        let bytes = self.delete_order("openOrders", Some(&params)).await?;
752        let response = parse::decode_cancel_open_orders(&bytes)?;
753        Ok(response)
754    }
755}
756
757/// High-level HTTP client for Binance Spot API.
758///
759/// Wraps [`BinanceRawSpotHttpClient`] and provides domain-level methods:
760/// - Simple types (ping, server_time): Pass through from raw client.
761/// - Complex types (instruments, orders): Transform to Nautilus domain types.
762#[cfg_attr(
763    feature = "python",
764    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.binance")
765)]
766pub struct BinanceSpotHttpClient {
767    inner: Arc<BinanceRawSpotHttpClient>,
768    instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
769}
770
771impl Clone for BinanceSpotHttpClient {
772    fn clone(&self) -> Self {
773        Self {
774            inner: self.inner.clone(),
775            instruments_cache: self.instruments_cache.clone(),
776        }
777    }
778}
779
780impl Debug for BinanceSpotHttpClient {
781    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
782        f.debug_struct(stringify!(BinanceSpotHttpClient))
783            .field("inner", &self.inner)
784            .field("instruments_cached", &self.instruments_cache.len())
785            .finish()
786    }
787}
788
789impl BinanceSpotHttpClient {
790    /// Creates a new Binance Spot HTTP client.
791    ///
792    /// # Errors
793    ///
794    /// Returns an error if the underlying HTTP client cannot be created.
795    pub fn new(
796        environment: BinanceEnvironment,
797        api_key: Option<String>,
798        api_secret: Option<String>,
799        base_url_override: Option<String>,
800        recv_window: Option<u64>,
801        timeout_secs: Option<u64>,
802        proxy_url: Option<String>,
803    ) -> BinanceSpotHttpResult<Self> {
804        let inner = BinanceRawSpotHttpClient::new(
805            environment,
806            api_key,
807            api_secret,
808            base_url_override,
809            recv_window,
810            timeout_secs,
811            proxy_url,
812        )?;
813
814        Ok(Self {
815            inner: Arc::new(inner),
816            instruments_cache: Arc::new(DashMap::new()),
817        })
818    }
819
820    /// Returns a reference to the inner raw client.
821    #[must_use]
822    pub fn inner(&self) -> &BinanceRawSpotHttpClient {
823        &self.inner
824    }
825
826    /// Returns the SBE schema ID.
827    #[must_use]
828    pub const fn schema_id() -> u16 {
829        SBE_SCHEMA_ID
830    }
831
832    /// Returns the SBE schema version.
833    #[must_use]
834    pub const fn schema_version() -> u16 {
835        SBE_SCHEMA_VERSION
836    }
837
838    /// Generates a timestamp for initialization.
839    fn generate_ts_init(&self) -> UnixNanos {
840        UnixNanos::from(chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0) as u64)
841    }
842
843    /// Retrieves an instrument from the cache.
844    fn instrument_from_cache(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
845        self.instruments_cache
846            .get(&symbol)
847            .map(|entry| entry.value().clone())
848            .ok_or_else(|| anyhow::anyhow!("Instrument {symbol} not in cache"))
849    }
850
851    /// Caches multiple instruments.
852    pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
853        for inst in instruments {
854            self.instruments_cache
855                .insert(inst.raw_symbol().inner(), inst);
856        }
857    }
858
859    /// Caches a single instrument.
860    pub fn cache_instrument(&self, instrument: InstrumentAny) {
861        self.instruments_cache
862            .insert(instrument.raw_symbol().inner(), instrument);
863    }
864
865    /// Gets an instrument from the cache by symbol.
866    #[must_use]
867    pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
868        self.instruments_cache
869            .get(symbol)
870            .map(|entry| entry.value().clone())
871    }
872
873    /// Tests connectivity to the API.
874    ///
875    /// # Errors
876    ///
877    /// Returns an error if the request fails or SBE decoding fails.
878    pub async fn ping(&self) -> BinanceSpotHttpResult<()> {
879        self.inner.ping().await
880    }
881
882    /// Returns the server time in **microseconds** since epoch.
883    ///
884    /// Note: SBE provides microsecond precision vs JSON's milliseconds.
885    ///
886    /// # Errors
887    ///
888    /// Returns an error if the request fails or SBE decoding fails.
889    pub async fn server_time(&self) -> BinanceSpotHttpResult<i64> {
890        self.inner.server_time().await
891    }
892
893    /// Returns exchange information including trading symbols.
894    ///
895    /// # Errors
896    ///
897    /// Returns an error if the request fails or SBE decoding fails.
898    pub async fn exchange_info(
899        &self,
900    ) -> BinanceSpotHttpResult<super::models::BinanceExchangeInfoSbe> {
901        self.inner.exchange_info().await
902    }
903
904    /// Requests Nautilus instruments for all trading symbols.
905    ///
906    /// Fetches exchange info via SBE and parses each symbol into a CurrencyPair.
907    /// Non-trading symbols are skipped with a debug log.
908    ///
909    /// # Errors
910    ///
911    /// Returns an error if the request fails or SBE decoding fails.
912    pub async fn request_instruments(&self) -> BinanceSpotHttpResult<Vec<InstrumentAny>> {
913        let info = self.exchange_info().await?;
914        let ts_init = self.generate_ts_init();
915
916        let mut instruments = Vec::with_capacity(info.symbols.len());
917        for symbol in &info.symbols {
918            match parse_spot_instrument_sbe(symbol, ts_init, ts_init) {
919                Ok(instrument) => instruments.push(instrument),
920                Err(e) => {
921                    log::debug!(
922                        "Skipping symbol during instrument parsing: symbol={}, error={e}",
923                        symbol.symbol
924                    );
925                }
926            }
927        }
928
929        // Cache instruments for use by other domain methods
930        self.cache_instruments(instruments.clone());
931
932        log::info!("Loaded spot instruments: count={}", instruments.len());
933        Ok(instruments)
934    }
935
936    /// Requests recent trades for an instrument.
937    ///
938    /// # Errors
939    ///
940    /// Returns an error if the request fails, the instrument is not cached,
941    /// or trade parsing fails.
942    pub async fn request_trades(
943        &self,
944        instrument_id: InstrumentId,
945        limit: Option<u32>,
946    ) -> anyhow::Result<Vec<TradeTick>> {
947        let symbol = instrument_id.symbol.inner();
948        let instrument = self.instrument_from_cache(symbol)?;
949        let ts_init = self.generate_ts_init();
950
951        let trades = self
952            .inner
953            .trades(symbol.as_str(), limit)
954            .await
955            .map_err(|e| anyhow::anyhow!(e))?;
956
957        parse_spot_trades_sbe(&trades, &instrument, ts_init)
958    }
959
960    /// Requests bar (kline/candlestick) data.
961    ///
962    /// # Errors
963    ///
964    /// Returns an error if the bar type is not supported, instrument is not cached,
965    /// or the request fails.
966    pub async fn request_bars(
967        &self,
968        bar_type: BarType,
969        start: Option<DateTime<Utc>>,
970        end: Option<DateTime<Utc>>,
971        limit: Option<u32>,
972    ) -> anyhow::Result<Vec<Bar>> {
973        anyhow::ensure!(
974            bar_type.aggregation_source() == AggregationSource::External,
975            "Only EXTERNAL aggregation is supported"
976        );
977
978        let spec = bar_type.spec();
979        let step = spec.step.get();
980        let interval = match spec.aggregation {
981            BarAggregation::Second => {
982                anyhow::bail!("Binance Spot does not support second-level kline intervals")
983            }
984            BarAggregation::Minute => format!("{step}m"),
985            BarAggregation::Hour => format!("{step}h"),
986            BarAggregation::Day => format!("{step}d"),
987            BarAggregation::Week => format!("{step}w"),
988            BarAggregation::Month => format!("{step}M"),
989            a => anyhow::bail!("Binance does not support {a:?} aggregation"),
990        };
991
992        let symbol = bar_type.instrument_id().symbol;
993        let instrument = self.instrument_from_cache(symbol.inner())?;
994        let ts_init = self.generate_ts_init();
995
996        let klines = self
997            .inner
998            .klines(
999                symbol.as_str(),
1000                &interval,
1001                start.map(|dt| dt.timestamp_millis()),
1002                end.map(|dt| dt.timestamp_millis()),
1003                limit,
1004            )
1005            .await
1006            .map_err(|e| anyhow::anyhow!(e))?;
1007
1008        parse_klines_to_bars(&klines, bar_type, &instrument, ts_init)
1009    }
1010
1011    /// Requests account state including balances.
1012    ///
1013    /// # Errors
1014    ///
1015    /// Returns an error if the request fails or SBE decoding fails.
1016    pub async fn request_account_state(
1017        &self,
1018        params: &AccountInfoParams,
1019    ) -> BinanceSpotHttpResult<BinanceAccountInfo> {
1020        self.inner.account(params).await
1021    }
1022
1023    /// Requests the status of a specific order.
1024    ///
1025    /// Either `venue_order_id` or `client_order_id` must be provided.
1026    ///
1027    /// # Errors
1028    ///
1029    /// Returns an error if neither identifier is provided, the request fails,
1030    /// instrument is not cached, or parsing fails.
1031    pub async fn request_order_status(
1032        &self,
1033        account_id: AccountId,
1034        instrument_id: InstrumentId,
1035        venue_order_id: Option<VenueOrderId>,
1036        client_order_id: Option<ClientOrderId>,
1037    ) -> anyhow::Result<OrderStatusReport> {
1038        anyhow::ensure!(
1039            venue_order_id.is_some() || client_order_id.is_some(),
1040            "Either venue_order_id or client_order_id must be provided"
1041        );
1042
1043        let symbol = instrument_id.symbol.inner();
1044        let instrument = self.instrument_from_cache(symbol)?;
1045        let ts_init = self.generate_ts_init();
1046
1047        let order_id = venue_order_id
1048            .map(|id| id.inner().parse::<i64>())
1049            .transpose()
1050            .map_err(|_| anyhow::anyhow!("Invalid venue order ID"))?;
1051
1052        let client_id_str = client_order_id.map(|id| id.to_string());
1053
1054        let order = self
1055            .inner
1056            .query_order(symbol.as_str(), order_id, client_id_str.as_deref())
1057            .await
1058            .map_err(|e| anyhow::anyhow!(e))?;
1059
1060        parse_order_status_report_sbe(&order, account_id, &instrument, ts_init)
1061    }
1062
1063    /// Requests order status reports.
1064    ///
1065    /// When `open_only` is true, returns only open orders (instrument_id optional).
1066    /// When `open_only` is false, returns order history (instrument_id required).
1067    ///
1068    /// # Errors
1069    ///
1070    /// Returns an error if the request fails, any order's instrument is not cached,
1071    /// or parsing fails.
1072    #[allow(clippy::too_many_arguments)]
1073    pub async fn request_order_status_reports(
1074        &self,
1075        account_id: AccountId,
1076        instrument_id: Option<InstrumentId>,
1077        start: Option<DateTime<Utc>>,
1078        end: Option<DateTime<Utc>>,
1079        open_only: bool,
1080        limit: Option<u32>,
1081    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1082        let ts_init = self.generate_ts_init();
1083        let symbol = instrument_id.map(|id| id.symbol.to_string());
1084
1085        let orders = if open_only {
1086            self.inner
1087                .open_orders(symbol.as_deref())
1088                .await
1089                .map_err(|e| anyhow::anyhow!(e))?
1090        } else {
1091            let symbol = symbol
1092                .ok_or_else(|| anyhow::anyhow!("instrument_id is required when open_only=false"))?;
1093            self.inner
1094                .all_orders(
1095                    &symbol,
1096                    start.map(|dt| dt.timestamp_millis()),
1097                    end.map(|dt| dt.timestamp_millis()),
1098                    limit,
1099                )
1100                .await
1101                .map_err(|e| anyhow::anyhow!(e))?
1102        };
1103
1104        orders
1105            .iter()
1106            .map(|order| {
1107                let symbol = Ustr::from(&order.symbol);
1108                let instrument = self.instrument_from_cache(symbol)?;
1109                parse_order_status_report_sbe(order, account_id, &instrument, ts_init)
1110            })
1111            .collect()
1112    }
1113
1114    /// Requests fill reports (trade history) for an instrument.
1115    ///
1116    /// # Errors
1117    ///
1118    /// Returns an error if the request fails, any trade's instrument is not cached,
1119    /// or parsing fails.
1120    #[allow(clippy::too_many_arguments)]
1121    pub async fn request_fill_reports(
1122        &self,
1123        account_id: AccountId,
1124        instrument_id: InstrumentId,
1125        venue_order_id: Option<VenueOrderId>,
1126        start: Option<DateTime<Utc>>,
1127        end: Option<DateTime<Utc>>,
1128        limit: Option<u32>,
1129    ) -> anyhow::Result<Vec<FillReport>> {
1130        let ts_init = self.generate_ts_init();
1131        let symbol = instrument_id.symbol.inner();
1132
1133        let order_id = venue_order_id
1134            .map(|id| id.inner().parse::<i64>())
1135            .transpose()
1136            .map_err(|_| anyhow::anyhow!("Invalid venue order ID"))?;
1137
1138        let trades = self
1139            .inner
1140            .account_trades(
1141                symbol.as_str(),
1142                order_id,
1143                start.map(|dt| dt.timestamp_millis()),
1144                end.map(|dt| dt.timestamp_millis()),
1145                limit,
1146            )
1147            .await
1148            .map_err(|e| anyhow::anyhow!(e))?;
1149
1150        trades
1151            .iter()
1152            .map(|trade| {
1153                let symbol = Ustr::from(&trade.symbol);
1154                let instrument = self.instrument_from_cache(symbol)?;
1155                let commission_currency = get_currency(&trade.commission_asset);
1156                parse_fill_report_sbe(trade, account_id, &instrument, commission_currency, ts_init)
1157            })
1158            .collect()
1159    }
1160
1161    /// Submits a new order to the venue.
1162    ///
1163    /// Converts Nautilus domain types to Binance-specific parameters
1164    /// and returns an `OrderStatusReport`.
1165    ///
1166    /// # Errors
1167    ///
1168    /// Returns an error if:
1169    /// - The instrument is not cached.
1170    /// - The order type or time-in-force is unsupported.
1171    /// - Stop orders are submitted without a trigger price.
1172    /// - The request fails or SBE decoding fails.
1173    #[allow(clippy::too_many_arguments)]
1174    pub async fn submit_order(
1175        &self,
1176        account_id: AccountId,
1177        instrument_id: InstrumentId,
1178        client_order_id: ClientOrderId,
1179        order_side: OrderSide,
1180        order_type: OrderType,
1181        quantity: Quantity,
1182        time_in_force: TimeInForce,
1183        price: Option<Price>,
1184        trigger_price: Option<Price>,
1185        post_only: bool,
1186    ) -> anyhow::Result<OrderStatusReport> {
1187        let symbol = instrument_id.symbol.inner();
1188        let instrument = self.instrument_from_cache(symbol)?;
1189        let ts_init = self.generate_ts_init();
1190
1191        let binance_side = BinanceSide::try_from(order_side)?;
1192        let binance_order_type = order_type_to_binance_spot(order_type, post_only)?;
1193
1194        // Validate trigger price for stop orders
1195        let is_stop_order = matches!(order_type, OrderType::StopMarket | OrderType::StopLimit);
1196        if is_stop_order && trigger_price.is_none() {
1197            anyhow::bail!("Stop orders require a trigger price");
1198        }
1199
1200        // Validate price for order types that require it
1201        let requires_price = matches!(
1202            binance_order_type,
1203            BinanceSpotOrderType::Limit
1204                | BinanceSpotOrderType::StopLossLimit
1205                | BinanceSpotOrderType::TakeProfitLimit
1206                | BinanceSpotOrderType::LimitMaker
1207        );
1208        if requires_price && price.is_none() {
1209            anyhow::bail!("{binance_order_type:?} orders require a price");
1210        }
1211
1212        // Only send TIF for order types that support it
1213        let supports_tif = matches!(
1214            binance_order_type,
1215            BinanceSpotOrderType::Limit
1216                | BinanceSpotOrderType::StopLossLimit
1217                | BinanceSpotOrderType::TakeProfitLimit
1218        );
1219        let binance_tif = if supports_tif {
1220            Some(BinanceTimeInForce::try_from(time_in_force)?)
1221        } else {
1222            None
1223        };
1224
1225        let qty_str = quantity.to_string();
1226        let price_str = price.map(|p| p.to_string());
1227        let stop_price_str = trigger_price.map(|p| p.to_string());
1228        let client_id_str = client_order_id.to_string();
1229
1230        let response = self
1231            .inner
1232            .new_order(
1233                symbol.as_str(),
1234                binance_side,
1235                binance_order_type,
1236                binance_tif,
1237                Some(&qty_str),
1238                price_str.as_deref(),
1239                Some(&client_id_str),
1240                stop_price_str.as_deref(),
1241            )
1242            .await
1243            .map_err(|e| anyhow::anyhow!(e))?;
1244
1245        parse_new_order_response_sbe(&response, account_id, &instrument, ts_init)
1246    }
1247
1248    /// Modifies an existing order (cancel and replace atomically).
1249    ///
1250    /// # Errors
1251    ///
1252    /// Returns an error if:
1253    /// - The instrument is not cached.
1254    /// - The order type or time-in-force is unsupported.
1255    /// - The request fails or SBE decoding fails.
1256    #[allow(clippy::too_many_arguments)]
1257    pub async fn modify_order(
1258        &self,
1259        account_id: AccountId,
1260        instrument_id: InstrumentId,
1261        venue_order_id: VenueOrderId,
1262        client_order_id: ClientOrderId,
1263        order_side: OrderSide,
1264        order_type: OrderType,
1265        quantity: Quantity,
1266        time_in_force: TimeInForce,
1267        price: Option<Price>,
1268    ) -> anyhow::Result<OrderStatusReport> {
1269        let symbol = instrument_id.symbol.inner();
1270        let instrument = self.instrument_from_cache(symbol)?;
1271        let ts_init = self.generate_ts_init();
1272
1273        let binance_side = BinanceSide::try_from(order_side)?;
1274        let binance_order_type = order_type_to_binance_spot(order_type, false)?;
1275        let binance_tif = BinanceTimeInForce::try_from(time_in_force)?;
1276
1277        let cancel_order_id: i64 = venue_order_id
1278            .inner()
1279            .parse()
1280            .map_err(|_| anyhow::anyhow!("Invalid venue order ID: {venue_order_id}"))?;
1281
1282        let qty_str = quantity.to_string();
1283        let price_str = price.map(|p| p.to_string());
1284        let client_id_str = client_order_id.to_string();
1285
1286        let response = self
1287            .inner
1288            .cancel_replace_order(
1289                symbol.as_str(),
1290                binance_side,
1291                binance_order_type,
1292                Some(binance_tif),
1293                Some(&qty_str),
1294                price_str.as_deref(),
1295                Some(cancel_order_id),
1296                None,
1297                Some(&client_id_str),
1298            )
1299            .await
1300            .map_err(|e| anyhow::anyhow!(e))?;
1301
1302        parse_new_order_response_sbe(&response, account_id, &instrument, ts_init)
1303    }
1304
1305    /// Cancels an existing order on the venue.
1306    ///
1307    /// Either `venue_order_id` or `client_order_id` must be provided.
1308    ///
1309    /// # Errors
1310    ///
1311    /// Returns an error if the request fails or SBE decoding fails.
1312    pub async fn cancel_order(
1313        &self,
1314        instrument_id: InstrumentId,
1315        venue_order_id: Option<VenueOrderId>,
1316        client_order_id: Option<ClientOrderId>,
1317    ) -> anyhow::Result<VenueOrderId> {
1318        let symbol = instrument_id.symbol.inner();
1319
1320        let order_id = venue_order_id
1321            .map(|id| id.inner().parse::<i64>())
1322            .transpose()
1323            .map_err(|_| anyhow::anyhow!("Invalid venue order ID"))?;
1324
1325        let client_id_str = client_order_id.map(|id| id.to_string());
1326
1327        let response = self
1328            .inner
1329            .cancel_order(symbol.as_str(), order_id, client_id_str.as_deref())
1330            .await
1331            .map_err(|e| anyhow::anyhow!(e))?;
1332
1333        Ok(VenueOrderId::new(response.order_id.to_string()))
1334    }
1335
1336    /// Cancels all open orders for a symbol.
1337    ///
1338    /// Returns the venue order IDs of all canceled orders.
1339    ///
1340    /// # Errors
1341    ///
1342    /// Returns an error if the request fails or SBE decoding fails.
1343    pub async fn cancel_all_orders(
1344        &self,
1345        instrument_id: InstrumentId,
1346    ) -> anyhow::Result<Vec<(VenueOrderId, ClientOrderId)>> {
1347        let symbol = instrument_id.symbol.inner();
1348
1349        let responses = self
1350            .inner
1351            .cancel_open_orders(symbol.as_str())
1352            .await
1353            .map_err(|e| anyhow::anyhow!(e))?;
1354
1355        Ok(responses
1356            .into_iter()
1357            .map(|r| {
1358                (
1359                    VenueOrderId::new(r.order_id.to_string()),
1360                    ClientOrderId::new(&r.orig_client_order_id),
1361                )
1362            })
1363            .collect())
1364    }
1365}
1366
1367#[cfg(test)]
1368mod tests {
1369    use rstest::rstest;
1370
1371    use super::*;
1372
1373    #[rstest]
1374    fn test_schema_constants() {
1375        assert_eq!(BinanceRawSpotHttpClient::schema_id(), 3);
1376        assert_eq!(BinanceRawSpotHttpClient::schema_version(), 2);
1377        assert_eq!(BinanceSpotHttpClient::schema_id(), 3);
1378        assert_eq!(BinanceSpotHttpClient::schema_version(), 2);
1379    }
1380
1381    #[rstest]
1382    fn test_sbe_schema_header() {
1383        assert_eq!(SBE_SCHEMA_HEADER, "3:2");
1384    }
1385
1386    #[rstest]
1387    fn test_default_headers_include_sbe() {
1388        let headers = BinanceRawSpotHttpClient::default_headers(&None);
1389
1390        assert_eq!(headers.get("Accept"), Some(&"application/sbe".to_string()));
1391        assert_eq!(headers.get("X-MBX-SBE"), Some(&"3:2".to_string()));
1392    }
1393
1394    #[rstest]
1395    fn test_rate_limit_config() {
1396        let config = BinanceRawSpotHttpClient::rate_limit_config();
1397
1398        assert!(config.default_quota.is_some());
1399        // Spot has 2 ORDERS quotas (SECOND and DAY)
1400        assert_eq!(config.order_keys.len(), 2);
1401    }
1402}