1use std::{
19 future::Future,
20 sync::{
21 Mutex,
22 atomic::{AtomicBool, Ordering},
23 },
24 time::{Duration, Instant},
25};
26
27use anyhow::Context;
28use async_trait::async_trait;
29use nautilus_common::{
30 clients::ExecutionClient,
31 live::{runner::get_exec_event_sender, runtime::get_runtime},
32 messages::{
33 ExecutionEvent, ExecutionReport as NautilusExecutionReport,
34 execution::{
35 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
36 GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
37 ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
38 },
39 },
40};
41use nautilus_core::{MUTEX_POISONED, UUID4, UnixNanos, time::get_atomic_clock_realtime};
42use nautilus_live::ExecutionClientCore;
43use nautilus_model::{
44 accounts::AccountAny,
45 enums::OmsType,
46 events::{
47 AccountState, OrderAccepted, OrderCancelRejected, OrderCanceled, OrderEventAny,
48 OrderModifyRejected, OrderRejected, OrderSubmitted, OrderUpdated,
49 },
50 identifiers::{AccountId, ClientId, Venue, VenueOrderId},
51 orders::Order,
52 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
53 types::{AccountBalance, Currency, MarginBalance, Money},
54};
55use tokio::task::JoinHandle;
56
57use crate::{
58 common::consts::BINANCE_VENUE,
59 config::BinanceExecClientConfig,
60 spot::http::{client::BinanceSpotHttpClient, query::AccountInfoParams},
61};
62
63#[derive(Debug)]
68pub struct BinanceSpotExecutionClient {
69 core: ExecutionClientCore,
70 config: BinanceExecClientConfig,
71 http_client: BinanceSpotHttpClient,
72 exec_event_sender: Option<tokio::sync::mpsc::UnboundedSender<ExecutionEvent>>,
73 started: bool,
74 connected: AtomicBool,
75 instruments_initialized: AtomicBool,
76 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
77}
78
79impl BinanceSpotExecutionClient {
80 pub fn new(core: ExecutionClientCore, config: BinanceExecClientConfig) -> anyhow::Result<Self> {
86 let api_key = config
88 .api_key
89 .clone()
90 .or_else(|| std::env::var("BINANCE_API_KEY").ok())
91 .ok_or_else(|| anyhow::anyhow!("BINANCE_API_KEY not found in config or environment"))?;
92
93 let api_secret = config
94 .api_secret
95 .clone()
96 .or_else(|| std::env::var("BINANCE_API_SECRET").ok())
97 .ok_or_else(|| {
98 anyhow::anyhow!("BINANCE_API_SECRET not found in config or environment")
99 })?;
100
101 let http_client = BinanceSpotHttpClient::new(
102 config.environment,
103 Some(api_key),
104 Some(api_secret),
105 config.base_url_http.clone(),
106 None, None, None, )
110 .context("failed to construct Binance Spot HTTP client")?;
111
112 Ok(Self {
113 core,
114 config,
115 http_client,
116 exec_event_sender: None,
117 started: false,
118 connected: AtomicBool::new(false),
119 instruments_initialized: AtomicBool::new(false),
120 pending_tasks: Mutex::new(Vec::new()),
121 })
122 }
123
124 fn mantissa_to_f64(mantissa: i64, exponent: i8) -> f64 {
126 mantissa as f64 * 10f64.powi(exponent as i32)
127 }
128
129 fn create_account_state(
131 &self,
132 account_info: &crate::spot::http::models::BinanceAccountInfo,
133 ) -> AccountState {
134 let ts_now = get_atomic_clock_realtime().get_time_ns();
135
136 let balances: Vec<AccountBalance> = account_info
138 .balances
139 .iter()
140 .filter_map(|b| {
141 let free = Self::mantissa_to_f64(b.free_mantissa, b.exponent);
142 let locked = Self::mantissa_to_f64(b.locked_mantissa, b.exponent);
143 let total = free + locked;
144
145 if total == 0.0 {
147 return None;
148 }
149
150 let currency = Currency::from(&b.asset);
151 Some(AccountBalance::new(
152 Money::new(total, currency),
153 Money::new(locked, currency),
154 Money::new(free, currency),
155 ))
156 })
157 .collect();
158
159 AccountState::new(
160 self.core.account_id,
161 self.core.account_type,
162 balances,
163 Vec::new(), true, UUID4::new(),
166 ts_now,
167 ts_now,
168 None, )
170 }
171
172 async fn refresh_account_state(&self) -> anyhow::Result<AccountState> {
173 let params = AccountInfoParams::default();
174 let account_info = match self.http_client.request_account_state(¶ms).await {
175 Ok(info) => info,
176 Err(e) => {
177 log::error!("Binance account state request failed: {e}");
178 anyhow::bail!("Binance account state request failed: {e}");
179 }
180 };
181
182 Ok(self.create_account_state(&account_info))
183 }
184
185 fn update_account_state(&self) -> anyhow::Result<()> {
186 let runtime = get_runtime();
187 let account_state = runtime.block_on(self.refresh_account_state())?;
188
189 self.core.generate_account_state(
190 account_state.balances.clone(),
191 account_state.margins.clone(),
192 account_state.is_reported,
193 account_state.ts_event,
194 )
195 }
196
197 fn submit_order_internal(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
198 let order = cmd.order.clone();
199 let http_client = self.http_client.clone();
200
201 let exec_event_sender = self.exec_event_sender.clone();
202 let trader_id = self.core.trader_id;
203 let account_id = self.core.account_id;
204 let ts_init = cmd.ts_init;
205 let client_order_id = order.client_order_id();
206 let strategy_id = order.strategy_id();
207 let instrument_id = order.instrument_id();
208 let order_side = order.order_side();
209 let order_type = order.order_type();
210 let quantity = order.quantity();
211 let time_in_force = order.time_in_force();
212 let price = order.price();
213 let trigger_price = order.trigger_price();
214 let is_post_only = order.is_post_only();
215
216 self.spawn_task("submit_order", async move {
217 let result = http_client
218 .submit_order(
219 account_id,
220 instrument_id,
221 client_order_id,
222 order_side,
223 order_type,
224 quantity,
225 time_in_force,
226 price,
227 trigger_price,
228 is_post_only,
229 )
230 .await
231 .map_err(|e| anyhow::anyhow!("Submit order failed: {e}"));
232
233 match result {
234 Ok(report) => {
235 let accepted_event = OrderAccepted::new(
237 trader_id,
238 strategy_id,
239 instrument_id,
240 client_order_id,
241 report.venue_order_id,
242 account_id,
243 UUID4::new(),
244 ts_init,
245 get_atomic_clock_realtime().get_time_ns(),
246 false,
247 );
248
249 if let Some(sender) = &exec_event_sender
250 && let Err(e) = sender.send(ExecutionEvent::Order(OrderEventAny::Accepted(
251 accepted_event,
252 )))
253 {
254 log::warn!("Failed to send OrderAccepted event: {e}");
255 }
256 }
257 Err(e) => {
258 let rejected_event = OrderRejected::new(
259 trader_id,
260 strategy_id,
261 instrument_id,
262 client_order_id,
263 account_id,
264 format!("submit-order-error: {e}").into(),
265 UUID4::new(),
266 ts_init,
267 get_atomic_clock_realtime().get_time_ns(),
268 false,
269 false,
270 );
271
272 if let Some(sender) = &exec_event_sender
273 && let Err(send_err) = sender.send(ExecutionEvent::Order(
274 OrderEventAny::Rejected(rejected_event),
275 ))
276 {
277 log::warn!("Failed to send OrderRejected event: {send_err}");
278 }
279
280 return Err(e);
281 }
282 }
283
284 Ok(())
285 });
286
287 Ok(())
288 }
289
290 fn cancel_order_internal(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
291 let http_client = self.http_client.clone();
292 let command = cmd.clone();
293
294 let exec_event_sender = self.exec_event_sender.clone();
295 let trader_id = self.core.trader_id;
296 let account_id = self.core.account_id;
297 let ts_init = cmd.ts_init;
298
299 self.spawn_task("cancel_order", async move {
300 let result = http_client
301 .cancel_order(
302 command.instrument_id,
303 command.venue_order_id,
304 Some(command.client_order_id),
305 )
306 .await
307 .map_err(|e| anyhow::anyhow!("Cancel order failed: {e}"));
308
309 match result {
310 Ok(venue_order_id) => {
311 let canceled_event = OrderCanceled::new(
313 trader_id,
314 command.strategy_id,
315 command.instrument_id,
316 command.client_order_id,
317 UUID4::new(),
318 ts_init,
319 get_atomic_clock_realtime().get_time_ns(),
320 false,
321 Some(venue_order_id),
322 Some(account_id),
323 );
324
325 if let Some(sender) = &exec_event_sender
326 && let Err(e) = sender.send(ExecutionEvent::Order(OrderEventAny::Canceled(
327 canceled_event,
328 )))
329 {
330 log::warn!("Failed to send OrderCanceled event: {e}");
331 }
332 }
333 Err(e) => {
334 let rejected_event = OrderCancelRejected::new(
335 trader_id,
336 command.strategy_id,
337 command.instrument_id,
338 command.client_order_id,
339 format!("cancel-order-error: {e}").into(),
340 UUID4::new(),
341 get_atomic_clock_realtime().get_time_ns(),
342 ts_init,
343 false,
344 command.venue_order_id,
345 Some(account_id),
346 );
347
348 if let Some(sender) = &exec_event_sender
349 && let Err(send_err) = sender.send(ExecutionEvent::Order(
350 OrderEventAny::CancelRejected(rejected_event),
351 ))
352 {
353 log::warn!("Failed to send OrderCancelRejected event: {send_err}");
354 }
355
356 return Err(e);
357 }
358 }
359
360 Ok(())
361 });
362
363 Ok(())
364 }
365
366 fn spawn_task<F>(&self, description: &'static str, fut: F)
367 where
368 F: Future<Output = anyhow::Result<()>> + Send + 'static,
369 {
370 let runtime = get_runtime();
371 let handle = runtime.spawn(async move {
372 if let Err(e) = fut.await {
373 log::warn!("{description} failed: {e}");
374 }
375 });
376
377 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
378 tasks.retain(|handle| !handle.is_finished());
379 tasks.push(handle);
380 }
381
382 fn abort_pending_tasks(&self) {
383 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
384 for handle in tasks.drain(..) {
385 handle.abort();
386 }
387 }
388
389 async fn await_account_registered(&self, timeout_secs: f64) -> anyhow::Result<()> {
391 let account_id = self.core.account_id;
392
393 if self.core.cache().borrow().account(&account_id).is_some() {
394 log::info!("Account {account_id} registered");
395 return Ok(());
396 }
397
398 let start = Instant::now();
399 let timeout = Duration::from_secs_f64(timeout_secs);
400 let interval = Duration::from_millis(10);
401
402 loop {
403 tokio::time::sleep(interval).await;
404
405 if self.core.cache().borrow().account(&account_id).is_some() {
406 log::info!("Account {account_id} registered");
407 return Ok(());
408 }
409
410 if start.elapsed() >= timeout {
411 anyhow::bail!(
412 "Timeout waiting for account {account_id} to be registered after {timeout_secs}s"
413 );
414 }
415 }
416 }
417}
418
419#[async_trait(?Send)]
420impl ExecutionClient for BinanceSpotExecutionClient {
421 fn is_connected(&self) -> bool {
422 self.connected.load(Ordering::Acquire)
423 }
424
425 fn client_id(&self) -> ClientId {
426 self.core.client_id
427 }
428
429 fn account_id(&self) -> AccountId {
430 self.core.account_id
431 }
432
433 fn venue(&self) -> Venue {
434 *BINANCE_VENUE
435 }
436
437 fn oms_type(&self) -> OmsType {
438 self.core.oms_type
439 }
440
441 fn get_account(&self) -> Option<AccountAny> {
442 self.core.get_account()
443 }
444
445 async fn connect(&mut self) -> anyhow::Result<()> {
446 if self.connected.load(Ordering::Acquire) {
447 return Ok(());
448 }
449
450 if self.exec_event_sender.is_none() {
452 self.exec_event_sender = Some(get_exec_event_sender());
453 }
454
455 if !self.instruments_initialized.load(Ordering::Acquire) {
457 let instruments = self
458 .http_client
459 .request_instruments()
460 .await
461 .context("failed to request Binance Spot instruments")?;
462
463 if instruments.is_empty() {
464 log::warn!("No instruments returned for Binance Spot");
465 } else {
466 log::info!("Loaded {} Spot instruments", instruments.len());
467 self.http_client.cache_instruments(instruments.clone());
468
469 {
471 let mut cache = self.core.cache().borrow_mut();
472 for instrument in &instruments {
473 if let Err(e) = cache.add_instrument(instrument.clone()) {
474 log::debug!("Instrument already in cache: {e}");
475 }
476 }
477 }
478 }
479
480 self.instruments_initialized.store(true, Ordering::Release);
481 }
482
483 let Some(sender) = self.exec_event_sender.as_ref() else {
484 log::error!("Execution event sender not initialized");
485 anyhow::bail!("Execution event sender not initialized");
486 };
487
488 let account_state = self
490 .refresh_account_state()
491 .await
492 .context("failed to request Binance account state")?;
493
494 if !account_state.balances.is_empty() {
495 log::info!(
496 "Received account state with {} balance(s)",
497 account_state.balances.len()
498 );
499 }
500
501 if let Err(e) = sender.send(ExecutionEvent::Account(account_state)) {
502 log::warn!("Failed to send account state: {e}");
503 }
504
505 self.await_account_registered(30.0).await?;
507
508 self.connected.store(true, Ordering::Release);
509 log::info!("Connected: client_id={}", self.core.client_id);
510 Ok(())
511 }
512
513 async fn disconnect(&mut self) -> anyhow::Result<()> {
514 if !self.connected.load(Ordering::Acquire) {
515 return Ok(());
516 }
517
518 self.abort_pending_tasks();
519
520 self.connected.store(false, Ordering::Release);
521 log::info!("Disconnected: client_id={}", self.core.client_id);
522 Ok(())
523 }
524
525 fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
526 self.update_account_state()
527 }
528
529 fn query_order(&self, cmd: &QueryOrder) -> anyhow::Result<()> {
530 log::debug!("query_order: client_order_id={}", cmd.client_order_id);
531
532 let http_client = self.http_client.clone();
533 let command = cmd.clone();
534 let exec_event_sender = self.exec_event_sender.clone();
535 let account_id = self.core.account_id;
536
537 self.spawn_task("query_order", async move {
538 let result = http_client
539 .request_order_status(
540 account_id,
541 command.instrument_id,
542 command.venue_order_id,
543 Some(command.client_order_id),
544 )
545 .await;
546
547 match result {
548 Ok(report) => {
549 if let Some(sender) = &exec_event_sender {
550 let exec_report = NautilusExecutionReport::Order(Box::new(report));
551 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
552 log::warn!("Failed to send order status report: {e}");
553 }
554 }
555 }
556 Err(e) => log::warn!("Failed to query order status: {e}"),
557 }
558
559 Ok(())
560 });
561
562 Ok(())
563 }
564
565 fn generate_account_state(
566 &self,
567 balances: Vec<AccountBalance>,
568 margins: Vec<MarginBalance>,
569 reported: bool,
570 ts_event: UnixNanos,
571 ) -> anyhow::Result<()> {
572 self.core
573 .generate_account_state(balances, margins, reported, ts_event)
574 }
575
576 fn start(&mut self) -> anyhow::Result<()> {
577 if self.started {
578 return Ok(());
579 }
580
581 self.started = true;
582
583 let http_client = self.http_client.clone();
585
586 get_runtime().spawn(async move {
587 match http_client.request_instruments().await {
588 Ok(instruments) => {
589 if instruments.is_empty() {
590 log::warn!("No instruments returned for Binance Spot");
591 } else {
592 http_client.cache_instruments(instruments);
593 log::info!("Instruments initialized");
594 }
595 }
596 Err(e) => {
597 log::error!("Failed to request Binance Spot instruments: {e}");
598 }
599 }
600 });
601
602 log::info!(
603 "Started: client_id={}, account_id={}, account_type={:?}, environment={:?}, product_types={:?}",
604 self.core.client_id,
605 self.core.account_id,
606 self.core.account_type,
607 self.config.environment,
608 self.config.product_types,
609 );
610 Ok(())
611 }
612
613 fn stop(&mut self) -> anyhow::Result<()> {
614 if !self.started {
615 return Ok(());
616 }
617
618 self.started = false;
619 self.connected.store(false, Ordering::Release);
620 self.abort_pending_tasks();
621 log::info!("Stopped: client_id={}", self.core.client_id);
622 Ok(())
623 }
624
625 fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
626 let order = &cmd.order;
627
628 if order.is_closed() {
629 let client_order_id = order.client_order_id();
630 log::warn!("Cannot submit closed order {client_order_id}");
631 return Ok(());
632 }
633
634 let event = OrderSubmitted::new(
635 self.core.trader_id,
636 order.strategy_id(),
637 order.instrument_id(),
638 order.client_order_id(),
639 self.core.account_id,
640 UUID4::new(),
641 cmd.ts_init,
642 get_atomic_clock_realtime().get_time_ns(),
643 );
644 if let Some(sender) = &self.exec_event_sender {
645 log::debug!("OrderSubmitted client_order_id={}", order.client_order_id());
646 if let Err(e) = sender.send(ExecutionEvent::Order(OrderEventAny::Submitted(event))) {
647 log::warn!("Failed to send OrderSubmitted event: {e}");
648 }
649 } else {
650 log::warn!("Cannot send OrderSubmitted: exec_event_sender not initialized");
651 }
652
653 self.submit_order_internal(cmd)
654 }
655
656 fn submit_order_list(&self, cmd: &SubmitOrderList) -> anyhow::Result<()> {
657 log::warn!(
658 "submit_order_list not yet implemented for Binance Spot execution client (got {} orders)",
659 cmd.order_list.orders.len()
660 );
661 Ok(())
662 }
663
664 fn modify_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
665 let order = {
669 let cache = self.core.cache().borrow();
670 cache.order(&cmd.client_order_id).cloned()
671 };
672
673 let Some(order) = order else {
674 log::warn!(
675 "Cannot modify order {}: not found in cache",
676 cmd.client_order_id
677 );
678 let rejected_event = OrderModifyRejected::new(
679 self.core.trader_id,
680 cmd.strategy_id,
681 cmd.instrument_id,
682 cmd.client_order_id,
683 "Order not found in cache for modify".into(),
684 UUID4::new(),
685 get_atomic_clock_realtime().get_time_ns(),
686 cmd.ts_init,
687 false,
688 cmd.venue_order_id,
689 Some(self.core.account_id),
690 );
691
692 if let Some(sender) = &self.exec_event_sender
693 && let Err(e) = sender.send(ExecutionEvent::Order(OrderEventAny::ModifyRejected(
694 rejected_event,
695 )))
696 {
697 log::warn!("Failed to send OrderModifyRejected event: {e}");
698 }
699 return Ok(());
700 };
701
702 let http_client = self.http_client.clone();
703 let command = cmd.clone();
704
705 let exec_event_sender = self.exec_event_sender.clone();
706 let trader_id = self.core.trader_id;
707 let account_id = self.core.account_id;
708 let ts_init = cmd.ts_init;
709
710 let order_side = order.order_side();
712 let order_type = order.order_type();
713 let time_in_force = order.time_in_force();
714 let quantity = cmd.quantity.unwrap_or_else(|| order.quantity());
715
716 self.spawn_task("modify_order", async move {
717 let result = http_client
719 .modify_order(
720 account_id,
721 command.instrument_id,
722 command
723 .venue_order_id
724 .ok_or_else(|| anyhow::anyhow!("venue_order_id required for modify"))?,
725 command.client_order_id,
726 order_side,
727 order_type,
728 quantity,
729 time_in_force,
730 command.price,
731 )
732 .await
733 .map_err(|e| anyhow::anyhow!("Modify order failed: {e}"));
734
735 match result {
736 Ok(report) => {
737 let updated_event = OrderUpdated::new(
739 trader_id,
740 command.strategy_id,
741 command.instrument_id,
742 command.client_order_id,
743 report.quantity,
744 UUID4::new(),
745 ts_init,
746 get_atomic_clock_realtime().get_time_ns(),
747 false,
748 Some(report.venue_order_id),
749 Some(account_id),
750 report.price,
751 None, None, );
754
755 if let Some(sender) = &exec_event_sender
756 && let Err(e) = sender
757 .send(ExecutionEvent::Order(OrderEventAny::Updated(updated_event)))
758 {
759 log::warn!("Failed to send OrderUpdated event: {e}");
760 }
761 }
762 Err(e) => {
763 let rejected_event = OrderModifyRejected::new(
764 trader_id,
765 command.strategy_id,
766 command.instrument_id,
767 command.client_order_id,
768 format!("modify-order-error: {e}").into(),
769 UUID4::new(),
770 get_atomic_clock_realtime().get_time_ns(),
771 ts_init,
772 false,
773 command.venue_order_id,
774 Some(account_id),
775 );
776
777 if let Some(sender) = &exec_event_sender
778 && let Err(send_err) = sender.send(ExecutionEvent::Order(
779 OrderEventAny::ModifyRejected(rejected_event),
780 ))
781 {
782 log::warn!("Failed to send OrderModifyRejected event: {send_err}");
783 }
784
785 return Err(e);
786 }
787 }
788
789 Ok(())
790 });
791
792 Ok(())
793 }
794
795 fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
796 self.cancel_order_internal(cmd)
797 }
798
799 fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
800 let http_client = self.http_client.clone();
801 let command = cmd.clone();
802
803 let exec_event_sender = self.exec_event_sender.clone();
804 let trader_id = self.core.trader_id;
805 let account_id = self.core.account_id;
806
807 self.spawn_task("cancel_all_orders", async move {
808 let canceled_orders = http_client.cancel_all_orders(command.instrument_id).await?;
809
810 for (venue_order_id, client_order_id) in canceled_orders {
812 let canceled_event = OrderCanceled::new(
813 trader_id,
814 command.strategy_id,
815 command.instrument_id,
816 client_order_id,
817 UUID4::new(),
818 command.ts_init,
819 get_atomic_clock_realtime().get_time_ns(),
820 false,
821 Some(venue_order_id),
822 Some(account_id),
823 );
824
825 if let Some(sender) = &exec_event_sender
826 && let Err(e) = sender.send(ExecutionEvent::Order(OrderEventAny::Canceled(
827 canceled_event,
828 )))
829 {
830 log::warn!("Failed to send OrderCanceled event: {e}");
831 }
832 }
833
834 Ok(())
835 });
836
837 Ok(())
838 }
839
840 fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
841 for cancel in &cmd.cancels {
844 self.cancel_order_internal(cancel)?;
845 }
846
847 Ok(())
848 }
849
850 async fn generate_order_status_report(
851 &self,
852 cmd: &GenerateOrderStatusReport,
853 ) -> anyhow::Result<Option<OrderStatusReport>> {
854 let Some(instrument_id) = cmd.instrument_id else {
855 log::warn!("generate_order_status_report requires instrument_id: {cmd:?}");
856 return Ok(None);
857 };
858
859 let venue_order_id = cmd
861 .venue_order_id
862 .as_ref()
863 .map(|id| VenueOrderId::new(id.inner()));
864
865 let report = self
866 .http_client
867 .request_order_status(
868 self.core.account_id,
869 instrument_id,
870 venue_order_id,
871 cmd.client_order_id,
872 )
873 .await?;
874
875 Ok(Some(report))
876 }
877
878 async fn generate_order_status_reports(
879 &self,
880 cmd: &GenerateOrderStatusReports,
881 ) -> anyhow::Result<Vec<OrderStatusReport>> {
882 let start_dt = cmd.start.map(|nanos| nanos.to_datetime_utc());
883 let end_dt = cmd.end.map(|nanos| nanos.to_datetime_utc());
884
885 let reports = self
886 .http_client
887 .request_order_status_reports(
888 self.core.account_id,
889 cmd.instrument_id,
890 start_dt,
891 end_dt,
892 cmd.open_only,
893 None, )
895 .await?;
896
897 Ok(reports)
898 }
899
900 async fn generate_fill_reports(
901 &self,
902 cmd: GenerateFillReports,
903 ) -> anyhow::Result<Vec<FillReport>> {
904 let Some(instrument_id) = cmd.instrument_id else {
905 log::warn!("generate_fill_reports requires instrument_id for Binance Spot");
906 return Ok(Vec::new());
907 };
908
909 let venue_order_id = cmd
911 .venue_order_id
912 .as_ref()
913 .map(|id| VenueOrderId::new(id.inner()));
914
915 let start_dt = cmd.start.map(|nanos| nanos.to_datetime_utc());
916 let end_dt = cmd.end.map(|nanos| nanos.to_datetime_utc());
917
918 let reports = self
919 .http_client
920 .request_fill_reports(
921 self.core.account_id,
922 instrument_id,
923 venue_order_id,
924 start_dt,
925 end_dt,
926 None, )
928 .await?;
929
930 Ok(reports)
931 }
932
933 async fn generate_position_status_reports(
934 &self,
935 _cmd: &GeneratePositionStatusReports,
936 ) -> anyhow::Result<Vec<PositionStatusReport>> {
937 Ok(Vec::new())
940 }
941
942 async fn generate_mass_status(
943 &self,
944 lookback_mins: Option<u64>,
945 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
946 log::info!("Generating ExecutionMassStatus (lookback_mins={lookback_mins:?})");
947
948 let ts_now = get_atomic_clock_realtime().get_time_ns();
949
950 let start = lookback_mins.map(|mins| {
951 let lookback_ns = mins * 60 * 1_000_000_000;
952 UnixNanos::from(ts_now.as_u64().saturating_sub(lookback_ns))
953 });
954
955 let order_cmd = GenerateOrderStatusReports::new(
958 UUID4::new(),
959 ts_now,
960 true, None, start,
963 None, None, None, );
967
968 let position_cmd = GeneratePositionStatusReports::new(
969 UUID4::new(),
970 ts_now,
971 None, start,
973 None, None, None, );
977
978 let (order_reports, position_reports) = tokio::try_join!(
979 self.generate_order_status_reports(&order_cmd),
980 self.generate_position_status_reports(&position_cmd),
981 )?;
982
983 log::info!("Received {} OrderStatusReports", order_reports.len());
987 log::info!("Received {} PositionReports", position_reports.len());
988
989 let mut mass_status = ExecutionMassStatus::new(
990 self.core.client_id,
991 self.core.account_id,
992 *BINANCE_VENUE,
993 ts_now,
994 None,
995 );
996
997 mass_status.add_order_reports(order_reports);
998 mass_status.add_position_reports(position_reports);
999
1000 Ok(Some(mass_status))
1001 }
1002}