1use std::sync::{
42 Arc, Mutex,
43 atomic::{AtomicU32, AtomicU64},
44};
45
46use anyhow::Context;
47use async_trait::async_trait;
48use chrono::{Duration, Utc};
49use dashmap::DashMap;
50use nautilus_common::{
51 live::{runner::get_exec_event_sender, runtime::get_runtime},
52 messages::{
53 ExecutionEvent, ExecutionReport as NautilusExecutionReport,
54 execution::{
55 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
56 GenerateOrderStatusReport, GeneratePositionReports, ModifyOrder, QueryAccount,
57 QueryOrder, SubmitOrder, SubmitOrderList,
58 },
59 },
60 msgbus,
61};
62use nautilus_core::{MUTEX_POISONED, UUID4, UnixNanos};
63use nautilus_execution::client::{ExecutionClient, base::ExecutionClientCore};
64use nautilus_live::execution::client::LiveExecutionClient;
65use nautilus_model::{
66 accounts::AccountAny,
67 enums::{OmsType, OrderSide, OrderType, TimeInForce},
68 events::{AccountState, OrderCancelRejected, OrderEventAny, OrderRejected},
69 identifiers::{AccountId, ClientId, ClientOrderId, InstrumentId, StrategyId, Venue},
70 instruments::{Instrument, InstrumentAny},
71 orders::Order,
72 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
73 types::{AccountBalance, MarginBalance},
74};
75use rust_decimal::Decimal;
76use tokio::task::JoinHandle;
77
78use crate::{
79 common::{consts::DYDX_VENUE, credential::DydxCredential, parse::nanos_to_secs_i64},
80 config::DydxAdapterConfig,
81 execution::submitter::OrderSubmitter,
82 grpc::{DydxGrpcClient, Wallet, types::ChainId},
83 http::client::DydxHttpClient,
84 websocket::{client::DydxWebSocketClient, enums::NautilusWsMessage},
85};
86
87pub mod submitter;
88
89pub const MAX_CLIENT_ID: u32 = u32::MAX;
95
96enum ExecutionReport {
101 Order(Box<OrderStatusReport>),
102 Fill(Box<FillReport>),
103}
104
105#[derive(Debug)]
120pub struct DydxExecutionClient {
121 core: ExecutionClientCore,
122 config: DydxAdapterConfig,
123 http_client: DydxHttpClient,
124 ws_client: DydxWebSocketClient,
125 grpc_client: Arc<tokio::sync::RwLock<DydxGrpcClient>>,
126 wallet: Arc<tokio::sync::RwLock<Option<Wallet>>>,
127 instruments: DashMap<InstrumentId, InstrumentAny>,
128 market_to_instrument: DashMap<String, InstrumentId>,
129 clob_pair_id_to_instrument: DashMap<u32, InstrumentId>,
130 block_height: AtomicU64,
131 oracle_prices: DashMap<InstrumentId, Decimal>,
132 client_id_to_int: DashMap<String, u32>,
133 int_to_client_id: DashMap<u32, String>,
134 next_client_id: AtomicU32,
135 wallet_address: String,
136 subaccount_number: u32,
137 started: bool,
138 connected: bool,
139 instruments_initialized: bool,
140 ws_stream_handle: Option<JoinHandle<()>>,
141 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
142}
143
144impl DydxExecutionClient {
145 pub fn new(
151 core: ExecutionClientCore,
152 config: DydxAdapterConfig,
153 wallet_address: String,
154 subaccount_number: u32,
155 ) -> anyhow::Result<Self> {
156 let http_client = DydxHttpClient::default();
157
158 let ws_client = if let Some(ref mnemonic) = config.mnemonic {
160 let credential = DydxCredential::from_mnemonic(
161 mnemonic,
162 subaccount_number,
163 config.authenticator_ids.clone(),
164 )?;
165 DydxWebSocketClient::new_private(
166 config.ws_url.clone(),
167 credential,
168 core.account_id,
169 Some(20),
170 )
171 } else {
172 DydxWebSocketClient::new_public(config.ws_url.clone(), Some(20))
173 };
174
175 let grpc_urls = config.get_grpc_urls();
176 let grpc_client = Arc::new(tokio::sync::RwLock::new(
177 get_runtime()
178 .block_on(async { DydxGrpcClient::new_with_fallback(&grpc_urls).await })
179 .context("failed to construct dYdX gRPC client")?,
180 ));
181
182 Ok(Self {
183 core,
184 config,
185 http_client,
186 ws_client,
187 grpc_client,
188 wallet: Arc::new(tokio::sync::RwLock::new(None)),
189 instruments: DashMap::new(),
190 market_to_instrument: DashMap::new(),
191 clob_pair_id_to_instrument: DashMap::new(),
192 block_height: AtomicU64::new(0),
193 oracle_prices: DashMap::new(),
194 client_id_to_int: DashMap::new(),
195 int_to_client_id: DashMap::new(),
196 next_client_id: AtomicU32::new(1),
197 wallet_address,
198 subaccount_number,
199 started: false,
200 connected: false,
201 instruments_initialized: false,
202 ws_stream_handle: None,
203 pending_tasks: Mutex::new(Vec::new()),
204 })
205 }
206
207 fn generate_client_order_id_int(&self, client_order_id: &str) -> u32 {
227 if let Some(existing) = self.client_id_to_int.get(client_order_id) {
229 return *existing.value();
230 }
231
232 if let Ok(id) = client_order_id.parse::<u32>() {
234 self.client_id_to_int
235 .insert(client_order_id.to_string(), id);
236 self.int_to_client_id
237 .insert(id, client_order_id.to_string());
238 return id;
239 }
240
241 use dashmap::mapref::entry::Entry;
243
244 match self.client_id_to_int.entry(client_order_id.to_string()) {
245 Entry::Occupied(entry) => *entry.get(),
246 Entry::Vacant(vacant) => {
247 let id = self
248 .next_client_id
249 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
250 vacant.insert(id);
251 self.int_to_client_id
252 .insert(id, client_order_id.to_string());
253 id
254 }
255 }
256 }
257
258 fn get_client_order_id_int(&self, client_order_id: &str) -> Option<u32> {
262 if let Ok(id) = client_order_id.parse::<u32>() {
264 return Some(id);
265 }
266
267 self.client_id_to_int
269 .get(client_order_id)
270 .map(|entry| *entry.value())
271 }
272
273 fn get_chain_id(&self) -> ChainId {
277 self.config.get_chain_id()
278 }
279
280 fn cache_instruments_from_http(&mut self) {
287 use nautilus_model::instruments::InstrumentAny;
288
289 let instruments: Vec<InstrumentAny> = self
291 .http_client
292 .instruments_cache
293 .iter()
294 .map(|entry| entry.value().clone())
295 .collect();
296
297 tracing::debug!(
298 "Caching {} instruments in execution client",
299 instruments.len()
300 );
301
302 for instrument in instruments {
303 let instrument_id = instrument.id();
304 let symbol = instrument_id.symbol.as_str();
305
306 self.instruments.insert(instrument_id, instrument.clone());
308
309 self.market_to_instrument
311 .insert(symbol.to_string(), instrument_id);
312 }
313
314 let http_mapping = self.http_client.clob_pair_id_mapping();
317 for entry in http_mapping.iter() {
318 self.clob_pair_id_to_instrument
319 .insert(*entry.key(), *entry.value());
320 }
321
322 self.instruments_initialized = true;
323 tracing::info!(
324 "Cached {} instruments ({} CLOB pair IDs) with market mappings",
325 self.instruments.len(),
326 self.clob_pair_id_to_instrument.len()
327 );
328 }
329
330 fn get_instrument_by_market(&self, market: &str) -> Option<InstrumentAny> {
332 self.market_to_instrument
333 .get(market)
334 .and_then(|id| self.instruments.get(&id).map(|entry| entry.value().clone()))
335 }
336
337 fn get_instrument_by_clob_pair_id(&self, clob_pair_id: u32) -> Option<InstrumentAny> {
339 let instrument = self
340 .clob_pair_id_to_instrument
341 .get(&clob_pair_id)
342 .and_then(|id| self.instruments.get(&id).map(|entry| entry.value().clone()));
343
344 if instrument.is_none() {
345 self.log_missing_instrument_for_clob_pair_id(clob_pair_id);
346 }
347
348 instrument
349 }
350
351 fn log_missing_instrument_for_clob_pair_id(&self, clob_pair_id: u32) {
352 let known: Vec<(u32, String)> = self
353 .clob_pair_id_to_instrument
354 .iter()
355 .filter_map(|entry| {
356 let instrument_id = entry.value();
357 self.instruments.get(instrument_id).map(|inst_entry| {
358 (
359 *entry.key(),
360 inst_entry.value().id().symbol.as_str().to_string(),
361 )
362 })
363 })
364 .collect();
365
366 tracing::warn!(
367 "Instrument for clob_pair_id {} not found in cache. Known CLOB pair IDs and symbols: {:?}",
368 clob_pair_id,
369 known
370 );
371 }
372
373 fn spawn_task<F>(&self, label: &'static str, fut: F)
374 where
375 F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
376 {
377 let handle = tokio::spawn(async move {
378 if let Err(e) = fut.await {
379 tracing::error!("{label}: {e:?}");
380 }
381 });
382
383 self.pending_tasks
384 .lock()
385 .expect(MUTEX_POISONED)
386 .push(handle);
387 }
388
389 fn spawn_order_task<F>(
393 &self,
394 label: &'static str,
395 strategy_id: StrategyId,
396 instrument_id: InstrumentId,
397 client_order_id: ClientOrderId,
398 fut: F,
399 ) where
400 F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
401 {
402 let trader_id = self.core.trader_id;
404 let account_id = self.core.account_id;
405 let sender = get_exec_event_sender();
406
407 let handle = tokio::spawn(async move {
408 if let Err(e) = fut.await {
409 let error_msg = format!("{label} failed: {e:?}");
410 tracing::error!("{}", error_msg);
411
412 let ts_now = UnixNanos::default(); let event = OrderRejected::new(
415 trader_id,
416 strategy_id,
417 instrument_id,
418 client_order_id,
419 account_id,
420 error_msg.into(),
421 UUID4::new(),
422 ts_now,
423 ts_now,
424 false,
425 false,
426 );
427
428 if let Err(send_err) =
429 sender.send(ExecutionEvent::Order(OrderEventAny::Rejected(event)))
430 {
431 tracing::error!("Failed to send OrderRejected event: {send_err}");
432 }
433 }
434 });
435
436 self.pending_tasks
437 .lock()
438 .expect(MUTEX_POISONED)
439 .push(handle);
440 }
441
442 fn abort_pending_tasks(&self) {
443 let mut guard = self.pending_tasks.lock().expect(MUTEX_POISONED);
444 for handle in guard.drain(..) {
445 handle.abort();
446 }
447 }
448}
449
450#[async_trait(?Send)]
451impl ExecutionClient for DydxExecutionClient {
452 fn is_connected(&self) -> bool {
453 self.connected
454 }
455
456 fn client_id(&self) -> ClientId {
457 self.core.client_id
458 }
459
460 fn account_id(&self) -> AccountId {
461 self.core.account_id
462 }
463
464 fn venue(&self) -> Venue {
465 *DYDX_VENUE
466 }
467
468 fn oms_type(&self) -> OmsType {
469 self.core.oms_type
470 }
471
472 fn get_account(&self) -> Option<AccountAny> {
473 self.core.get_account()
474 }
475
476 fn generate_account_state(
477 &self,
478 balances: Vec<AccountBalance>,
479 margins: Vec<MarginBalance>,
480 reported: bool,
481 ts_event: UnixNanos,
482 ) -> anyhow::Result<()> {
483 self.core
484 .generate_account_state(balances, margins, reported, ts_event)
485 }
486
487 fn start(&mut self) -> anyhow::Result<()> {
488 if self.started {
489 tracing::warn!("dYdX execution client already started");
490 return Ok(());
491 }
492
493 tracing::info!("Starting dYdX execution client");
494 self.started = true;
495 Ok(())
496 }
497
498 fn stop(&mut self) -> anyhow::Result<()> {
499 if !self.started {
500 tracing::warn!("dYdX execution client not started");
501 return Ok(());
502 }
503
504 tracing::info!("Stopping dYdX execution client");
505 self.abort_pending_tasks();
506 self.started = false;
507 self.connected = false;
508 Ok(())
509 }
510
511 fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
528 let order = cmd.order.clone();
529
530 if !self.is_connected() {
532 let reason = "Cannot submit order: execution client not connected";
533 tracing::error!("{}", reason);
534 anyhow::bail!(reason);
535 }
536
537 if order.is_closed() {
539 tracing::warn!("Cannot submit closed order {}", order.client_order_id());
540 return Ok(());
541 }
542
543 match order.order_type() {
545 OrderType::Market | OrderType::Limit => {
546 tracing::debug!(
547 "Submitting {} order: {}",
548 if matches!(order.order_type(), OrderType::Market) {
549 "MARKET"
550 } else {
551 "LIMIT"
552 },
553 order.client_order_id()
554 );
555 }
556 OrderType::StopMarket | OrderType::StopLimit => {
558 tracing::debug!(
559 "Submitting {} order: {}",
560 if matches!(order.order_type(), OrderType::StopMarket) {
561 "STOP_MARKET"
562 } else {
563 "STOP_LIMIT"
564 },
565 order.client_order_id()
566 );
567 }
568 OrderType::MarketIfTouched | OrderType::LimitIfTouched => {
570 tracing::debug!(
571 "Submitting {} order: {}",
572 if matches!(order.order_type(), OrderType::MarketIfTouched) {
573 "TAKE_PROFIT_MARKET"
574 } else {
575 "TAKE_PROFIT_LIMIT"
576 },
577 order.client_order_id()
578 );
579 }
580 OrderType::TrailingStopMarket | OrderType::TrailingStopLimit => {
582 let reason = "Trailing stop orders not supported by dYdX v4 protocol";
583 tracing::error!("{}", reason);
584 self.core.generate_order_rejected(
585 order.strategy_id(),
586 order.instrument_id(),
587 order.client_order_id(),
588 reason,
589 cmd.ts_init,
590 false,
591 );
592 return Ok(());
593 }
594 order_type => {
595 let reason = format!("Order type {order_type:?} not supported by dYdX");
596 tracing::error!("{}", reason);
597 self.core.generate_order_rejected(
598 order.strategy_id(),
599 order.instrument_id(),
600 order.client_order_id(),
601 &reason,
602 cmd.ts_init,
603 false,
604 );
605 return Ok(());
606 }
607 }
608
609 self.core.generate_order_submitted(
611 order.strategy_id(),
612 order.instrument_id(),
613 order.client_order_id(),
614 cmd.ts_init,
615 );
616
617 let grpc_client = self.grpc_client.clone();
618 let wallet = self.wallet.clone();
619 let http_client = self.http_client.clone();
620 let wallet_address = self.wallet_address.clone();
621 let subaccount_number = self.subaccount_number;
622 let client_order_id = order.client_order_id();
623 let instrument_id = order.instrument_id();
624 let block_height = self.block_height.load(std::sync::atomic::Ordering::Relaxed) as u32;
625 let chain_id = self.get_chain_id();
626 let authenticator_ids = self.config.authenticator_ids.clone();
627 #[allow(clippy::redundant_clone)]
628 let order_clone = order.clone();
629
630 let client_id_u32 = self.generate_client_order_id_int(client_order_id.as_str());
632
633 self.spawn_order_task(
634 "submit_order",
635 order.strategy_id(),
636 order.instrument_id(),
637 order.client_order_id(),
638 async move {
639 let wallet_guard = wallet.read().await;
640 let wallet_ref = wallet_guard
641 .as_ref()
642 .ok_or_else(|| anyhow::anyhow!("Wallet not initialized"))?;
643
644 let grpc_guard = grpc_client.read().await;
645 let submitter = OrderSubmitter::new(
646 (*grpc_guard).clone(),
647 http_client.clone(),
648 wallet_address,
649 subaccount_number,
650 chain_id,
651 authenticator_ids,
652 );
653
654 match order_clone.order_type() {
656 OrderType::Market => {
657 submitter
658 .submit_market_order(
659 wallet_ref,
660 instrument_id,
661 client_id_u32,
662 order_clone.order_side(),
663 order_clone.quantity(),
664 block_height,
665 )
666 .await?;
667 tracing::info!("Successfully submitted market order: {}", client_order_id);
668 }
669 OrderType::Limit => {
670 let expire_time = order_clone.expire_time().map(nanos_to_secs_i64);
671 submitter
672 .submit_limit_order(
673 wallet_ref,
674 instrument_id,
675 client_id_u32,
676 order_clone.order_side(),
677 order_clone
678 .price()
679 .ok_or_else(|| anyhow::anyhow!("Limit order missing price"))?,
680 order_clone.quantity(),
681 order_clone.time_in_force(),
682 order_clone.is_post_only(),
683 order_clone.is_reduce_only(),
684 block_height,
685 expire_time,
686 )
687 .await?;
688 tracing::info!("Successfully submitted limit order: {}", client_order_id);
689 }
690 OrderType::StopMarket => {
691 let trigger_price = order_clone.trigger_price().ok_or_else(|| {
692 anyhow::anyhow!("Stop market order missing trigger_price")
693 })?;
694 let expire_time = order_clone.expire_time().map(nanos_to_secs_i64);
695 submitter
696 .submit_stop_market_order(
697 wallet_ref,
698 instrument_id,
699 client_id_u32,
700 order_clone.order_side(),
701 trigger_price,
702 order_clone.quantity(),
703 order_clone.is_reduce_only(),
704 expire_time,
705 )
706 .await?;
707 tracing::info!(
708 "Successfully submitted stop market order: {}",
709 client_order_id
710 );
711 }
712 OrderType::StopLimit => {
713 let trigger_price = order_clone.trigger_price().ok_or_else(|| {
714 anyhow::anyhow!("Stop limit order missing trigger_price")
715 })?;
716 let limit_price = order_clone.price().ok_or_else(|| {
717 anyhow::anyhow!("Stop limit order missing limit price")
718 })?;
719 let expire_time = order_clone.expire_time().map(nanos_to_secs_i64);
720 submitter
721 .submit_stop_limit_order(
722 wallet_ref,
723 instrument_id,
724 client_id_u32,
725 order_clone.order_side(),
726 trigger_price,
727 limit_price,
728 order_clone.quantity(),
729 order_clone.time_in_force(),
730 order_clone.is_post_only(),
731 order_clone.is_reduce_only(),
732 expire_time,
733 )
734 .await?;
735 tracing::info!(
736 "Successfully submitted stop limit order: {}",
737 client_order_id
738 );
739 }
740 OrderType::MarketIfTouched => {
742 let trigger_price = order_clone.trigger_price().ok_or_else(|| {
743 anyhow::anyhow!("Take profit market order missing trigger_price")
744 })?;
745 let expire_time = order_clone.expire_time().map(nanos_to_secs_i64);
746 submitter
747 .submit_take_profit_market_order(
748 wallet_ref,
749 instrument_id,
750 client_id_u32,
751 order_clone.order_side(),
752 trigger_price,
753 order_clone.quantity(),
754 order_clone.is_reduce_only(),
755 expire_time,
756 )
757 .await?;
758 tracing::info!(
759 "Successfully submitted take profit market order: {}",
760 client_order_id
761 );
762 }
763 OrderType::LimitIfTouched => {
765 let trigger_price = order_clone.trigger_price().ok_or_else(|| {
766 anyhow::anyhow!("Take profit limit order missing trigger_price")
767 })?;
768 let limit_price = order_clone.price().ok_or_else(|| {
769 anyhow::anyhow!("Take profit limit order missing limit price")
770 })?;
771 let expire_time = order_clone.expire_time().map(nanos_to_secs_i64);
772 submitter
773 .submit_take_profit_limit_order(
774 wallet_ref,
775 instrument_id,
776 client_id_u32,
777 order_clone.order_side(),
778 trigger_price,
779 limit_price,
780 order_clone.quantity(),
781 order_clone.time_in_force(),
782 order_clone.is_post_only(),
783 order_clone.is_reduce_only(),
784 expire_time,
785 )
786 .await?;
787 tracing::info!(
788 "Successfully submitted take profit limit order: {}",
789 client_order_id
790 );
791 }
792 _ => unreachable!("Order type already validated"),
793 }
794
795 Ok(())
796 },
797 );
798
799 Ok(())
800 }
801
802 fn submit_order_list(&self, _cmd: &SubmitOrderList) -> anyhow::Result<()> {
803 anyhow::bail!("Order lists not supported by dYdX")
804 }
805
806 fn modify_order(&self, _cmd: &ModifyOrder) -> anyhow::Result<()> {
807 anyhow::bail!("Order modification not supported by dYdX")
808 }
809
810 fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
827 if !self.is_connected() {
828 anyhow::bail!("Cannot cancel order: not connected");
829 }
830
831 let client_order_id = cmd.client_order_id;
832
833 let cache = self.core.cache();
835 let cache_borrow = cache.borrow();
836
837 let order = match cache_borrow.order(&client_order_id) {
838 Some(order) => order,
839 None => {
840 tracing::error!(
841 "Cannot cancel order {}: not found in cache",
842 client_order_id
843 );
844 return Ok(()); }
846 };
847
848 if order.is_closed() {
850 tracing::warn!(
851 "CancelOrder command for {} when order already {} (will not send to exchange)",
852 client_order_id,
853 order.status()
854 );
855 return Ok(());
856 }
857
858 let instrument_id = cmd.instrument_id;
860 let instrument = match cache_borrow.instrument(&instrument_id) {
861 Some(instrument) => instrument,
862 None => {
863 tracing::error!(
864 "Cannot cancel order {}: instrument {} not found in cache",
865 client_order_id,
866 instrument_id
867 );
868 return Ok(()); }
870 };
871
872 tracing::debug!(
873 "Cancelling order {} for instrument {}",
874 client_order_id,
875 instrument.id()
876 );
877
878 let grpc_client = self.grpc_client.clone();
879 let wallet = self.wallet.clone();
880 let http_client = self.http_client.clone();
881 let wallet_address = self.wallet_address.clone();
882 let subaccount_number = self.subaccount_number;
883 let block_height = self.block_height.load(std::sync::atomic::Ordering::Relaxed) as u32;
884 let chain_id = self.get_chain_id();
885 let authenticator_ids = self.config.authenticator_ids.clone();
886 let trader_id = cmd.trader_id;
887 let strategy_id = cmd.strategy_id;
888 let venue_order_id = cmd.venue_order_id;
889
890 let client_id_u32 = match self.get_client_order_id_int(client_order_id.as_str()) {
892 Some(id) => id,
893 None => {
894 tracing::error!("Client order ID {} not found in cache", client_order_id);
895 anyhow::bail!("Client order ID not found in cache")
896 }
897 };
898
899 self.spawn_task("cancel_order", async move {
900 let wallet_guard = wallet.read().await;
901 let wallet_ref = wallet_guard
902 .as_ref()
903 .ok_or_else(|| anyhow::anyhow!("Wallet not initialized"))?;
904
905 let grpc_guard = grpc_client.read().await;
906 let submitter = OrderSubmitter::new(
907 (*grpc_guard).clone(),
908 http_client.clone(),
909 wallet_address,
910 subaccount_number,
911 chain_id,
912 authenticator_ids,
913 );
914
915 match submitter
917 .cancel_order(wallet_ref, instrument_id, client_id_u32, block_height)
918 .await
919 {
920 Ok(_) => {
921 tracing::info!("Successfully cancelled order: {}", client_order_id);
922 }
923 Err(e) => {
924 tracing::error!("Failed to cancel order {}: {:?}", client_order_id, e);
925
926 let sender = get_exec_event_sender();
928 let ts_now = UnixNanos::default();
929 let event = OrderCancelRejected::new(
930 trader_id,
931 strategy_id,
932 instrument_id,
933 client_order_id,
934 format!("Cancel order failed: {e:?}").into(),
935 UUID4::new(),
936 ts_now,
937 ts_now,
938 false,
939 Some(venue_order_id),
940 None, );
942 sender
943 .send(ExecutionEvent::Order(OrderEventAny::CancelRejected(event)))
944 .unwrap();
945 }
946 }
947
948 Ok(())
949 });
950
951 Ok(())
952 }
953
954 fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
955 if !self.is_connected() {
956 anyhow::bail!("Cannot cancel orders: not connected");
957 }
958
959 let cache = self.core.cache().borrow();
961 let mut open_orders: Vec<_> = cache
962 .orders_open(None, None, None, None)
963 .into_iter()
964 .collect();
965
966 let instrument_id = cmd.instrument_id;
968 open_orders.retain(|order| order.instrument_id() == instrument_id);
969
970 if cmd.order_side != OrderSide::NoOrderSide {
972 let order_side = cmd.order_side;
973 open_orders.retain(|order| order.order_side() == order_side);
974 }
975
976 let mut short_term_orders = Vec::new();
980 let mut long_term_orders = Vec::new();
981
982 for order in &open_orders {
983 match order.time_in_force() {
984 TimeInForce::Ioc | TimeInForce::Fok => short_term_orders.push(order),
985 TimeInForce::Gtc
986 | TimeInForce::Gtd
987 | TimeInForce::Day
988 | TimeInForce::AtTheOpen
989 | TimeInForce::AtTheClose => long_term_orders.push(order),
990 }
991 }
992
993 tracing::info!(
994 "Cancel all orders: total={}, short_term={}, long_term={}, instrument_id={}, order_side={:?}",
995 open_orders.len(),
996 short_term_orders.len(),
997 long_term_orders.len(),
998 instrument_id,
999 cmd.order_side
1000 );
1001
1002 let grpc_client = self.grpc_client.clone();
1004 let wallet = self.wallet.clone();
1005 let http_client = self.http_client.clone();
1006 let wallet_address = self.wallet_address.clone();
1007 let subaccount_number = self.subaccount_number;
1008 let block_height = self.block_height.load(std::sync::atomic::Ordering::Relaxed) as u32;
1009 let chain_id = self.get_chain_id();
1010 let authenticator_ids = self.config.authenticator_ids.clone();
1011
1012 let mut orders_to_cancel = Vec::new();
1014 for order in &open_orders {
1015 let client_order_id = order.client_order_id();
1016 if let Some(client_id_u32) = self.get_client_order_id_int(client_order_id.as_str()) {
1017 orders_to_cancel.push((instrument_id, client_id_u32));
1018 } else {
1019 tracing::warn!(
1020 "Cannot cancel order {}: client_order_id not found in cache",
1021 client_order_id
1022 );
1023 }
1024 }
1025
1026 self.spawn_task("cancel_all_orders", async move {
1027 let wallet_guard = wallet.read().await;
1028 let wallet_ref = wallet_guard
1029 .as_ref()
1030 .ok_or_else(|| anyhow::anyhow!("Wallet not initialized"))?;
1031
1032 let grpc_guard = grpc_client.read().await;
1033 let submitter = OrderSubmitter::new(
1034 (*grpc_guard).clone(),
1035 http_client.clone(),
1036 wallet_address,
1037 subaccount_number,
1038 chain_id,
1039 authenticator_ids,
1040 );
1041
1042 match submitter
1044 .cancel_orders_batch(wallet_ref, &orders_to_cancel, block_height)
1045 .await
1046 {
1047 Ok(_) => {
1048 tracing::info!("Successfully cancelled {} orders", orders_to_cancel.len());
1049 }
1050 Err(e) => {
1051 tracing::error!("Batch cancel failed: {:?}", e);
1052 }
1053 }
1054
1055 Ok(())
1056 });
1057
1058 Ok(())
1059 }
1060
1061 fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
1062 if cmd.cancels.is_empty() {
1063 return Ok(());
1064 }
1065
1066 if !self.is_connected() {
1067 anyhow::bail!("Cannot cancel orders: not connected");
1068 }
1069
1070 let mut orders_to_cancel = Vec::with_capacity(cmd.cancels.len());
1072 for cancel in &cmd.cancels {
1073 let client_id_str = cancel.client_order_id.as_str();
1074 let client_id_u32 = match self.get_client_order_id_int(client_id_str) {
1075 Some(id) => id,
1076 None => {
1077 tracing::warn!(
1078 "No u32 mapping found for client_order_id={}, skipping cancel",
1079 client_id_str
1080 );
1081 continue;
1082 }
1083 };
1084 orders_to_cancel.push((cancel.instrument_id, client_id_u32));
1085 }
1086
1087 if orders_to_cancel.is_empty() {
1088 tracing::warn!("No valid orders to cancel in batch");
1089 return Ok(());
1090 }
1091
1092 let grpc_client = self.grpc_client.clone();
1093 let wallet = self.wallet.clone();
1094 let http_client = self.http_client.clone();
1095 let wallet_address = self.wallet_address.clone();
1096 let subaccount_number = self.subaccount_number;
1097 let block_height = self.block_height.load(std::sync::atomic::Ordering::Relaxed) as u32;
1098 let chain_id = self.get_chain_id();
1099 let authenticator_ids = self.config.authenticator_ids.clone();
1100
1101 tracing::info!(
1102 "Batch cancelling {} orders: {:?}",
1103 orders_to_cancel.len(),
1104 orders_to_cancel
1105 );
1106
1107 self.spawn_task("batch_cancel_orders", async move {
1108 let wallet_guard = wallet.read().await;
1109 let wallet_ref = wallet_guard
1110 .as_ref()
1111 .ok_or_else(|| anyhow::anyhow!("Wallet not initialized"))?;
1112
1113 let grpc_guard = grpc_client.read().await;
1114 let submitter = OrderSubmitter::new(
1115 (*grpc_guard).clone(),
1116 http_client.clone(),
1117 wallet_address,
1118 subaccount_number,
1119 chain_id,
1120 authenticator_ids,
1121 );
1122
1123 match submitter
1124 .cancel_orders_batch(wallet_ref, &orders_to_cancel, block_height)
1125 .await
1126 {
1127 Ok(()) => {
1128 tracing::info!(
1129 "Successfully batch cancelled {} orders",
1130 orders_to_cancel.len()
1131 );
1132 }
1133 Err(e) => {
1134 tracing::error!("Batch cancel failed: {:?}", e);
1135 }
1136 }
1137
1138 Ok(())
1139 });
1140
1141 Ok(())
1142 }
1143
1144 fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
1145 Ok(())
1146 }
1147
1148 fn query_order(&self, _cmd: &QueryOrder) -> anyhow::Result<()> {
1149 Ok(())
1150 }
1151
1152 async fn connect(&mut self) -> anyhow::Result<()> {
1153 if self.connected {
1154 tracing::warn!("dYdX execution client already connected");
1155 return Ok(());
1156 }
1157
1158 tracing::info!("Connecting to dYdX");
1159
1160 tracing::debug!("Loading instruments from HTTP API");
1163 self.http_client.fetch_and_cache_instruments().await?;
1164 tracing::info!(
1165 "Loaded {} instruments from HTTP",
1166 self.http_client.instruments_cache.len()
1167 );
1168
1169 self.cache_instruments_from_http();
1171
1172 if let Some(mnemonic) = &self.config.mnemonic {
1174 let wallet = Wallet::from_mnemonic(mnemonic)?;
1175 *self.wallet.write().await = Some(wallet);
1176 tracing::debug!("Wallet initialized");
1177 }
1178
1179 self.ws_client.connect().await?;
1181 tracing::debug!("WebSocket connected");
1182
1183 self.ws_client.subscribe_block_height().await?;
1185 tracing::debug!("Subscribed to block height updates");
1186
1187 self.ws_client.subscribe_markets().await?;
1189 tracing::debug!("Subscribed to markets");
1190
1191 if self.config.mnemonic.is_some() {
1193 self.ws_client
1194 .subscribe_subaccount(&self.wallet_address, self.subaccount_number)
1195 .await?;
1196 tracing::debug!(
1197 "Subscribed to subaccount updates: {}/{}",
1198 self.wallet_address,
1199 self.subaccount_number
1200 );
1201
1202 if let Some(mut rx) = self.ws_client.take_receiver() {
1205 let account_id = self.core.account_id;
1207 let instruments = self.instruments.clone();
1208 let oracle_prices = self.oracle_prices.clone();
1209 let clob_pair_id_to_instrument = self.clob_pair_id_to_instrument.clone();
1210
1211 let handle = tokio::spawn(async move {
1212 while let Some(msg) = rx.recv().await {
1213 match msg {
1214 NautilusWsMessage::Order(report) => {
1215 tracing::debug!("Received order update: {:?}", report.order_status);
1216 dispatch_execution_report(ExecutionReport::Order(report));
1217 }
1218 NautilusWsMessage::Fill(report) => {
1219 tracing::debug!("Received fill update");
1220 dispatch_execution_report(ExecutionReport::Fill(report));
1221 }
1222 NautilusWsMessage::Position(report) => {
1223 tracing::debug!("Received position update");
1224 let sender = get_exec_event_sender();
1226 let exec_report =
1227 NautilusExecutionReport::Position(Box::new(*report));
1228 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1229 tracing::warn!("Failed to send position status report: {e}");
1230 }
1231 }
1232 NautilusWsMessage::AccountState(state) => {
1233 tracing::debug!("Received account state update");
1234 dispatch_account_state(*state);
1235 }
1236 NautilusWsMessage::SubaccountSubscribed(msg) => {
1237 tracing::debug!(
1238 "Parsing subaccount subscription with full context"
1239 );
1240
1241 let inst_map: std::collections::HashMap<_, _> = instruments
1243 .iter()
1244 .map(|entry| (*entry.key(), entry.value().clone()))
1245 .collect();
1246
1247 let oracle_map: std::collections::HashMap<_, _> = oracle_prices
1249 .iter()
1250 .map(|entry| (*entry.key(), *entry.value()))
1251 .collect();
1252
1253 let ts_init =
1254 nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
1255 let ts_event = ts_init;
1256
1257 match crate::http::parse::parse_account_state(
1258 &msg.contents.subaccount,
1259 account_id,
1260 &inst_map,
1261 &oracle_map,
1262 ts_event,
1263 ts_init,
1264 ) {
1265 Ok(account_state) => {
1266 tracing::info!(
1267 "Parsed account state: {} balance(s), {} margin(s)",
1268 account_state.balances.len(),
1269 account_state.margins.len()
1270 );
1271 dispatch_account_state(account_state);
1272 }
1273 Err(e) => {
1274 tracing::error!("Failed to parse account state: {e}");
1275 }
1276 }
1277
1278 if let Some(ref positions) =
1280 msg.contents.subaccount.open_perpetual_positions
1281 {
1282 tracing::debug!(
1283 "Parsing {} position(s) from subscription",
1284 positions.len()
1285 );
1286
1287 for (market, ws_position) in positions {
1288 match crate::websocket::parse::parse_ws_position_report(
1289 ws_position,
1290 &instruments,
1291 account_id,
1292 ts_init,
1293 ) {
1294 Ok(report) => {
1295 tracing::debug!(
1296 "Parsed position report: {} {} {} {}",
1297 report.instrument_id,
1298 report.position_side,
1299 report.quantity,
1300 market
1301 );
1302 let sender = get_exec_event_sender();
1303 let exec_report = NautilusExecutionReport::Position(
1304 Box::new(report),
1305 );
1306 if let Err(e) =
1307 sender.send(ExecutionEvent::Report(exec_report))
1308 {
1309 tracing::warn!(
1310 "Failed to send position status report: {e}"
1311 );
1312 }
1313 }
1314 Err(e) => {
1315 tracing::error!(
1316 "Failed to parse WebSocket position for {}: {e}",
1317 market
1318 );
1319 }
1320 }
1321 }
1322 }
1323 }
1324 NautilusWsMessage::SubaccountsChannelData(data) => {
1325 tracing::debug!("Processing subaccounts channel data");
1326 let ts_init =
1327 nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
1328
1329 if let Some(ref orders) = data.contents.orders {
1331 for ws_order in orders {
1332 match crate::websocket::parse::parse_ws_order_report(
1333 ws_order,
1334 &clob_pair_id_to_instrument,
1335 &instruments,
1336 account_id,
1337 ts_init,
1338 ) {
1339 Ok(report) => {
1340 tracing::debug!(
1341 "Parsed order report: {} {} {} @ {}",
1342 report.instrument_id,
1343 report.order_side,
1344 report.order_status,
1345 report.quantity
1346 );
1347 let sender = get_exec_event_sender();
1348 let exec_report =
1349 NautilusExecutionReport::OrderStatus(Box::new(
1350 report,
1351 ));
1352 if let Err(e) =
1353 sender.send(ExecutionEvent::Report(exec_report))
1354 {
1355 tracing::warn!(
1356 "Failed to send order status report: {e}"
1357 );
1358 }
1359 }
1360 Err(e) => {
1361 tracing::error!(
1362 "Failed to parse WebSocket order: {e}"
1363 );
1364 }
1365 }
1366 }
1367 }
1368
1369 if let Some(ref fills) = data.contents.fills {
1371 for ws_fill in fills {
1372 match crate::websocket::parse::parse_ws_fill_report(
1373 ws_fill,
1374 &instruments,
1375 account_id,
1376 ts_init,
1377 ) {
1378 Ok(report) => {
1379 tracing::debug!(
1380 "Parsed fill report: {} {} {} @ {}",
1381 report.instrument_id,
1382 report.venue_order_id,
1383 report.last_qty,
1384 report.last_px
1385 );
1386 let sender = get_exec_event_sender();
1387 let exec_report =
1388 NautilusExecutionReport::Fill(Box::new(report));
1389 if let Err(e) =
1390 sender.send(ExecutionEvent::Report(exec_report))
1391 {
1392 tracing::warn!(
1393 "Failed to send fill report: {e}"
1394 );
1395 }
1396 }
1397 Err(e) => {
1398 tracing::error!(
1399 "Failed to parse WebSocket fill: {e}"
1400 );
1401 }
1402 }
1403 }
1404 }
1405 }
1406 NautilusWsMessage::OraclePrices(oracle_prices_map) => {
1407 tracing::debug!(
1408 "Processing oracle price updates for {} markets",
1409 oracle_prices_map.len()
1410 );
1411
1412 for (market_symbol, oracle_data) in &oracle_prices_map {
1414 match oracle_data.oracle_price.parse::<rust_decimal::Decimal>()
1416 {
1417 Ok(price) => {
1418 let symbol_with_perp = format!("{market_symbol}-PERP");
1421
1422 if let Some(entry) = instruments.iter().find(|entry| {
1424 entry.value().id().symbol.as_str()
1425 == symbol_with_perp
1426 }) {
1427 let instrument_id = *entry.key();
1428 oracle_prices.insert(instrument_id, price);
1429 tracing::trace!(
1430 "Updated oracle price for {}: {}",
1431 instrument_id,
1432 price
1433 );
1434 } else {
1435 tracing::debug!(
1436 "No instrument found for market symbol '{}' (tried '{}')",
1437 market_symbol,
1438 symbol_with_perp
1439 );
1440 }
1441 }
1442 Err(e) => {
1443 tracing::warn!(
1444 "Failed to parse oracle price for {}: {}",
1445 market_symbol,
1446 e
1447 );
1448 }
1449 }
1450 }
1451 }
1452 NautilusWsMessage::Error(err) => {
1453 tracing::error!("WebSocket error: {:?}", err);
1454 }
1455 NautilusWsMessage::Reconnected => {
1456 tracing::info!("WebSocket reconnected");
1457 }
1458 _ => {
1459 }
1461 }
1462 }
1463 tracing::info!("WebSocket message processing task ended");
1464 });
1465
1466 self.ws_stream_handle = Some(handle);
1467 tracing::debug!("Spawned WebSocket message processing task");
1468 }
1469 }
1470 self.connected = true;
1471 tracing::info!(client_id = %self.core.client_id, "Connected");
1472 Ok(())
1473 }
1474
1475 async fn disconnect(&mut self) -> anyhow::Result<()> {
1476 if !self.connected {
1477 tracing::warn!("dYdX execution client not connected");
1478 return Ok(());
1479 }
1480
1481 tracing::info!("Disconnecting from dYdX");
1482
1483 if self.config.mnemonic.is_some() {
1485 let _ = self
1486 .ws_client
1487 .unsubscribe_subaccount(&self.wallet_address, self.subaccount_number)
1488 .await
1489 .map_err(|e| tracing::warn!("Failed to unsubscribe from subaccount: {e}"));
1490 }
1491
1492 let _ = self
1494 .ws_client
1495 .unsubscribe_markets()
1496 .await
1497 .map_err(|e| tracing::warn!("Failed to unsubscribe from markets: {e}"));
1498
1499 let _ = self
1501 .ws_client
1502 .unsubscribe_block_height()
1503 .await
1504 .map_err(|e| tracing::warn!("Failed to unsubscribe from block height: {e}"));
1505
1506 self.ws_client.disconnect().await?;
1508
1509 if let Some(handle) = self.ws_stream_handle.take() {
1511 handle.abort();
1512 tracing::debug!("Aborted WebSocket message processing task");
1513 }
1514
1515 self.abort_pending_tasks();
1517
1518 self.connected = false;
1519 tracing::info!(client_id = %self.core.client_id, "Disconnected");
1520 Ok(())
1521 }
1522}
1523
1524fn dispatch_account_state(state: AccountState) {
1529 use std::any::Any;
1530 msgbus::send_any("Portfolio.update_account".into(), &state as &dyn Any);
1531}
1532
1533fn dispatch_execution_report(report: ExecutionReport) {
1548 let sender = get_exec_event_sender();
1549 match report {
1550 ExecutionReport::Order(order_report) => {
1551 tracing::debug!(
1552 "Dispatching order report: status={:?}, venue_order_id={:?}, client_order_id={:?}",
1553 order_report.order_status,
1554 order_report.venue_order_id,
1555 order_report.client_order_id
1556 );
1557 let exec_report = NautilusExecutionReport::OrderStatus(order_report);
1558 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1559 tracing::warn!("Failed to send order status report: {e}");
1560 }
1561 }
1562 ExecutionReport::Fill(fill_report) => {
1563 tracing::debug!(
1564 "Dispatching fill report: venue_order_id={}, trade_id={}",
1565 fill_report.venue_order_id,
1566 fill_report.trade_id
1567 );
1568 let exec_report = NautilusExecutionReport::Fill(fill_report);
1569 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1570 tracing::warn!("Failed to send fill report: {e}");
1571 }
1572 }
1573 }
1574}
1575
1576#[async_trait(?Send)]
1577impl LiveExecutionClient for DydxExecutionClient {
1578 async fn generate_order_status_report(
1579 &self,
1580 cmd: &GenerateOrderStatusReport,
1581 ) -> anyhow::Result<Option<OrderStatusReport>> {
1582 use anyhow::Context;
1583
1584 let response = self
1586 .http_client
1587 .inner
1588 .get_orders(
1589 &self.wallet_address,
1590 self.subaccount_number,
1591 None, Some(1), )
1594 .await
1595 .context("failed to fetch order from dYdX API")?;
1596
1597 if response.is_empty() {
1598 return Ok(None);
1599 }
1600
1601 let order = &response[0];
1602 let ts_init = UnixNanos::default();
1603
1604 let instrument = match self.get_instrument_by_clob_pair_id(order.clob_pair_id) {
1606 Some(inst) => inst,
1607 None => return Ok(None),
1608 };
1609
1610 let report = crate::http::parse::parse_order_status_report(
1612 order,
1613 &instrument,
1614 self.core.account_id,
1615 ts_init,
1616 )
1617 .context("failed to parse order status report")?;
1618
1619 if let Some(client_order_id) = cmd.client_order_id
1621 && report.client_order_id != Some(client_order_id)
1622 {
1623 return Ok(None);
1624 }
1625
1626 if let Some(venue_order_id) = cmd.venue_order_id
1628 && report.venue_order_id.as_str() != venue_order_id.as_str()
1629 {
1630 return Ok(None);
1631 }
1632
1633 if let Some(instrument_id) = cmd.instrument_id
1635 && report.instrument_id != instrument_id
1636 {
1637 return Ok(None);
1638 }
1639
1640 Ok(Some(report))
1641 }
1642
1643 async fn generate_order_status_reports(
1644 &self,
1645 cmd: &GenerateOrderStatusReport,
1646 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1647 use anyhow::Context;
1648
1649 let response = self
1651 .http_client
1652 .inner
1653 .get_orders(
1654 &self.wallet_address,
1655 self.subaccount_number,
1656 None, None, )
1659 .await
1660 .context("failed to fetch orders from dYdX API")?;
1661
1662 let mut reports = Vec::new();
1663 let ts_init = UnixNanos::default();
1664
1665 for order in response {
1666 let instrument = match self.get_instrument_by_clob_pair_id(order.clob_pair_id) {
1668 Some(inst) => inst,
1669 None => continue,
1670 };
1671
1672 if let Some(filter_id) = cmd.instrument_id
1674 && instrument.id() != filter_id
1675 {
1676 continue;
1677 }
1678
1679 match crate::http::parse::parse_order_status_report(
1681 &order,
1682 &instrument,
1683 self.core.account_id,
1684 ts_init,
1685 ) {
1686 Ok(report) => {
1687 if let Some(client_order_id) = cmd.client_order_id
1689 && report.client_order_id != Some(client_order_id)
1690 {
1691 continue;
1692 }
1693
1694 if let Some(venue_order_id) = cmd.venue_order_id
1696 && report.venue_order_id.as_str() != venue_order_id.as_str()
1697 {
1698 continue;
1699 }
1700
1701 reports.push(report);
1702 }
1703 Err(e) => tracing::error!("Failed to parse order status report: {e}"),
1704 }
1705 }
1706
1707 tracing::info!("Generated {} order status reports", reports.len());
1708 Ok(reports)
1709 }
1710
1711 async fn generate_fill_reports(
1712 &self,
1713 cmd: GenerateFillReports,
1714 ) -> anyhow::Result<Vec<FillReport>> {
1715 use anyhow::Context;
1716
1717 let response = self
1719 .http_client
1720 .inner
1721 .get_fills(
1722 &self.wallet_address,
1723 self.subaccount_number,
1724 None, None, )
1727 .await
1728 .context("failed to fetch fills from dYdX API")?;
1729
1730 let mut reports = Vec::new();
1731 let ts_init = UnixNanos::default();
1732
1733 for fill in response.fills {
1734 let instrument = match self.get_instrument_by_market(&fill.market) {
1736 Some(inst) => inst,
1737 None => {
1738 tracing::warn!(
1739 "Instrument for market {} not found in cache, skipping fill {}",
1740 fill.market,
1741 fill.id
1742 );
1743 continue;
1744 }
1745 };
1746
1747 if let Some(filter_id) = cmd.instrument_id
1749 && instrument.id() != filter_id
1750 {
1751 continue;
1752 }
1753
1754 match crate::http::parse::parse_fill_report(
1756 &fill,
1757 &instrument,
1758 self.core.account_id,
1759 ts_init,
1760 ) {
1761 Ok(report) => {
1762 if let Some(venue_order_id) = cmd.venue_order_id
1764 && report.venue_order_id.as_str() != venue_order_id.as_str()
1765 {
1766 continue;
1767 }
1768
1769 if let (Some(start), Some(end)) = (cmd.start, cmd.end) {
1771 if report.ts_event >= start && report.ts_event <= end {
1772 reports.push(report);
1773 }
1774 } else if let Some(start) = cmd.start {
1775 if report.ts_event >= start {
1776 reports.push(report);
1777 }
1778 } else if let Some(end) = cmd.end {
1779 if report.ts_event <= end {
1780 reports.push(report);
1781 }
1782 } else {
1783 reports.push(report);
1784 }
1785 }
1786 Err(e) => tracing::error!("Failed to parse fill report: {e}"),
1787 }
1788 }
1789
1790 tracing::info!("Generated {} fill reports", reports.len());
1791 Ok(reports)
1792 }
1793
1794 async fn generate_position_status_reports(
1795 &self,
1796 cmd: &GeneratePositionReports,
1797 ) -> anyhow::Result<Vec<PositionStatusReport>> {
1798 use anyhow::Context;
1799
1800 let response = self
1802 .http_client
1803 .inner
1804 .get_subaccount(&self.wallet_address, self.subaccount_number)
1805 .await
1806 .context("failed to fetch subaccount from dYdX API")?;
1807
1808 let mut reports = Vec::new();
1809 let ts_init = UnixNanos::default();
1810
1811 for (market_ticker, position) in &response.subaccount.open_perpetual_positions {
1813 let instrument = match self.get_instrument_by_market(market_ticker) {
1815 Some(inst) => inst,
1816 None => {
1817 tracing::warn!(
1818 "Instrument for market {} not found in cache, skipping position",
1819 market_ticker
1820 );
1821 continue;
1822 }
1823 };
1824
1825 if let Some(filter_id) = cmd.instrument_id
1827 && instrument.id() != filter_id
1828 {
1829 continue;
1830 }
1831
1832 match crate::http::parse::parse_position_status_report(
1834 position,
1835 &instrument,
1836 self.core.account_id,
1837 ts_init,
1838 ) {
1839 Ok(report) => reports.push(report),
1840 Err(e) => {
1841 tracing::error!("Failed to parse position status report: {e}");
1842 }
1843 }
1844 }
1845
1846 tracing::info!("Generated {} position status reports", reports.len());
1847 Ok(reports)
1848 }
1849
1850 async fn generate_mass_status(
1851 &self,
1852 lookback_mins: Option<u64>,
1853 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
1854 use anyhow::Context;
1855
1856 tracing::info!(
1857 "Generating mass execution status{}",
1858 lookback_mins.map_or_else(
1859 || " (unbounded)".to_string(),
1860 |mins| format!(" (lookback: {mins} minutes)")
1861 )
1862 );
1863
1864 let cutoff_time = lookback_mins.map(|mins| Utc::now() - Duration::minutes(mins as i64));
1866
1867 let orders_response = self
1869 .http_client
1870 .inner
1871 .get_orders(&self.wallet_address, self.subaccount_number, None, None)
1872 .await
1873 .context("failed to fetch orders for mass status")?;
1874
1875 let subaccount_response = self
1877 .http_client
1878 .inner
1879 .get_subaccount(&self.wallet_address, self.subaccount_number)
1880 .await
1881 .context("failed to fetch subaccount for mass status")?;
1882
1883 let fills_response = self
1885 .http_client
1886 .inner
1887 .get_fills(&self.wallet_address, self.subaccount_number, None, None)
1888 .await
1889 .context("failed to fetch fills for mass status")?;
1890
1891 let ts_init = UnixNanos::default();
1892 let mut order_reports = Vec::new();
1893 let mut position_reports = Vec::new();
1894 let mut fill_reports = Vec::new();
1895
1896 let mut orders_filtered = 0;
1898 let mut fills_filtered = 0;
1899
1900 for order in orders_response {
1902 if let Some(cutoff) = cutoff_time
1904 && order.updated_at.is_some_and(|dt| dt < cutoff)
1905 {
1906 orders_filtered += 1;
1907 continue;
1908 }
1909
1910 if let Some(instrument) = self.get_instrument_by_clob_pair_id(order.clob_pair_id) {
1911 match crate::http::parse::parse_order_status_report(
1912 &order,
1913 &instrument,
1914 self.core.account_id,
1915 ts_init,
1916 ) {
1917 Ok(report) => order_reports.push(report),
1918 Err(e) => tracing::error!("Failed to parse order in mass status: {e}"),
1919 }
1920 }
1921 }
1922
1923 for (market_ticker, position) in &subaccount_response.subaccount.open_perpetual_positions {
1925 if let Some(instrument) = self.get_instrument_by_market(market_ticker) {
1926 match crate::http::parse::parse_position_status_report(
1927 position,
1928 &instrument,
1929 self.core.account_id,
1930 ts_init,
1931 ) {
1932 Ok(report) => position_reports.push(report),
1933 Err(e) => tracing::error!("Failed to parse position in mass status: {e}"),
1934 }
1935 }
1936 }
1937
1938 for fill in fills_response.fills {
1940 if let Some(cutoff) = cutoff_time
1942 && fill.created_at < cutoff
1943 {
1944 fills_filtered += 1;
1945 continue;
1946 }
1947
1948 if let Some(instrument) = self.get_instrument_by_market(&fill.market) {
1949 match crate::http::parse::parse_fill_report(
1950 &fill,
1951 &instrument,
1952 self.core.account_id,
1953 ts_init,
1954 ) {
1955 Ok(report) => fill_reports.push(report),
1956 Err(e) => tracing::error!("Failed to parse fill in mass status: {e}"),
1957 }
1958 }
1959 }
1960
1961 if cutoff_time.is_some() {
1962 tracing::info!(
1963 "Generated mass status: {} orders ({} filtered), {} positions, {} fills ({} filtered)",
1964 order_reports.len(),
1965 orders_filtered,
1966 position_reports.len(),
1967 fill_reports.len(),
1968 fills_filtered
1969 );
1970 } else {
1971 tracing::info!(
1972 "Generated mass status: {} orders, {} positions, {} fills",
1973 order_reports.len(),
1974 position_reports.len(),
1975 fill_reports.len()
1976 );
1977 }
1978
1979 let mut mass_status = ExecutionMassStatus::new(
1981 self.core.client_id,
1982 self.core.account_id,
1983 self.core.venue,
1984 ts_init,
1985 None, );
1987
1988 mass_status.add_order_reports(order_reports);
1989 mass_status.add_position_reports(position_reports);
1990 mass_status.add_fill_reports(fill_reports);
1991
1992 Ok(Some(mass_status))
1993 }
1994}