Skip to main content

nautilus_binance/futures/
execution.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live execution client implementation for the Binance Futures adapter.
17
18use std::{
19    future::Future,
20    sync::{
21        Arc, Mutex, RwLock,
22        atomic::{AtomicBool, Ordering},
23    },
24    time::{Duration, Instant},
25};
26
27use ahash::AHashSet;
28use anyhow::Context;
29use async_trait::async_trait;
30use futures_util::{StreamExt, pin_mut};
31use nautilus_common::{
32    clients::ExecutionClient,
33    live::{get_runtime, runner::get_exec_event_sender},
34    messages::execution::{
35        BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
36        GenerateOrderStatusReport, GenerateOrderStatusReports, GenerateOrderStatusReportsBuilder,
37        GeneratePositionStatusReports, GeneratePositionStatusReportsBuilder, ModifyOrder,
38        QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
39    },
40};
41use nautilus_core::{
42    MUTEX_POISONED, UUID4, UnixNanos,
43    time::{AtomicTime, get_atomic_clock_realtime},
44};
45use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
46use nautilus_model::{
47    accounts::AccountAny,
48    enums::{OmsType, OrderSide, PositionSideSpecified},
49    events::{
50        AccountState, OrderCancelRejected, OrderCanceled, OrderEventAny, OrderModifyRejected,
51        OrderRejected, OrderUpdated,
52    },
53    identifiers::{
54        AccountId, ClientId, ClientOrderId, InstrumentId, StrategyId, TraderId, Venue, VenueOrderId,
55    },
56    instruments::Instrument,
57    orders::{Order, OrderAny},
58    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
59    types::{AccountBalance, Currency, MarginBalance, Money, Quantity},
60};
61use rust_decimal::Decimal;
62use tokio::task::JoinHandle;
63use tokio_util::sync::CancellationToken;
64
65use super::{
66    http::{
67        BinanceFuturesHttpError,
68        client::{BinanceFuturesHttpClient, BinanceFuturesInstrument, is_algo_order_type},
69        models::{BatchOrderResult, BinancePositionRisk},
70        query::{
71            BatchCancelItem, BinanceAllOrdersParamsBuilder, BinanceOpenOrdersParamsBuilder,
72            BinanceOrderQueryParamsBuilder, BinancePositionRiskParamsBuilder,
73            BinanceUserTradesParamsBuilder,
74        },
75    },
76    websocket::{
77        client::BinanceFuturesWebSocketClient,
78        handler_exec::BinanceFuturesExecWsFeedHandler,
79        messages::{ExecHandlerCommand, NautilusExecWsMessage},
80    },
81};
82use crate::{
83    common::{
84        consts::BINANCE_VENUE,
85        credential::resolve_credentials,
86        enums::{BinancePositionSide, BinanceProductType},
87    },
88    config::BinanceExecClientConfig,
89    futures::http::models::BinanceFuturesAccountInfo,
90};
91
92/// Listen key keepalive interval (30 minutes).
93const LISTEN_KEY_KEEPALIVE_SECS: u64 = 30 * 60;
94
95/// Live execution client for Binance Futures trading.
96///
97/// Implements the [`ExecutionClient`] trait for order management on Binance
98/// USD-M and COIN-M Futures markets. Uses HTTP API for order operations and
99/// WebSocket for real-time order updates via user data stream.
100///
101/// Uses a two-tier architecture with an execution handler that maintains
102/// pending order maps for correlating WebSocket updates with order context.
103#[derive(Debug)]
104pub struct BinanceFuturesExecutionClient {
105    core: ExecutionClientCore,
106    clock: &'static AtomicTime,
107    config: BinanceExecClientConfig,
108    emitter: ExecutionEventEmitter,
109    product_type: BinanceProductType,
110    http_client: BinanceFuturesHttpClient,
111    ws_client: Option<BinanceFuturesWebSocketClient>,
112    exec_cmd_tx: Option<tokio::sync::mpsc::UnboundedSender<ExecHandlerCommand>>,
113    listen_key: Arc<RwLock<Option<String>>>,
114    cancellation_token: CancellationToken,
115    handler_signal: Arc<AtomicBool>,
116    triggered_algo_order_ids: Arc<RwLock<AHashSet<ClientOrderId>>>,
117    ws_task: Mutex<Option<JoinHandle<()>>>,
118    keepalive_task: Mutex<Option<JoinHandle<()>>>,
119    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
120    is_hedge_mode: AtomicBool,
121}
122
123impl BinanceFuturesExecutionClient {
124    /// Creates a new [`BinanceFuturesExecutionClient`].
125    ///
126    /// # Errors
127    ///
128    /// Returns an error if the HTTP client fails to initialize or credentials are missing.
129    pub fn new(core: ExecutionClientCore, config: BinanceExecClientConfig) -> anyhow::Result<Self> {
130        let product_type = config
131            .product_types
132            .iter()
133            .find(|pt| matches!(pt, BinanceProductType::UsdM | BinanceProductType::CoinM))
134            .copied()
135            .unwrap_or(BinanceProductType::UsdM);
136
137        let (api_key, api_secret) = resolve_credentials(
138            config.api_key.clone(),
139            config.api_secret.clone(),
140            config.environment,
141            product_type,
142        )?;
143
144        let http_client = BinanceFuturesHttpClient::new(
145            product_type,
146            config.environment,
147            Some(api_key.clone()),
148            Some(api_secret.clone()),
149            config.base_url_http.clone(),
150            None, // recv_window
151            None, // timeout_secs
152            None, // proxy_url
153        )
154        .context("failed to construct Binance Futures HTTP client")?;
155
156        let ws_client = BinanceFuturesWebSocketClient::new(
157            product_type,
158            config.environment,
159            Some(api_key),
160            Some(api_secret),
161            config.base_url_ws.clone(),
162            Some(20), // Heartbeat interval
163        )
164        .context("failed to construct Binance Futures WebSocket client")?;
165
166        let clock = get_atomic_clock_realtime();
167        let emitter = ExecutionEventEmitter::new(
168            clock,
169            core.trader_id,
170            core.account_id,
171            core.account_type,
172            core.base_currency,
173        );
174
175        Ok(Self {
176            core,
177            clock,
178            config,
179            emitter,
180            product_type,
181            http_client,
182            ws_client: Some(ws_client),
183            exec_cmd_tx: None,
184            listen_key: Arc::new(RwLock::new(None)),
185            cancellation_token: CancellationToken::new(),
186            handler_signal: Arc::new(AtomicBool::new(false)),
187            triggered_algo_order_ids: Arc::new(RwLock::new(AHashSet::new())),
188            ws_task: Mutex::new(None),
189            keepalive_task: Mutex::new(None),
190            pending_tasks: Mutex::new(Vec::new()),
191            is_hedge_mode: AtomicBool::new(false),
192        })
193    }
194
195    /// Returns whether the account is in hedge mode (dual side position).
196    #[must_use]
197    pub fn is_hedge_mode(&self) -> bool {
198        self.is_hedge_mode.load(Ordering::Acquire)
199    }
200
201    /// Determines the position side for hedge mode based on order direction.
202    fn determine_position_side(
203        &self,
204        order_side: OrderSide,
205        reduce_only: bool,
206    ) -> Option<BinancePositionSide> {
207        if !self.is_hedge_mode() {
208            return None;
209        }
210
211        // In hedge mode, position side depends on whether we're opening or closing
212        Some(if reduce_only {
213            // Closing: Buy closes Short, Sell closes Long
214            match order_side {
215                OrderSide::Buy => BinancePositionSide::Short,
216                OrderSide::Sell => BinancePositionSide::Long,
217                _ => BinancePositionSide::Both,
218            }
219        } else {
220            // Opening: Buy opens Long, Sell opens Short
221            match order_side {
222                OrderSide::Buy => BinancePositionSide::Long,
223                OrderSide::Sell => BinancePositionSide::Short,
224                _ => BinancePositionSide::Both,
225            }
226        })
227    }
228
229    /// Converts Binance futures account info to Nautilus account state.
230    fn create_account_state(&self, account_info: &BinanceFuturesAccountInfo) -> AccountState {
231        let ts_now = self.clock.get_time_ns();
232
233        let balances: Vec<AccountBalance> = account_info
234            .assets
235            .iter()
236            .filter_map(|b| {
237                let wallet_balance: f64 = b.wallet_balance.parse().unwrap_or(0.0);
238                let available_balance: f64 = b.available_balance.parse().unwrap_or(0.0);
239                let locked = wallet_balance - available_balance;
240
241                if wallet_balance == 0.0 {
242                    return None;
243                }
244
245                let currency = Currency::from(&b.asset);
246                Some(AccountBalance::new(
247                    Money::new(wallet_balance, currency),
248                    Money::new(locked.max(0.0), currency),
249                    Money::new(available_balance, currency),
250                ))
251            })
252            .collect();
253
254        // Margin balances in futures are position-specific, not account-level,
255        // so we don't create MarginBalance entries here as they require instrument_id.
256        let margins: Vec<MarginBalance> = Vec::new();
257
258        AccountState::new(
259            self.core.account_id,
260            self.core.account_type,
261            balances,
262            margins,
263            true, // reported
264            UUID4::new(),
265            ts_now,
266            ts_now,
267            None, // base currency
268        )
269    }
270
271    async fn refresh_account_state(&self) -> anyhow::Result<AccountState> {
272        let account_info = match self.http_client.query_account().await {
273            Ok(info) => info,
274            Err(e) => {
275                log::error!("Binance Futures account state request failed: {e}");
276                anyhow::bail!("Binance Futures account state request failed: {e}");
277            }
278        };
279
280        Ok(self.create_account_state(&account_info))
281    }
282
283    fn update_account_state(&self) -> anyhow::Result<()> {
284        let runtime = get_runtime();
285        let account_state = runtime.block_on(self.refresh_account_state())?;
286
287        let ts_now = self.clock.get_time_ns();
288        self.emitter.emit_account_state(
289            account_state.balances.clone(),
290            account_state.margins.clone(),
291            account_state.is_reported,
292            ts_now,
293        );
294        Ok(())
295    }
296
297    async fn init_hedge_mode(&self) -> anyhow::Result<bool> {
298        let response = self.http_client.query_hedge_mode().await?;
299        Ok(response.dual_side_position)
300    }
301
302    /// Handles execution events from the handler.
303    ///
304    /// The handler has already correlated WebSocket updates with order context
305    /// (strategy_id, etc.) and emits normalized Nautilus events.
306    fn handle_exec_event(message: NautilusExecWsMessage, emitter: &ExecutionEventEmitter) {
307        match message {
308            NautilusExecWsMessage::OrderAccepted(event) => {
309                emitter.send_order_event(OrderEventAny::Accepted(event));
310            }
311            NautilusExecWsMessage::OrderCanceled(event) => {
312                emitter.send_order_event(OrderEventAny::Canceled(event));
313            }
314            NautilusExecWsMessage::OrderRejected(event) => {
315                emitter.send_order_event(OrderEventAny::Rejected(event));
316            }
317            NautilusExecWsMessage::OrderFilled(event) => {
318                emitter.send_order_event(OrderEventAny::Filled(event));
319            }
320            NautilusExecWsMessage::OrderUpdated(event) => {
321                emitter.send_order_event(OrderEventAny::Updated(event));
322            }
323            NautilusExecWsMessage::AccountUpdate(event) => {
324                emitter.send_account_state(event);
325            }
326            NautilusExecWsMessage::ListenKeyExpired => {
327                log::warn!("Listen key expired - reconnection required");
328            }
329            NautilusExecWsMessage::Reconnected => {
330                log::info!("User data stream WebSocket reconnected");
331            }
332        }
333    }
334
335    /// Registers an order with the execution handler for context tracking.
336    fn register_order(&self, order: &OrderAny) {
337        if let Some(ref cmd_tx) = self.exec_cmd_tx {
338            let cmd = ExecHandlerCommand::RegisterOrder {
339                client_order_id: order.client_order_id(),
340                trader_id: order.trader_id(),
341                strategy_id: order.strategy_id(),
342                instrument_id: order.instrument_id(),
343            };
344            if let Err(e) = cmd_tx.send(cmd) {
345                log::error!("Failed to register order with handler: {e}");
346            }
347        }
348    }
349
350    /// Registers a cancel request with the execution handler for context tracking.
351    fn register_cancel(
352        &self,
353        client_order_id: ClientOrderId,
354        trader_id: TraderId,
355        strategy_id: StrategyId,
356        instrument_id: InstrumentId,
357        venue_order_id: Option<VenueOrderId>,
358    ) {
359        if let Some(ref cmd_tx) = self.exec_cmd_tx {
360            let cmd = ExecHandlerCommand::RegisterCancel {
361                client_order_id,
362                trader_id,
363                strategy_id,
364                instrument_id,
365                venue_order_id,
366            };
367            if let Err(e) = cmd_tx.send(cmd) {
368                log::error!("Failed to register cancel with handler: {e}");
369            }
370        }
371    }
372
373    fn submit_order_internal(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
374        let http_client = self.http_client.clone();
375
376        let order = self
377            .core
378            .cache()
379            .order(&cmd.client_order_id)
380            .cloned()
381            .ok_or_else(|| anyhow::anyhow!("Order not found: {}", cmd.client_order_id))?;
382
383        // Register order with handler for context tracking before HTTP request
384        self.register_order(&order);
385
386        let emitter = self.emitter.clone();
387        let trader_id = self.core.trader_id;
388        let account_id = self.core.account_id;
389        let clock = self.clock;
390        let client_order_id = order.client_order_id();
391        let strategy_id = order.strategy_id();
392        let instrument_id = order.instrument_id();
393        let order_side = order.order_side();
394        let order_type = order.order_type();
395        let quantity = order.quantity();
396        let time_in_force = order.time_in_force();
397        let price = order.price();
398        let trigger_price = order.trigger_price();
399        let reduce_only = order.is_reduce_only();
400        let position_side = self.determine_position_side(order_side, reduce_only);
401
402        // HTTP only generates OrderRejected on failure.
403        // OrderAccepted comes from WebSocket (ORDER_TRADE_UPDATE or ALGO_UPDATE).
404        let use_algo_api = is_algo_order_type(order_type);
405
406        self.spawn_task("submit_order", async move {
407            let result = if use_algo_api {
408                http_client
409                    .submit_algo_order(
410                        account_id,
411                        instrument_id,
412                        client_order_id,
413                        order_side,
414                        order_type,
415                        quantity,
416                        time_in_force,
417                        price,
418                        trigger_price,
419                        reduce_only,
420                        position_side,
421                    )
422                    .await
423            } else {
424                http_client
425                    .submit_order(
426                        account_id,
427                        instrument_id,
428                        client_order_id,
429                        order_side,
430                        order_type,
431                        quantity,
432                        time_in_force,
433                        price,
434                        trigger_price,
435                        reduce_only,
436                        position_side,
437                    )
438                    .await
439            };
440
441            match result {
442                Ok(report) => {
443                    log::debug!(
444                        "Order submit accepted: client_order_id={}, venue_order_id={}",
445                        client_order_id,
446                        report.venue_order_id
447                    );
448                }
449                Err(e) => {
450                    // Keep order registered - if HTTP failed due to timeout but order
451                    // reached Binance, WebSocket updates will still arrive. The order
452                    // will be cleaned up via WebSocket rejection or reconciliation.
453                    let ts_now = clock.get_time_ns();
454                    let rejected_event = OrderRejected::new(
455                        trader_id,
456                        strategy_id,
457                        instrument_id,
458                        client_order_id,
459                        account_id,
460                        format!("submit-order-error: {e}").into(),
461                        UUID4::new(),
462                        ts_now,
463                        ts_now,
464                        false,
465                        false,
466                    );
467
468                    emitter.send_order_event(OrderEventAny::Rejected(rejected_event));
469
470                    return Err(e);
471                }
472            }
473
474            Ok(())
475        });
476
477        Ok(())
478    }
479
480    fn cancel_order_internal(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
481        let http_client = self.http_client.clone();
482        let command = cmd.clone();
483
484        // Register cancel with handler for context tracking before HTTP request
485        self.register_cancel(
486            command.client_order_id,
487            self.core.trader_id,
488            command.strategy_id,
489            command.instrument_id,
490            command.venue_order_id,
491        );
492
493        // Non-triggered algo orders use algo cancel endpoint, triggered use regular
494        let is_algo = self
495            .core
496            .cache()
497            .order(&command.client_order_id)
498            .is_some_and(|order| is_algo_order_type(order.order_type()));
499        let is_triggered = self
500            .triggered_algo_order_ids
501            .read()
502            .expect("triggered_algo_order_ids lock poisoned")
503            .contains(&command.client_order_id);
504        let use_algo_cancel = is_algo && !is_triggered;
505
506        let emitter = self.emitter.clone();
507        let trader_id = self.core.trader_id;
508        let account_id = self.core.account_id;
509        let clock = self.clock;
510        let instrument_id = command.instrument_id;
511        let venue_order_id = command.venue_order_id;
512        let client_order_id = command.client_order_id;
513
514        // HTTP only generates OrderCancelRejected on failure.
515        // OrderCanceled comes from WebSocket (ORDER_TRADE_UPDATE or ALGO_UPDATE).
516        self.spawn_task("cancel_order", async move {
517            let result = if use_algo_cancel {
518                // Try algo cancel first; if it fails, the order may have been triggered
519                // before this session started, so fall back to regular cancel
520                match http_client.cancel_algo_order(client_order_id).await {
521                    Ok(()) => Ok(()),
522                    Err(algo_err) => {
523                        log::debug!("Algo cancel failed, trying regular cancel: {algo_err}");
524                        http_client
525                            .cancel_order(instrument_id, venue_order_id, Some(client_order_id))
526                            .await
527                            .map(|_| ())
528                    }
529                }
530            } else {
531                http_client
532                    .cancel_order(instrument_id, venue_order_id, Some(client_order_id))
533                    .await
534                    .map(|_| ())
535            };
536
537            match result {
538                Ok(()) => {
539                    log::debug!("Cancel request accepted: client_order_id={client_order_id}");
540                }
541                Err(e) => {
542                    let ts_now = clock.get_time_ns();
543                    let rejected_event = OrderCancelRejected::new(
544                        trader_id,
545                        command.strategy_id,
546                        command.instrument_id,
547                        client_order_id,
548                        format!("cancel-order-error: {e}").into(),
549                        UUID4::new(),
550                        ts_now,
551                        ts_now,
552                        false,
553                        command.venue_order_id,
554                        Some(account_id),
555                    );
556
557                    emitter.send_order_event(OrderEventAny::CancelRejected(rejected_event));
558
559                    return Err(e);
560                }
561            }
562
563            Ok(())
564        });
565
566        Ok(())
567    }
568
569    fn spawn_task<F>(&self, description: &'static str, fut: F)
570    where
571        F: Future<Output = anyhow::Result<()>> + Send + 'static,
572    {
573        let runtime = get_runtime();
574        let handle = runtime.spawn(async move {
575            if let Err(e) = fut.await {
576                log::warn!("{description} failed: {e}");
577            }
578        });
579
580        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
581        tasks.retain(|handle| !handle.is_finished());
582        tasks.push(handle);
583    }
584
585    fn abort_pending_tasks(&self) {
586        let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
587        for handle in tasks.drain(..) {
588            handle.abort();
589        }
590    }
591
592    async fn await_account_registered(&self, timeout_secs: f64) -> anyhow::Result<()> {
593        let account_id = self.core.account_id;
594
595        if self.core.cache().account(&account_id).is_some() {
596            log::info!("Account {account_id} registered");
597            return Ok(());
598        }
599
600        let start = Instant::now();
601        let timeout = Duration::from_secs_f64(timeout_secs);
602        let interval = Duration::from_millis(10);
603
604        loop {
605            tokio::time::sleep(interval).await;
606
607            if self.core.cache().account(&account_id).is_some() {
608                log::info!("Account {account_id} registered");
609                return Ok(());
610            }
611
612            if start.elapsed() >= timeout {
613                anyhow::bail!(
614                    "Timeout waiting for account {account_id} to be registered after {timeout_secs}s"
615                );
616            }
617        }
618    }
619
620    /// Returns the (price_precision, size_precision) for an instrument.
621    fn get_instrument_precision(&self, instrument_id: InstrumentId) -> (u8, u8) {
622        let cache = self.core.cache();
623        cache
624            .instrument(&instrument_id)
625            .map_or((8, 8), |i| (i.price_precision(), i.size_precision()))
626    }
627
628    /// Creates a position status report from Binance position risk data.
629    fn create_position_report(
630        &self,
631        position: &BinancePositionRisk,
632        instrument_id: InstrumentId,
633        size_precision: u8,
634    ) -> anyhow::Result<PositionStatusReport> {
635        let position_amount: Decimal = position
636            .position_amt
637            .parse()
638            .context("invalid position_amt")?;
639
640        if position_amount.is_zero() {
641            anyhow::bail!("Position is flat");
642        }
643
644        let entry_price: Decimal = position
645            .entry_price
646            .parse()
647            .context("invalid entry_price")?;
648
649        let position_side = if position_amount > Decimal::ZERO {
650            PositionSideSpecified::Long
651        } else {
652            PositionSideSpecified::Short
653        };
654
655        let ts_now = self.clock.get_time_ns();
656
657        Ok(PositionStatusReport::new(
658            self.core.account_id,
659            instrument_id,
660            position_side,
661            Quantity::new(position_amount.abs().to_string().parse()?, size_precision),
662            ts_now,
663            ts_now,
664            Some(UUID4::new()),
665            None, // venue_position_id
666            Some(entry_price),
667        ))
668    }
669}
670
671#[async_trait(?Send)]
672impl ExecutionClient for BinanceFuturesExecutionClient {
673    fn is_connected(&self) -> bool {
674        self.core.is_connected()
675    }
676
677    fn client_id(&self) -> ClientId {
678        self.core.client_id
679    }
680
681    fn account_id(&self) -> AccountId {
682        self.core.account_id
683    }
684
685    fn venue(&self) -> Venue {
686        *BINANCE_VENUE
687    }
688
689    fn oms_type(&self) -> OmsType {
690        self.core.oms_type
691    }
692
693    fn get_account(&self) -> Option<AccountAny> {
694        self.core.cache().account(&self.core.account_id).cloned()
695    }
696
697    async fn connect(&mut self) -> anyhow::Result<()> {
698        if self.core.is_connected() {
699            return Ok(());
700        }
701
702        // Reinitialize cancellation token in case of reconnection
703        self.cancellation_token = CancellationToken::new();
704
705        // Check hedge mode
706        let is_hedge_mode = self
707            .init_hedge_mode()
708            .await
709            .context("failed to query hedge mode")?;
710        self.is_hedge_mode.store(is_hedge_mode, Ordering::Release);
711        log::info!("Hedge mode (dual side position): {is_hedge_mode}");
712
713        // Load instruments if not already done
714        let _instruments = if self.core.instruments_initialized() {
715            Vec::new()
716        } else {
717            let instruments = self
718                .http_client
719                .request_instruments()
720                .await
721                .context("failed to request Binance Futures instruments")?;
722
723            if instruments.is_empty() {
724                log::warn!("No instruments returned for Binance Futures");
725            } else {
726                log::info!("Loaded {} Futures instruments", instruments.len());
727            }
728
729            self.core.set_instruments_initialized();
730            instruments
731        };
732
733        // Create listen key for user data stream
734        log::info!("Creating listen key for user data stream...");
735        let listen_key_response = self
736            .http_client
737            .create_listen_key()
738            .await
739            .context("failed to create listen key")?;
740        let listen_key = listen_key_response.listen_key;
741        log::info!("Listen key created successfully");
742
743        {
744            let mut key_guard = self.listen_key.write().expect(MUTEX_POISONED);
745            *key_guard = Some(listen_key.clone());
746        }
747
748        // Connect WebSocket and set up execution handler
749        if let Some(ref mut ws_client) = self.ws_client {
750            log::info!("Connecting to Binance Futures user data stream WebSocket...");
751            ws_client.connect().await.map_err(|e| {
752                log::error!("Binance Futures WebSocket connection failed: {e:?}");
753                anyhow::anyhow!("failed to connect Binance Futures WebSocket: {e}")
754            })?;
755            log::info!("Binance Futures WebSocket connected");
756
757            // Subscribe to user data stream using listen key
758            log::info!("Subscribing to user data stream...");
759            ws_client
760                .subscribe(vec![listen_key.clone()])
761                .await
762                .map_err(|e| anyhow::anyhow!("failed to subscribe to user data stream: {e}"))?;
763            log::info!("Subscribed to user data stream");
764
765            // Create channels for the execution handler
766            let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
767            let (raw_tx, raw_rx) = tokio::sync::mpsc::unbounded_channel();
768
769            // Store command channel for order registration
770            self.exec_cmd_tx = Some(cmd_tx.clone());
771
772            // Create and initialize the execution handler
773            let mut handler = BinanceFuturesExecWsFeedHandler::new(
774                self.clock,
775                self.core.trader_id,
776                self.core.account_id,
777                self.core.account_type,
778                self.product_type,
779                self.handler_signal.clone(),
780                self.triggered_algo_order_ids.clone(),
781                cmd_rx,
782                raw_rx,
783            );
784
785            // Initialize handler with instruments
786            let instruments_for_handler: Vec<BinanceFuturesInstrument> = self
787                .http_client
788                .instruments_cache()
789                .iter()
790                .map(|r| r.value().clone())
791                .collect();
792            if let Err(e) = cmd_tx.send(ExecHandlerCommand::InitializeInstruments(
793                instruments_for_handler,
794            )) {
795                log::error!("Failed to send instruments to handler: {e}");
796            }
797
798            // Set up raw message forwarding from WebSocket to handler
799            let stream = ws_client.stream();
800            let cancel = self.cancellation_token.clone();
801            let raw_forward_task = get_runtime().spawn(async move {
802                pin_mut!(stream);
803                loop {
804                    tokio::select! {
805                        Some(message) = stream.next() => {
806                            if let Err(e) = raw_tx.send(message) {
807                                log::error!("Failed to forward raw message to handler: {e}");
808                                break;
809                            }
810                        }
811                        () = cancel.cancelled() => {
812                            log::debug!("Raw message forwarding task cancelled");
813                            break;
814                        }
815                    }
816                }
817            });
818
819            let emitter = self.emitter.clone();
820            let handler_cancel = self.cancellation_token.clone();
821
822            let ws_task = get_runtime().spawn(async move {
823                loop {
824                    tokio::select! {
825                        msg = handler.next() => {
826                            match msg {
827                                Some(event) => {
828                                    Self::handle_exec_event(event, &emitter);
829                                }
830                                None => break,
831                            }
832                        }
833                        () = handler_cancel.cancelled() => {
834                            log::debug!("Handler task cancelled");
835                            break;
836                        }
837                    }
838                }
839
840                raw_forward_task.abort();
841            });
842            *self.ws_task.lock().expect(MUTEX_POISONED) = Some(ws_task);
843
844            // Start listen key keepalive task
845            let http_client = self.http_client.clone();
846            let listen_key_ref = self.listen_key.clone();
847            let cancel = self.cancellation_token.clone();
848
849            let keepalive_task = get_runtime().spawn(async move {
850                let mut interval =
851                    tokio::time::interval(Duration::from_secs(LISTEN_KEY_KEEPALIVE_SECS));
852                loop {
853                    tokio::select! {
854                        _ = interval.tick() => {
855                            let key = {
856                                let guard = listen_key_ref.read().expect(MUTEX_POISONED);
857                                guard.clone()
858                            };
859                            if let Some(ref key) = key {
860                                match http_client.keepalive_listen_key(key).await {
861                                    Ok(()) => {
862                                        log::debug!("Listen key keepalive sent successfully");
863                                    }
864                                    Err(e) => {
865                                        log::warn!("Listen key keepalive failed: {e}");
866                                    }
867                                }
868                            }
869                        }
870                        () = cancel.cancelled() => {
871                            log::debug!("Listen key keepalive task cancelled");
872                            break;
873                        }
874                    }
875                }
876            });
877            *self.keepalive_task.lock().expect(MUTEX_POISONED) = Some(keepalive_task);
878        }
879
880        // Request initial account state
881        let account_state = self
882            .refresh_account_state()
883            .await
884            .context("failed to request Binance Futures account state")?;
885
886        if !account_state.balances.is_empty() {
887            log::info!(
888                "Received account state with {} balance(s) and {} margin(s)",
889                account_state.balances.len(),
890                account_state.margins.len()
891            );
892        }
893
894        self.emitter.send_account_state(account_state);
895
896        self.await_account_registered(30.0).await?;
897
898        self.core.set_connected();
899        log::info!("Connected: client_id={}", self.core.client_id);
900        Ok(())
901    }
902
903    async fn disconnect(&mut self) -> anyhow::Result<()> {
904        if self.core.is_disconnected() {
905            return Ok(());
906        }
907
908        // Cancel all background tasks
909        self.cancellation_token.cancel();
910
911        // Wait for WebSocket task to complete
912        let ws_task = self.ws_task.lock().expect(MUTEX_POISONED).take();
913        if let Some(task) = ws_task {
914            let _ = task.await;
915        }
916
917        // Wait for keepalive task to complete
918        let keepalive_task = self.keepalive_task.lock().expect(MUTEX_POISONED).take();
919        if let Some(task) = keepalive_task {
920            let _ = task.await;
921        }
922
923        // Close WebSocket
924        if let Some(ref mut ws_client) = self.ws_client {
925            let _ = ws_client.close().await;
926        }
927
928        // Close listen key
929        let listen_key = self.listen_key.read().expect(MUTEX_POISONED).clone();
930        if let Some(ref key) = listen_key
931            && let Err(e) = self.http_client.close_listen_key(key).await
932        {
933            log::warn!("Failed to close listen key: {e}");
934        }
935        *self.listen_key.write().expect(MUTEX_POISONED) = None;
936
937        self.abort_pending_tasks();
938
939        self.core.set_disconnected();
940        log::info!("Disconnected: client_id={}", self.core.client_id);
941        Ok(())
942    }
943
944    fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
945        self.update_account_state()
946    }
947
948    fn query_order(&self, cmd: &QueryOrder) -> anyhow::Result<()> {
949        log::debug!("query_order: client_order_id={}", cmd.client_order_id);
950
951        let http_client = self.http_client.clone();
952        let command = cmd.clone();
953        let emitter = self.emitter.clone();
954        let account_id = self.core.account_id;
955
956        let symbol = command.instrument_id.symbol.to_string();
957        let order_id = command.venue_order_id.map(|id| {
958            id.inner()
959                .parse::<i64>()
960                .expect("venue_order_id should be numeric")
961        });
962        let orig_client_order_id = Some(command.client_order_id.to_string());
963        let (_, size_precision) = self.get_instrument_precision(command.instrument_id);
964
965        self.spawn_task("query_order", async move {
966            let mut builder = BinanceOrderQueryParamsBuilder::default();
967            builder.symbol(symbol.clone());
968            if let Some(oid) = order_id {
969                builder.order_id(oid);
970            }
971            if let Some(coid) = orig_client_order_id {
972                builder.orig_client_order_id(coid);
973            }
974            let params = builder.build().expect("order query params");
975
976            let result = http_client.query_order(&params).await;
977
978            match result {
979                Ok(order) => {
980                    let report = order.to_order_status_report(
981                        account_id,
982                        command.instrument_id,
983                        size_precision,
984                    )?;
985
986                    emitter.send_order_status_report(report);
987                }
988                Err(e) => log::warn!("Failed to query order status: {e}"),
989            }
990
991            Ok(())
992        });
993
994        Ok(())
995    }
996
997    fn generate_account_state(
998        &self,
999        balances: Vec<AccountBalance>,
1000        margins: Vec<MarginBalance>,
1001        reported: bool,
1002        ts_event: UnixNanos,
1003    ) -> anyhow::Result<()> {
1004        self.emitter
1005            .emit_account_state(balances, margins, reported, ts_event);
1006        Ok(())
1007    }
1008
1009    fn start(&mut self) -> anyhow::Result<()> {
1010        if self.core.is_started() {
1011            return Ok(());
1012        }
1013
1014        self.emitter.set_sender(get_exec_event_sender());
1015        self.core.set_started();
1016
1017        let http_client = self.http_client.clone();
1018
1019        get_runtime().spawn(async move {
1020            match http_client.request_instruments().await {
1021                Ok(instruments) => {
1022                    if instruments.is_empty() {
1023                        log::warn!("No instruments returned for Binance Futures");
1024                    } else {
1025                        log::info!("Loaded {} Futures instruments", instruments.len());
1026                    }
1027                }
1028                Err(e) => {
1029                    log::error!("Failed to request Binance Futures instruments: {e}");
1030                }
1031            }
1032        });
1033
1034        log::info!(
1035            "Started: client_id={}, account_id={}, account_type={:?}, environment={:?}",
1036            self.core.client_id,
1037            self.core.account_id,
1038            self.core.account_type,
1039            self.config.environment,
1040        );
1041        Ok(())
1042    }
1043
1044    fn stop(&mut self) -> anyhow::Result<()> {
1045        if self.core.is_stopped() {
1046            return Ok(());
1047        }
1048
1049        self.core.set_stopped();
1050        self.core.set_disconnected();
1051        self.abort_pending_tasks();
1052        log::info!("Stopped: client_id={}", self.core.client_id);
1053        Ok(())
1054    }
1055
1056    fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
1057        let order = self
1058            .core
1059            .cache()
1060            .order(&cmd.client_order_id)
1061            .cloned()
1062            .ok_or_else(|| anyhow::anyhow!("Order not found: {}", cmd.client_order_id))?;
1063
1064        if order.is_closed() {
1065            let client_order_id = order.client_order_id();
1066            log::warn!("Cannot submit closed order {client_order_id}");
1067            return Ok(());
1068        }
1069
1070        log::debug!("OrderSubmitted client_order_id={}", order.client_order_id());
1071        self.emitter.emit_order_submitted(&order);
1072
1073        self.submit_order_internal(cmd)
1074    }
1075
1076    fn submit_order_list(&self, cmd: &SubmitOrderList) -> anyhow::Result<()> {
1077        log::warn!(
1078            "submit_order_list not yet implemented for Binance Futures (got {} orders)",
1079            cmd.order_list.client_order_ids.len()
1080        );
1081        Ok(())
1082    }
1083
1084    fn modify_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
1085        let order = {
1086            let cache = self.core.cache();
1087            cache.order(&cmd.client_order_id).cloned()
1088        };
1089
1090        let Some(order) = order else {
1091            log::warn!(
1092                "Cannot modify order {}: not found in cache",
1093                cmd.client_order_id
1094            );
1095            let ts_init = self.clock.get_time_ns();
1096            let rejected_event = OrderModifyRejected::new(
1097                self.core.trader_id,
1098                cmd.strategy_id,
1099                cmd.instrument_id,
1100                cmd.client_order_id,
1101                "Order not found in cache for modify".into(),
1102                UUID4::new(),
1103                ts_init, // TODO: Use proper event timestamp
1104                ts_init,
1105                false,
1106                cmd.venue_order_id,
1107                Some(self.core.account_id),
1108            );
1109
1110            self.emitter
1111                .send_order_event(OrderEventAny::ModifyRejected(rejected_event));
1112            return Ok(());
1113        };
1114
1115        let http_client = self.http_client.clone();
1116        let command = cmd.clone();
1117        let emitter = self.emitter.clone();
1118        let trader_id = self.core.trader_id;
1119        let account_id = self.core.account_id;
1120        let instrument_id = command.instrument_id;
1121        let venue_order_id = command.venue_order_id;
1122        let client_order_id = Some(command.client_order_id);
1123        let order_side = order.order_side();
1124        let quantity = command.quantity.unwrap_or_else(|| order.quantity());
1125        let price = command.price.or_else(|| order.price());
1126
1127        let Some(price) = price else {
1128            log::warn!(
1129                "Cannot modify order {}: price required",
1130                cmd.client_order_id
1131            );
1132            let ts_init = self.clock.get_time_ns();
1133            let rejected_event = OrderModifyRejected::new(
1134                self.core.trader_id,
1135                cmd.strategy_id,
1136                cmd.instrument_id,
1137                cmd.client_order_id,
1138                "Price required for order modification".into(),
1139                UUID4::new(),
1140                ts_init, // TODO: Use proper event timestamp
1141                ts_init,
1142                false,
1143                cmd.venue_order_id,
1144                Some(self.core.account_id),
1145            );
1146
1147            self.emitter
1148                .send_order_event(OrderEventAny::ModifyRejected(rejected_event));
1149            return Ok(());
1150        };
1151        let clock = self.clock;
1152
1153        self.spawn_task("modify_order", async move {
1154            let result = http_client
1155                .modify_order(
1156                    account_id,
1157                    instrument_id,
1158                    venue_order_id,
1159                    client_order_id,
1160                    order_side,
1161                    quantity,
1162                    price,
1163                )
1164                .await;
1165
1166            match result {
1167                Ok(report) => {
1168                    let ts_now = clock.get_time_ns();
1169                    let updated_event = OrderUpdated::new(
1170                        trader_id,
1171                        command.strategy_id,
1172                        command.instrument_id,
1173                        command.client_order_id,
1174                        quantity,
1175                        UUID4::new(),
1176                        ts_now,
1177                        ts_now,
1178                        false,
1179                        Some(report.venue_order_id),
1180                        Some(account_id),
1181                        Some(price),
1182                        None,
1183                        None,
1184                    );
1185
1186                    emitter.send_order_event(OrderEventAny::Updated(updated_event));
1187                }
1188                Err(e) => {
1189                    let ts_now = clock.get_time_ns();
1190                    let rejected_event = OrderModifyRejected::new(
1191                        trader_id,
1192                        command.strategy_id,
1193                        command.instrument_id,
1194                        command.client_order_id,
1195                        format!("modify-order-failed: {e}").into(),
1196                        UUID4::new(),
1197                        ts_now,
1198                        ts_now,
1199                        false,
1200                        command.venue_order_id,
1201                        Some(account_id),
1202                    );
1203
1204                    emitter.send_order_event(OrderEventAny::ModifyRejected(rejected_event));
1205
1206                    anyhow::bail!("Modify order failed: {e}");
1207                }
1208            }
1209
1210            Ok(())
1211        });
1212
1213        Ok(())
1214    }
1215
1216    fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
1217        self.cancel_order_internal(cmd)
1218    }
1219
1220    fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
1221        let http_client = self.http_client.clone();
1222        let instrument_id = cmd.instrument_id;
1223
1224        // HTTP only confirms request accepted; OrderCanceled comes from WebSocket
1225        self.spawn_task("cancel_all_orders", async move {
1226            match http_client.cancel_all_orders(instrument_id).await {
1227                Ok(_) => {
1228                    log::info!("Cancel all regular orders request accepted for {instrument_id}");
1229                }
1230                Err(e) => {
1231                    log::error!("Failed to cancel all regular orders for {instrument_id}: {e}");
1232                }
1233            }
1234
1235            match http_client.cancel_all_algo_orders(instrument_id).await {
1236                Ok(()) => {
1237                    log::info!("Cancel all algo orders request accepted for {instrument_id}");
1238                }
1239                Err(e) => {
1240                    log::error!("Failed to cancel all algo orders for {instrument_id}: {e}");
1241                }
1242            }
1243
1244            Ok(())
1245        });
1246
1247        Ok(())
1248    }
1249
1250    fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
1251        const BATCH_SIZE: usize = 5;
1252
1253        if cmd.cancels.is_empty() {
1254            return Ok(());
1255        }
1256
1257        let http_client = self.http_client.clone();
1258        let command = cmd.clone();
1259
1260        let emitter = self.emitter.clone();
1261        let trader_id = self.core.trader_id;
1262        let account_id = self.core.account_id;
1263        let clock = self.clock;
1264
1265        self.spawn_task("batch_cancel_orders", async move {
1266            for chunk in command.cancels.chunks(BATCH_SIZE) {
1267                let batch_items: Vec<BatchCancelItem> = chunk
1268                    .iter()
1269                    .map(|cancel| {
1270                        if let Some(venue_order_id) = cancel.venue_order_id {
1271                            let order_id = venue_order_id.inner().parse::<i64>().unwrap_or(0);
1272                            if order_id != 0 {
1273                                BatchCancelItem::by_order_id(
1274                                    command.instrument_id.symbol.to_string(),
1275                                    order_id,
1276                                )
1277                            } else {
1278                                BatchCancelItem::by_client_order_id(
1279                                    command.instrument_id.symbol.to_string(),
1280                                    cancel.client_order_id.to_string(),
1281                                )
1282                            }
1283                        } else {
1284                            BatchCancelItem::by_client_order_id(
1285                                command.instrument_id.symbol.to_string(),
1286                                cancel.client_order_id.to_string(),
1287                            )
1288                        }
1289                    })
1290                    .collect();
1291
1292                match http_client.batch_cancel_orders(&batch_items).await {
1293                    Ok(results) => {
1294                        for (i, result) in results.iter().enumerate() {
1295                            let cancel = &chunk[i];
1296                            match result {
1297                                BatchOrderResult::Success(response) => {
1298                                    let venue_order_id =
1299                                        VenueOrderId::new(response.order_id.to_string());
1300                                    let canceled_event = OrderCanceled::new(
1301                                        trader_id,
1302                                        cancel.strategy_id,
1303                                        cancel.instrument_id,
1304                                        cancel.client_order_id,
1305                                        UUID4::new(),
1306                                        cancel.ts_init,
1307                                        clock.get_time_ns(),
1308                                        false,
1309                                        Some(venue_order_id),
1310                                        Some(account_id),
1311                                    );
1312
1313                                    emitter
1314                                        .send_order_event(OrderEventAny::Canceled(canceled_event));
1315                                }
1316                                BatchOrderResult::Error(error) => {
1317                                    let rejected_event = OrderCancelRejected::new(
1318                                        trader_id,
1319                                        cancel.strategy_id,
1320                                        cancel.instrument_id,
1321                                        cancel.client_order_id,
1322                                        format!(
1323                                            "batch-cancel-error: code={}, msg={}",
1324                                            error.code, error.msg
1325                                        )
1326                                        .into(),
1327                                        UUID4::new(),
1328                                        clock.get_time_ns(),
1329                                        cancel.ts_init,
1330                                        false,
1331                                        cancel.venue_order_id,
1332                                        Some(account_id),
1333                                    );
1334
1335                                    emitter.send_order_event(OrderEventAny::CancelRejected(
1336                                        rejected_event,
1337                                    ));
1338                                }
1339                            }
1340                        }
1341                    }
1342                    Err(e) => {
1343                        for cancel in chunk {
1344                            let rejected_event = OrderCancelRejected::new(
1345                                trader_id,
1346                                cancel.strategy_id,
1347                                cancel.instrument_id,
1348                                cancel.client_order_id,
1349                                format!("batch-cancel-request-failed: {e}").into(),
1350                                UUID4::new(),
1351                                clock.get_time_ns(),
1352                                cancel.ts_init,
1353                                false,
1354                                cancel.venue_order_id,
1355                                Some(account_id),
1356                            );
1357
1358                            emitter.send_order_event(OrderEventAny::CancelRejected(rejected_event));
1359                        }
1360                    }
1361                }
1362            }
1363
1364            Ok(())
1365        });
1366
1367        Ok(())
1368    }
1369
1370    async fn generate_order_status_report(
1371        &self,
1372        cmd: &GenerateOrderStatusReport,
1373    ) -> anyhow::Result<Option<OrderStatusReport>> {
1374        let Some(instrument_id) = cmd.instrument_id else {
1375            log::warn!("generate_order_status_report requires instrument_id: {cmd:?}");
1376            return Ok(None);
1377        };
1378
1379        let symbol = instrument_id.symbol.to_string();
1380        let order_id = cmd.venue_order_id.as_ref().map(|id| {
1381            id.inner()
1382                .parse::<i64>()
1383                .expect("venue_order_id should be numeric")
1384        });
1385        let orig_client_order_id = cmd.client_order_id.map(|id| id.to_string());
1386
1387        let mut builder = BinanceOrderQueryParamsBuilder::default();
1388        builder.symbol(symbol);
1389        if let Some(oid) = order_id {
1390            builder.order_id(oid);
1391        }
1392        if let Some(ref coid) = orig_client_order_id {
1393            builder.orig_client_order_id(coid.clone());
1394        }
1395        let params = builder.build().map_err(|e| anyhow::anyhow!("{e}"))?;
1396
1397        let (_, size_precision) = self.get_instrument_precision(instrument_id);
1398
1399        match self.http_client.query_order(&params).await {
1400            Ok(order) => {
1401                let report = order.to_order_status_report(
1402                    self.core.account_id,
1403                    instrument_id,
1404                    size_precision,
1405                )?;
1406                Ok(Some(report))
1407            }
1408            Err(BinanceFuturesHttpError::BinanceError { code: -2013, .. }) => {
1409                // Order not found in regular API, try algo order API
1410                let Some(client_order_id) = cmd.client_order_id else {
1411                    return Ok(None);
1412                };
1413
1414                match self.http_client.query_algo_order(client_order_id).await {
1415                    Ok(algo_order) => {
1416                        let report = algo_order.to_order_status_report(
1417                            self.core.account_id,
1418                            instrument_id,
1419                            size_precision,
1420                        )?;
1421                        Ok(Some(report))
1422                    }
1423                    Err(e) => {
1424                        log::debug!("Algo order query also failed: {e}");
1425                        Ok(None)
1426                    }
1427                }
1428            }
1429            Err(e) => Err(e.into()),
1430        }
1431    }
1432
1433    async fn generate_order_status_reports(
1434        &self,
1435        cmd: &GenerateOrderStatusReports,
1436    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1437        let mut reports = Vec::new();
1438
1439        if cmd.open_only {
1440            let symbol = cmd.instrument_id.map(|id| id.symbol.to_string());
1441            let mut builder = BinanceOpenOrdersParamsBuilder::default();
1442            if let Some(s) = symbol {
1443                builder.symbol(s);
1444            }
1445            let params = builder.build().map_err(|e| anyhow::anyhow!("{e}"))?;
1446
1447            let (orders, algo_orders) = tokio::try_join!(
1448                self.http_client.query_open_orders(&params),
1449                self.http_client.query_open_algo_orders(cmd.instrument_id),
1450            )?;
1451
1452            for order in orders {
1453                if let Some(instrument_id) = cmd.instrument_id {
1454                    let (_, size_precision) = self.get_instrument_precision(instrument_id);
1455                    if let Ok(report) = order.to_order_status_report(
1456                        self.core.account_id,
1457                        instrument_id,
1458                        size_precision,
1459                    ) {
1460                        reports.push(report);
1461                    }
1462                } else {
1463                    let cache = self.core.cache();
1464                    if let Some(instrument) = cache
1465                        .instruments(&BINANCE_VENUE, None)
1466                        .into_iter()
1467                        .find(|i| i.symbol().as_str() == order.symbol.as_str())
1468                        && let Ok(report) = order.to_order_status_report(
1469                            self.core.account_id,
1470                            instrument.id(),
1471                            instrument.size_precision(),
1472                        )
1473                    {
1474                        reports.push(report);
1475                    }
1476                }
1477            }
1478
1479            for algo_order in algo_orders {
1480                if let Some(instrument_id) = cmd.instrument_id {
1481                    let (_, size_precision) = self.get_instrument_precision(instrument_id);
1482                    if let Ok(report) = algo_order.to_order_status_report(
1483                        self.core.account_id,
1484                        instrument_id,
1485                        size_precision,
1486                    ) {
1487                        reports.push(report);
1488                    }
1489                } else {
1490                    let cache = self.core.cache();
1491                    if let Some(instrument) = cache
1492                        .instruments(&BINANCE_VENUE, None)
1493                        .into_iter()
1494                        .find(|i| i.symbol().as_str() == algo_order.symbol.as_str())
1495                        && let Ok(report) = algo_order.to_order_status_report(
1496                            self.core.account_id,
1497                            instrument.id(),
1498                            instrument.size_precision(),
1499                        )
1500                    {
1501                        reports.push(report);
1502                    }
1503                }
1504            }
1505        } else if let Some(instrument_id) = cmd.instrument_id {
1506            let symbol = instrument_id.symbol.to_string();
1507            let start_time = cmd.start.map(|t| t.as_i64() / 1_000_000); // ns to ms
1508            let end_time = cmd.end.map(|t| t.as_i64() / 1_000_000);
1509
1510            let mut builder = BinanceAllOrdersParamsBuilder::default();
1511            builder.symbol(symbol);
1512            if let Some(st) = start_time {
1513                builder.start_time(st);
1514            }
1515            if let Some(et) = end_time {
1516                builder.end_time(et);
1517            }
1518            let params = builder.build().map_err(|e| anyhow::anyhow!("{e}"))?;
1519
1520            let orders = self.http_client.query_all_orders(&params).await?;
1521            let (_, size_precision) = self.get_instrument_precision(instrument_id);
1522
1523            for order in orders {
1524                if let Ok(report) = order.to_order_status_report(
1525                    self.core.account_id,
1526                    instrument_id,
1527                    size_precision,
1528                ) {
1529                    reports.push(report);
1530                }
1531            }
1532        }
1533
1534        Ok(reports)
1535    }
1536
1537    async fn generate_fill_reports(
1538        &self,
1539        cmd: GenerateFillReports,
1540    ) -> anyhow::Result<Vec<FillReport>> {
1541        let Some(instrument_id) = cmd.instrument_id else {
1542            log::warn!("generate_fill_reports requires instrument_id for Binance Futures");
1543            return Ok(Vec::new());
1544        };
1545
1546        let symbol = instrument_id.symbol.to_string();
1547        let start_time = cmd.start.map(|t| t.as_i64() / 1_000_000);
1548        let end_time = cmd.end.map(|t| t.as_i64() / 1_000_000);
1549
1550        let mut builder = BinanceUserTradesParamsBuilder::default();
1551        builder.symbol(symbol);
1552        if let Some(st) = start_time {
1553            builder.start_time(st);
1554        }
1555        if let Some(et) = end_time {
1556            builder.end_time(et);
1557        }
1558        let params = builder.build().map_err(|e| anyhow::anyhow!("{e}"))?;
1559
1560        let trades = self.http_client.query_user_trades(&params).await?;
1561        let (price_precision, size_precision) = self.get_instrument_precision(instrument_id);
1562
1563        let mut reports = Vec::new();
1564        for trade in trades {
1565            if let Ok(report) = trade.to_fill_report(
1566                self.core.account_id,
1567                instrument_id,
1568                price_precision,
1569                size_precision,
1570            ) {
1571                reports.push(report);
1572            }
1573        }
1574
1575        Ok(reports)
1576    }
1577
1578    async fn generate_position_status_reports(
1579        &self,
1580        cmd: &GeneratePositionStatusReports,
1581    ) -> anyhow::Result<Vec<PositionStatusReport>> {
1582        let symbol = cmd.instrument_id.map(|id| id.symbol.to_string());
1583
1584        let mut builder = BinancePositionRiskParamsBuilder::default();
1585        if let Some(s) = symbol {
1586            builder.symbol(s);
1587        }
1588        let params = builder.build().map_err(|e| anyhow::anyhow!("{e}"))?;
1589
1590        let positions = self.http_client.query_positions(&params).await?;
1591
1592        let mut reports = Vec::new();
1593        for position in positions {
1594            let position_amt: f64 = position.position_amt.parse().unwrap_or(0.0);
1595            if position_amt == 0.0 {
1596                continue;
1597            }
1598
1599            let cache = self.core.cache();
1600            if let Some(instrument) = cache
1601                .instruments(&BINANCE_VENUE, None)
1602                .into_iter()
1603                .find(|i| i.symbol().as_str() == position.symbol.as_str())
1604                && let Ok(report) = self.create_position_report(
1605                    &position,
1606                    instrument.id(),
1607                    instrument.size_precision(),
1608                )
1609            {
1610                reports.push(report);
1611            }
1612        }
1613
1614        Ok(reports)
1615    }
1616
1617    async fn generate_mass_status(
1618        &self,
1619        lookback_mins: Option<u64>,
1620    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
1621        log::info!("Generating ExecutionMassStatus (lookback_mins={lookback_mins:?})");
1622
1623        let ts_now = self.clock.get_time_ns();
1624
1625        let start = lookback_mins.map(|mins| {
1626            let lookback_ns = mins * 60 * 1_000_000_000;
1627            UnixNanos::from(ts_now.as_u64().saturating_sub(lookback_ns))
1628        });
1629
1630        let order_cmd = GenerateOrderStatusReportsBuilder::default()
1631            .ts_init(ts_now)
1632            .open_only(true)
1633            .start(start)
1634            .build()
1635            .map_err(|e| anyhow::anyhow!("{e}"))?;
1636
1637        let position_cmd = GeneratePositionStatusReportsBuilder::default()
1638            .ts_init(ts_now)
1639            .start(start)
1640            .build()
1641            .map_err(|e| anyhow::anyhow!("{e}"))?;
1642
1643        let (order_reports, position_reports) = tokio::try_join!(
1644            self.generate_order_status_reports(&order_cmd),
1645            self.generate_position_status_reports(&position_cmd),
1646        )?;
1647
1648        log::info!("Received {} OrderStatusReports", order_reports.len());
1649        log::info!("Received {} PositionReports", position_reports.len());
1650
1651        let mut mass_status = ExecutionMassStatus::new(
1652            self.core.client_id,
1653            self.core.account_id,
1654            *BINANCE_VENUE,
1655            ts_now,
1656            None,
1657        );
1658
1659        mass_status.add_order_reports(order_reports);
1660        mass_status.add_position_reports(position_reports);
1661
1662        Ok(Some(mass_status))
1663    }
1664}