1use std::{
19 future::Future,
20 sync::{
21 Arc, Mutex, RwLock,
22 atomic::{AtomicBool, Ordering},
23 },
24 time::{Duration, Instant},
25};
26
27use ahash::AHashSet;
28use anyhow::Context;
29use async_trait::async_trait;
30use futures_util::{StreamExt, pin_mut};
31use nautilus_common::{
32 clients::ExecutionClient,
33 live::{get_runtime, runner::get_exec_event_sender},
34 messages::execution::{
35 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
36 GenerateOrderStatusReport, GenerateOrderStatusReports, GenerateOrderStatusReportsBuilder,
37 GeneratePositionStatusReports, GeneratePositionStatusReportsBuilder, ModifyOrder,
38 QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
39 },
40};
41use nautilus_core::{
42 MUTEX_POISONED, UUID4, UnixNanos,
43 time::{AtomicTime, get_atomic_clock_realtime},
44};
45use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
46use nautilus_model::{
47 accounts::AccountAny,
48 enums::{OmsType, OrderSide, PositionSideSpecified},
49 events::{
50 AccountState, OrderCancelRejected, OrderCanceled, OrderEventAny, OrderModifyRejected,
51 OrderRejected, OrderUpdated,
52 },
53 identifiers::{
54 AccountId, ClientId, ClientOrderId, InstrumentId, StrategyId, TraderId, Venue, VenueOrderId,
55 },
56 instruments::Instrument,
57 orders::{Order, OrderAny},
58 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
59 types::{AccountBalance, Currency, MarginBalance, Money, Quantity},
60};
61use rust_decimal::Decimal;
62use tokio::task::JoinHandle;
63use tokio_util::sync::CancellationToken;
64
65use super::{
66 http::{
67 BinanceFuturesHttpError,
68 client::{BinanceFuturesHttpClient, BinanceFuturesInstrument, is_algo_order_type},
69 models::{BatchOrderResult, BinancePositionRisk},
70 query::{
71 BatchCancelItem, BinanceAllOrdersParamsBuilder, BinanceOpenOrdersParamsBuilder,
72 BinanceOrderQueryParamsBuilder, BinancePositionRiskParamsBuilder,
73 BinanceUserTradesParamsBuilder,
74 },
75 },
76 websocket::{
77 client::BinanceFuturesWebSocketClient,
78 handler_exec::BinanceFuturesExecWsFeedHandler,
79 messages::{ExecHandlerCommand, NautilusExecWsMessage},
80 },
81};
82use crate::{
83 common::{
84 consts::BINANCE_VENUE,
85 credential::resolve_credentials,
86 enums::{BinancePositionSide, BinanceProductType},
87 },
88 config::BinanceExecClientConfig,
89 futures::http::models::BinanceFuturesAccountInfo,
90};
91
92const LISTEN_KEY_KEEPALIVE_SECS: u64 = 30 * 60;
94
95#[derive(Debug)]
104pub struct BinanceFuturesExecutionClient {
105 core: ExecutionClientCore,
106 clock: &'static AtomicTime,
107 config: BinanceExecClientConfig,
108 emitter: ExecutionEventEmitter,
109 product_type: BinanceProductType,
110 http_client: BinanceFuturesHttpClient,
111 ws_client: Option<BinanceFuturesWebSocketClient>,
112 exec_cmd_tx: Option<tokio::sync::mpsc::UnboundedSender<ExecHandlerCommand>>,
113 listen_key: Arc<RwLock<Option<String>>>,
114 cancellation_token: CancellationToken,
115 handler_signal: Arc<AtomicBool>,
116 triggered_algo_order_ids: Arc<RwLock<AHashSet<ClientOrderId>>>,
117 ws_task: Mutex<Option<JoinHandle<()>>>,
118 keepalive_task: Mutex<Option<JoinHandle<()>>>,
119 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
120 is_hedge_mode: AtomicBool,
121}
122
123impl BinanceFuturesExecutionClient {
124 pub fn new(core: ExecutionClientCore, config: BinanceExecClientConfig) -> anyhow::Result<Self> {
130 let product_type = config
131 .product_types
132 .iter()
133 .find(|pt| matches!(pt, BinanceProductType::UsdM | BinanceProductType::CoinM))
134 .copied()
135 .unwrap_or(BinanceProductType::UsdM);
136
137 let (api_key, api_secret) = resolve_credentials(
138 config.api_key.clone(),
139 config.api_secret.clone(),
140 config.environment,
141 product_type,
142 )?;
143
144 let http_client = BinanceFuturesHttpClient::new(
145 product_type,
146 config.environment,
147 Some(api_key.clone()),
148 Some(api_secret.clone()),
149 config.base_url_http.clone(),
150 None, None, None, )
154 .context("failed to construct Binance Futures HTTP client")?;
155
156 let ws_client = BinanceFuturesWebSocketClient::new(
157 product_type,
158 config.environment,
159 Some(api_key),
160 Some(api_secret),
161 config.base_url_ws.clone(),
162 Some(20), )
164 .context("failed to construct Binance Futures WebSocket client")?;
165
166 let clock = get_atomic_clock_realtime();
167 let emitter = ExecutionEventEmitter::new(
168 clock,
169 core.trader_id,
170 core.account_id,
171 core.account_type,
172 core.base_currency,
173 );
174
175 Ok(Self {
176 core,
177 clock,
178 config,
179 emitter,
180 product_type,
181 http_client,
182 ws_client: Some(ws_client),
183 exec_cmd_tx: None,
184 listen_key: Arc::new(RwLock::new(None)),
185 cancellation_token: CancellationToken::new(),
186 handler_signal: Arc::new(AtomicBool::new(false)),
187 triggered_algo_order_ids: Arc::new(RwLock::new(AHashSet::new())),
188 ws_task: Mutex::new(None),
189 keepalive_task: Mutex::new(None),
190 pending_tasks: Mutex::new(Vec::new()),
191 is_hedge_mode: AtomicBool::new(false),
192 })
193 }
194
195 #[must_use]
197 pub fn is_hedge_mode(&self) -> bool {
198 self.is_hedge_mode.load(Ordering::Acquire)
199 }
200
201 fn determine_position_side(
203 &self,
204 order_side: OrderSide,
205 reduce_only: bool,
206 ) -> Option<BinancePositionSide> {
207 if !self.is_hedge_mode() {
208 return None;
209 }
210
211 Some(if reduce_only {
213 match order_side {
215 OrderSide::Buy => BinancePositionSide::Short,
216 OrderSide::Sell => BinancePositionSide::Long,
217 _ => BinancePositionSide::Both,
218 }
219 } else {
220 match order_side {
222 OrderSide::Buy => BinancePositionSide::Long,
223 OrderSide::Sell => BinancePositionSide::Short,
224 _ => BinancePositionSide::Both,
225 }
226 })
227 }
228
229 fn create_account_state(&self, account_info: &BinanceFuturesAccountInfo) -> AccountState {
231 let ts_now = self.clock.get_time_ns();
232
233 let balances: Vec<AccountBalance> = account_info
234 .assets
235 .iter()
236 .filter_map(|b| {
237 let wallet_balance: f64 = b.wallet_balance.parse().unwrap_or(0.0);
238 let available_balance: f64 = b.available_balance.parse().unwrap_or(0.0);
239 let locked = wallet_balance - available_balance;
240
241 if wallet_balance == 0.0 {
242 return None;
243 }
244
245 let currency = Currency::from(&b.asset);
246 Some(AccountBalance::new(
247 Money::new(wallet_balance, currency),
248 Money::new(locked.max(0.0), currency),
249 Money::new(available_balance, currency),
250 ))
251 })
252 .collect();
253
254 let margins: Vec<MarginBalance> = Vec::new();
257
258 AccountState::new(
259 self.core.account_id,
260 self.core.account_type,
261 balances,
262 margins,
263 true, UUID4::new(),
265 ts_now,
266 ts_now,
267 None, )
269 }
270
271 async fn refresh_account_state(&self) -> anyhow::Result<AccountState> {
272 let account_info = match self.http_client.query_account().await {
273 Ok(info) => info,
274 Err(e) => {
275 log::error!("Binance Futures account state request failed: {e}");
276 anyhow::bail!("Binance Futures account state request failed: {e}");
277 }
278 };
279
280 Ok(self.create_account_state(&account_info))
281 }
282
283 fn update_account_state(&self) -> anyhow::Result<()> {
284 let runtime = get_runtime();
285 let account_state = runtime.block_on(self.refresh_account_state())?;
286
287 let ts_now = self.clock.get_time_ns();
288 self.emitter.emit_account_state(
289 account_state.balances.clone(),
290 account_state.margins.clone(),
291 account_state.is_reported,
292 ts_now,
293 );
294 Ok(())
295 }
296
297 async fn init_hedge_mode(&self) -> anyhow::Result<bool> {
298 let response = self.http_client.query_hedge_mode().await?;
299 Ok(response.dual_side_position)
300 }
301
302 fn handle_exec_event(message: NautilusExecWsMessage, emitter: &ExecutionEventEmitter) {
307 match message {
308 NautilusExecWsMessage::OrderAccepted(event) => {
309 emitter.send_order_event(OrderEventAny::Accepted(event));
310 }
311 NautilusExecWsMessage::OrderCanceled(event) => {
312 emitter.send_order_event(OrderEventAny::Canceled(event));
313 }
314 NautilusExecWsMessage::OrderRejected(event) => {
315 emitter.send_order_event(OrderEventAny::Rejected(event));
316 }
317 NautilusExecWsMessage::OrderFilled(event) => {
318 emitter.send_order_event(OrderEventAny::Filled(event));
319 }
320 NautilusExecWsMessage::OrderUpdated(event) => {
321 emitter.send_order_event(OrderEventAny::Updated(event));
322 }
323 NautilusExecWsMessage::AccountUpdate(event) => {
324 emitter.send_account_state(event);
325 }
326 NautilusExecWsMessage::ListenKeyExpired => {
327 log::warn!("Listen key expired - reconnection required");
328 }
329 NautilusExecWsMessage::Reconnected => {
330 log::info!("User data stream WebSocket reconnected");
331 }
332 }
333 }
334
335 fn register_order(&self, order: &OrderAny) {
337 if let Some(ref cmd_tx) = self.exec_cmd_tx {
338 let cmd = ExecHandlerCommand::RegisterOrder {
339 client_order_id: order.client_order_id(),
340 trader_id: order.trader_id(),
341 strategy_id: order.strategy_id(),
342 instrument_id: order.instrument_id(),
343 };
344 if let Err(e) = cmd_tx.send(cmd) {
345 log::error!("Failed to register order with handler: {e}");
346 }
347 }
348 }
349
350 fn register_cancel(
352 &self,
353 client_order_id: ClientOrderId,
354 trader_id: TraderId,
355 strategy_id: StrategyId,
356 instrument_id: InstrumentId,
357 venue_order_id: Option<VenueOrderId>,
358 ) {
359 if let Some(ref cmd_tx) = self.exec_cmd_tx {
360 let cmd = ExecHandlerCommand::RegisterCancel {
361 client_order_id,
362 trader_id,
363 strategy_id,
364 instrument_id,
365 venue_order_id,
366 };
367 if let Err(e) = cmd_tx.send(cmd) {
368 log::error!("Failed to register cancel with handler: {e}");
369 }
370 }
371 }
372
373 fn submit_order_internal(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
374 let http_client = self.http_client.clone();
375
376 let order = self
377 .core
378 .cache()
379 .order(&cmd.client_order_id)
380 .cloned()
381 .ok_or_else(|| anyhow::anyhow!("Order not found: {}", cmd.client_order_id))?;
382
383 self.register_order(&order);
385
386 let emitter = self.emitter.clone();
387 let trader_id = self.core.trader_id;
388 let account_id = self.core.account_id;
389 let clock = self.clock;
390 let client_order_id = order.client_order_id();
391 let strategy_id = order.strategy_id();
392 let instrument_id = order.instrument_id();
393 let order_side = order.order_side();
394 let order_type = order.order_type();
395 let quantity = order.quantity();
396 let time_in_force = order.time_in_force();
397 let price = order.price();
398 let trigger_price = order.trigger_price();
399 let reduce_only = order.is_reduce_only();
400 let position_side = self.determine_position_side(order_side, reduce_only);
401
402 let use_algo_api = is_algo_order_type(order_type);
405
406 self.spawn_task("submit_order", async move {
407 let result = if use_algo_api {
408 http_client
409 .submit_algo_order(
410 account_id,
411 instrument_id,
412 client_order_id,
413 order_side,
414 order_type,
415 quantity,
416 time_in_force,
417 price,
418 trigger_price,
419 reduce_only,
420 position_side,
421 )
422 .await
423 } else {
424 http_client
425 .submit_order(
426 account_id,
427 instrument_id,
428 client_order_id,
429 order_side,
430 order_type,
431 quantity,
432 time_in_force,
433 price,
434 trigger_price,
435 reduce_only,
436 position_side,
437 )
438 .await
439 };
440
441 match result {
442 Ok(report) => {
443 log::debug!(
444 "Order submit accepted: client_order_id={}, venue_order_id={}",
445 client_order_id,
446 report.venue_order_id
447 );
448 }
449 Err(e) => {
450 let ts_now = clock.get_time_ns();
454 let rejected_event = OrderRejected::new(
455 trader_id,
456 strategy_id,
457 instrument_id,
458 client_order_id,
459 account_id,
460 format!("submit-order-error: {e}").into(),
461 UUID4::new(),
462 ts_now,
463 ts_now,
464 false,
465 false,
466 );
467
468 emitter.send_order_event(OrderEventAny::Rejected(rejected_event));
469
470 return Err(e);
471 }
472 }
473
474 Ok(())
475 });
476
477 Ok(())
478 }
479
480 fn cancel_order_internal(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
481 let http_client = self.http_client.clone();
482 let command = cmd.clone();
483
484 self.register_cancel(
486 command.client_order_id,
487 self.core.trader_id,
488 command.strategy_id,
489 command.instrument_id,
490 command.venue_order_id,
491 );
492
493 let is_algo = self
495 .core
496 .cache()
497 .order(&command.client_order_id)
498 .is_some_and(|order| is_algo_order_type(order.order_type()));
499 let is_triggered = self
500 .triggered_algo_order_ids
501 .read()
502 .expect("triggered_algo_order_ids lock poisoned")
503 .contains(&command.client_order_id);
504 let use_algo_cancel = is_algo && !is_triggered;
505
506 let emitter = self.emitter.clone();
507 let trader_id = self.core.trader_id;
508 let account_id = self.core.account_id;
509 let clock = self.clock;
510 let instrument_id = command.instrument_id;
511 let venue_order_id = command.venue_order_id;
512 let client_order_id = command.client_order_id;
513
514 self.spawn_task("cancel_order", async move {
517 let result = if use_algo_cancel {
518 match http_client.cancel_algo_order(client_order_id).await {
521 Ok(()) => Ok(()),
522 Err(algo_err) => {
523 log::debug!("Algo cancel failed, trying regular cancel: {algo_err}");
524 http_client
525 .cancel_order(instrument_id, venue_order_id, Some(client_order_id))
526 .await
527 .map(|_| ())
528 }
529 }
530 } else {
531 http_client
532 .cancel_order(instrument_id, venue_order_id, Some(client_order_id))
533 .await
534 .map(|_| ())
535 };
536
537 match result {
538 Ok(()) => {
539 log::debug!("Cancel request accepted: client_order_id={client_order_id}");
540 }
541 Err(e) => {
542 let ts_now = clock.get_time_ns();
543 let rejected_event = OrderCancelRejected::new(
544 trader_id,
545 command.strategy_id,
546 command.instrument_id,
547 client_order_id,
548 format!("cancel-order-error: {e}").into(),
549 UUID4::new(),
550 ts_now,
551 ts_now,
552 false,
553 command.venue_order_id,
554 Some(account_id),
555 );
556
557 emitter.send_order_event(OrderEventAny::CancelRejected(rejected_event));
558
559 return Err(e);
560 }
561 }
562
563 Ok(())
564 });
565
566 Ok(())
567 }
568
569 fn spawn_task<F>(&self, description: &'static str, fut: F)
570 where
571 F: Future<Output = anyhow::Result<()>> + Send + 'static,
572 {
573 let runtime = get_runtime();
574 let handle = runtime.spawn(async move {
575 if let Err(e) = fut.await {
576 log::warn!("{description} failed: {e}");
577 }
578 });
579
580 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
581 tasks.retain(|handle| !handle.is_finished());
582 tasks.push(handle);
583 }
584
585 fn abort_pending_tasks(&self) {
586 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
587 for handle in tasks.drain(..) {
588 handle.abort();
589 }
590 }
591
592 async fn await_account_registered(&self, timeout_secs: f64) -> anyhow::Result<()> {
593 let account_id = self.core.account_id;
594
595 if self.core.cache().account(&account_id).is_some() {
596 log::info!("Account {account_id} registered");
597 return Ok(());
598 }
599
600 let start = Instant::now();
601 let timeout = Duration::from_secs_f64(timeout_secs);
602 let interval = Duration::from_millis(10);
603
604 loop {
605 tokio::time::sleep(interval).await;
606
607 if self.core.cache().account(&account_id).is_some() {
608 log::info!("Account {account_id} registered");
609 return Ok(());
610 }
611
612 if start.elapsed() >= timeout {
613 anyhow::bail!(
614 "Timeout waiting for account {account_id} to be registered after {timeout_secs}s"
615 );
616 }
617 }
618 }
619
620 fn get_instrument_precision(&self, instrument_id: InstrumentId) -> (u8, u8) {
622 let cache = self.core.cache();
623 cache
624 .instrument(&instrument_id)
625 .map_or((8, 8), |i| (i.price_precision(), i.size_precision()))
626 }
627
628 fn create_position_report(
630 &self,
631 position: &BinancePositionRisk,
632 instrument_id: InstrumentId,
633 size_precision: u8,
634 ) -> anyhow::Result<PositionStatusReport> {
635 let position_amount: Decimal = position
636 .position_amt
637 .parse()
638 .context("invalid position_amt")?;
639
640 if position_amount.is_zero() {
641 anyhow::bail!("Position is flat");
642 }
643
644 let entry_price: Decimal = position
645 .entry_price
646 .parse()
647 .context("invalid entry_price")?;
648
649 let position_side = if position_amount > Decimal::ZERO {
650 PositionSideSpecified::Long
651 } else {
652 PositionSideSpecified::Short
653 };
654
655 let ts_now = self.clock.get_time_ns();
656
657 Ok(PositionStatusReport::new(
658 self.core.account_id,
659 instrument_id,
660 position_side,
661 Quantity::new(position_amount.abs().to_string().parse()?, size_precision),
662 ts_now,
663 ts_now,
664 Some(UUID4::new()),
665 None, Some(entry_price),
667 ))
668 }
669}
670
671#[async_trait(?Send)]
672impl ExecutionClient for BinanceFuturesExecutionClient {
673 fn is_connected(&self) -> bool {
674 self.core.is_connected()
675 }
676
677 fn client_id(&self) -> ClientId {
678 self.core.client_id
679 }
680
681 fn account_id(&self) -> AccountId {
682 self.core.account_id
683 }
684
685 fn venue(&self) -> Venue {
686 *BINANCE_VENUE
687 }
688
689 fn oms_type(&self) -> OmsType {
690 self.core.oms_type
691 }
692
693 fn get_account(&self) -> Option<AccountAny> {
694 self.core.cache().account(&self.core.account_id).cloned()
695 }
696
697 async fn connect(&mut self) -> anyhow::Result<()> {
698 if self.core.is_connected() {
699 return Ok(());
700 }
701
702 self.cancellation_token = CancellationToken::new();
704
705 let is_hedge_mode = self
707 .init_hedge_mode()
708 .await
709 .context("failed to query hedge mode")?;
710 self.is_hedge_mode.store(is_hedge_mode, Ordering::Release);
711 log::info!("Hedge mode (dual side position): {is_hedge_mode}");
712
713 let _instruments = if self.core.instruments_initialized() {
715 Vec::new()
716 } else {
717 let instruments = self
718 .http_client
719 .request_instruments()
720 .await
721 .context("failed to request Binance Futures instruments")?;
722
723 if instruments.is_empty() {
724 log::warn!("No instruments returned for Binance Futures");
725 } else {
726 log::info!("Loaded {} Futures instruments", instruments.len());
727 }
728
729 self.core.set_instruments_initialized();
730 instruments
731 };
732
733 log::info!("Creating listen key for user data stream...");
735 let listen_key_response = self
736 .http_client
737 .create_listen_key()
738 .await
739 .context("failed to create listen key")?;
740 let listen_key = listen_key_response.listen_key;
741 log::info!("Listen key created successfully");
742
743 {
744 let mut key_guard = self.listen_key.write().expect(MUTEX_POISONED);
745 *key_guard = Some(listen_key.clone());
746 }
747
748 if let Some(ref mut ws_client) = self.ws_client {
750 log::info!("Connecting to Binance Futures user data stream WebSocket...");
751 ws_client.connect().await.map_err(|e| {
752 log::error!("Binance Futures WebSocket connection failed: {e:?}");
753 anyhow::anyhow!("failed to connect Binance Futures WebSocket: {e}")
754 })?;
755 log::info!("Binance Futures WebSocket connected");
756
757 log::info!("Subscribing to user data stream...");
759 ws_client
760 .subscribe(vec![listen_key.clone()])
761 .await
762 .map_err(|e| anyhow::anyhow!("failed to subscribe to user data stream: {e}"))?;
763 log::info!("Subscribed to user data stream");
764
765 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
767 let (raw_tx, raw_rx) = tokio::sync::mpsc::unbounded_channel();
768
769 self.exec_cmd_tx = Some(cmd_tx.clone());
771
772 let mut handler = BinanceFuturesExecWsFeedHandler::new(
774 self.clock,
775 self.core.trader_id,
776 self.core.account_id,
777 self.core.account_type,
778 self.product_type,
779 self.handler_signal.clone(),
780 self.triggered_algo_order_ids.clone(),
781 cmd_rx,
782 raw_rx,
783 );
784
785 let instruments_for_handler: Vec<BinanceFuturesInstrument> = self
787 .http_client
788 .instruments_cache()
789 .iter()
790 .map(|r| r.value().clone())
791 .collect();
792 if let Err(e) = cmd_tx.send(ExecHandlerCommand::InitializeInstruments(
793 instruments_for_handler,
794 )) {
795 log::error!("Failed to send instruments to handler: {e}");
796 }
797
798 let stream = ws_client.stream();
800 let cancel = self.cancellation_token.clone();
801 let raw_forward_task = get_runtime().spawn(async move {
802 pin_mut!(stream);
803 loop {
804 tokio::select! {
805 Some(message) = stream.next() => {
806 if let Err(e) = raw_tx.send(message) {
807 log::error!("Failed to forward raw message to handler: {e}");
808 break;
809 }
810 }
811 () = cancel.cancelled() => {
812 log::debug!("Raw message forwarding task cancelled");
813 break;
814 }
815 }
816 }
817 });
818
819 let emitter = self.emitter.clone();
820 let handler_cancel = self.cancellation_token.clone();
821
822 let ws_task = get_runtime().spawn(async move {
823 loop {
824 tokio::select! {
825 msg = handler.next() => {
826 match msg {
827 Some(event) => {
828 Self::handle_exec_event(event, &emitter);
829 }
830 None => break,
831 }
832 }
833 () = handler_cancel.cancelled() => {
834 log::debug!("Handler task cancelled");
835 break;
836 }
837 }
838 }
839
840 raw_forward_task.abort();
841 });
842 *self.ws_task.lock().expect(MUTEX_POISONED) = Some(ws_task);
843
844 let http_client = self.http_client.clone();
846 let listen_key_ref = self.listen_key.clone();
847 let cancel = self.cancellation_token.clone();
848
849 let keepalive_task = get_runtime().spawn(async move {
850 let mut interval =
851 tokio::time::interval(Duration::from_secs(LISTEN_KEY_KEEPALIVE_SECS));
852 loop {
853 tokio::select! {
854 _ = interval.tick() => {
855 let key = {
856 let guard = listen_key_ref.read().expect(MUTEX_POISONED);
857 guard.clone()
858 };
859 if let Some(ref key) = key {
860 match http_client.keepalive_listen_key(key).await {
861 Ok(()) => {
862 log::debug!("Listen key keepalive sent successfully");
863 }
864 Err(e) => {
865 log::warn!("Listen key keepalive failed: {e}");
866 }
867 }
868 }
869 }
870 () = cancel.cancelled() => {
871 log::debug!("Listen key keepalive task cancelled");
872 break;
873 }
874 }
875 }
876 });
877 *self.keepalive_task.lock().expect(MUTEX_POISONED) = Some(keepalive_task);
878 }
879
880 let account_state = self
882 .refresh_account_state()
883 .await
884 .context("failed to request Binance Futures account state")?;
885
886 if !account_state.balances.is_empty() {
887 log::info!(
888 "Received account state with {} balance(s) and {} margin(s)",
889 account_state.balances.len(),
890 account_state.margins.len()
891 );
892 }
893
894 self.emitter.send_account_state(account_state);
895
896 self.await_account_registered(30.0).await?;
897
898 self.core.set_connected();
899 log::info!("Connected: client_id={}", self.core.client_id);
900 Ok(())
901 }
902
903 async fn disconnect(&mut self) -> anyhow::Result<()> {
904 if self.core.is_disconnected() {
905 return Ok(());
906 }
907
908 self.cancellation_token.cancel();
910
911 let ws_task = self.ws_task.lock().expect(MUTEX_POISONED).take();
913 if let Some(task) = ws_task {
914 let _ = task.await;
915 }
916
917 let keepalive_task = self.keepalive_task.lock().expect(MUTEX_POISONED).take();
919 if let Some(task) = keepalive_task {
920 let _ = task.await;
921 }
922
923 if let Some(ref mut ws_client) = self.ws_client {
925 let _ = ws_client.close().await;
926 }
927
928 let listen_key = self.listen_key.read().expect(MUTEX_POISONED).clone();
930 if let Some(ref key) = listen_key
931 && let Err(e) = self.http_client.close_listen_key(key).await
932 {
933 log::warn!("Failed to close listen key: {e}");
934 }
935 *self.listen_key.write().expect(MUTEX_POISONED) = None;
936
937 self.abort_pending_tasks();
938
939 self.core.set_disconnected();
940 log::info!("Disconnected: client_id={}", self.core.client_id);
941 Ok(())
942 }
943
944 fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
945 self.update_account_state()
946 }
947
948 fn query_order(&self, cmd: &QueryOrder) -> anyhow::Result<()> {
949 log::debug!("query_order: client_order_id={}", cmd.client_order_id);
950
951 let http_client = self.http_client.clone();
952 let command = cmd.clone();
953 let emitter = self.emitter.clone();
954 let account_id = self.core.account_id;
955
956 let symbol = command.instrument_id.symbol.to_string();
957 let order_id = command.venue_order_id.map(|id| {
958 id.inner()
959 .parse::<i64>()
960 .expect("venue_order_id should be numeric")
961 });
962 let orig_client_order_id = Some(command.client_order_id.to_string());
963 let (_, size_precision) = self.get_instrument_precision(command.instrument_id);
964
965 self.spawn_task("query_order", async move {
966 let mut builder = BinanceOrderQueryParamsBuilder::default();
967 builder.symbol(symbol.clone());
968 if let Some(oid) = order_id {
969 builder.order_id(oid);
970 }
971 if let Some(coid) = orig_client_order_id {
972 builder.orig_client_order_id(coid);
973 }
974 let params = builder.build().expect("order query params");
975
976 let result = http_client.query_order(¶ms).await;
977
978 match result {
979 Ok(order) => {
980 let report = order.to_order_status_report(
981 account_id,
982 command.instrument_id,
983 size_precision,
984 )?;
985
986 emitter.send_order_status_report(report);
987 }
988 Err(e) => log::warn!("Failed to query order status: {e}"),
989 }
990
991 Ok(())
992 });
993
994 Ok(())
995 }
996
997 fn generate_account_state(
998 &self,
999 balances: Vec<AccountBalance>,
1000 margins: Vec<MarginBalance>,
1001 reported: bool,
1002 ts_event: UnixNanos,
1003 ) -> anyhow::Result<()> {
1004 self.emitter
1005 .emit_account_state(balances, margins, reported, ts_event);
1006 Ok(())
1007 }
1008
1009 fn start(&mut self) -> anyhow::Result<()> {
1010 if self.core.is_started() {
1011 return Ok(());
1012 }
1013
1014 self.emitter.set_sender(get_exec_event_sender());
1015 self.core.set_started();
1016
1017 let http_client = self.http_client.clone();
1018
1019 get_runtime().spawn(async move {
1020 match http_client.request_instruments().await {
1021 Ok(instruments) => {
1022 if instruments.is_empty() {
1023 log::warn!("No instruments returned for Binance Futures");
1024 } else {
1025 log::info!("Loaded {} Futures instruments", instruments.len());
1026 }
1027 }
1028 Err(e) => {
1029 log::error!("Failed to request Binance Futures instruments: {e}");
1030 }
1031 }
1032 });
1033
1034 log::info!(
1035 "Started: client_id={}, account_id={}, account_type={:?}, environment={:?}",
1036 self.core.client_id,
1037 self.core.account_id,
1038 self.core.account_type,
1039 self.config.environment,
1040 );
1041 Ok(())
1042 }
1043
1044 fn stop(&mut self) -> anyhow::Result<()> {
1045 if self.core.is_stopped() {
1046 return Ok(());
1047 }
1048
1049 self.core.set_stopped();
1050 self.core.set_disconnected();
1051 self.abort_pending_tasks();
1052 log::info!("Stopped: client_id={}", self.core.client_id);
1053 Ok(())
1054 }
1055
1056 fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
1057 let order = self
1058 .core
1059 .cache()
1060 .order(&cmd.client_order_id)
1061 .cloned()
1062 .ok_or_else(|| anyhow::anyhow!("Order not found: {}", cmd.client_order_id))?;
1063
1064 if order.is_closed() {
1065 let client_order_id = order.client_order_id();
1066 log::warn!("Cannot submit closed order {client_order_id}");
1067 return Ok(());
1068 }
1069
1070 log::debug!("OrderSubmitted client_order_id={}", order.client_order_id());
1071 self.emitter.emit_order_submitted(&order);
1072
1073 self.submit_order_internal(cmd)
1074 }
1075
1076 fn submit_order_list(&self, cmd: &SubmitOrderList) -> anyhow::Result<()> {
1077 log::warn!(
1078 "submit_order_list not yet implemented for Binance Futures (got {} orders)",
1079 cmd.order_list.client_order_ids.len()
1080 );
1081 Ok(())
1082 }
1083
1084 fn modify_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
1085 let order = {
1086 let cache = self.core.cache();
1087 cache.order(&cmd.client_order_id).cloned()
1088 };
1089
1090 let Some(order) = order else {
1091 log::warn!(
1092 "Cannot modify order {}: not found in cache",
1093 cmd.client_order_id
1094 );
1095 let ts_init = self.clock.get_time_ns();
1096 let rejected_event = OrderModifyRejected::new(
1097 self.core.trader_id,
1098 cmd.strategy_id,
1099 cmd.instrument_id,
1100 cmd.client_order_id,
1101 "Order not found in cache for modify".into(),
1102 UUID4::new(),
1103 ts_init, ts_init,
1105 false,
1106 cmd.venue_order_id,
1107 Some(self.core.account_id),
1108 );
1109
1110 self.emitter
1111 .send_order_event(OrderEventAny::ModifyRejected(rejected_event));
1112 return Ok(());
1113 };
1114
1115 let http_client = self.http_client.clone();
1116 let command = cmd.clone();
1117 let emitter = self.emitter.clone();
1118 let trader_id = self.core.trader_id;
1119 let account_id = self.core.account_id;
1120 let instrument_id = command.instrument_id;
1121 let venue_order_id = command.venue_order_id;
1122 let client_order_id = Some(command.client_order_id);
1123 let order_side = order.order_side();
1124 let quantity = command.quantity.unwrap_or_else(|| order.quantity());
1125 let price = command.price.or_else(|| order.price());
1126
1127 let Some(price) = price else {
1128 log::warn!(
1129 "Cannot modify order {}: price required",
1130 cmd.client_order_id
1131 );
1132 let ts_init = self.clock.get_time_ns();
1133 let rejected_event = OrderModifyRejected::new(
1134 self.core.trader_id,
1135 cmd.strategy_id,
1136 cmd.instrument_id,
1137 cmd.client_order_id,
1138 "Price required for order modification".into(),
1139 UUID4::new(),
1140 ts_init, ts_init,
1142 false,
1143 cmd.venue_order_id,
1144 Some(self.core.account_id),
1145 );
1146
1147 self.emitter
1148 .send_order_event(OrderEventAny::ModifyRejected(rejected_event));
1149 return Ok(());
1150 };
1151 let clock = self.clock;
1152
1153 self.spawn_task("modify_order", async move {
1154 let result = http_client
1155 .modify_order(
1156 account_id,
1157 instrument_id,
1158 venue_order_id,
1159 client_order_id,
1160 order_side,
1161 quantity,
1162 price,
1163 )
1164 .await;
1165
1166 match result {
1167 Ok(report) => {
1168 let ts_now = clock.get_time_ns();
1169 let updated_event = OrderUpdated::new(
1170 trader_id,
1171 command.strategy_id,
1172 command.instrument_id,
1173 command.client_order_id,
1174 quantity,
1175 UUID4::new(),
1176 ts_now,
1177 ts_now,
1178 false,
1179 Some(report.venue_order_id),
1180 Some(account_id),
1181 Some(price),
1182 None,
1183 None,
1184 );
1185
1186 emitter.send_order_event(OrderEventAny::Updated(updated_event));
1187 }
1188 Err(e) => {
1189 let ts_now = clock.get_time_ns();
1190 let rejected_event = OrderModifyRejected::new(
1191 trader_id,
1192 command.strategy_id,
1193 command.instrument_id,
1194 command.client_order_id,
1195 format!("modify-order-failed: {e}").into(),
1196 UUID4::new(),
1197 ts_now,
1198 ts_now,
1199 false,
1200 command.venue_order_id,
1201 Some(account_id),
1202 );
1203
1204 emitter.send_order_event(OrderEventAny::ModifyRejected(rejected_event));
1205
1206 anyhow::bail!("Modify order failed: {e}");
1207 }
1208 }
1209
1210 Ok(())
1211 });
1212
1213 Ok(())
1214 }
1215
1216 fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
1217 self.cancel_order_internal(cmd)
1218 }
1219
1220 fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
1221 let http_client = self.http_client.clone();
1222 let instrument_id = cmd.instrument_id;
1223
1224 self.spawn_task("cancel_all_orders", async move {
1226 match http_client.cancel_all_orders(instrument_id).await {
1227 Ok(_) => {
1228 log::info!("Cancel all regular orders request accepted for {instrument_id}");
1229 }
1230 Err(e) => {
1231 log::error!("Failed to cancel all regular orders for {instrument_id}: {e}");
1232 }
1233 }
1234
1235 match http_client.cancel_all_algo_orders(instrument_id).await {
1236 Ok(()) => {
1237 log::info!("Cancel all algo orders request accepted for {instrument_id}");
1238 }
1239 Err(e) => {
1240 log::error!("Failed to cancel all algo orders for {instrument_id}: {e}");
1241 }
1242 }
1243
1244 Ok(())
1245 });
1246
1247 Ok(())
1248 }
1249
1250 fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
1251 const BATCH_SIZE: usize = 5;
1252
1253 if cmd.cancels.is_empty() {
1254 return Ok(());
1255 }
1256
1257 let http_client = self.http_client.clone();
1258 let command = cmd.clone();
1259
1260 let emitter = self.emitter.clone();
1261 let trader_id = self.core.trader_id;
1262 let account_id = self.core.account_id;
1263 let clock = self.clock;
1264
1265 self.spawn_task("batch_cancel_orders", async move {
1266 for chunk in command.cancels.chunks(BATCH_SIZE) {
1267 let batch_items: Vec<BatchCancelItem> = chunk
1268 .iter()
1269 .map(|cancel| {
1270 if let Some(venue_order_id) = cancel.venue_order_id {
1271 let order_id = venue_order_id.inner().parse::<i64>().unwrap_or(0);
1272 if order_id != 0 {
1273 BatchCancelItem::by_order_id(
1274 command.instrument_id.symbol.to_string(),
1275 order_id,
1276 )
1277 } else {
1278 BatchCancelItem::by_client_order_id(
1279 command.instrument_id.symbol.to_string(),
1280 cancel.client_order_id.to_string(),
1281 )
1282 }
1283 } else {
1284 BatchCancelItem::by_client_order_id(
1285 command.instrument_id.symbol.to_string(),
1286 cancel.client_order_id.to_string(),
1287 )
1288 }
1289 })
1290 .collect();
1291
1292 match http_client.batch_cancel_orders(&batch_items).await {
1293 Ok(results) => {
1294 for (i, result) in results.iter().enumerate() {
1295 let cancel = &chunk[i];
1296 match result {
1297 BatchOrderResult::Success(response) => {
1298 let venue_order_id =
1299 VenueOrderId::new(response.order_id.to_string());
1300 let canceled_event = OrderCanceled::new(
1301 trader_id,
1302 cancel.strategy_id,
1303 cancel.instrument_id,
1304 cancel.client_order_id,
1305 UUID4::new(),
1306 cancel.ts_init,
1307 clock.get_time_ns(),
1308 false,
1309 Some(venue_order_id),
1310 Some(account_id),
1311 );
1312
1313 emitter
1314 .send_order_event(OrderEventAny::Canceled(canceled_event));
1315 }
1316 BatchOrderResult::Error(error) => {
1317 let rejected_event = OrderCancelRejected::new(
1318 trader_id,
1319 cancel.strategy_id,
1320 cancel.instrument_id,
1321 cancel.client_order_id,
1322 format!(
1323 "batch-cancel-error: code={}, msg={}",
1324 error.code, error.msg
1325 )
1326 .into(),
1327 UUID4::new(),
1328 clock.get_time_ns(),
1329 cancel.ts_init,
1330 false,
1331 cancel.venue_order_id,
1332 Some(account_id),
1333 );
1334
1335 emitter.send_order_event(OrderEventAny::CancelRejected(
1336 rejected_event,
1337 ));
1338 }
1339 }
1340 }
1341 }
1342 Err(e) => {
1343 for cancel in chunk {
1344 let rejected_event = OrderCancelRejected::new(
1345 trader_id,
1346 cancel.strategy_id,
1347 cancel.instrument_id,
1348 cancel.client_order_id,
1349 format!("batch-cancel-request-failed: {e}").into(),
1350 UUID4::new(),
1351 clock.get_time_ns(),
1352 cancel.ts_init,
1353 false,
1354 cancel.venue_order_id,
1355 Some(account_id),
1356 );
1357
1358 emitter.send_order_event(OrderEventAny::CancelRejected(rejected_event));
1359 }
1360 }
1361 }
1362 }
1363
1364 Ok(())
1365 });
1366
1367 Ok(())
1368 }
1369
1370 async fn generate_order_status_report(
1371 &self,
1372 cmd: &GenerateOrderStatusReport,
1373 ) -> anyhow::Result<Option<OrderStatusReport>> {
1374 let Some(instrument_id) = cmd.instrument_id else {
1375 log::warn!("generate_order_status_report requires instrument_id: {cmd:?}");
1376 return Ok(None);
1377 };
1378
1379 let symbol = instrument_id.symbol.to_string();
1380 let order_id = cmd.venue_order_id.as_ref().map(|id| {
1381 id.inner()
1382 .parse::<i64>()
1383 .expect("venue_order_id should be numeric")
1384 });
1385 let orig_client_order_id = cmd.client_order_id.map(|id| id.to_string());
1386
1387 let mut builder = BinanceOrderQueryParamsBuilder::default();
1388 builder.symbol(symbol);
1389 if let Some(oid) = order_id {
1390 builder.order_id(oid);
1391 }
1392 if let Some(ref coid) = orig_client_order_id {
1393 builder.orig_client_order_id(coid.clone());
1394 }
1395 let params = builder.build().map_err(|e| anyhow::anyhow!("{e}"))?;
1396
1397 let (_, size_precision) = self.get_instrument_precision(instrument_id);
1398
1399 match self.http_client.query_order(¶ms).await {
1400 Ok(order) => {
1401 let report = order.to_order_status_report(
1402 self.core.account_id,
1403 instrument_id,
1404 size_precision,
1405 )?;
1406 Ok(Some(report))
1407 }
1408 Err(BinanceFuturesHttpError::BinanceError { code: -2013, .. }) => {
1409 let Some(client_order_id) = cmd.client_order_id else {
1411 return Ok(None);
1412 };
1413
1414 match self.http_client.query_algo_order(client_order_id).await {
1415 Ok(algo_order) => {
1416 let report = algo_order.to_order_status_report(
1417 self.core.account_id,
1418 instrument_id,
1419 size_precision,
1420 )?;
1421 Ok(Some(report))
1422 }
1423 Err(e) => {
1424 log::debug!("Algo order query also failed: {e}");
1425 Ok(None)
1426 }
1427 }
1428 }
1429 Err(e) => Err(e.into()),
1430 }
1431 }
1432
1433 async fn generate_order_status_reports(
1434 &self,
1435 cmd: &GenerateOrderStatusReports,
1436 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1437 let mut reports = Vec::new();
1438
1439 if cmd.open_only {
1440 let symbol = cmd.instrument_id.map(|id| id.symbol.to_string());
1441 let mut builder = BinanceOpenOrdersParamsBuilder::default();
1442 if let Some(s) = symbol {
1443 builder.symbol(s);
1444 }
1445 let params = builder.build().map_err(|e| anyhow::anyhow!("{e}"))?;
1446
1447 let (orders, algo_orders) = tokio::try_join!(
1448 self.http_client.query_open_orders(¶ms),
1449 self.http_client.query_open_algo_orders(cmd.instrument_id),
1450 )?;
1451
1452 for order in orders {
1453 if let Some(instrument_id) = cmd.instrument_id {
1454 let (_, size_precision) = self.get_instrument_precision(instrument_id);
1455 if let Ok(report) = order.to_order_status_report(
1456 self.core.account_id,
1457 instrument_id,
1458 size_precision,
1459 ) {
1460 reports.push(report);
1461 }
1462 } else {
1463 let cache = self.core.cache();
1464 if let Some(instrument) = cache
1465 .instruments(&BINANCE_VENUE, None)
1466 .into_iter()
1467 .find(|i| i.symbol().as_str() == order.symbol.as_str())
1468 && let Ok(report) = order.to_order_status_report(
1469 self.core.account_id,
1470 instrument.id(),
1471 instrument.size_precision(),
1472 )
1473 {
1474 reports.push(report);
1475 }
1476 }
1477 }
1478
1479 for algo_order in algo_orders {
1480 if let Some(instrument_id) = cmd.instrument_id {
1481 let (_, size_precision) = self.get_instrument_precision(instrument_id);
1482 if let Ok(report) = algo_order.to_order_status_report(
1483 self.core.account_id,
1484 instrument_id,
1485 size_precision,
1486 ) {
1487 reports.push(report);
1488 }
1489 } else {
1490 let cache = self.core.cache();
1491 if let Some(instrument) = cache
1492 .instruments(&BINANCE_VENUE, None)
1493 .into_iter()
1494 .find(|i| i.symbol().as_str() == algo_order.symbol.as_str())
1495 && let Ok(report) = algo_order.to_order_status_report(
1496 self.core.account_id,
1497 instrument.id(),
1498 instrument.size_precision(),
1499 )
1500 {
1501 reports.push(report);
1502 }
1503 }
1504 }
1505 } else if let Some(instrument_id) = cmd.instrument_id {
1506 let symbol = instrument_id.symbol.to_string();
1507 let start_time = cmd.start.map(|t| t.as_i64() / 1_000_000); let end_time = cmd.end.map(|t| t.as_i64() / 1_000_000);
1509
1510 let mut builder = BinanceAllOrdersParamsBuilder::default();
1511 builder.symbol(symbol);
1512 if let Some(st) = start_time {
1513 builder.start_time(st);
1514 }
1515 if let Some(et) = end_time {
1516 builder.end_time(et);
1517 }
1518 let params = builder.build().map_err(|e| anyhow::anyhow!("{e}"))?;
1519
1520 let orders = self.http_client.query_all_orders(¶ms).await?;
1521 let (_, size_precision) = self.get_instrument_precision(instrument_id);
1522
1523 for order in orders {
1524 if let Ok(report) = order.to_order_status_report(
1525 self.core.account_id,
1526 instrument_id,
1527 size_precision,
1528 ) {
1529 reports.push(report);
1530 }
1531 }
1532 }
1533
1534 Ok(reports)
1535 }
1536
1537 async fn generate_fill_reports(
1538 &self,
1539 cmd: GenerateFillReports,
1540 ) -> anyhow::Result<Vec<FillReport>> {
1541 let Some(instrument_id) = cmd.instrument_id else {
1542 log::warn!("generate_fill_reports requires instrument_id for Binance Futures");
1543 return Ok(Vec::new());
1544 };
1545
1546 let symbol = instrument_id.symbol.to_string();
1547 let start_time = cmd.start.map(|t| t.as_i64() / 1_000_000);
1548 let end_time = cmd.end.map(|t| t.as_i64() / 1_000_000);
1549
1550 let mut builder = BinanceUserTradesParamsBuilder::default();
1551 builder.symbol(symbol);
1552 if let Some(st) = start_time {
1553 builder.start_time(st);
1554 }
1555 if let Some(et) = end_time {
1556 builder.end_time(et);
1557 }
1558 let params = builder.build().map_err(|e| anyhow::anyhow!("{e}"))?;
1559
1560 let trades = self.http_client.query_user_trades(¶ms).await?;
1561 let (price_precision, size_precision) = self.get_instrument_precision(instrument_id);
1562
1563 let mut reports = Vec::new();
1564 for trade in trades {
1565 if let Ok(report) = trade.to_fill_report(
1566 self.core.account_id,
1567 instrument_id,
1568 price_precision,
1569 size_precision,
1570 ) {
1571 reports.push(report);
1572 }
1573 }
1574
1575 Ok(reports)
1576 }
1577
1578 async fn generate_position_status_reports(
1579 &self,
1580 cmd: &GeneratePositionStatusReports,
1581 ) -> anyhow::Result<Vec<PositionStatusReport>> {
1582 let symbol = cmd.instrument_id.map(|id| id.symbol.to_string());
1583
1584 let mut builder = BinancePositionRiskParamsBuilder::default();
1585 if let Some(s) = symbol {
1586 builder.symbol(s);
1587 }
1588 let params = builder.build().map_err(|e| anyhow::anyhow!("{e}"))?;
1589
1590 let positions = self.http_client.query_positions(¶ms).await?;
1591
1592 let mut reports = Vec::new();
1593 for position in positions {
1594 let position_amt: f64 = position.position_amt.parse().unwrap_or(0.0);
1595 if position_amt == 0.0 {
1596 continue;
1597 }
1598
1599 let cache = self.core.cache();
1600 if let Some(instrument) = cache
1601 .instruments(&BINANCE_VENUE, None)
1602 .into_iter()
1603 .find(|i| i.symbol().as_str() == position.symbol.as_str())
1604 && let Ok(report) = self.create_position_report(
1605 &position,
1606 instrument.id(),
1607 instrument.size_precision(),
1608 )
1609 {
1610 reports.push(report);
1611 }
1612 }
1613
1614 Ok(reports)
1615 }
1616
1617 async fn generate_mass_status(
1618 &self,
1619 lookback_mins: Option<u64>,
1620 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
1621 log::info!("Generating ExecutionMassStatus (lookback_mins={lookback_mins:?})");
1622
1623 let ts_now = self.clock.get_time_ns();
1624
1625 let start = lookback_mins.map(|mins| {
1626 let lookback_ns = mins * 60 * 1_000_000_000;
1627 UnixNanos::from(ts_now.as_u64().saturating_sub(lookback_ns))
1628 });
1629
1630 let order_cmd = GenerateOrderStatusReportsBuilder::default()
1631 .ts_init(ts_now)
1632 .open_only(true)
1633 .start(start)
1634 .build()
1635 .map_err(|e| anyhow::anyhow!("{e}"))?;
1636
1637 let position_cmd = GeneratePositionStatusReportsBuilder::default()
1638 .ts_init(ts_now)
1639 .start(start)
1640 .build()
1641 .map_err(|e| anyhow::anyhow!("{e}"))?;
1642
1643 let (order_reports, position_reports) = tokio::try_join!(
1644 self.generate_order_status_reports(&order_cmd),
1645 self.generate_position_status_reports(&position_cmd),
1646 )?;
1647
1648 log::info!("Received {} OrderStatusReports", order_reports.len());
1649 log::info!("Received {} PositionReports", position_reports.len());
1650
1651 let mut mass_status = ExecutionMassStatus::new(
1652 self.core.client_id,
1653 self.core.account_id,
1654 *BINANCE_VENUE,
1655 ts_now,
1656 None,
1657 );
1658
1659 mass_status.add_order_reports(order_reports);
1660 mass_status.add_position_reports(position_reports);
1661
1662 Ok(Some(mass_status))
1663 }
1664}