nautilus_binance/execution/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live execution client implementation for the Binance Spot adapter.
17
18use std::{
19    future::Future,
20    sync::{
21        Mutex,
22        atomic::{AtomicBool, Ordering},
23    },
24    time::{Duration, Instant},
25};
26
27use anyhow::Context;
28use async_trait::async_trait;
29use nautilus_common::{
30    clients::ExecutionClient,
31    live::{runner::get_exec_event_sender, runtime::get_runtime},
32    messages::{
33        ExecutionEvent, ExecutionReport as NautilusExecutionReport,
34        execution::{
35            BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
36            GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
37            ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
38        },
39    },
40};
41use nautilus_core::{MUTEX_POISONED, UUID4, UnixNanos, time::get_atomic_clock_realtime};
42use nautilus_live::ExecutionClientCore;
43use nautilus_model::{
44    accounts::AccountAny,
45    enums::OmsType,
46    events::{
47        AccountState, OrderAccepted, OrderCancelRejected, OrderCanceled, OrderEventAny,
48        OrderModifyRejected, OrderRejected, OrderSubmitted, OrderUpdated,
49    },
50    identifiers::{AccountId, ClientId, Venue, VenueOrderId},
51    orders::Order,
52    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
53    types::{AccountBalance, Currency, MarginBalance, Money},
54};
55use tokio::task::JoinHandle;
56
57use crate::{
58    common::consts::BINANCE_VENUE,
59    config::BinanceExecClientConfig,
60    spot::http::{client::BinanceSpotHttpClient, query::AccountInfoParams},
61};
62
63/// Live execution client for Binance Spot trading.
64///
65/// Implements the [`ExecutionClient`] trait for order management on Binance Spot
66/// and Spot Margin markets. Uses HTTP API for all order operations with SBE encoding.
67#[derive(Debug)]
68pub struct BinanceSpotExecutionClient {
69    core: ExecutionClientCore,
70    config: BinanceExecClientConfig,
71    http_client: BinanceSpotHttpClient,
72    exec_event_sender: Option<tokio::sync::mpsc::UnboundedSender<ExecutionEvent>>,
73    started: bool,
74    connected: AtomicBool,
75    instruments_initialized: AtomicBool,
76    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
77}
78
79impl BinanceSpotExecutionClient {
80    /// Creates a new [`BinanceSpotExecutionClient`].
81    ///
82    /// # Errors
83    ///
84    /// Returns an error if the HTTP client fails to initialize or credentials are missing.
85    pub fn new(core: ExecutionClientCore, config: BinanceExecClientConfig) -> anyhow::Result<Self> {
86        // Get credentials from config or environment variables
87        let api_key = config
88            .api_key
89            .clone()
90            .or_else(|| std::env::var("BINANCE_API_KEY").ok())
91            .ok_or_else(|| anyhow::anyhow!("BINANCE_API_KEY not found in config or environment"))?;
92
93        let api_secret = config
94            .api_secret
95            .clone()
96            .or_else(|| std::env::var("BINANCE_API_SECRET").ok())
97            .ok_or_else(|| {
98                anyhow::anyhow!("BINANCE_API_SECRET not found in config or environment")
99            })?;
100
101        let http_client = BinanceSpotHttpClient::new(
102            config.environment,
103            Some(api_key),
104            Some(api_secret),
105            config.base_url_http.clone(),
106            None, // recv_window
107            None, // timeout_secs
108            None, // proxy_url
109        )
110        .context("failed to construct Binance Spot HTTP client")?;
111
112        Ok(Self {
113            core,
114            config,
115            http_client,
116            exec_event_sender: None,
117            started: false,
118            connected: AtomicBool::new(false),
119            instruments_initialized: AtomicBool::new(false),
120            pending_tasks: Mutex::new(Vec::new()),
121        })
122    }
123
124    /// Converts mantissa and exponent to f64 value.
125    fn mantissa_to_f64(mantissa: i64, exponent: i8) -> f64 {
126        mantissa as f64 * 10f64.powi(exponent as i32)
127    }
128
129    /// Converts Binance account info to Nautilus account state.
130    fn create_account_state(
131        &self,
132        account_info: &crate::spot::http::models::BinanceAccountInfo,
133    ) -> AccountState {
134        let ts_now = get_atomic_clock_realtime().get_time_ns();
135
136        // Convert balances to AccountBalance using mantissa/exponent format
137        let balances: Vec<AccountBalance> = account_info
138            .balances
139            .iter()
140            .filter_map(|b| {
141                let free = Self::mantissa_to_f64(b.free_mantissa, b.exponent);
142                let locked = Self::mantissa_to_f64(b.locked_mantissa, b.exponent);
143                let total = free + locked;
144
145                // Only include non-zero balances
146                if total == 0.0 {
147                    return None;
148                }
149
150                let currency = Currency::from(&b.asset);
151                Some(AccountBalance::new(
152                    Money::new(total, currency),
153                    Money::new(locked, currency),
154                    Money::new(free, currency),
155                ))
156            })
157            .collect();
158
159        AccountState::new(
160            self.core.account_id,
161            self.core.account_type,
162            balances,
163            Vec::new(), // No margin balances for spot
164            true,       // reported
165            UUID4::new(),
166            ts_now,
167            ts_now,
168            None, // base currency
169        )
170    }
171
172    async fn refresh_account_state(&self) -> anyhow::Result<AccountState> {
173        let params = AccountInfoParams::default();
174        let account_info = match self.http_client.request_account_state(&params).await {
175            Ok(info) => info,
176            Err(e) => {
177                log::error!("Binance account state request failed: {e}");
178                anyhow::bail!("Binance account state request failed: {e}");
179            }
180        };
181
182        Ok(self.create_account_state(&account_info))
183    }
184
185    fn update_account_state(&self) -> anyhow::Result<()> {
186        let runtime = get_runtime();
187        let account_state = runtime.block_on(self.refresh_account_state())?;
188
189        self.core.generate_account_state(
190            account_state.balances.clone(),
191            account_state.margins.clone(),
192            account_state.is_reported,
193            account_state.ts_event,
194        )
195    }
196
197    fn submit_order_internal(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
198        let order = cmd.order.clone();
199        let http_client = self.http_client.clone();
200
201        let exec_event_sender = self.exec_event_sender.clone();
202        let trader_id = self.core.trader_id;
203        let account_id = self.core.account_id;
204        let ts_init = cmd.ts_init;
205        let client_order_id = order.client_order_id();
206        let strategy_id = order.strategy_id();
207        let instrument_id = order.instrument_id();
208        let order_side = order.order_side();
209        let order_type = order.order_type();
210        let quantity = order.quantity();
211        let time_in_force = order.time_in_force();
212        let price = order.price();
213        let trigger_price = order.trigger_price();
214        let is_post_only = order.is_post_only();
215
216        self.spawn_task("submit_order", async move {
217            let result = http_client
218                .submit_order(
219                    account_id,
220                    instrument_id,
221                    client_order_id,
222                    order_side,
223                    order_type,
224                    quantity,
225                    time_in_force,
226                    price,
227                    trigger_price,
228                    is_post_only,
229                )
230                .await
231                .map_err(|e| anyhow::anyhow!("Submit order failed: {e}"));
232
233            match result {
234                Ok(report) => {
235                    // Order accepted - dispatch OrderAccepted event
236                    let accepted_event = OrderAccepted::new(
237                        trader_id,
238                        strategy_id,
239                        instrument_id,
240                        client_order_id,
241                        report.venue_order_id,
242                        account_id,
243                        UUID4::new(),
244                        ts_init,
245                        get_atomic_clock_realtime().get_time_ns(),
246                        false,
247                    );
248
249                    if let Some(sender) = &exec_event_sender
250                        && let Err(e) = sender.send(ExecutionEvent::Order(OrderEventAny::Accepted(
251                            accepted_event,
252                        )))
253                    {
254                        log::warn!("Failed to send OrderAccepted event: {e}");
255                    }
256                }
257                Err(e) => {
258                    let rejected_event = OrderRejected::new(
259                        trader_id,
260                        strategy_id,
261                        instrument_id,
262                        client_order_id,
263                        account_id,
264                        format!("submit-order-error: {e}").into(),
265                        UUID4::new(),
266                        ts_init,
267                        get_atomic_clock_realtime().get_time_ns(),
268                        false,
269                        false,
270                    );
271
272                    if let Some(sender) = &exec_event_sender
273                        && let Err(send_err) = sender.send(ExecutionEvent::Order(
274                            OrderEventAny::Rejected(rejected_event),
275                        ))
276                    {
277                        log::warn!("Failed to send OrderRejected event: {send_err}");
278                    }
279
280                    return Err(e);
281                }
282            }
283
284            Ok(())
285        });
286
287        Ok(())
288    }
289
290    fn cancel_order_internal(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
291        let http_client = self.http_client.clone();
292        let command = cmd.clone();
293
294        let exec_event_sender = self.exec_event_sender.clone();
295        let trader_id = self.core.trader_id;
296        let account_id = self.core.account_id;
297        let ts_init = cmd.ts_init;
298
299        self.spawn_task("cancel_order", async move {
300            let result = http_client
301                .cancel_order(
302                    command.instrument_id,
303                    command.venue_order_id,
304                    Some(command.client_order_id),
305                )
306                .await
307                .map_err(|e| anyhow::anyhow!("Cancel order failed: {e}"));
308
309            match result {
310                Ok(venue_order_id) => {
311                    // Order canceled - dispatch OrderCanceled event
312                    let canceled_event = OrderCanceled::new(
313                        trader_id,
314                        command.strategy_id,
315                        command.instrument_id,
316                        command.client_order_id,
317                        UUID4::new(),
318                        ts_init,
319                        get_atomic_clock_realtime().get_time_ns(),
320                        false,
321                        Some(venue_order_id),
322                        Some(account_id),
323                    );
324
325                    if let Some(sender) = &exec_event_sender
326                        && let Err(e) = sender.send(ExecutionEvent::Order(OrderEventAny::Canceled(
327                            canceled_event,
328                        )))
329                    {
330                        log::warn!("Failed to send OrderCanceled event: {e}");
331                    }
332                }
333                Err(e) => {
334                    let rejected_event = OrderCancelRejected::new(
335                        trader_id,
336                        command.strategy_id,
337                        command.instrument_id,
338                        command.client_order_id,
339                        format!("cancel-order-error: {e}").into(),
340                        UUID4::new(),
341                        get_atomic_clock_realtime().get_time_ns(),
342                        ts_init,
343                        false,
344                        command.venue_order_id,
345                        Some(account_id),
346                    );
347
348                    if let Some(sender) = &exec_event_sender
349                        && let Err(send_err) = sender.send(ExecutionEvent::Order(
350                            OrderEventAny::CancelRejected(rejected_event),
351                        ))
352                    {
353                        log::warn!("Failed to send OrderCancelRejected event: {send_err}");
354                    }
355
356                    return Err(e);
357                }
358            }
359
360            Ok(())
361        });
362
363        Ok(())
364    }
365
366    fn spawn_task<F>(&self, description: &'static str, fut: F)
367    where
368        F: Future<Output = anyhow::Result<()>> + Send + 'static,
369    {
370        let runtime = get_runtime();
371        let handle = runtime.spawn(async move {
372            if let Err(e) = fut.await {
373                log::warn!("{description} failed: {e}");
374            }
375        });
376
377        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
378        tasks.retain(|handle| !handle.is_finished());
379        tasks.push(handle);
380    }
381
382    fn abort_pending_tasks(&self) {
383        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
384        for handle in tasks.drain(..) {
385            handle.abort();
386        }
387    }
388
389    /// Polls the cache until the account is registered or timeout is reached.
390    async fn await_account_registered(&self, timeout_secs: f64) -> anyhow::Result<()> {
391        let account_id = self.core.account_id;
392
393        if self.core.cache().borrow().account(&account_id).is_some() {
394            log::info!("Account {account_id} registered");
395            return Ok(());
396        }
397
398        let start = Instant::now();
399        let timeout = Duration::from_secs_f64(timeout_secs);
400        let interval = Duration::from_millis(10);
401
402        loop {
403            tokio::time::sleep(interval).await;
404
405            if self.core.cache().borrow().account(&account_id).is_some() {
406                log::info!("Account {account_id} registered");
407                return Ok(());
408            }
409
410            if start.elapsed() >= timeout {
411                anyhow::bail!(
412                    "Timeout waiting for account {account_id} to be registered after {timeout_secs}s"
413                );
414            }
415        }
416    }
417}
418
419#[async_trait(?Send)]
420impl ExecutionClient for BinanceSpotExecutionClient {
421    fn is_connected(&self) -> bool {
422        self.connected.load(Ordering::Acquire)
423    }
424
425    fn client_id(&self) -> ClientId {
426        self.core.client_id
427    }
428
429    fn account_id(&self) -> AccountId {
430        self.core.account_id
431    }
432
433    fn venue(&self) -> Venue {
434        *BINANCE_VENUE
435    }
436
437    fn oms_type(&self) -> OmsType {
438        self.core.oms_type
439    }
440
441    fn get_account(&self) -> Option<AccountAny> {
442        self.core.get_account()
443    }
444
445    async fn connect(&mut self) -> anyhow::Result<()> {
446        if self.connected.load(Ordering::Acquire) {
447            return Ok(());
448        }
449
450        // Initialize exec event sender (must be done in async context after runner is set up)
451        if self.exec_event_sender.is_none() {
452            self.exec_event_sender = Some(get_exec_event_sender());
453        }
454
455        // Load instruments if not already done
456        if !self.instruments_initialized.load(Ordering::Acquire) {
457            let instruments = self
458                .http_client
459                .request_instruments()
460                .await
461                .context("failed to request Binance Spot instruments")?;
462
463            if instruments.is_empty() {
464                log::warn!("No instruments returned for Binance Spot");
465            } else {
466                log::info!("Loaded {} Spot instruments", instruments.len());
467                self.http_client.cache_instruments(instruments.clone());
468
469                // Add instruments to Nautilus Cache for reconciliation
470                {
471                    let mut cache = self.core.cache().borrow_mut();
472                    for instrument in &instruments {
473                        if let Err(e) = cache.add_instrument(instrument.clone()) {
474                            log::debug!("Instrument already in cache: {e}");
475                        }
476                    }
477                }
478            }
479
480            self.instruments_initialized.store(true, Ordering::Release);
481        }
482
483        let Some(sender) = self.exec_event_sender.as_ref() else {
484            log::error!("Execution event sender not initialized");
485            anyhow::bail!("Execution event sender not initialized");
486        };
487
488        // Request initial account state
489        let account_state = self
490            .refresh_account_state()
491            .await
492            .context("failed to request Binance account state")?;
493
494        if !account_state.balances.is_empty() {
495            log::info!(
496                "Received account state with {} balance(s)",
497                account_state.balances.len()
498            );
499        }
500
501        if let Err(e) = sender.send(ExecutionEvent::Account(account_state)) {
502            log::warn!("Failed to send account state: {e}");
503        }
504
505        // Wait for account to be registered in cache before completing connect
506        self.await_account_registered(30.0).await?;
507
508        self.connected.store(true, Ordering::Release);
509        log::info!("Connected: client_id={}", self.core.client_id);
510        Ok(())
511    }
512
513    async fn disconnect(&mut self) -> anyhow::Result<()> {
514        if !self.connected.load(Ordering::Acquire) {
515            return Ok(());
516        }
517
518        self.abort_pending_tasks();
519
520        self.connected.store(false, Ordering::Release);
521        log::info!("Disconnected: client_id={}", self.core.client_id);
522        Ok(())
523    }
524
525    fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
526        self.update_account_state()
527    }
528
529    fn query_order(&self, cmd: &QueryOrder) -> anyhow::Result<()> {
530        log::debug!("query_order: client_order_id={}", cmd.client_order_id);
531
532        let http_client = self.http_client.clone();
533        let command = cmd.clone();
534        let exec_event_sender = self.exec_event_sender.clone();
535        let account_id = self.core.account_id;
536
537        self.spawn_task("query_order", async move {
538            let result = http_client
539                .request_order_status(
540                    account_id,
541                    command.instrument_id,
542                    command.venue_order_id,
543                    Some(command.client_order_id),
544                )
545                .await;
546
547            match result {
548                Ok(report) => {
549                    if let Some(sender) = &exec_event_sender {
550                        let exec_report = NautilusExecutionReport::Order(Box::new(report));
551                        if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
552                            log::warn!("Failed to send order status report: {e}");
553                        }
554                    }
555                }
556                Err(e) => log::warn!("Failed to query order status: {e}"),
557            }
558
559            Ok(())
560        });
561
562        Ok(())
563    }
564
565    fn generate_account_state(
566        &self,
567        balances: Vec<AccountBalance>,
568        margins: Vec<MarginBalance>,
569        reported: bool,
570        ts_event: UnixNanos,
571    ) -> anyhow::Result<()> {
572        self.core
573            .generate_account_state(balances, margins, reported, ts_event)
574    }
575
576    fn start(&mut self) -> anyhow::Result<()> {
577        if self.started {
578            return Ok(());
579        }
580
581        self.started = true;
582
583        // Spawn instrument bootstrap task
584        let http_client = self.http_client.clone();
585
586        get_runtime().spawn(async move {
587            match http_client.request_instruments().await {
588                Ok(instruments) => {
589                    if instruments.is_empty() {
590                        log::warn!("No instruments returned for Binance Spot");
591                    } else {
592                        http_client.cache_instruments(instruments);
593                        log::info!("Instruments initialized");
594                    }
595                }
596                Err(e) => {
597                    log::error!("Failed to request Binance Spot instruments: {e}");
598                }
599            }
600        });
601
602        log::info!(
603            "Started: client_id={}, account_id={}, account_type={:?}, environment={:?}, product_types={:?}",
604            self.core.client_id,
605            self.core.account_id,
606            self.core.account_type,
607            self.config.environment,
608            self.config.product_types,
609        );
610        Ok(())
611    }
612
613    fn stop(&mut self) -> anyhow::Result<()> {
614        if !self.started {
615            return Ok(());
616        }
617
618        self.started = false;
619        self.connected.store(false, Ordering::Release);
620        self.abort_pending_tasks();
621        log::info!("Stopped: client_id={}", self.core.client_id);
622        Ok(())
623    }
624
625    fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
626        let order = &cmd.order;
627
628        if order.is_closed() {
629            let client_order_id = order.client_order_id();
630            log::warn!("Cannot submit closed order {client_order_id}");
631            return Ok(());
632        }
633
634        let event = OrderSubmitted::new(
635            self.core.trader_id,
636            order.strategy_id(),
637            order.instrument_id(),
638            order.client_order_id(),
639            self.core.account_id,
640            UUID4::new(),
641            cmd.ts_init,
642            get_atomic_clock_realtime().get_time_ns(),
643        );
644        if let Some(sender) = &self.exec_event_sender {
645            log::debug!("OrderSubmitted client_order_id={}", order.client_order_id());
646            if let Err(e) = sender.send(ExecutionEvent::Order(OrderEventAny::Submitted(event))) {
647                log::warn!("Failed to send OrderSubmitted event: {e}");
648            }
649        } else {
650            log::warn!("Cannot send OrderSubmitted: exec_event_sender not initialized");
651        }
652
653        self.submit_order_internal(cmd)
654    }
655
656    fn submit_order_list(&self, cmd: &SubmitOrderList) -> anyhow::Result<()> {
657        log::warn!(
658            "submit_order_list not yet implemented for Binance Spot execution client (got {} orders)",
659            cmd.order_list.orders.len()
660        );
661        Ok(())
662    }
663
664    fn modify_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
665        // Binance Spot uses cancel-replace for order modification, which requires
666        // the full order specification (side, type, time_in_force). Since ModifyOrder
667        // doesn't include these fields, we need to look up the original order from cache.
668        let order = {
669            let cache = self.core.cache().borrow();
670            cache.order(&cmd.client_order_id).cloned()
671        };
672
673        let Some(order) = order else {
674            log::warn!(
675                "Cannot modify order {}: not found in cache",
676                cmd.client_order_id
677            );
678            let rejected_event = OrderModifyRejected::new(
679                self.core.trader_id,
680                cmd.strategy_id,
681                cmd.instrument_id,
682                cmd.client_order_id,
683                "Order not found in cache for modify".into(),
684                UUID4::new(),
685                get_atomic_clock_realtime().get_time_ns(),
686                cmd.ts_init,
687                false,
688                cmd.venue_order_id,
689                Some(self.core.account_id),
690            );
691
692            if let Some(sender) = &self.exec_event_sender
693                && let Err(e) = sender.send(ExecutionEvent::Order(OrderEventAny::ModifyRejected(
694                    rejected_event,
695                )))
696            {
697                log::warn!("Failed to send OrderModifyRejected event: {e}");
698            }
699            return Ok(());
700        };
701
702        let http_client = self.http_client.clone();
703        let command = cmd.clone();
704
705        let exec_event_sender = self.exec_event_sender.clone();
706        let trader_id = self.core.trader_id;
707        let account_id = self.core.account_id;
708        let ts_init = cmd.ts_init;
709
710        // Get order properties from cached order
711        let order_side = order.order_side();
712        let order_type = order.order_type();
713        let time_in_force = order.time_in_force();
714        let quantity = cmd.quantity.unwrap_or_else(|| order.quantity());
715
716        self.spawn_task("modify_order", async move {
717            // Binance uses cancel-replace for order modification
718            let result = http_client
719                .modify_order(
720                    account_id,
721                    command.instrument_id,
722                    command
723                        .venue_order_id
724                        .ok_or_else(|| anyhow::anyhow!("venue_order_id required for modify"))?,
725                    command.client_order_id,
726                    order_side,
727                    order_type,
728                    quantity,
729                    time_in_force,
730                    command.price,
731                )
732                .await
733                .map_err(|e| anyhow::anyhow!("Modify order failed: {e}"));
734
735            match result {
736                Ok(report) => {
737                    // Order modified - dispatch OrderUpdated event
738                    let updated_event = OrderUpdated::new(
739                        trader_id,
740                        command.strategy_id,
741                        command.instrument_id,
742                        command.client_order_id,
743                        report.quantity,
744                        UUID4::new(),
745                        ts_init,
746                        get_atomic_clock_realtime().get_time_ns(),
747                        false,
748                        Some(report.venue_order_id),
749                        Some(account_id),
750                        report.price,
751                        None, // trigger_price
752                        None, // protection_price
753                    );
754
755                    if let Some(sender) = &exec_event_sender
756                        && let Err(e) = sender
757                            .send(ExecutionEvent::Order(OrderEventAny::Updated(updated_event)))
758                    {
759                        log::warn!("Failed to send OrderUpdated event: {e}");
760                    }
761                }
762                Err(e) => {
763                    let rejected_event = OrderModifyRejected::new(
764                        trader_id,
765                        command.strategy_id,
766                        command.instrument_id,
767                        command.client_order_id,
768                        format!("modify-order-error: {e}").into(),
769                        UUID4::new(),
770                        get_atomic_clock_realtime().get_time_ns(),
771                        ts_init,
772                        false,
773                        command.venue_order_id,
774                        Some(account_id),
775                    );
776
777                    if let Some(sender) = &exec_event_sender
778                        && let Err(send_err) = sender.send(ExecutionEvent::Order(
779                            OrderEventAny::ModifyRejected(rejected_event),
780                        ))
781                    {
782                        log::warn!("Failed to send OrderModifyRejected event: {send_err}");
783                    }
784
785                    return Err(e);
786                }
787            }
788
789            Ok(())
790        });
791
792        Ok(())
793    }
794
795    fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
796        self.cancel_order_internal(cmd)
797    }
798
799    fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
800        let http_client = self.http_client.clone();
801        let command = cmd.clone();
802
803        let exec_event_sender = self.exec_event_sender.clone();
804        let trader_id = self.core.trader_id;
805        let account_id = self.core.account_id;
806
807        self.spawn_task("cancel_all_orders", async move {
808            let canceled_orders = http_client.cancel_all_orders(command.instrument_id).await?;
809
810            // Generate OrderCanceled events for each canceled order
811            for (venue_order_id, client_order_id) in canceled_orders {
812                let canceled_event = OrderCanceled::new(
813                    trader_id,
814                    command.strategy_id,
815                    command.instrument_id,
816                    client_order_id,
817                    UUID4::new(),
818                    command.ts_init,
819                    get_atomic_clock_realtime().get_time_ns(),
820                    false,
821                    Some(venue_order_id),
822                    Some(account_id),
823                );
824
825                if let Some(sender) = &exec_event_sender
826                    && let Err(e) = sender.send(ExecutionEvent::Order(OrderEventAny::Canceled(
827                        canceled_event,
828                    )))
829                {
830                    log::warn!("Failed to send OrderCanceled event: {e}");
831                }
832            }
833
834            Ok(())
835        });
836
837        Ok(())
838    }
839
840    fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
841        // Binance doesn't have a true batch cancel endpoint for spot
842        // Cancel orders individually
843        for cancel in &cmd.cancels {
844            self.cancel_order_internal(cancel)?;
845        }
846
847        Ok(())
848    }
849
850    async fn generate_order_status_report(
851        &self,
852        cmd: &GenerateOrderStatusReport,
853    ) -> anyhow::Result<Option<OrderStatusReport>> {
854        let Some(instrument_id) = cmd.instrument_id else {
855            log::warn!("generate_order_status_report requires instrument_id: {cmd:?}");
856            return Ok(None);
857        };
858
859        // Convert ClientOrderId to VenueOrderId if provided (API naming quirk)
860        let venue_order_id = cmd
861            .venue_order_id
862            .as_ref()
863            .map(|id| VenueOrderId::new(id.inner()));
864
865        let report = self
866            .http_client
867            .request_order_status(
868                self.core.account_id,
869                instrument_id,
870                venue_order_id,
871                cmd.client_order_id,
872            )
873            .await?;
874
875        Ok(Some(report))
876    }
877
878    async fn generate_order_status_reports(
879        &self,
880        cmd: &GenerateOrderStatusReports,
881    ) -> anyhow::Result<Vec<OrderStatusReport>> {
882        let start_dt = cmd.start.map(|nanos| nanos.to_datetime_utc());
883        let end_dt = cmd.end.map(|nanos| nanos.to_datetime_utc());
884
885        let reports = self
886            .http_client
887            .request_order_status_reports(
888                self.core.account_id,
889                cmd.instrument_id,
890                start_dt,
891                end_dt,
892                cmd.open_only,
893                None, // limit
894            )
895            .await?;
896
897        Ok(reports)
898    }
899
900    async fn generate_fill_reports(
901        &self,
902        cmd: GenerateFillReports,
903    ) -> anyhow::Result<Vec<FillReport>> {
904        let Some(instrument_id) = cmd.instrument_id else {
905            log::warn!("generate_fill_reports requires instrument_id for Binance Spot");
906            return Ok(Vec::new());
907        };
908
909        // Convert ClientOrderId to VenueOrderId if provided (API naming quirk)
910        let venue_order_id = cmd
911            .venue_order_id
912            .as_ref()
913            .map(|id| VenueOrderId::new(id.inner()));
914
915        let start_dt = cmd.start.map(|nanos| nanos.to_datetime_utc());
916        let end_dt = cmd.end.map(|nanos| nanos.to_datetime_utc());
917
918        let reports = self
919            .http_client
920            .request_fill_reports(
921                self.core.account_id,
922                instrument_id,
923                venue_order_id,
924                start_dt,
925                end_dt,
926                None, // limit
927            )
928            .await?;
929
930        Ok(reports)
931    }
932
933    async fn generate_position_status_reports(
934        &self,
935        _cmd: &GeneratePositionStatusReports,
936    ) -> anyhow::Result<Vec<PositionStatusReport>> {
937        // Spot trading doesn't have positions in the traditional sense
938        // Returns empty for spot, could be extended for margin positions
939        Ok(Vec::new())
940    }
941
942    async fn generate_mass_status(
943        &self,
944        lookback_mins: Option<u64>,
945    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
946        log::info!("Generating ExecutionMassStatus (lookback_mins={lookback_mins:?})");
947
948        let ts_now = get_atomic_clock_realtime().get_time_ns();
949
950        let start = lookback_mins.map(|mins| {
951            let lookback_ns = mins * 60 * 1_000_000_000;
952            UnixNanos::from(ts_now.as_u64().saturating_sub(lookback_ns))
953        });
954
955        // Binance requires instrument_id for historical orders (open_only=false).
956        // Use open_only=true for mass status to get all open orders across instruments.
957        let order_cmd = GenerateOrderStatusReports::new(
958            UUID4::new(),
959            ts_now,
960            true, // open_only - Binance requires instrument_id for historical orders
961            None, // instrument_id
962            start,
963            None, // end
964            None, // params
965            None, // correlation_id
966        );
967
968        let position_cmd = GeneratePositionStatusReports::new(
969            UUID4::new(),
970            ts_now,
971            None, // instrument_id
972            start,
973            None, // end
974            None, // params
975            None, // correlation_id
976        );
977
978        let (order_reports, position_reports) = tokio::try_join!(
979            self.generate_order_status_reports(&order_cmd),
980            self.generate_position_status_reports(&position_cmd),
981        )?;
982
983        // Note: Fill reports require instrument_id for Binance, so we skip them in mass status
984        // They would need to be fetched per-instrument if needed
985
986        log::info!("Received {} OrderStatusReports", order_reports.len());
987        log::info!("Received {} PositionReports", position_reports.len());
988
989        let mut mass_status = ExecutionMassStatus::new(
990            self.core.client_id,
991            self.core.account_id,
992            *BINANCE_VENUE,
993            ts_now,
994            None,
995        );
996
997        mass_status.add_order_reports(order_reports);
998        mass_status.add_position_reports(position_reports);
999
1000        Ok(Some(mass_status))
1001    }
1002}