Skip to main content

nautilus_binance/spot/
execution.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live execution client implementation for the Binance Spot adapter.
17
18use std::{
19    future::Future,
20    sync::Mutex,
21    time::{Duration, Instant},
22};
23
24use anyhow::Context;
25use async_trait::async_trait;
26use nautilus_common::{
27    clients::ExecutionClient,
28    live::{get_runtime, runner::get_exec_event_sender},
29    messages::execution::{
30        BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
31        GenerateOrderStatusReport, GenerateOrderStatusReports, GenerateOrderStatusReportsBuilder,
32        GeneratePositionStatusReports, GeneratePositionStatusReportsBuilder, ModifyOrder,
33        QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
34    },
35};
36use nautilus_core::{
37    MUTEX_POISONED, UUID4, UnixNanos,
38    time::{AtomicTime, get_atomic_clock_realtime},
39};
40use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
41use nautilus_model::{
42    accounts::AccountAny,
43    enums::OmsType,
44    events::{
45        AccountState, OrderAccepted, OrderCancelRejected, OrderCanceled, OrderEventAny,
46        OrderModifyRejected, OrderRejected, OrderUpdated,
47    },
48    identifiers::{AccountId, ClientId, Venue, VenueOrderId},
49    orders::Order,
50    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
51    types::{AccountBalance, MarginBalance},
52};
53use tokio::task::JoinHandle;
54
55use crate::{
56    common::{consts::BINANCE_VENUE, credential::resolve_credentials, enums::BinanceProductType},
57    config::BinanceExecClientConfig,
58    spot::http::{
59        client::BinanceSpotHttpClient, models::BatchCancelResult, query::BatchCancelItem,
60    },
61};
62
63/// Live execution client for Binance Spot trading.
64///
65/// Implements the [`ExecutionClient`] trait for order management on Binance Spot
66/// and Spot Margin markets. Uses HTTP API for all order operations with SBE encoding.
67#[derive(Debug)]
68pub struct BinanceSpotExecutionClient {
69    core: ExecutionClientCore,
70    clock: &'static AtomicTime,
71    config: BinanceExecClientConfig,
72    emitter: ExecutionEventEmitter,
73    http_client: BinanceSpotHttpClient,
74    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
75}
76
77impl BinanceSpotExecutionClient {
78    /// Creates a new [`BinanceSpotExecutionClient`].
79    ///
80    /// # Errors
81    ///
82    /// Returns an error if the HTTP client fails to initialize or credentials are missing.
83    pub fn new(core: ExecutionClientCore, config: BinanceExecClientConfig) -> anyhow::Result<Self> {
84        let product_type = config
85            .product_types
86            .first()
87            .copied()
88            .unwrap_or(BinanceProductType::Spot);
89
90        let (api_key, api_secret) = resolve_credentials(
91            config.api_key.clone(),
92            config.api_secret.clone(),
93            config.environment,
94            product_type,
95        )?;
96
97        let http_client = BinanceSpotHttpClient::new(
98            config.environment,
99            Some(api_key),
100            Some(api_secret),
101            config.base_url_http.clone(),
102            None, // recv_window
103            None, // timeout_secs
104            None, // proxy_url
105        )
106        .context("failed to construct Binance Spot HTTP client")?;
107
108        let clock = get_atomic_clock_realtime();
109        let emitter = ExecutionEventEmitter::new(
110            clock,
111            core.trader_id,
112            core.account_id,
113            core.account_type,
114            core.base_currency,
115        );
116
117        Ok(Self {
118            core,
119            clock,
120            config,
121            emitter,
122            http_client,
123            pending_tasks: Mutex::new(Vec::new()),
124        })
125    }
126
127    async fn refresh_account_state(&self) -> anyhow::Result<AccountState> {
128        self.http_client
129            .request_account_state(self.core.account_id)
130            .await
131    }
132
133    fn update_account_state(&self) -> anyhow::Result<()> {
134        let runtime = get_runtime();
135        let account_state = runtime.block_on(self.refresh_account_state())?;
136
137        let ts_now = self.clock.get_time_ns();
138        self.emitter.emit_account_state(
139            account_state.balances.clone(),
140            account_state.margins.clone(),
141            account_state.is_reported,
142            ts_now,
143        );
144
145        Ok(())
146    }
147
148    fn submit_order_internal(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
149        let order = self
150            .core
151            .cache()
152            .order(&cmd.client_order_id)
153            .cloned()
154            .ok_or_else(|| anyhow::anyhow!("Order not found: {}", cmd.client_order_id))?;
155        let http_client = self.http_client.clone();
156
157        let event_emitter = self.emitter.clone();
158        let trader_id = self.core.trader_id;
159        let account_id = self.core.account_id;
160        let client_order_id = order.client_order_id();
161        let strategy_id = order.strategy_id();
162        let instrument_id = order.instrument_id();
163        let order_side = order.order_side();
164        let order_type = order.order_type();
165        let quantity = order.quantity();
166        let time_in_force = order.time_in_force();
167        let price = order.price();
168        let trigger_price = order.trigger_price();
169        let is_post_only = order.is_post_only();
170        let clock = self.clock;
171        let ts_init = self.clock.get_time_ns();
172
173        self.spawn_task("submit_order", async move {
174            let result = http_client
175                .submit_order(
176                    account_id,
177                    instrument_id,
178                    client_order_id,
179                    order_side,
180                    order_type,
181                    quantity,
182                    time_in_force,
183                    price,
184                    trigger_price,
185                    is_post_only,
186                )
187                .await
188                .map_err(|e| anyhow::anyhow!("Submit order failed: {e}"));
189
190            match result {
191                Ok(report) => {
192                    let accepted = OrderAccepted::new(
193                        trader_id,
194                        strategy_id,
195                        instrument_id,
196                        client_order_id,
197                        report.venue_order_id,
198                        account_id,
199                        UUID4::new(),
200                        ts_init, // TODO: Use proper event timestamp
201                        ts_init,
202                        false,
203                    );
204
205                    event_emitter.send_order_event(OrderEventAny::Accepted(accepted));
206                }
207                Err(e) => {
208                    let rejected = OrderRejected::new(
209                        trader_id,
210                        strategy_id,
211                        instrument_id,
212                        client_order_id,
213                        account_id,
214                        format!("submit-order-error: {e}").into(),
215                        UUID4::new(),
216                        ts_init,
217                        clock.get_time_ns(),
218                        false,
219                        false,
220                    );
221
222                    event_emitter.send_order_event(OrderEventAny::Rejected(rejected));
223
224                    return Err(e);
225                }
226            }
227
228            Ok(())
229        });
230
231        Ok(())
232    }
233
234    fn cancel_order_internal(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
235        let http_client = self.http_client.clone();
236        let command = cmd.clone();
237
238        let event_emitter = self.emitter.clone();
239        let trader_id = self.core.trader_id;
240        let account_id = self.core.account_id;
241        let clock = self.clock;
242
243        self.spawn_task("cancel_order", async move {
244            let result = http_client
245                .cancel_order(
246                    command.instrument_id,
247                    command.venue_order_id,
248                    Some(command.client_order_id),
249                )
250                .await
251                .map_err(|e| anyhow::anyhow!("Cancel order failed: {e}"));
252
253            match result {
254                Ok(venue_order_id) => {
255                    // Order canceled - dispatch OrderCanceled event
256                    let ts_now = clock.get_time_ns();
257                    let canceled_event = OrderCanceled::new(
258                        trader_id,
259                        command.strategy_id,
260                        command.instrument_id,
261                        command.client_order_id,
262                        UUID4::new(),
263                        ts_now,
264                        ts_now,
265                        false,
266                        Some(venue_order_id),
267                        Some(account_id),
268                    );
269
270                    event_emitter.send_order_event(OrderEventAny::Canceled(canceled_event));
271                }
272                Err(e) => {
273                    let ts_now = clock.get_time_ns();
274                    let rejected_event = OrderCancelRejected::new(
275                        trader_id,
276                        command.strategy_id,
277                        command.instrument_id,
278                        command.client_order_id,
279                        format!("cancel-order-error: {e}").into(),
280                        UUID4::new(),
281                        ts_now,
282                        ts_now,
283                        false,
284                        command.venue_order_id,
285                        Some(account_id),
286                    );
287
288                    event_emitter.send_order_event(OrderEventAny::CancelRejected(rejected_event));
289
290                    return Err(e);
291                }
292            }
293
294            Ok(())
295        });
296
297        Ok(())
298    }
299
300    fn spawn_task<F>(&self, description: &'static str, fut: F)
301    where
302        F: Future<Output = anyhow::Result<()>> + Send + 'static,
303    {
304        let runtime = get_runtime();
305        let handle = runtime.spawn(async move {
306            if let Err(e) = fut.await {
307                log::warn!("{description} failed: {e}");
308            }
309        });
310
311        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
312        tasks.retain(|handle| !handle.is_finished());
313        tasks.push(handle);
314    }
315
316    fn abort_pending_tasks(&self) {
317        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
318        for handle in tasks.drain(..) {
319            handle.abort();
320        }
321    }
322
323    /// Polls the cache until the account is registered or timeout is reached.
324    async fn await_account_registered(&self, timeout_secs: f64) -> anyhow::Result<()> {
325        let account_id = self.core.account_id;
326
327        if self.core.cache().account(&account_id).is_some() {
328            log::info!("Account {account_id} registered");
329            return Ok(());
330        }
331
332        let start = Instant::now();
333        let timeout = Duration::from_secs_f64(timeout_secs);
334        let interval = Duration::from_millis(10);
335
336        loop {
337            tokio::time::sleep(interval).await;
338
339            if self.core.cache().account(&account_id).is_some() {
340                log::info!("Account {account_id} registered");
341                return Ok(());
342            }
343
344            if start.elapsed() >= timeout {
345                anyhow::bail!(
346                    "Timeout waiting for account {account_id} to be registered after {timeout_secs}s"
347                );
348            }
349        }
350    }
351}
352
353#[async_trait(?Send)]
354impl ExecutionClient for BinanceSpotExecutionClient {
355    fn is_connected(&self) -> bool {
356        self.core.is_connected()
357    }
358
359    fn client_id(&self) -> ClientId {
360        self.core.client_id
361    }
362
363    fn account_id(&self) -> AccountId {
364        self.core.account_id
365    }
366
367    fn venue(&self) -> Venue {
368        *BINANCE_VENUE
369    }
370
371    fn oms_type(&self) -> OmsType {
372        self.core.oms_type
373    }
374
375    fn get_account(&self) -> Option<AccountAny> {
376        self.core.cache().account(&self.core.account_id).cloned()
377    }
378
379    async fn connect(&mut self) -> anyhow::Result<()> {
380        if self.core.is_connected() {
381            return Ok(());
382        }
383
384        // Load instruments if not already done
385        if !self.core.instruments_initialized() {
386            let instruments = self
387                .http_client
388                .request_instruments()
389                .await
390                .context("failed to request Binance Spot instruments")?;
391
392            if instruments.is_empty() {
393                log::warn!("No instruments returned for Binance Spot");
394            } else {
395                log::info!("Loaded {} Spot instruments", instruments.len());
396                self.http_client.cache_instruments(instruments);
397            }
398
399            self.core.set_instruments_initialized();
400        }
401
402        // Request initial account state
403        let account_state = self
404            .refresh_account_state()
405            .await
406            .context("failed to request Binance account state")?;
407
408        if !account_state.balances.is_empty() {
409            log::info!(
410                "Received account state with {} balance(s)",
411                account_state.balances.len()
412            );
413        }
414
415        self.emitter.send_account_state(account_state);
416
417        // Wait for account to be registered in cache before completing connect
418        self.await_account_registered(30.0).await?;
419
420        self.core.set_connected();
421        log::info!("Connected: client_id={}", self.core.client_id);
422        Ok(())
423    }
424
425    async fn disconnect(&mut self) -> anyhow::Result<()> {
426        if self.core.is_disconnected() {
427            return Ok(());
428        }
429
430        self.abort_pending_tasks();
431
432        self.core.set_disconnected();
433        log::info!("Disconnected: client_id={}", self.core.client_id);
434        Ok(())
435    }
436
437    fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
438        self.update_account_state()
439    }
440
441    fn query_order(&self, cmd: &QueryOrder) -> anyhow::Result<()> {
442        log::debug!("query_order: client_order_id={}", cmd.client_order_id);
443
444        let http_client = self.http_client.clone();
445        let command = cmd.clone();
446        let event_emitter = self.emitter.clone();
447        let account_id = self.core.account_id;
448
449        self.spawn_task("query_order", async move {
450            let result = http_client
451                .request_order_status_report(
452                    account_id,
453                    command.instrument_id,
454                    command.venue_order_id,
455                    Some(command.client_order_id),
456                )
457                .await;
458
459            match result {
460                Ok(report) => {
461                    event_emitter.send_order_status_report(report);
462                }
463                Err(e) => log::warn!("Failed to query order status: {e}"),
464            }
465
466            Ok(())
467        });
468
469        Ok(())
470    }
471
472    fn generate_account_state(
473        &self,
474        balances: Vec<AccountBalance>,
475        margins: Vec<MarginBalance>,
476        reported: bool,
477        ts_event: UnixNanos,
478    ) -> anyhow::Result<()> {
479        self.emitter
480            .emit_account_state(balances, margins, reported, ts_event);
481        Ok(())
482    }
483
484    fn start(&mut self) -> anyhow::Result<()> {
485        if self.core.is_started() {
486            return Ok(());
487        }
488
489        self.emitter.set_sender(get_exec_event_sender());
490        self.core.set_started();
491
492        // Spawn instrument bootstrap task
493        let http_client = self.http_client.clone();
494
495        get_runtime().spawn(async move {
496            match http_client.request_instruments().await {
497                Ok(instruments) => {
498                    if instruments.is_empty() {
499                        log::warn!("No instruments returned for Binance Spot");
500                    } else {
501                        http_client.cache_instruments(instruments);
502                        log::info!("Instruments initialized");
503                    }
504                }
505                Err(e) => {
506                    log::error!("Failed to request Binance Spot instruments: {e}");
507                }
508            }
509        });
510
511        log::info!(
512            "Started: client_id={}, account_id={}, account_type={:?}, environment={:?}, product_types={:?}",
513            self.core.client_id,
514            self.core.account_id,
515            self.core.account_type,
516            self.config.environment,
517            self.config.product_types,
518        );
519        Ok(())
520    }
521
522    fn stop(&mut self) -> anyhow::Result<()> {
523        if self.core.is_stopped() {
524            return Ok(());
525        }
526
527        self.core.set_stopped();
528        self.core.set_disconnected();
529        self.abort_pending_tasks();
530        log::info!("Stopped: client_id={}", self.core.client_id);
531        Ok(())
532    }
533
534    fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
535        let order = self
536            .core
537            .cache()
538            .order(&cmd.client_order_id)
539            .cloned()
540            .ok_or_else(|| anyhow::anyhow!("Order not found: {}", cmd.client_order_id))?;
541
542        if order.is_closed() {
543            let client_order_id = order.client_order_id();
544            log::warn!("Cannot submit closed order {client_order_id}");
545            return Ok(());
546        }
547
548        log::debug!("OrderSubmitted client_order_id={}", order.client_order_id());
549        self.emitter.emit_order_submitted(&order);
550
551        self.submit_order_internal(cmd)
552    }
553
554    fn submit_order_list(&self, cmd: &SubmitOrderList) -> anyhow::Result<()> {
555        log::warn!(
556            "submit_order_list not yet implemented for Binance Spot execution client (got {} orders)",
557            cmd.order_list.client_order_ids.len()
558        );
559        Ok(())
560    }
561
562    fn modify_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
563        // Binance Spot uses cancel-replace for order modification, which requires
564        // the full order specification (side, type, time_in_force). Since ModifyOrder
565        // doesn't include these fields, we need to look up the original order from cache.
566        let order = self.core.cache().order(&cmd.client_order_id).cloned();
567
568        let Some(order) = order else {
569            log::warn!(
570                "Cannot modify order {}: not found in cache",
571                cmd.client_order_id
572            );
573            let ts_init = self.clock.get_time_ns();
574            let rejected_event = OrderModifyRejected::new(
575                self.core.trader_id,
576                cmd.strategy_id,
577                cmd.instrument_id,
578                cmd.client_order_id,
579                "Order not found in cache for modify".into(),
580                UUID4::new(),
581                ts_init, // TODO: Use proper event timestamp
582                ts_init,
583                false,
584                cmd.venue_order_id,
585                Some(self.core.account_id),
586            );
587
588            self.emitter
589                .send_order_event(OrderEventAny::ModifyRejected(rejected_event));
590            return Ok(());
591        };
592
593        let http_client = self.http_client.clone();
594        let command = cmd.clone();
595
596        let event_emitter = self.emitter.clone();
597        let trader_id = self.core.trader_id;
598        let account_id = self.core.account_id;
599        let clock = self.clock;
600
601        // Get order properties from cached order
602        let order_side = order.order_side();
603        let order_type = order.order_type();
604        let time_in_force = order.time_in_force();
605        let quantity = cmd.quantity.unwrap_or_else(|| order.quantity());
606
607        self.spawn_task("modify_order", async move {
608            // Binance uses cancel-replace for order modification
609            let result = http_client
610                .modify_order(
611                    account_id,
612                    command.instrument_id,
613                    command
614                        .venue_order_id
615                        .ok_or_else(|| anyhow::anyhow!("venue_order_id required for modify"))?,
616                    command.client_order_id,
617                    order_side,
618                    order_type,
619                    quantity,
620                    time_in_force,
621                    command.price,
622                )
623                .await
624                .map_err(|e| anyhow::anyhow!("Modify order failed: {e}"));
625
626            match result {
627                Ok(report) => {
628                    // Order modified - dispatch OrderUpdated event
629                    let ts_now = clock.get_time_ns();
630                    let updated_event = OrderUpdated::new(
631                        trader_id,
632                        command.strategy_id,
633                        command.instrument_id,
634                        command.client_order_id,
635                        report.quantity,
636                        UUID4::new(),
637                        ts_now,
638                        ts_now,
639                        false,
640                        Some(report.venue_order_id),
641                        Some(account_id),
642                        report.price,
643                        None, // trigger_price
644                        None, // protection_price
645                    );
646
647                    event_emitter.send_order_event(OrderEventAny::Updated(updated_event));
648                }
649                Err(e) => {
650                    let ts_now = clock.get_time_ns();
651                    let rejected_event = OrderModifyRejected::new(
652                        trader_id,
653                        command.strategy_id,
654                        command.instrument_id,
655                        command.client_order_id,
656                        format!("modify-order-error: {e}").into(),
657                        UUID4::new(),
658                        ts_now,
659                        ts_now,
660                        false,
661                        command.venue_order_id,
662                        Some(account_id),
663                    );
664
665                    event_emitter.send_order_event(OrderEventAny::ModifyRejected(rejected_event));
666
667                    return Err(e);
668                }
669            }
670
671            Ok(())
672        });
673
674        Ok(())
675    }
676
677    fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
678        self.cancel_order_internal(cmd)
679    }
680
681    fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
682        let http_client = self.http_client.clone();
683        let command = cmd.clone();
684
685        let event_emitter = self.emitter.clone();
686        let trader_id = self.core.trader_id;
687        let account_id = self.core.account_id;
688        let clock = self.clock;
689
690        self.spawn_task("cancel_all_orders", async move {
691            let canceled_orders = http_client.cancel_all_orders(command.instrument_id).await?;
692
693            // Generate OrderCanceled events for each canceled order
694            for (venue_order_id, client_order_id) in canceled_orders {
695                let canceled_event = OrderCanceled::new(
696                    trader_id,
697                    command.strategy_id,
698                    command.instrument_id,
699                    client_order_id,
700                    UUID4::new(),
701                    command.ts_init,
702                    clock.get_time_ns(),
703                    false,
704                    Some(venue_order_id),
705                    Some(account_id),
706                );
707
708                event_emitter.send_order_event(OrderEventAny::Canceled(canceled_event));
709            }
710
711            Ok(())
712        });
713
714        Ok(())
715    }
716
717    fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
718        const BATCH_SIZE: usize = 5;
719
720        if cmd.cancels.is_empty() {
721            return Ok(());
722        }
723
724        let http_client = self.http_client.clone();
725        let command = cmd.clone();
726
727        let event_emitter = self.emitter.clone();
728        let trader_id = self.core.trader_id;
729        let account_id = self.core.account_id;
730        let clock = self.clock;
731
732        self.spawn_task("batch_cancel_orders", async move {
733            for chunk in command.cancels.chunks(BATCH_SIZE) {
734                let batch_items: Vec<BatchCancelItem> = chunk
735                    .iter()
736                    .map(|cancel| {
737                        if let Some(venue_order_id) = cancel.venue_order_id {
738                            let order_id = venue_order_id.inner().parse::<i64>().unwrap_or(0);
739                            if order_id != 0 {
740                                BatchCancelItem::by_order_id(
741                                    command.instrument_id.symbol.to_string(),
742                                    order_id,
743                                )
744                            } else {
745                                BatchCancelItem::by_client_order_id(
746                                    command.instrument_id.symbol.to_string(),
747                                    cancel.client_order_id.to_string(),
748                                )
749                            }
750                        } else {
751                            BatchCancelItem::by_client_order_id(
752                                command.instrument_id.symbol.to_string(),
753                                cancel.client_order_id.to_string(),
754                            )
755                        }
756                    })
757                    .collect();
758
759                match http_client.batch_cancel_orders(&batch_items).await {
760                    Ok(results) => {
761                        for (i, result) in results.iter().enumerate() {
762                            let cancel = &chunk[i];
763                            match result {
764                                BatchCancelResult::Success(success) => {
765                                    let venue_order_id =
766                                        VenueOrderId::new(success.order_id.to_string());
767                                    let canceled_event = OrderCanceled::new(
768                                        trader_id,
769                                        cancel.strategy_id,
770                                        cancel.instrument_id,
771                                        cancel.client_order_id,
772                                        UUID4::new(),
773                                        cancel.ts_init,
774                                        clock.get_time_ns(),
775                                        false,
776                                        Some(venue_order_id),
777                                        Some(account_id),
778                                    );
779
780                                    event_emitter
781                                        .send_order_event(OrderEventAny::Canceled(canceled_event));
782                                }
783                                BatchCancelResult::Error(error) => {
784                                    let rejected_event = OrderCancelRejected::new(
785                                        trader_id,
786                                        cancel.strategy_id,
787                                        cancel.instrument_id,
788                                        cancel.client_order_id,
789                                        format!(
790                                            "batch-cancel-error: code={}, msg={}",
791                                            error.code, error.msg
792                                        )
793                                        .into(),
794                                        UUID4::new(),
795                                        clock.get_time_ns(),
796                                        cancel.ts_init,
797                                        false,
798                                        cancel.venue_order_id,
799                                        Some(account_id),
800                                    );
801
802                                    event_emitter.send_order_event(OrderEventAny::CancelRejected(
803                                        rejected_event,
804                                    ));
805                                }
806                            }
807                        }
808                    }
809                    Err(e) => {
810                        for cancel in chunk {
811                            let rejected_event = OrderCancelRejected::new(
812                                trader_id,
813                                cancel.strategy_id,
814                                cancel.instrument_id,
815                                cancel.client_order_id,
816                                format!("batch-cancel-request-failed: {e}").into(),
817                                UUID4::new(),
818                                clock.get_time_ns(),
819                                cancel.ts_init,
820                                false,
821                                cancel.venue_order_id,
822                                Some(account_id),
823                            );
824
825                            event_emitter
826                                .send_order_event(OrderEventAny::CancelRejected(rejected_event));
827                        }
828                    }
829                }
830            }
831
832            Ok(())
833        });
834
835        Ok(())
836    }
837
838    async fn generate_order_status_report(
839        &self,
840        cmd: &GenerateOrderStatusReport,
841    ) -> anyhow::Result<Option<OrderStatusReport>> {
842        let Some(instrument_id) = cmd.instrument_id else {
843            log::warn!("generate_order_status_report requires instrument_id: {cmd:?}");
844            return Ok(None);
845        };
846
847        // Convert ClientOrderId to VenueOrderId if provided (API naming quirk)
848        let venue_order_id = cmd
849            .venue_order_id
850            .as_ref()
851            .map(|id| VenueOrderId::new(id.inner()));
852
853        let report = self
854            .http_client
855            .request_order_status_report(
856                self.core.account_id,
857                instrument_id,
858                venue_order_id,
859                cmd.client_order_id,
860            )
861            .await?;
862
863        Ok(Some(report))
864    }
865
866    async fn generate_order_status_reports(
867        &self,
868        cmd: &GenerateOrderStatusReports,
869    ) -> anyhow::Result<Vec<OrderStatusReport>> {
870        let start_dt = cmd.start.map(|nanos| nanos.to_datetime_utc());
871        let end_dt = cmd.end.map(|nanos| nanos.to_datetime_utc());
872
873        let reports = self
874            .http_client
875            .request_order_status_reports(
876                self.core.account_id,
877                cmd.instrument_id,
878                start_dt,
879                end_dt,
880                cmd.open_only,
881                None, // limit
882            )
883            .await?;
884
885        Ok(reports)
886    }
887
888    async fn generate_fill_reports(
889        &self,
890        cmd: GenerateFillReports,
891    ) -> anyhow::Result<Vec<FillReport>> {
892        let Some(instrument_id) = cmd.instrument_id else {
893            log::warn!("generate_fill_reports requires instrument_id for Binance Spot");
894            return Ok(Vec::new());
895        };
896
897        // Convert ClientOrderId to VenueOrderId if provided (API naming quirk)
898        let venue_order_id = cmd
899            .venue_order_id
900            .as_ref()
901            .map(|id| VenueOrderId::new(id.inner()));
902
903        let start_dt = cmd.start.map(|nanos| nanos.to_datetime_utc());
904        let end_dt = cmd.end.map(|nanos| nanos.to_datetime_utc());
905
906        let reports = self
907            .http_client
908            .request_fill_reports(
909                self.core.account_id,
910                instrument_id,
911                venue_order_id,
912                start_dt,
913                end_dt,
914                None, // limit
915            )
916            .await?;
917
918        Ok(reports)
919    }
920
921    async fn generate_position_status_reports(
922        &self,
923        _cmd: &GeneratePositionStatusReports,
924    ) -> anyhow::Result<Vec<PositionStatusReport>> {
925        // Spot trading doesn't have positions in the traditional sense
926        // Returns empty for spot, could be extended for margin positions
927        Ok(Vec::new())
928    }
929
930    async fn generate_mass_status(
931        &self,
932        lookback_mins: Option<u64>,
933    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
934        log::info!("Generating ExecutionMassStatus (lookback_mins={lookback_mins:?})");
935
936        let ts_now = self.clock.get_time_ns();
937
938        let start = lookback_mins.map(|mins| {
939            let lookback_ns = mins * 60 * 1_000_000_000;
940            UnixNanos::from(ts_now.as_u64().saturating_sub(lookback_ns))
941        });
942
943        // Binance requires instrument_id for historical orders (open_only=false).
944        // Use open_only=true for mass status to get all open orders across instruments.
945        let order_cmd = GenerateOrderStatusReportsBuilder::default()
946            .ts_init(ts_now)
947            .open_only(true)
948            .start(start)
949            .build()
950            .map_err(|e| anyhow::anyhow!("{e}"))?;
951
952        let position_cmd = GeneratePositionStatusReportsBuilder::default()
953            .ts_init(ts_now)
954            .start(start)
955            .build()
956            .map_err(|e| anyhow::anyhow!("{e}"))?;
957
958        let (order_reports, position_reports) = tokio::try_join!(
959            self.generate_order_status_reports(&order_cmd),
960            self.generate_position_status_reports(&position_cmd),
961        )?;
962
963        // Note: Fill reports require instrument_id for Binance, so we skip them in mass status
964        // They would need to be fetched per-instrument if needed
965
966        log::info!("Received {} OrderStatusReports", order_reports.len());
967        log::info!("Received {} PositionReports", position_reports.len());
968
969        let mut mass_status = ExecutionMassStatus::new(
970            self.core.client_id,
971            self.core.account_id,
972            *BINANCE_VENUE,
973            ts_now,
974            None,
975        );
976
977        mass_status.add_order_reports(order_reports);
978        mass_status.add_position_reports(position_reports);
979
980        Ok(Some(mass_status))
981    }
982}