1use std::sync::{
42 Arc, Mutex,
43 atomic::{AtomicU32, AtomicU64, Ordering},
44};
45
46use anyhow::Context;
47use async_trait::async_trait;
48use dashmap::DashMap;
49use nautilus_common::{
50 live::{runner::get_exec_event_sender, runtime::get_runtime},
51 messages::{
52 ExecutionEvent, ExecutionReport as NautilusExecutionReport,
53 execution::{
54 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
55 GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
56 ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
57 },
58 },
59 msgbus,
60};
61use nautilus_core::{MUTEX_POISONED, UUID4, UnixNanos};
62use nautilus_execution::client::{ExecutionClient, base::ExecutionClientCore};
63use nautilus_model::{
64 accounts::AccountAny,
65 enums::{OmsType, OrderSide, OrderType, TimeInForce},
66 events::{AccountState, OrderCancelRejected, OrderEventAny, OrderRejected},
67 identifiers::{AccountId, ClientId, ClientOrderId, InstrumentId, StrategyId, Venue},
68 instruments::{Instrument, InstrumentAny},
69 orders::Order,
70 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
71 types::{AccountBalance, MarginBalance},
72};
73use nautilus_network::retry::RetryConfig;
74use rust_decimal::Decimal;
75use tokio::task::JoinHandle;
76
77use crate::{
78 common::{consts::DYDX_VENUE, credential::DydxCredential, parse::nanos_to_secs_i64},
79 config::DydxAdapterConfig,
80 execution::submitter::OrderSubmitter,
81 grpc::{DydxGrpcClient, Wallet, types::ChainId},
82 http::client::DydxHttpClient,
83 websocket::{client::DydxWebSocketClient, enums::NautilusWsMessage},
84};
85
86pub mod submitter;
87
88pub const MAX_CLIENT_ID: u32 = u32::MAX;
94
95enum ExecutionReport {
100 Order(Box<OrderStatusReport>),
101 Fill(Box<FillReport>),
102}
103
104#[derive(Debug)]
119pub struct DydxExecutionClient {
120 core: ExecutionClientCore,
121 config: DydxAdapterConfig,
122 http_client: DydxHttpClient,
123 ws_client: DydxWebSocketClient,
124 grpc_client: Arc<tokio::sync::RwLock<DydxGrpcClient>>,
125 wallet: Arc<tokio::sync::RwLock<Option<Wallet>>>,
126 instruments: DashMap<InstrumentId, InstrumentAny>,
127 market_to_instrument: DashMap<String, InstrumentId>,
128 clob_pair_id_to_instrument: DashMap<u32, InstrumentId>,
129 block_height: Arc<AtomicU64>,
130 oracle_prices: Arc<DashMap<InstrumentId, Decimal>>,
131 client_id_to_int: DashMap<String, u32>,
132 int_to_client_id: DashMap<u32, String>,
133 next_client_id: AtomicU32,
134 wallet_address: String,
135 subaccount_number: u32,
136 started: bool,
137 connected: bool,
138 instruments_initialized: bool,
139 ws_stream_handle: Option<JoinHandle<()>>,
140 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
141}
142
143impl DydxExecutionClient {
144 pub fn new(
150 core: ExecutionClientCore,
151 config: DydxAdapterConfig,
152 wallet_address: String,
153 subaccount_number: u32,
154 ) -> anyhow::Result<Self> {
155 let retry_config = RetryConfig {
157 max_retries: config.max_retries,
158 initial_delay_ms: config.retry_delay_initial_ms,
159 max_delay_ms: config.retry_delay_max_ms,
160 ..Default::default()
161 };
162 let http_client = DydxHttpClient::new(
163 Some(config.base_url.clone()),
164 Some(config.timeout_secs),
165 None, config.is_testnet,
167 Some(retry_config),
168 )?;
169
170 let ws_client = if let Some(ref mnemonic) = config.mnemonic {
172 let credential = DydxCredential::from_mnemonic(
173 mnemonic,
174 subaccount_number,
175 config.authenticator_ids.clone(),
176 )?;
177 DydxWebSocketClient::new_private(
178 config.ws_url.clone(),
179 credential,
180 core.account_id,
181 Some(20),
182 )
183 } else {
184 DydxWebSocketClient::new_public(config.ws_url.clone(), Some(20))
185 };
186
187 let grpc_urls = config.get_grpc_urls();
188 let grpc_client = Arc::new(tokio::sync::RwLock::new(
189 get_runtime()
190 .block_on(async { DydxGrpcClient::new_with_fallback(&grpc_urls).await })
191 .context("failed to construct dYdX gRPC client")?,
192 ));
193
194 Ok(Self {
195 core,
196 config,
197 http_client,
198 ws_client,
199 grpc_client,
200 wallet: Arc::new(tokio::sync::RwLock::new(None)),
201 instruments: DashMap::new(),
202 market_to_instrument: DashMap::new(),
203 clob_pair_id_to_instrument: DashMap::new(),
204 block_height: Arc::new(AtomicU64::new(0)),
205 oracle_prices: Arc::new(DashMap::new()),
206 client_id_to_int: DashMap::new(),
207 int_to_client_id: DashMap::new(),
208 next_client_id: AtomicU32::new(1),
209 wallet_address,
210 subaccount_number,
211 started: false,
212 connected: false,
213 instruments_initialized: false,
214 ws_stream_handle: None,
215 pending_tasks: Mutex::new(Vec::new()),
216 })
217 }
218
219 fn generate_client_order_id_int(&self, client_order_id: &str) -> u32 {
239 if let Some(existing) = self.client_id_to_int.get(client_order_id) {
241 return *existing.value();
242 }
243
244 if let Ok(id) = client_order_id.parse::<u32>() {
246 self.client_id_to_int
247 .insert(client_order_id.to_string(), id);
248 self.int_to_client_id
249 .insert(id, client_order_id.to_string());
250 return id;
251 }
252
253 use dashmap::mapref::entry::Entry;
255
256 match self.client_id_to_int.entry(client_order_id.to_string()) {
257 Entry::Occupied(entry) => *entry.get(),
258 Entry::Vacant(vacant) => {
259 let id = self
260 .next_client_id
261 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
262 vacant.insert(id);
263 self.int_to_client_id
264 .insert(id, client_order_id.to_string());
265 id
266 }
267 }
268 }
269
270 fn get_client_order_id_int(&self, client_order_id: &str) -> Option<u32> {
274 if let Ok(id) = client_order_id.parse::<u32>() {
276 return Some(id);
277 }
278
279 self.client_id_to_int
281 .get(client_order_id)
282 .map(|entry| *entry.value())
283 }
284
285 fn get_chain_id(&self) -> ChainId {
289 self.config.get_chain_id()
290 }
291
292 fn cache_instruments_from_http(&mut self) {
299 use nautilus_model::instruments::InstrumentAny;
300
301 let instruments: Vec<InstrumentAny> = self
303 .http_client
304 .instruments_cache
305 .iter()
306 .map(|entry| entry.value().clone())
307 .collect();
308
309 tracing::debug!(
310 "Caching {} instruments in execution client",
311 instruments.len()
312 );
313
314 for instrument in instruments {
315 let instrument_id = instrument.id();
316 let symbol = instrument_id.symbol.as_str();
317
318 self.instruments.insert(instrument_id, instrument.clone());
320
321 self.market_to_instrument
323 .insert(symbol.to_string(), instrument_id);
324 }
325
326 let http_mapping = self.http_client.clob_pair_id_mapping();
329 for entry in http_mapping.iter() {
330 self.clob_pair_id_to_instrument
331 .insert(*entry.key(), *entry.value());
332 }
333
334 self.instruments_initialized = true;
335 tracing::info!(
336 "Cached {} instruments ({} CLOB pair IDs) with market mappings",
337 self.instruments.len(),
338 self.clob_pair_id_to_instrument.len()
339 );
340 }
341
342 fn get_instrument_by_market(&self, market: &str) -> Option<InstrumentAny> {
344 self.market_to_instrument
345 .get(market)
346 .and_then(|id| self.instruments.get(&id).map(|entry| entry.value().clone()))
347 }
348
349 fn get_instrument_by_clob_pair_id(&self, clob_pair_id: u32) -> Option<InstrumentAny> {
351 let instrument = self
352 .clob_pair_id_to_instrument
353 .get(&clob_pair_id)
354 .and_then(|id| self.instruments.get(&id).map(|entry| entry.value().clone()));
355
356 if instrument.is_none() {
357 self.log_missing_instrument_for_clob_pair_id(clob_pair_id);
358 }
359
360 instrument
361 }
362
363 fn log_missing_instrument_for_clob_pair_id(&self, clob_pair_id: u32) {
364 let known: Vec<(u32, String)> = self
365 .clob_pair_id_to_instrument
366 .iter()
367 .filter_map(|entry| {
368 let instrument_id = entry.value();
369 self.instruments.get(instrument_id).map(|inst_entry| {
370 (
371 *entry.key(),
372 inst_entry.value().id().symbol.as_str().to_string(),
373 )
374 })
375 })
376 .collect();
377
378 tracing::warn!(
379 "Instrument for clob_pair_id {} not found in cache. Known CLOB pair IDs and symbols: {:?}",
380 clob_pair_id,
381 known
382 );
383 }
384
385 fn spawn_task<F>(&self, label: &'static str, fut: F)
386 where
387 F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
388 {
389 let handle = get_runtime().spawn(async move {
390 if let Err(e) = fut.await {
391 tracing::error!("{label}: {e:?}");
392 }
393 });
394
395 self.pending_tasks
396 .lock()
397 .expect(MUTEX_POISONED)
398 .push(handle);
399 }
400
401 fn spawn_order_task<F>(
405 &self,
406 label: &'static str,
407 strategy_id: StrategyId,
408 instrument_id: InstrumentId,
409 client_order_id: ClientOrderId,
410 fut: F,
411 ) where
412 F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
413 {
414 let trader_id = self.core.trader_id;
416 let account_id = self.core.account_id;
417 let sender = get_exec_event_sender();
418
419 let handle = get_runtime().spawn(async move {
420 if let Err(e) = fut.await {
421 let error_msg = format!("{label} failed: {e:?}");
422 tracing::error!("{}", error_msg);
423
424 let ts_now = UnixNanos::default(); let event = OrderRejected::new(
426 trader_id,
427 strategy_id,
428 instrument_id,
429 client_order_id,
430 account_id,
431 error_msg.into(),
432 UUID4::new(),
433 ts_now,
434 ts_now,
435 false,
436 false,
437 );
438
439 if let Err(send_err) =
440 sender.send(ExecutionEvent::Order(OrderEventAny::Rejected(event)))
441 {
442 tracing::error!("Failed to send OrderRejected event: {send_err}");
443 }
444 }
445 });
446
447 self.pending_tasks
448 .lock()
449 .expect(MUTEX_POISONED)
450 .push(handle);
451 }
452
453 fn abort_pending_tasks(&self) {
454 let mut guard = self.pending_tasks.lock().expect(MUTEX_POISONED);
455 for handle in guard.drain(..) {
456 handle.abort();
457 }
458 }
459}
460
461#[async_trait(?Send)]
462impl ExecutionClient for DydxExecutionClient {
463 fn is_connected(&self) -> bool {
464 self.connected
465 }
466
467 fn client_id(&self) -> ClientId {
468 self.core.client_id
469 }
470
471 fn account_id(&self) -> AccountId {
472 self.core.account_id
473 }
474
475 fn venue(&self) -> Venue {
476 *DYDX_VENUE
477 }
478
479 fn oms_type(&self) -> OmsType {
480 self.core.oms_type
481 }
482
483 fn get_account(&self) -> Option<AccountAny> {
484 self.core.get_account()
485 }
486
487 fn generate_account_state(
488 &self,
489 balances: Vec<AccountBalance>,
490 margins: Vec<MarginBalance>,
491 reported: bool,
492 ts_event: UnixNanos,
493 ) -> anyhow::Result<()> {
494 self.core
495 .generate_account_state(balances, margins, reported, ts_event)
496 }
497
498 fn start(&mut self) -> anyhow::Result<()> {
499 if self.started {
500 tracing::warn!("dYdX execution client already started");
501 return Ok(());
502 }
503
504 tracing::info!("Starting dYdX execution client");
505 self.started = true;
506 Ok(())
507 }
508
509 fn stop(&mut self) -> anyhow::Result<()> {
510 if !self.started {
511 tracing::warn!("dYdX execution client not started");
512 return Ok(());
513 }
514
515 tracing::info!("Stopping dYdX execution client");
516 self.abort_pending_tasks();
517 self.started = false;
518 self.connected = false;
519 Ok(())
520 }
521
522 fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
539 let order = cmd.order.clone();
540
541 if !self.is_connected() {
543 let reason = "Cannot submit order: execution client not connected";
544 tracing::error!("{}", reason);
545 anyhow::bail!(reason);
546 }
547
548 let current_block = self.block_height.load(Ordering::Relaxed);
550 if current_block == 0 {
551 let reason = "Block height not initialized";
552 tracing::warn!(
553 "Cannot submit order {}: {}",
554 order.client_order_id(),
555 reason
556 );
557 self.core.generate_order_rejected(
558 order.strategy_id(),
559 order.instrument_id(),
560 order.client_order_id(),
561 reason,
562 cmd.ts_init,
563 false,
564 );
565 return Ok(());
566 }
567
568 if order.is_closed() {
570 tracing::warn!("Cannot submit closed order {}", order.client_order_id());
571 return Ok(());
572 }
573
574 match order.order_type() {
576 OrderType::Market | OrderType::Limit => {
577 tracing::debug!(
578 "Submitting {} order: {}",
579 if matches!(order.order_type(), OrderType::Market) {
580 "MARKET"
581 } else {
582 "LIMIT"
583 },
584 order.client_order_id()
585 );
586 }
587 OrderType::StopMarket | OrderType::StopLimit => {
589 tracing::debug!(
590 "Submitting {} order: {}",
591 if matches!(order.order_type(), OrderType::StopMarket) {
592 "STOP_MARKET"
593 } else {
594 "STOP_LIMIT"
595 },
596 order.client_order_id()
597 );
598 }
599 OrderType::MarketIfTouched | OrderType::LimitIfTouched => {
601 tracing::debug!(
602 "Submitting {} order: {}",
603 if matches!(order.order_type(), OrderType::MarketIfTouched) {
604 "TAKE_PROFIT_MARKET"
605 } else {
606 "TAKE_PROFIT_LIMIT"
607 },
608 order.client_order_id()
609 );
610 }
611 OrderType::TrailingStopMarket | OrderType::TrailingStopLimit => {
613 let reason = "Trailing stop orders not supported by dYdX v4 protocol";
614 tracing::error!("{}", reason);
615 self.core.generate_order_rejected(
616 order.strategy_id(),
617 order.instrument_id(),
618 order.client_order_id(),
619 reason,
620 cmd.ts_init,
621 false,
622 );
623 return Ok(());
624 }
625 order_type => {
626 let reason = format!("Order type {order_type:?} not supported by dYdX");
627 tracing::error!("{}", reason);
628 self.core.generate_order_rejected(
629 order.strategy_id(),
630 order.instrument_id(),
631 order.client_order_id(),
632 &reason,
633 cmd.ts_init,
634 false,
635 );
636 return Ok(());
637 }
638 }
639
640 self.core.generate_order_submitted(
642 order.strategy_id(),
643 order.instrument_id(),
644 order.client_order_id(),
645 cmd.ts_init,
646 );
647
648 let grpc_client = self.grpc_client.clone();
649 let wallet = self.wallet.clone();
650 let http_client = self.http_client.clone();
651 let wallet_address = self.wallet_address.clone();
652 let subaccount_number = self.subaccount_number;
653 let client_order_id = order.client_order_id();
654 let instrument_id = order.instrument_id();
655 let block_height = self.block_height.load(std::sync::atomic::Ordering::Relaxed) as u32;
656 let chain_id = self.get_chain_id();
657 let authenticator_ids = self.config.authenticator_ids.clone();
658 #[allow(clippy::redundant_clone)]
659 let order_clone = order.clone();
660
661 let client_id_u32 = self.generate_client_order_id_int(client_order_id.as_str());
663
664 self.spawn_order_task(
665 "submit_order",
666 order.strategy_id(),
667 order.instrument_id(),
668 order.client_order_id(),
669 async move {
670 let wallet_guard = wallet.read().await;
671 let wallet_ref = wallet_guard
672 .as_ref()
673 .ok_or_else(|| anyhow::anyhow!("Wallet not initialized"))?;
674
675 let grpc_guard = grpc_client.read().await;
676 let submitter = OrderSubmitter::new(
677 (*grpc_guard).clone(),
678 http_client.clone(),
679 wallet_address,
680 subaccount_number,
681 chain_id,
682 authenticator_ids,
683 );
684
685 match order_clone.order_type() {
687 OrderType::Market => {
688 submitter
689 .submit_market_order(
690 wallet_ref,
691 instrument_id,
692 client_id_u32,
693 order_clone.order_side(),
694 order_clone.quantity(),
695 block_height,
696 )
697 .await?;
698 tracing::info!("Successfully submitted market order: {}", client_order_id);
699 }
700 OrderType::Limit => {
701 let expire_time = order_clone.expire_time().map(nanos_to_secs_i64);
702 submitter
703 .submit_limit_order(
704 wallet_ref,
705 instrument_id,
706 client_id_u32,
707 order_clone.order_side(),
708 order_clone
709 .price()
710 .ok_or_else(|| anyhow::anyhow!("Limit order missing price"))?,
711 order_clone.quantity(),
712 order_clone.time_in_force(),
713 order_clone.is_post_only(),
714 order_clone.is_reduce_only(),
715 block_height,
716 expire_time,
717 )
718 .await?;
719 tracing::info!("Successfully submitted limit order: {}", client_order_id);
720 }
721 OrderType::StopMarket => {
722 let trigger_price = order_clone.trigger_price().ok_or_else(|| {
723 anyhow::anyhow!("Stop market order missing trigger_price")
724 })?;
725 let expire_time = order_clone.expire_time().map(nanos_to_secs_i64);
726 submitter
727 .submit_stop_market_order(
728 wallet_ref,
729 instrument_id,
730 client_id_u32,
731 order_clone.order_side(),
732 trigger_price,
733 order_clone.quantity(),
734 order_clone.is_reduce_only(),
735 expire_time,
736 )
737 .await?;
738 tracing::info!(
739 "Successfully submitted stop market order: {}",
740 client_order_id
741 );
742 }
743 OrderType::StopLimit => {
744 let trigger_price = order_clone.trigger_price().ok_or_else(|| {
745 anyhow::anyhow!("Stop limit order missing trigger_price")
746 })?;
747 let limit_price = order_clone.price().ok_or_else(|| {
748 anyhow::anyhow!("Stop limit order missing limit price")
749 })?;
750 let expire_time = order_clone.expire_time().map(nanos_to_secs_i64);
751 submitter
752 .submit_stop_limit_order(
753 wallet_ref,
754 instrument_id,
755 client_id_u32,
756 order_clone.order_side(),
757 trigger_price,
758 limit_price,
759 order_clone.quantity(),
760 order_clone.time_in_force(),
761 order_clone.is_post_only(),
762 order_clone.is_reduce_only(),
763 expire_time,
764 )
765 .await?;
766 tracing::info!(
767 "Successfully submitted stop limit order: {}",
768 client_order_id
769 );
770 }
771 OrderType::MarketIfTouched => {
773 let trigger_price = order_clone.trigger_price().ok_or_else(|| {
774 anyhow::anyhow!("Take profit market order missing trigger_price")
775 })?;
776 let expire_time = order_clone.expire_time().map(nanos_to_secs_i64);
777 submitter
778 .submit_take_profit_market_order(
779 wallet_ref,
780 instrument_id,
781 client_id_u32,
782 order_clone.order_side(),
783 trigger_price,
784 order_clone.quantity(),
785 order_clone.is_reduce_only(),
786 expire_time,
787 )
788 .await?;
789 tracing::info!(
790 "Successfully submitted take profit market order: {}",
791 client_order_id
792 );
793 }
794 OrderType::LimitIfTouched => {
796 let trigger_price = order_clone.trigger_price().ok_or_else(|| {
797 anyhow::anyhow!("Take profit limit order missing trigger_price")
798 })?;
799 let limit_price = order_clone.price().ok_or_else(|| {
800 anyhow::anyhow!("Take profit limit order missing limit price")
801 })?;
802 let expire_time = order_clone.expire_time().map(nanos_to_secs_i64);
803 submitter
804 .submit_take_profit_limit_order(
805 wallet_ref,
806 instrument_id,
807 client_id_u32,
808 order_clone.order_side(),
809 trigger_price,
810 limit_price,
811 order_clone.quantity(),
812 order_clone.time_in_force(),
813 order_clone.is_post_only(),
814 order_clone.is_reduce_only(),
815 expire_time,
816 )
817 .await?;
818 tracing::info!(
819 "Successfully submitted take profit limit order: {}",
820 client_order_id
821 );
822 }
823 _ => unreachable!("Order type already validated"),
824 }
825
826 Ok(())
827 },
828 );
829
830 Ok(())
831 }
832
833 fn submit_order_list(&self, _cmd: &SubmitOrderList) -> anyhow::Result<()> {
834 anyhow::bail!("Order lists not supported by dYdX")
835 }
836
837 fn modify_order(&self, _cmd: &ModifyOrder) -> anyhow::Result<()> {
838 anyhow::bail!("Order modification not supported by dYdX")
839 }
840
841 fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
858 if !self.is_connected() {
859 anyhow::bail!("Cannot cancel order: not connected");
860 }
861
862 let client_order_id = cmd.client_order_id;
863
864 let cache = self.core.cache();
866 let cache_borrow = cache.borrow();
867
868 let order = match cache_borrow.order(&client_order_id) {
869 Some(order) => order,
870 None => {
871 tracing::error!(
872 "Cannot cancel order {}: not found in cache",
873 client_order_id
874 );
875 return Ok(()); }
877 };
878
879 if order.is_closed() {
881 tracing::warn!(
882 "CancelOrder command for {} when order already {} (will not send to exchange)",
883 client_order_id,
884 order.status()
885 );
886 return Ok(());
887 }
888
889 let instrument_id = cmd.instrument_id;
891 let instrument = match cache_borrow.instrument(&instrument_id) {
892 Some(instrument) => instrument,
893 None => {
894 tracing::error!(
895 "Cannot cancel order {}: instrument {} not found in cache",
896 client_order_id,
897 instrument_id
898 );
899 return Ok(()); }
901 };
902
903 tracing::debug!(
904 "Cancelling order {} for instrument {}",
905 client_order_id,
906 instrument.id()
907 );
908
909 let grpc_client = self.grpc_client.clone();
910 let wallet = self.wallet.clone();
911 let http_client = self.http_client.clone();
912 let wallet_address = self.wallet_address.clone();
913 let subaccount_number = self.subaccount_number;
914 let block_height = self.block_height.load(std::sync::atomic::Ordering::Relaxed) as u32;
915 let chain_id = self.get_chain_id();
916 let authenticator_ids = self.config.authenticator_ids.clone();
917 let trader_id = cmd.trader_id;
918 let strategy_id = cmd.strategy_id;
919 let venue_order_id = cmd.venue_order_id;
920
921 let client_id_u32 = match self.get_client_order_id_int(client_order_id.as_str()) {
923 Some(id) => id,
924 None => {
925 tracing::error!("Client order ID {} not found in cache", client_order_id);
926 anyhow::bail!("Client order ID not found in cache")
927 }
928 };
929
930 self.spawn_task("cancel_order", async move {
931 let wallet_guard = wallet.read().await;
932 let wallet_ref = wallet_guard
933 .as_ref()
934 .ok_or_else(|| anyhow::anyhow!("Wallet not initialized"))?;
935
936 let grpc_guard = grpc_client.read().await;
937 let submitter = OrderSubmitter::new(
938 (*grpc_guard).clone(),
939 http_client.clone(),
940 wallet_address,
941 subaccount_number,
942 chain_id,
943 authenticator_ids,
944 );
945
946 match submitter
948 .cancel_order(wallet_ref, instrument_id, client_id_u32, block_height)
949 .await
950 {
951 Ok(_) => {
952 tracing::info!("Successfully cancelled order: {}", client_order_id);
953 }
954 Err(e) => {
955 tracing::error!("Failed to cancel order {}: {:?}", client_order_id, e);
956
957 let sender = get_exec_event_sender();
958 let ts_now = UnixNanos::default();
959 let event = OrderCancelRejected::new(
960 trader_id,
961 strategy_id,
962 instrument_id,
963 client_order_id,
964 format!("Cancel order failed: {e:?}").into(),
965 UUID4::new(),
966 ts_now,
967 ts_now,
968 false,
969 venue_order_id,
970 None, );
972 sender
973 .send(ExecutionEvent::Order(OrderEventAny::CancelRejected(event)))
974 .unwrap();
975 }
976 }
977
978 Ok(())
979 });
980
981 Ok(())
982 }
983
984 fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
985 if !self.is_connected() {
986 anyhow::bail!("Cannot cancel orders: not connected");
987 }
988
989 let cache = self.core.cache().borrow();
991 let mut open_orders: Vec<_> = cache
992 .orders_open(None, None, None, None)
993 .into_iter()
994 .collect();
995
996 let instrument_id = cmd.instrument_id;
997 open_orders.retain(|order| order.instrument_id() == instrument_id);
998
999 if cmd.order_side != OrderSide::NoOrderSide {
1001 let order_side = cmd.order_side;
1002 open_orders.retain(|order| order.order_side() == order_side);
1003 }
1004
1005 let mut short_term_orders = Vec::new();
1009 let mut long_term_orders = Vec::new();
1010
1011 for order in &open_orders {
1012 match order.time_in_force() {
1013 TimeInForce::Ioc | TimeInForce::Fok => short_term_orders.push(order),
1014 TimeInForce::Gtc
1015 | TimeInForce::Gtd
1016 | TimeInForce::Day
1017 | TimeInForce::AtTheOpen
1018 | TimeInForce::AtTheClose => long_term_orders.push(order),
1019 }
1020 }
1021
1022 tracing::info!(
1023 "Cancel all orders: total={}, short_term={}, long_term={}, instrument_id={}, order_side={:?}",
1024 open_orders.len(),
1025 short_term_orders.len(),
1026 long_term_orders.len(),
1027 instrument_id,
1028 cmd.order_side
1029 );
1030
1031 let grpc_client = self.grpc_client.clone();
1033 let wallet = self.wallet.clone();
1034 let http_client = self.http_client.clone();
1035 let wallet_address = self.wallet_address.clone();
1036 let subaccount_number = self.subaccount_number;
1037 let block_height = self.block_height.load(std::sync::atomic::Ordering::Relaxed) as u32;
1038 let chain_id = self.get_chain_id();
1039 let authenticator_ids = self.config.authenticator_ids.clone();
1040
1041 let mut orders_to_cancel = Vec::new();
1043 for order in &open_orders {
1044 let client_order_id = order.client_order_id();
1045 if let Some(client_id_u32) = self.get_client_order_id_int(client_order_id.as_str()) {
1046 orders_to_cancel.push((instrument_id, client_id_u32));
1047 } else {
1048 tracing::warn!(
1049 "Cannot cancel order {}: client_order_id not found in cache",
1050 client_order_id
1051 );
1052 }
1053 }
1054
1055 self.spawn_task("cancel_all_orders", async move {
1056 let wallet_guard = wallet.read().await;
1057 let wallet_ref = wallet_guard
1058 .as_ref()
1059 .ok_or_else(|| anyhow::anyhow!("Wallet not initialized"))?;
1060
1061 let grpc_guard = grpc_client.read().await;
1062 let submitter = OrderSubmitter::new(
1063 (*grpc_guard).clone(),
1064 http_client.clone(),
1065 wallet_address,
1066 subaccount_number,
1067 chain_id,
1068 authenticator_ids,
1069 );
1070
1071 match submitter
1073 .cancel_orders_batch(wallet_ref, &orders_to_cancel, block_height)
1074 .await
1075 {
1076 Ok(_) => {
1077 tracing::info!("Successfully cancelled {} orders", orders_to_cancel.len());
1078 }
1079 Err(e) => {
1080 tracing::error!("Batch cancel failed: {:?}", e);
1081 }
1082 }
1083
1084 Ok(())
1085 });
1086
1087 Ok(())
1088 }
1089
1090 fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
1091 if cmd.cancels.is_empty() {
1092 return Ok(());
1093 }
1094
1095 if !self.is_connected() {
1096 anyhow::bail!("Cannot cancel orders: not connected");
1097 }
1098
1099 let mut orders_to_cancel = Vec::with_capacity(cmd.cancels.len());
1101 for cancel in &cmd.cancels {
1102 let client_id_str = cancel.client_order_id.as_str();
1103 let client_id_u32 = match self.get_client_order_id_int(client_id_str) {
1104 Some(id) => id,
1105 None => {
1106 tracing::warn!(
1107 "No u32 mapping found for client_order_id={}, skipping cancel",
1108 client_id_str
1109 );
1110 continue;
1111 }
1112 };
1113 orders_to_cancel.push((cancel.instrument_id, client_id_u32));
1114 }
1115
1116 if orders_to_cancel.is_empty() {
1117 tracing::warn!("No valid orders to cancel in batch");
1118 return Ok(());
1119 }
1120
1121 let grpc_client = self.grpc_client.clone();
1122 let wallet = self.wallet.clone();
1123 let http_client = self.http_client.clone();
1124 let wallet_address = self.wallet_address.clone();
1125 let subaccount_number = self.subaccount_number;
1126 let block_height = self.block_height.load(std::sync::atomic::Ordering::Relaxed) as u32;
1127 let chain_id = self.get_chain_id();
1128 let authenticator_ids = self.config.authenticator_ids.clone();
1129
1130 tracing::info!(
1131 "Batch cancelling {} orders: {:?}",
1132 orders_to_cancel.len(),
1133 orders_to_cancel
1134 );
1135
1136 self.spawn_task("batch_cancel_orders", async move {
1137 let wallet_guard = wallet.read().await;
1138 let wallet_ref = wallet_guard
1139 .as_ref()
1140 .ok_or_else(|| anyhow::anyhow!("Wallet not initialized"))?;
1141
1142 let grpc_guard = grpc_client.read().await;
1143 let submitter = OrderSubmitter::new(
1144 (*grpc_guard).clone(),
1145 http_client.clone(),
1146 wallet_address,
1147 subaccount_number,
1148 chain_id,
1149 authenticator_ids,
1150 );
1151
1152 match submitter
1153 .cancel_orders_batch(wallet_ref, &orders_to_cancel, block_height)
1154 .await
1155 {
1156 Ok(()) => {
1157 tracing::info!(
1158 "Successfully batch cancelled {} orders",
1159 orders_to_cancel.len()
1160 );
1161 }
1162 Err(e) => {
1163 tracing::error!("Batch cancel failed: {:?}", e);
1164 }
1165 }
1166
1167 Ok(())
1168 });
1169
1170 Ok(())
1171 }
1172
1173 fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
1174 Ok(())
1175 }
1176
1177 fn query_order(&self, _cmd: &QueryOrder) -> anyhow::Result<()> {
1178 Ok(())
1179 }
1180
1181 async fn connect(&mut self) -> anyhow::Result<()> {
1182 if self.connected {
1183 tracing::warn!("dYdX execution client already connected");
1184 return Ok(());
1185 }
1186
1187 tracing::info!("Connecting to dYdX");
1188
1189 tracing::debug!("Loading instruments from HTTP API");
1192 self.http_client.fetch_and_cache_instruments().await?;
1193 tracing::info!(
1194 "Loaded {} instruments from HTTP",
1195 self.http_client.instruments_cache.len()
1196 );
1197
1198 self.cache_instruments_from_http();
1200
1201 if let Some(mnemonic) = &self.config.mnemonic {
1203 let wallet = Wallet::from_mnemonic(mnemonic)?;
1204 *self.wallet.write().await = Some(wallet);
1205 tracing::debug!("Wallet initialized");
1206 }
1207
1208 self.ws_client.connect().await?;
1210 tracing::debug!("WebSocket connected");
1211
1212 self.ws_client.subscribe_block_height().await?;
1214 tracing::debug!("Subscribed to block height updates");
1215
1216 self.ws_client.subscribe_markets().await?;
1218 tracing::debug!("Subscribed to markets");
1219
1220 if self.config.mnemonic.is_some() {
1222 self.ws_client
1223 .subscribe_subaccount(&self.wallet_address, self.subaccount_number)
1224 .await?;
1225 tracing::debug!(
1226 "Subscribed to subaccount updates: {}/{}",
1227 self.wallet_address,
1228 self.subaccount_number
1229 );
1230
1231 if let Some(mut rx) = self.ws_client.take_receiver() {
1234 let account_id = self.core.account_id;
1236 let instruments = self.instruments.clone();
1237 let oracle_prices = self.oracle_prices.clone();
1238 let clob_pair_id_to_instrument = self.clob_pair_id_to_instrument.clone();
1239 let block_height = self.block_height.clone();
1240
1241 let handle = get_runtime().spawn(async move {
1242 while let Some(msg) = rx.recv().await {
1243 match msg {
1244 NautilusWsMessage::Order(report) => {
1245 tracing::debug!("Received order update: {:?}", report.order_status);
1246 dispatch_execution_report(ExecutionReport::Order(report));
1247 }
1248 NautilusWsMessage::Fill(report) => {
1249 tracing::debug!("Received fill update");
1250 dispatch_execution_report(ExecutionReport::Fill(report));
1251 }
1252 NautilusWsMessage::Position(report) => {
1253 tracing::debug!("Received position update");
1254 let sender = get_exec_event_sender();
1256 let exec_report =
1257 NautilusExecutionReport::Position(Box::new(*report));
1258 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1259 tracing::warn!("Failed to send position status report: {e}");
1260 }
1261 }
1262 NautilusWsMessage::AccountState(state) => {
1263 tracing::debug!("Received account state update");
1264 dispatch_account_state(*state);
1265 }
1266 NautilusWsMessage::SubaccountSubscribed(msg) => {
1267 tracing::debug!(
1268 "Parsing subaccount subscription with full context"
1269 );
1270
1271 let inst_map: std::collections::HashMap<_, _> = instruments
1273 .iter()
1274 .map(|entry| (*entry.key(), entry.value().clone()))
1275 .collect();
1276
1277 let oracle_map: std::collections::HashMap<_, _> = oracle_prices
1279 .iter()
1280 .map(|entry| (*entry.key(), *entry.value()))
1281 .collect();
1282
1283 let ts_init =
1284 nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
1285 let ts_event = ts_init;
1286
1287 match crate::http::parse::parse_account_state(
1288 &msg.contents.subaccount,
1289 account_id,
1290 &inst_map,
1291 &oracle_map,
1292 ts_event,
1293 ts_init,
1294 ) {
1295 Ok(account_state) => {
1296 tracing::info!(
1297 "Parsed account state: {} balance(s), {} margin(s)",
1298 account_state.balances.len(),
1299 account_state.margins.len()
1300 );
1301 dispatch_account_state(account_state);
1302 }
1303 Err(e) => {
1304 tracing::error!("Failed to parse account state: {e}");
1305 }
1306 }
1307
1308 if let Some(ref positions) =
1310 msg.contents.subaccount.open_perpetual_positions
1311 {
1312 tracing::debug!(
1313 "Parsing {} position(s) from subscription",
1314 positions.len()
1315 );
1316
1317 for (market, ws_position) in positions {
1318 match crate::websocket::parse::parse_ws_position_report(
1319 ws_position,
1320 &instruments,
1321 account_id,
1322 ts_init,
1323 ) {
1324 Ok(report) => {
1325 tracing::debug!(
1326 "Parsed position report: {} {} {} {}",
1327 report.instrument_id,
1328 report.position_side,
1329 report.quantity,
1330 market
1331 );
1332 let sender = get_exec_event_sender();
1333 let exec_report = NautilusExecutionReport::Position(
1334 Box::new(report),
1335 );
1336 if let Err(e) =
1337 sender.send(ExecutionEvent::Report(exec_report))
1338 {
1339 tracing::warn!(
1340 "Failed to send position status report: {e}"
1341 );
1342 }
1343 }
1344 Err(e) => {
1345 tracing::error!(
1346 "Failed to parse WebSocket position for {}: {e}",
1347 market
1348 );
1349 }
1350 }
1351 }
1352 }
1353 }
1354 NautilusWsMessage::SubaccountsChannelData(data) => {
1355 tracing::debug!("Processing subaccounts channel data");
1356 let ts_init =
1357 nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
1358
1359 if let Some(ref orders) = data.contents.orders {
1361 for ws_order in orders {
1362 match crate::websocket::parse::parse_ws_order_report(
1363 ws_order,
1364 &clob_pair_id_to_instrument,
1365 &instruments,
1366 account_id,
1367 ts_init,
1368 ) {
1369 Ok(report) => {
1370 tracing::debug!(
1371 "Parsed order report: {} {} {} @ {}",
1372 report.instrument_id,
1373 report.order_side,
1374 report.order_status,
1375 report.quantity
1376 );
1377 let sender = get_exec_event_sender();
1378 let exec_report =
1379 NautilusExecutionReport::OrderStatus(Box::new(
1380 report,
1381 ));
1382 if let Err(e) =
1383 sender.send(ExecutionEvent::Report(exec_report))
1384 {
1385 tracing::warn!(
1386 "Failed to send order status report: {e}"
1387 );
1388 }
1389 }
1390 Err(e) => {
1391 tracing::error!(
1392 "Failed to parse WebSocket order: {e}"
1393 );
1394 }
1395 }
1396 }
1397 }
1398
1399 if let Some(ref fills) = data.contents.fills {
1401 for ws_fill in fills {
1402 match crate::websocket::parse::parse_ws_fill_report(
1403 ws_fill,
1404 &instruments,
1405 account_id,
1406 ts_init,
1407 ) {
1408 Ok(report) => {
1409 tracing::debug!(
1410 "Parsed fill report: {} {} {} @ {}",
1411 report.instrument_id,
1412 report.venue_order_id,
1413 report.last_qty,
1414 report.last_px
1415 );
1416 let sender = get_exec_event_sender();
1417 let exec_report =
1418 NautilusExecutionReport::Fill(Box::new(report));
1419 if let Err(e) =
1420 sender.send(ExecutionEvent::Report(exec_report))
1421 {
1422 tracing::warn!(
1423 "Failed to send fill report: {e}"
1424 );
1425 }
1426 }
1427 Err(e) => {
1428 tracing::error!(
1429 "Failed to parse WebSocket fill: {e}"
1430 );
1431 }
1432 }
1433 }
1434 }
1435 }
1436 NautilusWsMessage::OraclePrices(oracle_prices_map) => {
1437 tracing::debug!(
1438 "Processing oracle price updates for {} markets",
1439 oracle_prices_map.len()
1440 );
1441
1442 for (market_symbol, oracle_data) in &oracle_prices_map {
1444 match oracle_data.oracle_price.parse::<rust_decimal::Decimal>()
1446 {
1447 Ok(price) => {
1448 let symbol_with_perp = format!("{market_symbol}-PERP");
1451
1452 if let Some(entry) = instruments.iter().find(|entry| {
1454 entry.value().id().symbol.as_str()
1455 == symbol_with_perp
1456 }) {
1457 let instrument_id = *entry.key();
1458 oracle_prices.insert(instrument_id, price);
1459 tracing::trace!(
1460 "Updated oracle price for {}: {}",
1461 instrument_id,
1462 price
1463 );
1464 } else {
1465 tracing::debug!(
1466 "No instrument found for market symbol '{}' (tried '{}')",
1467 market_symbol,
1468 symbol_with_perp
1469 );
1470 }
1471 }
1472 Err(e) => {
1473 tracing::warn!(
1474 "Failed to parse oracle price for {}: {}",
1475 market_symbol,
1476 e
1477 );
1478 }
1479 }
1480 }
1481 }
1482 NautilusWsMessage::BlockHeight(height) => {
1483 tracing::debug!("Block height update: {}", height);
1484 block_height.store(height, std::sync::atomic::Ordering::Relaxed);
1485 }
1486 NautilusWsMessage::Error(err) => {
1487 tracing::error!("WebSocket error: {:?}", err);
1488 }
1489 NautilusWsMessage::Reconnected => {
1490 tracing::info!("WebSocket reconnected");
1491 }
1492 _ => {
1493 }
1495 }
1496 }
1497 tracing::info!("WebSocket message processing task ended");
1498 });
1499
1500 self.ws_stream_handle = Some(handle);
1501 tracing::debug!("Spawned WebSocket message processing task");
1502 }
1503 }
1504
1505 self.connected = true;
1506 tracing::info!(client_id = %self.core.client_id, "Connected");
1507 Ok(())
1508 }
1509
1510 async fn disconnect(&mut self) -> anyhow::Result<()> {
1511 if !self.connected {
1512 tracing::warn!("dYdX execution client not connected");
1513 return Ok(());
1514 }
1515
1516 tracing::info!("Disconnecting from dYdX");
1517
1518 if self.config.mnemonic.is_some() {
1520 let _ = self
1521 .ws_client
1522 .unsubscribe_subaccount(&self.wallet_address, self.subaccount_number)
1523 .await
1524 .map_err(|e| tracing::warn!("Failed to unsubscribe from subaccount: {e}"));
1525 }
1526
1527 let _ = self
1529 .ws_client
1530 .unsubscribe_markets()
1531 .await
1532 .map_err(|e| tracing::warn!("Failed to unsubscribe from markets: {e}"));
1533
1534 let _ = self
1536 .ws_client
1537 .unsubscribe_block_height()
1538 .await
1539 .map_err(|e| tracing::warn!("Failed to unsubscribe from block height: {e}"));
1540
1541 self.ws_client.disconnect().await?;
1543
1544 if let Some(handle) = self.ws_stream_handle.take() {
1546 handle.abort();
1547 tracing::debug!("Aborted WebSocket message processing task");
1548 }
1549
1550 self.abort_pending_tasks();
1552
1553 self.connected = false;
1554 tracing::info!(client_id = %self.core.client_id, "Disconnected");
1555 Ok(())
1556 }
1557
1558 async fn generate_order_status_report(
1559 &self,
1560 cmd: &GenerateOrderStatusReport,
1561 ) -> anyhow::Result<Option<OrderStatusReport>> {
1562 let response = self
1564 .http_client
1565 .inner
1566 .get_orders(
1567 &self.wallet_address,
1568 self.subaccount_number,
1569 None, Some(1), )
1572 .await
1573 .context("failed to fetch order from dYdX API")?;
1574
1575 if response.is_empty() {
1576 return Ok(None);
1577 }
1578
1579 let order = &response[0];
1580 let ts_init = UnixNanos::default();
1581
1582 let instrument = match self.get_instrument_by_clob_pair_id(order.clob_pair_id) {
1583 Some(inst) => inst,
1584 None => return Ok(None),
1585 };
1586
1587 let report = crate::http::parse::parse_order_status_report(
1588 order,
1589 &instrument,
1590 self.core.account_id,
1591 ts_init,
1592 )
1593 .context("failed to parse order status report")?;
1594
1595 if let Some(client_order_id) = cmd.client_order_id
1596 && report.client_order_id != Some(client_order_id)
1597 {
1598 return Ok(None);
1599 }
1600
1601 if let Some(venue_order_id) = cmd.venue_order_id
1602 && report.venue_order_id.as_str() != venue_order_id.as_str()
1603 {
1604 return Ok(None);
1605 }
1606
1607 if let Some(instrument_id) = cmd.instrument_id
1608 && report.instrument_id != instrument_id
1609 {
1610 return Ok(None);
1611 }
1612
1613 Ok(Some(report))
1614 }
1615
1616 async fn generate_order_status_reports(
1617 &self,
1618 cmd: &GenerateOrderStatusReports,
1619 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1620 let response = self
1622 .http_client
1623 .inner
1624 .get_orders(
1625 &self.wallet_address,
1626 self.subaccount_number,
1627 None, None, )
1630 .await
1631 .context("failed to fetch orders from dYdX API")?;
1632
1633 let mut reports = Vec::new();
1634 let ts_init = UnixNanos::default();
1635
1636 for order in response {
1637 let instrument = match self.get_instrument_by_clob_pair_id(order.clob_pair_id) {
1638 Some(inst) => inst,
1639 None => continue,
1640 };
1641
1642 if let Some(filter_id) = cmd.instrument_id
1643 && instrument.id() != filter_id
1644 {
1645 continue;
1646 }
1647
1648 let report = match crate::http::parse::parse_order_status_report(
1649 &order,
1650 &instrument,
1651 self.core.account_id,
1652 ts_init,
1653 ) {
1654 Ok(r) => r,
1655 Err(e) => {
1656 tracing::warn!("Failed to parse order status report: {e}");
1657 continue;
1658 }
1659 };
1660
1661 reports.push(report);
1662 }
1663
1664 if cmd.open_only {
1666 reports.retain(|r| r.order_status.is_open());
1667 }
1668
1669 if let Some(start) = cmd.start {
1671 reports.retain(|r| r.ts_last >= start);
1672 }
1673 if let Some(end) = cmd.end {
1674 reports.retain(|r| r.ts_last <= end);
1675 }
1676
1677 Ok(reports)
1678 }
1679
1680 async fn generate_fill_reports(
1681 &self,
1682 cmd: GenerateFillReports,
1683 ) -> anyhow::Result<Vec<FillReport>> {
1684 let response = self
1685 .http_client
1686 .inner
1687 .get_fills(
1688 &self.wallet_address,
1689 self.subaccount_number,
1690 None, None, )
1693 .await
1694 .context("failed to fetch fills from dYdX API")?;
1695
1696 let mut reports = Vec::new();
1697 let ts_init = UnixNanos::default();
1698
1699 for fill in response.fills {
1700 let instrument = match self.get_instrument_by_market(&fill.market) {
1701 Some(inst) => inst,
1702 None => {
1703 tracing::warn!("Unknown market in fill: {}", fill.market);
1704 continue;
1705 }
1706 };
1707
1708 if let Some(filter_id) = cmd.instrument_id
1709 && instrument.id() != filter_id
1710 {
1711 continue;
1712 }
1713
1714 let report = match crate::http::parse::parse_fill_report(
1715 &fill,
1716 &instrument,
1717 self.core.account_id,
1718 ts_init,
1719 ) {
1720 Ok(r) => r,
1721 Err(e) => {
1722 tracing::warn!("Failed to parse fill report: {e}");
1723 continue;
1724 }
1725 };
1726
1727 reports.push(report);
1728 }
1729
1730 if let Some(venue_order_id) = cmd.venue_order_id {
1731 reports.retain(|r| r.venue_order_id.as_str() == venue_order_id.as_str());
1732 }
1733
1734 Ok(reports)
1735 }
1736
1737 async fn generate_position_status_reports(
1738 &self,
1739 cmd: &GeneratePositionStatusReports,
1740 ) -> anyhow::Result<Vec<PositionStatusReport>> {
1741 let response = self
1743 .http_client
1744 .inner
1745 .get_subaccount(&self.wallet_address, self.subaccount_number)
1746 .await
1747 .context("failed to fetch subaccount from dYdX API")?;
1748
1749 let mut reports = Vec::new();
1750 let ts_init = UnixNanos::default();
1751
1752 for (market_ticker, perp_position) in &response.subaccount.open_perpetual_positions {
1753 let instrument = match self.get_instrument_by_market(market_ticker) {
1754 Some(inst) => inst,
1755 None => {
1756 tracing::warn!("Unknown market in position: {}", market_ticker);
1757 continue;
1758 }
1759 };
1760
1761 if let Some(filter_id) = cmd.instrument_id
1762 && instrument.id() != filter_id
1763 {
1764 continue;
1765 }
1766
1767 let report = match crate::http::parse::parse_position_status_report(
1768 perp_position,
1769 &instrument,
1770 self.core.account_id,
1771 ts_init,
1772 ) {
1773 Ok(r) => r,
1774 Err(e) => {
1775 tracing::warn!("Failed to parse position status report: {e}");
1776 continue;
1777 }
1778 };
1779
1780 reports.push(report);
1781 }
1782
1783 Ok(reports)
1784 }
1785
1786 async fn generate_mass_status(
1787 &self,
1788 lookback_mins: Option<u64>,
1789 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
1790 let ts_init = UnixNanos::default();
1791
1792 let orders_response = self
1794 .http_client
1795 .inner
1796 .get_orders(&self.wallet_address, self.subaccount_number, None, None)
1797 .await
1798 .context("failed to fetch orders for mass status")?;
1799
1800 let subaccount_response = self
1802 .http_client
1803 .inner
1804 .get_subaccount(&self.wallet_address, self.subaccount_number)
1805 .await
1806 .context("failed to fetch subaccount for mass status")?;
1807
1808 let fills_response = self
1810 .http_client
1811 .inner
1812 .get_fills(&self.wallet_address, self.subaccount_number, None, None)
1813 .await
1814 .context("failed to fetch fills for mass status")?;
1815
1816 let mut order_reports = Vec::new();
1818 let mut orders_filtered = 0usize;
1819 for order in orders_response {
1820 let instrument = match self.get_instrument_by_clob_pair_id(order.clob_pair_id) {
1821 Some(inst) => inst,
1822 None => {
1823 orders_filtered += 1;
1824 continue;
1825 }
1826 };
1827
1828 match crate::http::parse::parse_order_status_report(
1829 &order,
1830 &instrument,
1831 self.core.account_id,
1832 ts_init,
1833 ) {
1834 Ok(r) => order_reports.push(r),
1835 Err(e) => {
1836 tracing::warn!("Failed to parse order status report: {e}");
1837 orders_filtered += 1;
1838 }
1839 }
1840 }
1841
1842 let mut position_reports = Vec::new();
1844 for (market_ticker, perp_position) in
1845 &subaccount_response.subaccount.open_perpetual_positions
1846 {
1847 let instrument = match self.get_instrument_by_market(market_ticker) {
1848 Some(inst) => inst,
1849 None => continue,
1850 };
1851
1852 match crate::http::parse::parse_position_status_report(
1853 perp_position,
1854 &instrument,
1855 self.core.account_id,
1856 ts_init,
1857 ) {
1858 Ok(r) => position_reports.push(r),
1859 Err(e) => {
1860 tracing::warn!("Failed to parse position status report: {e}");
1861 }
1862 }
1863 }
1864
1865 let mut fill_reports = Vec::new();
1867 let mut fills_filtered = 0usize;
1868 for fill in fills_response.fills {
1869 let instrument = match self.get_instrument_by_market(&fill.market) {
1870 Some(inst) => inst,
1871 None => {
1872 fills_filtered += 1;
1873 continue;
1874 }
1875 };
1876
1877 match crate::http::parse::parse_fill_report(
1878 &fill,
1879 &instrument,
1880 self.core.account_id,
1881 ts_init,
1882 ) {
1883 Ok(r) => fill_reports.push(r),
1884 Err(e) => {
1885 tracing::warn!("Failed to parse fill report: {e}");
1886 fills_filtered += 1;
1887 }
1888 }
1889 }
1890
1891 if lookback_mins.is_some() {
1892 tracing::debug!(
1893 "lookback_mins={:?} filtering not yet implemented. Returning all: {} orders ({} filtered), {} positions, {} fills ({} filtered)",
1894 lookback_mins,
1895 order_reports.len(),
1896 orders_filtered,
1897 position_reports.len(),
1898 fill_reports.len(),
1899 fills_filtered
1900 );
1901 } else {
1902 tracing::info!(
1903 "Generated mass status: {} orders, {} positions, {} fills",
1904 order_reports.len(),
1905 position_reports.len(),
1906 fill_reports.len()
1907 );
1908 }
1909
1910 let mut mass_status = ExecutionMassStatus::new(
1912 self.core.client_id,
1913 self.core.account_id,
1914 self.core.venue,
1915 ts_init,
1916 None, );
1918
1919 mass_status.add_order_reports(order_reports);
1920 mass_status.add_position_reports(position_reports);
1921 mass_status.add_fill_reports(fill_reports);
1922
1923 Ok(Some(mass_status))
1924 }
1925}
1926
1927fn dispatch_account_state(state: AccountState) {
1932 use std::any::Any;
1933 msgbus::send_any("Portfolio.update_account".into(), &state as &dyn Any);
1934}
1935
1936fn dispatch_execution_report(report: ExecutionReport) {
1951 let sender = get_exec_event_sender();
1952 match report {
1953 ExecutionReport::Order(order_report) => {
1954 tracing::debug!(
1955 "Dispatching order report: status={:?}, venue_order_id={:?}, client_order_id={:?}",
1956 order_report.order_status,
1957 order_report.venue_order_id,
1958 order_report.client_order_id
1959 );
1960 let exec_report = NautilusExecutionReport::OrderStatus(order_report);
1961 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1962 tracing::warn!("Failed to send order status report: {e}");
1963 }
1964 }
1965 ExecutionReport::Fill(fill_report) => {
1966 tracing::debug!(
1967 "Dispatching fill report: venue_order_id={}, trade_id={}",
1968 fill_report.venue_order_id,
1969 fill_report.trade_id
1970 );
1971 let exec_report = NautilusExecutionReport::Fill(fill_report);
1972 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1973 tracing::warn!("Failed to send fill report: {e}");
1974 }
1975 }
1976 }
1977}