1use std::{
19 future::Future,
20 sync::Mutex,
21 time::{Duration, Instant},
22};
23
24use anyhow::Context;
25use async_trait::async_trait;
26use nautilus_common::{
27 clients::ExecutionClient,
28 live::{get_runtime, runner::get_exec_event_sender},
29 messages::execution::{
30 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
31 GenerateOrderStatusReport, GenerateOrderStatusReports, GenerateOrderStatusReportsBuilder,
32 GeneratePositionStatusReports, GeneratePositionStatusReportsBuilder, ModifyOrder,
33 QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
34 },
35};
36use nautilus_core::{
37 MUTEX_POISONED, UUID4, UnixNanos,
38 time::{AtomicTime, get_atomic_clock_realtime},
39};
40use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
41use nautilus_model::{
42 accounts::AccountAny,
43 enums::OmsType,
44 events::{
45 AccountState, OrderAccepted, OrderCancelRejected, OrderCanceled, OrderEventAny,
46 OrderModifyRejected, OrderRejected, OrderUpdated,
47 },
48 identifiers::{AccountId, ClientId, Venue, VenueOrderId},
49 orders::Order,
50 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
51 types::{AccountBalance, MarginBalance},
52};
53use tokio::task::JoinHandle;
54
55use crate::{
56 common::{consts::BINANCE_VENUE, credential::resolve_credentials, enums::BinanceProductType},
57 config::BinanceExecClientConfig,
58 spot::http::{
59 client::BinanceSpotHttpClient, models::BatchCancelResult, query::BatchCancelItem,
60 },
61};
62
63#[derive(Debug)]
68pub struct BinanceSpotExecutionClient {
69 core: ExecutionClientCore,
70 clock: &'static AtomicTime,
71 config: BinanceExecClientConfig,
72 emitter: ExecutionEventEmitter,
73 http_client: BinanceSpotHttpClient,
74 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
75}
76
77impl BinanceSpotExecutionClient {
78 pub fn new(core: ExecutionClientCore, config: BinanceExecClientConfig) -> anyhow::Result<Self> {
84 let product_type = config
85 .product_types
86 .first()
87 .copied()
88 .unwrap_or(BinanceProductType::Spot);
89
90 let (api_key, api_secret) = resolve_credentials(
91 config.api_key.clone(),
92 config.api_secret.clone(),
93 config.environment,
94 product_type,
95 )?;
96
97 let http_client = BinanceSpotHttpClient::new(
98 config.environment,
99 Some(api_key),
100 Some(api_secret),
101 config.base_url_http.clone(),
102 None, None, None, )
106 .context("failed to construct Binance Spot HTTP client")?;
107
108 let clock = get_atomic_clock_realtime();
109 let emitter = ExecutionEventEmitter::new(
110 clock,
111 core.trader_id,
112 core.account_id,
113 core.account_type,
114 core.base_currency,
115 );
116
117 Ok(Self {
118 core,
119 clock,
120 config,
121 emitter,
122 http_client,
123 pending_tasks: Mutex::new(Vec::new()),
124 })
125 }
126
127 async fn refresh_account_state(&self) -> anyhow::Result<AccountState> {
128 self.http_client
129 .request_account_state(self.core.account_id)
130 .await
131 }
132
133 fn update_account_state(&self) -> anyhow::Result<()> {
134 let runtime = get_runtime();
135 let account_state = runtime.block_on(self.refresh_account_state())?;
136
137 let ts_now = self.clock.get_time_ns();
138 self.emitter.emit_account_state(
139 account_state.balances.clone(),
140 account_state.margins.clone(),
141 account_state.is_reported,
142 ts_now,
143 );
144
145 Ok(())
146 }
147
148 fn submit_order_internal(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
149 let order = self
150 .core
151 .cache()
152 .order(&cmd.client_order_id)
153 .cloned()
154 .ok_or_else(|| anyhow::anyhow!("Order not found: {}", cmd.client_order_id))?;
155 let http_client = self.http_client.clone();
156
157 let event_emitter = self.emitter.clone();
158 let trader_id = self.core.trader_id;
159 let account_id = self.core.account_id;
160 let client_order_id = order.client_order_id();
161 let strategy_id = order.strategy_id();
162 let instrument_id = order.instrument_id();
163 let order_side = order.order_side();
164 let order_type = order.order_type();
165 let quantity = order.quantity();
166 let time_in_force = order.time_in_force();
167 let price = order.price();
168 let trigger_price = order.trigger_price();
169 let is_post_only = order.is_post_only();
170 let clock = self.clock;
171 let ts_init = self.clock.get_time_ns();
172
173 self.spawn_task("submit_order", async move {
174 let result = http_client
175 .submit_order(
176 account_id,
177 instrument_id,
178 client_order_id,
179 order_side,
180 order_type,
181 quantity,
182 time_in_force,
183 price,
184 trigger_price,
185 is_post_only,
186 )
187 .await
188 .map_err(|e| anyhow::anyhow!("Submit order failed: {e}"));
189
190 match result {
191 Ok(report) => {
192 let accepted = OrderAccepted::new(
193 trader_id,
194 strategy_id,
195 instrument_id,
196 client_order_id,
197 report.venue_order_id,
198 account_id,
199 UUID4::new(),
200 ts_init, ts_init,
202 false,
203 );
204
205 event_emitter.send_order_event(OrderEventAny::Accepted(accepted));
206 }
207 Err(e) => {
208 let rejected = OrderRejected::new(
209 trader_id,
210 strategy_id,
211 instrument_id,
212 client_order_id,
213 account_id,
214 format!("submit-order-error: {e}").into(),
215 UUID4::new(),
216 ts_init,
217 clock.get_time_ns(),
218 false,
219 false,
220 );
221
222 event_emitter.send_order_event(OrderEventAny::Rejected(rejected));
223
224 return Err(e);
225 }
226 }
227
228 Ok(())
229 });
230
231 Ok(())
232 }
233
234 fn cancel_order_internal(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
235 let http_client = self.http_client.clone();
236 let command = cmd.clone();
237
238 let event_emitter = self.emitter.clone();
239 let trader_id = self.core.trader_id;
240 let account_id = self.core.account_id;
241 let clock = self.clock;
242
243 self.spawn_task("cancel_order", async move {
244 let result = http_client
245 .cancel_order(
246 command.instrument_id,
247 command.venue_order_id,
248 Some(command.client_order_id),
249 )
250 .await
251 .map_err(|e| anyhow::anyhow!("Cancel order failed: {e}"));
252
253 match result {
254 Ok(venue_order_id) => {
255 let ts_now = clock.get_time_ns();
257 let canceled_event = OrderCanceled::new(
258 trader_id,
259 command.strategy_id,
260 command.instrument_id,
261 command.client_order_id,
262 UUID4::new(),
263 ts_now,
264 ts_now,
265 false,
266 Some(venue_order_id),
267 Some(account_id),
268 );
269
270 event_emitter.send_order_event(OrderEventAny::Canceled(canceled_event));
271 }
272 Err(e) => {
273 let ts_now = clock.get_time_ns();
274 let rejected_event = OrderCancelRejected::new(
275 trader_id,
276 command.strategy_id,
277 command.instrument_id,
278 command.client_order_id,
279 format!("cancel-order-error: {e}").into(),
280 UUID4::new(),
281 ts_now,
282 ts_now,
283 false,
284 command.venue_order_id,
285 Some(account_id),
286 );
287
288 event_emitter.send_order_event(OrderEventAny::CancelRejected(rejected_event));
289
290 return Err(e);
291 }
292 }
293
294 Ok(())
295 });
296
297 Ok(())
298 }
299
300 fn spawn_task<F>(&self, description: &'static str, fut: F)
301 where
302 F: Future<Output = anyhow::Result<()>> + Send + 'static,
303 {
304 let runtime = get_runtime();
305 let handle = runtime.spawn(async move {
306 if let Err(e) = fut.await {
307 log::warn!("{description} failed: {e}");
308 }
309 });
310
311 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
312 tasks.retain(|handle| !handle.is_finished());
313 tasks.push(handle);
314 }
315
316 fn abort_pending_tasks(&self) {
317 let mut tasks = self.pending_tasks.lock().expect(MUTEX_POISONED);
318 for handle in tasks.drain(..) {
319 handle.abort();
320 }
321 }
322
323 async fn await_account_registered(&self, timeout_secs: f64) -> anyhow::Result<()> {
325 let account_id = self.core.account_id;
326
327 if self.core.cache().account(&account_id).is_some() {
328 log::info!("Account {account_id} registered");
329 return Ok(());
330 }
331
332 let start = Instant::now();
333 let timeout = Duration::from_secs_f64(timeout_secs);
334 let interval = Duration::from_millis(10);
335
336 loop {
337 tokio::time::sleep(interval).await;
338
339 if self.core.cache().account(&account_id).is_some() {
340 log::info!("Account {account_id} registered");
341 return Ok(());
342 }
343
344 if start.elapsed() >= timeout {
345 anyhow::bail!(
346 "Timeout waiting for account {account_id} to be registered after {timeout_secs}s"
347 );
348 }
349 }
350 }
351}
352
353#[async_trait(?Send)]
354impl ExecutionClient for BinanceSpotExecutionClient {
355 fn is_connected(&self) -> bool {
356 self.core.is_connected()
357 }
358
359 fn client_id(&self) -> ClientId {
360 self.core.client_id
361 }
362
363 fn account_id(&self) -> AccountId {
364 self.core.account_id
365 }
366
367 fn venue(&self) -> Venue {
368 *BINANCE_VENUE
369 }
370
371 fn oms_type(&self) -> OmsType {
372 self.core.oms_type
373 }
374
375 fn get_account(&self) -> Option<AccountAny> {
376 self.core.cache().account(&self.core.account_id).cloned()
377 }
378
379 async fn connect(&mut self) -> anyhow::Result<()> {
380 if self.core.is_connected() {
381 return Ok(());
382 }
383
384 if !self.core.instruments_initialized() {
386 let instruments = self
387 .http_client
388 .request_instruments()
389 .await
390 .context("failed to request Binance Spot instruments")?;
391
392 if instruments.is_empty() {
393 log::warn!("No instruments returned for Binance Spot");
394 } else {
395 log::info!("Loaded {} Spot instruments", instruments.len());
396 self.http_client.cache_instruments(instruments);
397 }
398
399 self.core.set_instruments_initialized();
400 }
401
402 let account_state = self
404 .refresh_account_state()
405 .await
406 .context("failed to request Binance account state")?;
407
408 if !account_state.balances.is_empty() {
409 log::info!(
410 "Received account state with {} balance(s)",
411 account_state.balances.len()
412 );
413 }
414
415 self.emitter.send_account_state(account_state);
416
417 self.await_account_registered(30.0).await?;
419
420 self.core.set_connected();
421 log::info!("Connected: client_id={}", self.core.client_id);
422 Ok(())
423 }
424
425 async fn disconnect(&mut self) -> anyhow::Result<()> {
426 if self.core.is_disconnected() {
427 return Ok(());
428 }
429
430 self.abort_pending_tasks();
431
432 self.core.set_disconnected();
433 log::info!("Disconnected: client_id={}", self.core.client_id);
434 Ok(())
435 }
436
437 fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
438 self.update_account_state()
439 }
440
441 fn query_order(&self, cmd: &QueryOrder) -> anyhow::Result<()> {
442 log::debug!("query_order: client_order_id={}", cmd.client_order_id);
443
444 let http_client = self.http_client.clone();
445 let command = cmd.clone();
446 let event_emitter = self.emitter.clone();
447 let account_id = self.core.account_id;
448
449 self.spawn_task("query_order", async move {
450 let result = http_client
451 .request_order_status_report(
452 account_id,
453 command.instrument_id,
454 command.venue_order_id,
455 Some(command.client_order_id),
456 )
457 .await;
458
459 match result {
460 Ok(report) => {
461 event_emitter.send_order_status_report(report);
462 }
463 Err(e) => log::warn!("Failed to query order status: {e}"),
464 }
465
466 Ok(())
467 });
468
469 Ok(())
470 }
471
472 fn generate_account_state(
473 &self,
474 balances: Vec<AccountBalance>,
475 margins: Vec<MarginBalance>,
476 reported: bool,
477 ts_event: UnixNanos,
478 ) -> anyhow::Result<()> {
479 self.emitter
480 .emit_account_state(balances, margins, reported, ts_event);
481 Ok(())
482 }
483
484 fn start(&mut self) -> anyhow::Result<()> {
485 if self.core.is_started() {
486 return Ok(());
487 }
488
489 self.emitter.set_sender(get_exec_event_sender());
490 self.core.set_started();
491
492 let http_client = self.http_client.clone();
494
495 get_runtime().spawn(async move {
496 match http_client.request_instruments().await {
497 Ok(instruments) => {
498 if instruments.is_empty() {
499 log::warn!("No instruments returned for Binance Spot");
500 } else {
501 http_client.cache_instruments(instruments);
502 log::info!("Instruments initialized");
503 }
504 }
505 Err(e) => {
506 log::error!("Failed to request Binance Spot instruments: {e}");
507 }
508 }
509 });
510
511 log::info!(
512 "Started: client_id={}, account_id={}, account_type={:?}, environment={:?}, product_types={:?}",
513 self.core.client_id,
514 self.core.account_id,
515 self.core.account_type,
516 self.config.environment,
517 self.config.product_types,
518 );
519 Ok(())
520 }
521
522 fn stop(&mut self) -> anyhow::Result<()> {
523 if self.core.is_stopped() {
524 return Ok(());
525 }
526
527 self.core.set_stopped();
528 self.core.set_disconnected();
529 self.abort_pending_tasks();
530 log::info!("Stopped: client_id={}", self.core.client_id);
531 Ok(())
532 }
533
534 fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
535 let order = self
536 .core
537 .cache()
538 .order(&cmd.client_order_id)
539 .cloned()
540 .ok_or_else(|| anyhow::anyhow!("Order not found: {}", cmd.client_order_id))?;
541
542 if order.is_closed() {
543 let client_order_id = order.client_order_id();
544 log::warn!("Cannot submit closed order {client_order_id}");
545 return Ok(());
546 }
547
548 log::debug!("OrderSubmitted client_order_id={}", order.client_order_id());
549 self.emitter.emit_order_submitted(&order);
550
551 self.submit_order_internal(cmd)
552 }
553
554 fn submit_order_list(&self, cmd: &SubmitOrderList) -> anyhow::Result<()> {
555 log::warn!(
556 "submit_order_list not yet implemented for Binance Spot execution client (got {} orders)",
557 cmd.order_list.client_order_ids.len()
558 );
559 Ok(())
560 }
561
562 fn modify_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
563 let order = self.core.cache().order(&cmd.client_order_id).cloned();
567
568 let Some(order) = order else {
569 log::warn!(
570 "Cannot modify order {}: not found in cache",
571 cmd.client_order_id
572 );
573 let ts_init = self.clock.get_time_ns();
574 let rejected_event = OrderModifyRejected::new(
575 self.core.trader_id,
576 cmd.strategy_id,
577 cmd.instrument_id,
578 cmd.client_order_id,
579 "Order not found in cache for modify".into(),
580 UUID4::new(),
581 ts_init, ts_init,
583 false,
584 cmd.venue_order_id,
585 Some(self.core.account_id),
586 );
587
588 self.emitter
589 .send_order_event(OrderEventAny::ModifyRejected(rejected_event));
590 return Ok(());
591 };
592
593 let http_client = self.http_client.clone();
594 let command = cmd.clone();
595
596 let event_emitter = self.emitter.clone();
597 let trader_id = self.core.trader_id;
598 let account_id = self.core.account_id;
599 let clock = self.clock;
600
601 let order_side = order.order_side();
603 let order_type = order.order_type();
604 let time_in_force = order.time_in_force();
605 let quantity = cmd.quantity.unwrap_or_else(|| order.quantity());
606
607 self.spawn_task("modify_order", async move {
608 let result = http_client
610 .modify_order(
611 account_id,
612 command.instrument_id,
613 command
614 .venue_order_id
615 .ok_or_else(|| anyhow::anyhow!("venue_order_id required for modify"))?,
616 command.client_order_id,
617 order_side,
618 order_type,
619 quantity,
620 time_in_force,
621 command.price,
622 )
623 .await
624 .map_err(|e| anyhow::anyhow!("Modify order failed: {e}"));
625
626 match result {
627 Ok(report) => {
628 let ts_now = clock.get_time_ns();
630 let updated_event = OrderUpdated::new(
631 trader_id,
632 command.strategy_id,
633 command.instrument_id,
634 command.client_order_id,
635 report.quantity,
636 UUID4::new(),
637 ts_now,
638 ts_now,
639 false,
640 Some(report.venue_order_id),
641 Some(account_id),
642 report.price,
643 None, None, );
646
647 event_emitter.send_order_event(OrderEventAny::Updated(updated_event));
648 }
649 Err(e) => {
650 let ts_now = clock.get_time_ns();
651 let rejected_event = OrderModifyRejected::new(
652 trader_id,
653 command.strategy_id,
654 command.instrument_id,
655 command.client_order_id,
656 format!("modify-order-error: {e}").into(),
657 UUID4::new(),
658 ts_now,
659 ts_now,
660 false,
661 command.venue_order_id,
662 Some(account_id),
663 );
664
665 event_emitter.send_order_event(OrderEventAny::ModifyRejected(rejected_event));
666
667 return Err(e);
668 }
669 }
670
671 Ok(())
672 });
673
674 Ok(())
675 }
676
677 fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
678 self.cancel_order_internal(cmd)
679 }
680
681 fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
682 let http_client = self.http_client.clone();
683 let command = cmd.clone();
684
685 let event_emitter = self.emitter.clone();
686 let trader_id = self.core.trader_id;
687 let account_id = self.core.account_id;
688 let clock = self.clock;
689
690 self.spawn_task("cancel_all_orders", async move {
691 let canceled_orders = http_client.cancel_all_orders(command.instrument_id).await?;
692
693 for (venue_order_id, client_order_id) in canceled_orders {
695 let canceled_event = OrderCanceled::new(
696 trader_id,
697 command.strategy_id,
698 command.instrument_id,
699 client_order_id,
700 UUID4::new(),
701 command.ts_init,
702 clock.get_time_ns(),
703 false,
704 Some(venue_order_id),
705 Some(account_id),
706 );
707
708 event_emitter.send_order_event(OrderEventAny::Canceled(canceled_event));
709 }
710
711 Ok(())
712 });
713
714 Ok(())
715 }
716
717 fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
718 const BATCH_SIZE: usize = 5;
719
720 if cmd.cancels.is_empty() {
721 return Ok(());
722 }
723
724 let http_client = self.http_client.clone();
725 let command = cmd.clone();
726
727 let event_emitter = self.emitter.clone();
728 let trader_id = self.core.trader_id;
729 let account_id = self.core.account_id;
730 let clock = self.clock;
731
732 self.spawn_task("batch_cancel_orders", async move {
733 for chunk in command.cancels.chunks(BATCH_SIZE) {
734 let batch_items: Vec<BatchCancelItem> = chunk
735 .iter()
736 .map(|cancel| {
737 if let Some(venue_order_id) = cancel.venue_order_id {
738 let order_id = venue_order_id.inner().parse::<i64>().unwrap_or(0);
739 if order_id != 0 {
740 BatchCancelItem::by_order_id(
741 command.instrument_id.symbol.to_string(),
742 order_id,
743 )
744 } else {
745 BatchCancelItem::by_client_order_id(
746 command.instrument_id.symbol.to_string(),
747 cancel.client_order_id.to_string(),
748 )
749 }
750 } else {
751 BatchCancelItem::by_client_order_id(
752 command.instrument_id.symbol.to_string(),
753 cancel.client_order_id.to_string(),
754 )
755 }
756 })
757 .collect();
758
759 match http_client.batch_cancel_orders(&batch_items).await {
760 Ok(results) => {
761 for (i, result) in results.iter().enumerate() {
762 let cancel = &chunk[i];
763 match result {
764 BatchCancelResult::Success(success) => {
765 let venue_order_id =
766 VenueOrderId::new(success.order_id.to_string());
767 let canceled_event = OrderCanceled::new(
768 trader_id,
769 cancel.strategy_id,
770 cancel.instrument_id,
771 cancel.client_order_id,
772 UUID4::new(),
773 cancel.ts_init,
774 clock.get_time_ns(),
775 false,
776 Some(venue_order_id),
777 Some(account_id),
778 );
779
780 event_emitter
781 .send_order_event(OrderEventAny::Canceled(canceled_event));
782 }
783 BatchCancelResult::Error(error) => {
784 let rejected_event = OrderCancelRejected::new(
785 trader_id,
786 cancel.strategy_id,
787 cancel.instrument_id,
788 cancel.client_order_id,
789 format!(
790 "batch-cancel-error: code={}, msg={}",
791 error.code, error.msg
792 )
793 .into(),
794 UUID4::new(),
795 clock.get_time_ns(),
796 cancel.ts_init,
797 false,
798 cancel.venue_order_id,
799 Some(account_id),
800 );
801
802 event_emitter.send_order_event(OrderEventAny::CancelRejected(
803 rejected_event,
804 ));
805 }
806 }
807 }
808 }
809 Err(e) => {
810 for cancel in chunk {
811 let rejected_event = OrderCancelRejected::new(
812 trader_id,
813 cancel.strategy_id,
814 cancel.instrument_id,
815 cancel.client_order_id,
816 format!("batch-cancel-request-failed: {e}").into(),
817 UUID4::new(),
818 clock.get_time_ns(),
819 cancel.ts_init,
820 false,
821 cancel.venue_order_id,
822 Some(account_id),
823 );
824
825 event_emitter
826 .send_order_event(OrderEventAny::CancelRejected(rejected_event));
827 }
828 }
829 }
830 }
831
832 Ok(())
833 });
834
835 Ok(())
836 }
837
838 async fn generate_order_status_report(
839 &self,
840 cmd: &GenerateOrderStatusReport,
841 ) -> anyhow::Result<Option<OrderStatusReport>> {
842 let Some(instrument_id) = cmd.instrument_id else {
843 log::warn!("generate_order_status_report requires instrument_id: {cmd:?}");
844 return Ok(None);
845 };
846
847 let venue_order_id = cmd
849 .venue_order_id
850 .as_ref()
851 .map(|id| VenueOrderId::new(id.inner()));
852
853 let report = self
854 .http_client
855 .request_order_status_report(
856 self.core.account_id,
857 instrument_id,
858 venue_order_id,
859 cmd.client_order_id,
860 )
861 .await?;
862
863 Ok(Some(report))
864 }
865
866 async fn generate_order_status_reports(
867 &self,
868 cmd: &GenerateOrderStatusReports,
869 ) -> anyhow::Result<Vec<OrderStatusReport>> {
870 let start_dt = cmd.start.map(|nanos| nanos.to_datetime_utc());
871 let end_dt = cmd.end.map(|nanos| nanos.to_datetime_utc());
872
873 let reports = self
874 .http_client
875 .request_order_status_reports(
876 self.core.account_id,
877 cmd.instrument_id,
878 start_dt,
879 end_dt,
880 cmd.open_only,
881 None, )
883 .await?;
884
885 Ok(reports)
886 }
887
888 async fn generate_fill_reports(
889 &self,
890 cmd: GenerateFillReports,
891 ) -> anyhow::Result<Vec<FillReport>> {
892 let Some(instrument_id) = cmd.instrument_id else {
893 log::warn!("generate_fill_reports requires instrument_id for Binance Spot");
894 return Ok(Vec::new());
895 };
896
897 let venue_order_id = cmd
899 .venue_order_id
900 .as_ref()
901 .map(|id| VenueOrderId::new(id.inner()));
902
903 let start_dt = cmd.start.map(|nanos| nanos.to_datetime_utc());
904 let end_dt = cmd.end.map(|nanos| nanos.to_datetime_utc());
905
906 let reports = self
907 .http_client
908 .request_fill_reports(
909 self.core.account_id,
910 instrument_id,
911 venue_order_id,
912 start_dt,
913 end_dt,
914 None, )
916 .await?;
917
918 Ok(reports)
919 }
920
921 async fn generate_position_status_reports(
922 &self,
923 _cmd: &GeneratePositionStatusReports,
924 ) -> anyhow::Result<Vec<PositionStatusReport>> {
925 Ok(Vec::new())
928 }
929
930 async fn generate_mass_status(
931 &self,
932 lookback_mins: Option<u64>,
933 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
934 log::info!("Generating ExecutionMassStatus (lookback_mins={lookback_mins:?})");
935
936 let ts_now = self.clock.get_time_ns();
937
938 let start = lookback_mins.map(|mins| {
939 let lookback_ns = mins * 60 * 1_000_000_000;
940 UnixNanos::from(ts_now.as_u64().saturating_sub(lookback_ns))
941 });
942
943 let order_cmd = GenerateOrderStatusReportsBuilder::default()
946 .ts_init(ts_now)
947 .open_only(true)
948 .start(start)
949 .build()
950 .map_err(|e| anyhow::anyhow!("{e}"))?;
951
952 let position_cmd = GeneratePositionStatusReportsBuilder::default()
953 .ts_init(ts_now)
954 .start(start)
955 .build()
956 .map_err(|e| anyhow::anyhow!("{e}"))?;
957
958 let (order_reports, position_reports) = tokio::try_join!(
959 self.generate_order_status_reports(&order_cmd),
960 self.generate_position_status_reports(&position_cmd),
961 )?;
962
963 log::info!("Received {} OrderStatusReports", order_reports.len());
967 log::info!("Received {} PositionReports", position_reports.len());
968
969 let mut mass_status = ExecutionMassStatus::new(
970 self.core.client_id,
971 self.core.account_id,
972 *BINANCE_VENUE,
973 ts_now,
974 None,
975 );
976
977 mass_status.add_order_reports(order_reports);
978 mass_status.add_position_reports(position_reports);
979
980 Ok(Some(mass_status))
981 }
982}