1use std::sync::{
42 Arc, Mutex,
43 atomic::{AtomicU32, AtomicU64, Ordering},
44};
45
46use anyhow::Context;
47use async_trait::async_trait;
48use dashmap::DashMap;
49use nautilus_common::{
50 clients::ExecutionClient,
51 live::{runner::get_exec_event_sender, runtime::get_runtime},
52 messages::{
53 ExecutionEvent, ExecutionReport as NautilusExecutionReport,
54 execution::{
55 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
56 GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
57 ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
58 },
59 },
60 msgbus,
61};
62use nautilus_core::{MUTEX_POISONED, UUID4, UnixNanos};
63use nautilus_live::ExecutionClientCore;
64use nautilus_model::{
65 accounts::AccountAny,
66 enums::{OmsType, OrderSide, OrderType, TimeInForce},
67 events::{AccountState, OrderCancelRejected, OrderEventAny, OrderRejected},
68 identifiers::{AccountId, ClientId, ClientOrderId, InstrumentId, StrategyId, Venue},
69 instruments::{Instrument, InstrumentAny},
70 orders::Order,
71 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
72 types::{AccountBalance, MarginBalance},
73};
74use nautilus_network::retry::RetryConfig;
75use rust_decimal::Decimal;
76use tokio::task::JoinHandle;
77
78use crate::{
79 common::{consts::DYDX_VENUE, credential::DydxCredential, parse::nanos_to_secs_i64},
80 config::DydxAdapterConfig,
81 execution::submitter::OrderSubmitter,
82 grpc::{DydxGrpcClient, Wallet, types::ChainId},
83 http::client::DydxHttpClient,
84 websocket::{client::DydxWebSocketClient, enums::NautilusWsMessage},
85};
86
87pub mod submitter;
88
89pub const MAX_CLIENT_ID: u32 = u32::MAX;
95
96enum ExecutionReport {
101 Order(Box<OrderStatusReport>),
102 Fill(Box<FillReport>),
103}
104
105#[derive(Debug)]
120pub struct DydxExecutionClient {
121 core: ExecutionClientCore,
122 config: DydxAdapterConfig,
123 http_client: DydxHttpClient,
124 ws_client: DydxWebSocketClient,
125 grpc_client: Arc<tokio::sync::RwLock<DydxGrpcClient>>,
126 wallet: Arc<tokio::sync::RwLock<Option<Wallet>>>,
127 instruments: DashMap<InstrumentId, InstrumentAny>,
128 market_to_instrument: DashMap<String, InstrumentId>,
129 clob_pair_id_to_instrument: DashMap<u32, InstrumentId>,
130 block_height: Arc<AtomicU64>,
131 oracle_prices: Arc<DashMap<InstrumentId, Decimal>>,
132 client_id_to_int: DashMap<String, u32>,
133 int_to_client_id: DashMap<u32, String>,
134 next_client_id: AtomicU32,
135 wallet_address: String,
136 subaccount_number: u32,
137 started: bool,
138 connected: bool,
139 instruments_initialized: bool,
140 ws_stream_handle: Option<JoinHandle<()>>,
141 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
142}
143
144impl DydxExecutionClient {
145 pub fn new(
151 core: ExecutionClientCore,
152 config: DydxAdapterConfig,
153 wallet_address: String,
154 subaccount_number: u32,
155 ) -> anyhow::Result<Self> {
156 let retry_config = RetryConfig {
158 max_retries: config.max_retries,
159 initial_delay_ms: config.retry_delay_initial_ms,
160 max_delay_ms: config.retry_delay_max_ms,
161 ..Default::default()
162 };
163 let http_client = DydxHttpClient::new(
164 Some(config.base_url.clone()),
165 Some(config.timeout_secs),
166 None, config.is_testnet,
168 Some(retry_config),
169 )?;
170
171 let ws_client = if let Some(ref mnemonic) = config.mnemonic {
173 let credential = DydxCredential::from_mnemonic(
174 mnemonic,
175 subaccount_number,
176 config.authenticator_ids.clone(),
177 )?;
178 DydxWebSocketClient::new_private(
179 config.ws_url.clone(),
180 credential,
181 core.account_id,
182 Some(20),
183 )
184 } else {
185 DydxWebSocketClient::new_public(config.ws_url.clone(), Some(20))
186 };
187
188 let grpc_urls = config.get_grpc_urls();
189 let grpc_client = Arc::new(tokio::sync::RwLock::new(
190 get_runtime()
191 .block_on(async { DydxGrpcClient::new_with_fallback(&grpc_urls).await })
192 .context("failed to construct dYdX gRPC client")?,
193 ));
194
195 Ok(Self {
196 core,
197 config,
198 http_client,
199 ws_client,
200 grpc_client,
201 wallet: Arc::new(tokio::sync::RwLock::new(None)),
202 instruments: DashMap::new(),
203 market_to_instrument: DashMap::new(),
204 clob_pair_id_to_instrument: DashMap::new(),
205 block_height: Arc::new(AtomicU64::new(0)),
206 oracle_prices: Arc::new(DashMap::new()),
207 client_id_to_int: DashMap::new(),
208 int_to_client_id: DashMap::new(),
209 next_client_id: AtomicU32::new(1),
210 wallet_address,
211 subaccount_number,
212 started: false,
213 connected: false,
214 instruments_initialized: false,
215 ws_stream_handle: None,
216 pending_tasks: Mutex::new(Vec::new()),
217 })
218 }
219
220 fn generate_client_order_id_int(&self, client_order_id: &str) -> u32 {
240 use dashmap::mapref::entry::Entry;
241
242 if let Some(existing) = self.client_id_to_int.get(client_order_id) {
244 return *existing.value();
245 }
246
247 if let Ok(id) = client_order_id.parse::<u32>() {
249 self.client_id_to_int
250 .insert(client_order_id.to_string(), id);
251 self.int_to_client_id
252 .insert(id, client_order_id.to_string());
253 return id;
254 }
255
256 match self.client_id_to_int.entry(client_order_id.to_string()) {
258 Entry::Occupied(entry) => *entry.get(),
259 Entry::Vacant(vacant) => {
260 let id = self
261 .next_client_id
262 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
263 vacant.insert(id);
264 self.int_to_client_id
265 .insert(id, client_order_id.to_string());
266 id
267 }
268 }
269 }
270
271 fn get_client_order_id_int(&self, client_order_id: &str) -> Option<u32> {
275 if let Ok(id) = client_order_id.parse::<u32>() {
277 return Some(id);
278 }
279
280 self.client_id_to_int
282 .get(client_order_id)
283 .map(|entry| *entry.value())
284 }
285
286 fn get_chain_id(&self) -> ChainId {
290 self.config.get_chain_id()
291 }
292
293 fn cache_instruments_from_http(&mut self) {
300 let instruments: Vec<InstrumentAny> = self
302 .http_client
303 .instruments_cache
304 .iter()
305 .map(|entry| entry.value().clone())
306 .collect();
307
308 log::debug!(
309 "Caching {} instruments in execution client",
310 instruments.len()
311 );
312
313 for instrument in instruments {
314 let instrument_id = instrument.id();
315 let symbol = instrument_id.symbol.as_str();
316
317 self.instruments.insert(instrument_id, instrument.clone());
319
320 self.market_to_instrument
322 .insert(symbol.to_string(), instrument_id);
323 }
324
325 let http_mapping = self.http_client.clob_pair_id_mapping();
328 for entry in http_mapping.iter() {
329 self.clob_pair_id_to_instrument
330 .insert(*entry.key(), *entry.value());
331 }
332
333 self.instruments_initialized = true;
334 log::info!(
335 "Cached {} instruments ({} CLOB pair IDs) with market mappings",
336 self.instruments.len(),
337 self.clob_pair_id_to_instrument.len()
338 );
339 }
340
341 fn get_instrument_by_market(&self, market: &str) -> Option<InstrumentAny> {
343 self.market_to_instrument
344 .get(market)
345 .and_then(|id| self.instruments.get(&id).map(|entry| entry.value().clone()))
346 }
347
348 fn get_instrument_by_clob_pair_id(&self, clob_pair_id: u32) -> Option<InstrumentAny> {
350 let instrument = self
351 .clob_pair_id_to_instrument
352 .get(&clob_pair_id)
353 .and_then(|id| self.instruments.get(&id).map(|entry| entry.value().clone()));
354
355 if instrument.is_none() {
356 self.log_missing_instrument_for_clob_pair_id(clob_pair_id);
357 }
358
359 instrument
360 }
361
362 fn log_missing_instrument_for_clob_pair_id(&self, clob_pair_id: u32) {
363 let known: Vec<(u32, String)> = self
364 .clob_pair_id_to_instrument
365 .iter()
366 .filter_map(|entry| {
367 let instrument_id = entry.value();
368 self.instruments.get(instrument_id).map(|inst_entry| {
369 (
370 *entry.key(),
371 inst_entry.value().id().symbol.as_str().to_string(),
372 )
373 })
374 })
375 .collect();
376
377 log::warn!(
378 "Instrument for clob_pair_id {clob_pair_id} not found in cache. Known CLOB pair IDs and symbols: {known:?}"
379 );
380 }
381
382 fn spawn_task<F>(&self, label: &'static str, fut: F)
383 where
384 F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
385 {
386 let handle = get_runtime().spawn(async move {
387 if let Err(e) = fut.await {
388 log::error!("{label}: {e:?}");
389 }
390 });
391
392 self.pending_tasks
393 .lock()
394 .expect(MUTEX_POISONED)
395 .push(handle);
396 }
397
398 fn spawn_order_task<F>(
402 &self,
403 label: &'static str,
404 strategy_id: StrategyId,
405 instrument_id: InstrumentId,
406 client_order_id: ClientOrderId,
407 fut: F,
408 ) where
409 F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
410 {
411 let trader_id = self.core.trader_id;
413 let account_id = self.core.account_id;
414 let sender = get_exec_event_sender();
415
416 let handle = get_runtime().spawn(async move {
417 if let Err(e) = fut.await {
418 let error_msg = format!("{label} failed: {e:?}");
419 log::error!("{error_msg}");
420
421 let ts_now = UnixNanos::default(); let event = OrderRejected::new(
423 trader_id,
424 strategy_id,
425 instrument_id,
426 client_order_id,
427 account_id,
428 error_msg.into(),
429 UUID4::new(),
430 ts_now,
431 ts_now,
432 false,
433 false,
434 );
435
436 if let Err(send_err) =
437 sender.send(ExecutionEvent::Order(OrderEventAny::Rejected(event)))
438 {
439 log::error!("Failed to send OrderRejected event: {send_err}");
440 }
441 }
442 });
443
444 self.pending_tasks
445 .lock()
446 .expect(MUTEX_POISONED)
447 .push(handle);
448 }
449
450 fn abort_pending_tasks(&self) {
451 let mut guard = self.pending_tasks.lock().expect(MUTEX_POISONED);
452 for handle in guard.drain(..) {
453 handle.abort();
454 }
455 }
456}
457
458#[async_trait(?Send)]
459impl ExecutionClient for DydxExecutionClient {
460 fn is_connected(&self) -> bool {
461 self.connected
462 }
463
464 fn client_id(&self) -> ClientId {
465 self.core.client_id
466 }
467
468 fn account_id(&self) -> AccountId {
469 self.core.account_id
470 }
471
472 fn venue(&self) -> Venue {
473 *DYDX_VENUE
474 }
475
476 fn oms_type(&self) -> OmsType {
477 self.core.oms_type
478 }
479
480 fn get_account(&self) -> Option<AccountAny> {
481 self.core.get_account()
482 }
483
484 fn generate_account_state(
485 &self,
486 balances: Vec<AccountBalance>,
487 margins: Vec<MarginBalance>,
488 reported: bool,
489 ts_event: UnixNanos,
490 ) -> anyhow::Result<()> {
491 self.core
492 .generate_account_state(balances, margins, reported, ts_event)
493 }
494
495 fn start(&mut self) -> anyhow::Result<()> {
496 if self.started {
497 log::warn!("dYdX execution client already started");
498 return Ok(());
499 }
500
501 log::info!("Starting dYdX execution client");
502 self.started = true;
503 Ok(())
504 }
505
506 fn stop(&mut self) -> anyhow::Result<()> {
507 if !self.started {
508 log::warn!("dYdX execution client not started");
509 return Ok(());
510 }
511
512 log::info!("Stopping dYdX execution client");
513 self.abort_pending_tasks();
514 self.started = false;
515 self.connected = false;
516 Ok(())
517 }
518
519 fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
536 let order = cmd.order.clone();
537
538 if !self.is_connected() {
540 let reason = "Cannot submit order: execution client not connected";
541 log::error!("{reason}");
542 anyhow::bail!(reason);
543 }
544
545 let current_block = self.block_height.load(Ordering::Relaxed);
547 if current_block == 0 {
548 let reason = "Block height not initialized";
549 log::warn!(
550 "Cannot submit order {}: {}",
551 order.client_order_id(),
552 reason
553 );
554 self.core.generate_order_rejected(
555 order.strategy_id(),
556 order.instrument_id(),
557 order.client_order_id(),
558 reason,
559 cmd.ts_init,
560 false,
561 );
562 return Ok(());
563 }
564
565 if order.is_closed() {
567 log::warn!("Cannot submit closed order {}", order.client_order_id());
568 return Ok(());
569 }
570
571 match order.order_type() {
573 OrderType::Market | OrderType::Limit => {
574 log::debug!(
575 "Submitting {} order: {}",
576 if matches!(order.order_type(), OrderType::Market) {
577 "MARKET"
578 } else {
579 "LIMIT"
580 },
581 order.client_order_id()
582 );
583 }
584 OrderType::StopMarket | OrderType::StopLimit => {
586 log::debug!(
587 "Submitting {} order: {}",
588 if matches!(order.order_type(), OrderType::StopMarket) {
589 "STOP_MARKET"
590 } else {
591 "STOP_LIMIT"
592 },
593 order.client_order_id()
594 );
595 }
596 OrderType::MarketIfTouched | OrderType::LimitIfTouched => {
598 log::debug!(
599 "Submitting {} order: {}",
600 if matches!(order.order_type(), OrderType::MarketIfTouched) {
601 "TAKE_PROFIT_MARKET"
602 } else {
603 "TAKE_PROFIT_LIMIT"
604 },
605 order.client_order_id()
606 );
607 }
608 OrderType::TrailingStopMarket | OrderType::TrailingStopLimit => {
610 let reason = "Trailing stop orders not supported by dYdX v4 protocol";
611 log::error!("{reason}");
612 self.core.generate_order_rejected(
613 order.strategy_id(),
614 order.instrument_id(),
615 order.client_order_id(),
616 reason,
617 cmd.ts_init,
618 false,
619 );
620 return Ok(());
621 }
622 order_type => {
623 let reason = format!("Order type {order_type:?} not supported by dYdX");
624 log::error!("{reason}");
625 self.core.generate_order_rejected(
626 order.strategy_id(),
627 order.instrument_id(),
628 order.client_order_id(),
629 &reason,
630 cmd.ts_init,
631 false,
632 );
633 return Ok(());
634 }
635 }
636
637 self.core.generate_order_submitted(
639 order.strategy_id(),
640 order.instrument_id(),
641 order.client_order_id(),
642 cmd.ts_init,
643 );
644
645 let grpc_client = self.grpc_client.clone();
646 let wallet = self.wallet.clone();
647 let http_client = self.http_client.clone();
648 let wallet_address = self.wallet_address.clone();
649 let subaccount_number = self.subaccount_number;
650 let client_order_id = order.client_order_id();
651 let instrument_id = order.instrument_id();
652 let block_height = self.block_height.load(std::sync::atomic::Ordering::Relaxed) as u32;
653 let chain_id = self.get_chain_id();
654 let authenticator_ids = self.config.authenticator_ids.clone();
655 #[allow(clippy::redundant_clone)]
656 let order_clone = order.clone();
657
658 let client_id_u32 = self.generate_client_order_id_int(client_order_id.as_str());
660
661 self.spawn_order_task(
662 "submit_order",
663 order.strategy_id(),
664 order.instrument_id(),
665 order.client_order_id(),
666 async move {
667 let wallet_guard = wallet.read().await;
668 let wallet_ref = wallet_guard
669 .as_ref()
670 .ok_or_else(|| anyhow::anyhow!("Wallet not initialized"))?;
671
672 let grpc_guard = grpc_client.read().await;
673 let submitter = OrderSubmitter::new(
674 (*grpc_guard).clone(),
675 http_client.clone(),
676 wallet_address,
677 subaccount_number,
678 chain_id,
679 authenticator_ids,
680 );
681
682 match order_clone.order_type() {
684 OrderType::Market => {
685 submitter
686 .submit_market_order(
687 wallet_ref,
688 instrument_id,
689 client_id_u32,
690 order_clone.order_side(),
691 order_clone.quantity(),
692 block_height,
693 )
694 .await?;
695 log::info!("Successfully submitted market order: {client_order_id}");
696 }
697 OrderType::Limit => {
698 let expire_time = order_clone.expire_time().map(nanos_to_secs_i64);
699 submitter
700 .submit_limit_order(
701 wallet_ref,
702 instrument_id,
703 client_id_u32,
704 order_clone.order_side(),
705 order_clone
706 .price()
707 .ok_or_else(|| anyhow::anyhow!("Limit order missing price"))?,
708 order_clone.quantity(),
709 order_clone.time_in_force(),
710 order_clone.is_post_only(),
711 order_clone.is_reduce_only(),
712 block_height,
713 expire_time,
714 )
715 .await?;
716 log::info!("Successfully submitted limit order: {client_order_id}");
717 }
718 OrderType::StopMarket => {
719 let trigger_price = order_clone.trigger_price().ok_or_else(|| {
720 anyhow::anyhow!("Stop market order missing trigger_price")
721 })?;
722 let expire_time = order_clone.expire_time().map(nanos_to_secs_i64);
723 submitter
724 .submit_stop_market_order(
725 wallet_ref,
726 instrument_id,
727 client_id_u32,
728 order_clone.order_side(),
729 trigger_price,
730 order_clone.quantity(),
731 order_clone.is_reduce_only(),
732 expire_time,
733 )
734 .await?;
735 log::info!("Successfully submitted stop market order: {client_order_id}");
736 }
737 OrderType::StopLimit => {
738 let trigger_price = order_clone.trigger_price().ok_or_else(|| {
739 anyhow::anyhow!("Stop limit order missing trigger_price")
740 })?;
741 let limit_price = order_clone.price().ok_or_else(|| {
742 anyhow::anyhow!("Stop limit order missing limit price")
743 })?;
744 let expire_time = order_clone.expire_time().map(nanos_to_secs_i64);
745 submitter
746 .submit_stop_limit_order(
747 wallet_ref,
748 instrument_id,
749 client_id_u32,
750 order_clone.order_side(),
751 trigger_price,
752 limit_price,
753 order_clone.quantity(),
754 order_clone.time_in_force(),
755 order_clone.is_post_only(),
756 order_clone.is_reduce_only(),
757 expire_time,
758 )
759 .await?;
760 log::info!("Successfully submitted stop limit order: {client_order_id}");
761 }
762 OrderType::MarketIfTouched => {
764 let trigger_price = order_clone.trigger_price().ok_or_else(|| {
765 anyhow::anyhow!("Take profit market order missing trigger_price")
766 })?;
767 let expire_time = order_clone.expire_time().map(nanos_to_secs_i64);
768 submitter
769 .submit_take_profit_market_order(
770 wallet_ref,
771 instrument_id,
772 client_id_u32,
773 order_clone.order_side(),
774 trigger_price,
775 order_clone.quantity(),
776 order_clone.is_reduce_only(),
777 expire_time,
778 )
779 .await?;
780 log::info!(
781 "Successfully submitted take profit market order: {client_order_id}"
782 );
783 }
784 OrderType::LimitIfTouched => {
786 let trigger_price = order_clone.trigger_price().ok_or_else(|| {
787 anyhow::anyhow!("Take profit limit order missing trigger_price")
788 })?;
789 let limit_price = order_clone.price().ok_or_else(|| {
790 anyhow::anyhow!("Take profit limit order missing limit price")
791 })?;
792 let expire_time = order_clone.expire_time().map(nanos_to_secs_i64);
793 submitter
794 .submit_take_profit_limit_order(
795 wallet_ref,
796 instrument_id,
797 client_id_u32,
798 order_clone.order_side(),
799 trigger_price,
800 limit_price,
801 order_clone.quantity(),
802 order_clone.time_in_force(),
803 order_clone.is_post_only(),
804 order_clone.is_reduce_only(),
805 expire_time,
806 )
807 .await?;
808 log::info!(
809 "Successfully submitted take profit limit order: {client_order_id}"
810 );
811 }
812 _ => unreachable!("Order type already validated"),
813 }
814
815 Ok(())
816 },
817 );
818
819 Ok(())
820 }
821
822 fn submit_order_list(&self, _cmd: &SubmitOrderList) -> anyhow::Result<()> {
823 anyhow::bail!("Order lists not supported by dYdX")
824 }
825
826 fn modify_order(&self, _cmd: &ModifyOrder) -> anyhow::Result<()> {
827 anyhow::bail!("Order modification not supported by dYdX")
828 }
829
830 fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
847 if !self.is_connected() {
848 anyhow::bail!("Cannot cancel order: not connected");
849 }
850
851 let client_order_id = cmd.client_order_id;
852
853 let cache = self.core.cache();
855 let cache_borrow = cache.borrow();
856
857 let order = match cache_borrow.order(&client_order_id) {
858 Some(order) => order,
859 None => {
860 log::error!("Cannot cancel order {client_order_id}: not found in cache");
861 return Ok(()); }
863 };
864
865 if order.is_closed() {
867 log::warn!(
868 "CancelOrder command for {} when order already {} (will not send to exchange)",
869 client_order_id,
870 order.status()
871 );
872 return Ok(());
873 }
874
875 let instrument_id = cmd.instrument_id;
877 let instrument = match cache_borrow.instrument(&instrument_id) {
878 Some(instrument) => instrument,
879 None => {
880 log::error!(
881 "Cannot cancel order {client_order_id}: instrument {instrument_id} not found in cache"
882 );
883 return Ok(()); }
885 };
886
887 log::debug!(
888 "Cancelling order {} for instrument {}",
889 client_order_id,
890 instrument.id()
891 );
892
893 let grpc_client = self.grpc_client.clone();
894 let wallet = self.wallet.clone();
895 let http_client = self.http_client.clone();
896 let wallet_address = self.wallet_address.clone();
897 let subaccount_number = self.subaccount_number;
898 let block_height = self.block_height.load(std::sync::atomic::Ordering::Relaxed) as u32;
899 let chain_id = self.get_chain_id();
900 let authenticator_ids = self.config.authenticator_ids.clone();
901 let trader_id = cmd.trader_id;
902 let strategy_id = cmd.strategy_id;
903 let venue_order_id = cmd.venue_order_id;
904
905 let client_id_u32 = match self.get_client_order_id_int(client_order_id.as_str()) {
907 Some(id) => id,
908 None => {
909 log::error!("Client order ID {client_order_id} not found in cache");
910 anyhow::bail!("Client order ID not found in cache")
911 }
912 };
913
914 self.spawn_task("cancel_order", async move {
915 let wallet_guard = wallet.read().await;
916 let wallet_ref = wallet_guard
917 .as_ref()
918 .ok_or_else(|| anyhow::anyhow!("Wallet not initialized"))?;
919
920 let grpc_guard = grpc_client.read().await;
921 let submitter = OrderSubmitter::new(
922 (*grpc_guard).clone(),
923 http_client.clone(),
924 wallet_address,
925 subaccount_number,
926 chain_id,
927 authenticator_ids,
928 );
929
930 match submitter
932 .cancel_order(wallet_ref, instrument_id, client_id_u32, block_height)
933 .await
934 {
935 Ok(()) => {
936 log::info!("Successfully cancelled order: {client_order_id}");
937 }
938 Err(e) => {
939 log::error!("Failed to cancel order {client_order_id}: {e:?}");
940
941 let sender = get_exec_event_sender();
942 let ts_now = UnixNanos::default();
943 let event = OrderCancelRejected::new(
944 trader_id,
945 strategy_id,
946 instrument_id,
947 client_order_id,
948 format!("Cancel order failed: {e:?}").into(),
949 UUID4::new(),
950 ts_now,
951 ts_now,
952 false,
953 venue_order_id,
954 None, );
956 sender
957 .send(ExecutionEvent::Order(OrderEventAny::CancelRejected(event)))
958 .unwrap();
959 }
960 }
961
962 Ok(())
963 });
964
965 Ok(())
966 }
967
968 fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
969 if !self.is_connected() {
970 anyhow::bail!("Cannot cancel orders: not connected");
971 }
972
973 let cache = self.core.cache().borrow();
975 let mut open_orders: Vec<_> = cache
976 .orders_open(None, None, None, None)
977 .into_iter()
978 .collect();
979
980 let instrument_id = cmd.instrument_id;
981 open_orders.retain(|order| order.instrument_id() == instrument_id);
982
983 if cmd.order_side != OrderSide::NoOrderSide {
985 let order_side = cmd.order_side;
986 open_orders.retain(|order| order.order_side() == order_side);
987 }
988
989 let mut short_term_orders = Vec::new();
993 let mut long_term_orders = Vec::new();
994
995 for order in &open_orders {
996 match order.time_in_force() {
997 TimeInForce::Ioc | TimeInForce::Fok => short_term_orders.push(order),
998 TimeInForce::Gtc
999 | TimeInForce::Gtd
1000 | TimeInForce::Day
1001 | TimeInForce::AtTheOpen
1002 | TimeInForce::AtTheClose => long_term_orders.push(order),
1003 }
1004 }
1005
1006 log::info!(
1007 "Cancel all orders: total={}, short_term={}, long_term={}, instrument_id={}, order_side={:?}",
1008 open_orders.len(),
1009 short_term_orders.len(),
1010 long_term_orders.len(),
1011 instrument_id,
1012 cmd.order_side
1013 );
1014
1015 let grpc_client = self.grpc_client.clone();
1017 let wallet = self.wallet.clone();
1018 let http_client = self.http_client.clone();
1019 let wallet_address = self.wallet_address.clone();
1020 let subaccount_number = self.subaccount_number;
1021 let block_height = self.block_height.load(std::sync::atomic::Ordering::Relaxed) as u32;
1022 let chain_id = self.get_chain_id();
1023 let authenticator_ids = self.config.authenticator_ids.clone();
1024
1025 let mut orders_to_cancel = Vec::new();
1027 for order in &open_orders {
1028 let client_order_id = order.client_order_id();
1029 if let Some(client_id_u32) = self.get_client_order_id_int(client_order_id.as_str()) {
1030 orders_to_cancel.push((instrument_id, client_id_u32));
1031 } else {
1032 log::warn!(
1033 "Cannot cancel order {client_order_id}: client_order_id not found in cache"
1034 );
1035 }
1036 }
1037
1038 self.spawn_task("cancel_all_orders", async move {
1039 let wallet_guard = wallet.read().await;
1040 let wallet_ref = wallet_guard
1041 .as_ref()
1042 .ok_or_else(|| anyhow::anyhow!("Wallet not initialized"))?;
1043
1044 let grpc_guard = grpc_client.read().await;
1045 let submitter = OrderSubmitter::new(
1046 (*grpc_guard).clone(),
1047 http_client.clone(),
1048 wallet_address,
1049 subaccount_number,
1050 chain_id,
1051 authenticator_ids,
1052 );
1053
1054 match submitter
1056 .cancel_orders_batch(wallet_ref, &orders_to_cancel, block_height)
1057 .await
1058 {
1059 Ok(()) => {
1060 log::info!("Successfully cancelled {} orders", orders_to_cancel.len());
1061 }
1062 Err(e) => {
1063 log::error!("Batch cancel failed: {e:?}");
1064 }
1065 }
1066
1067 Ok(())
1068 });
1069
1070 Ok(())
1071 }
1072
1073 fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
1074 if cmd.cancels.is_empty() {
1075 return Ok(());
1076 }
1077
1078 if !self.is_connected() {
1079 anyhow::bail!("Cannot cancel orders: not connected");
1080 }
1081
1082 let mut orders_to_cancel = Vec::with_capacity(cmd.cancels.len());
1084 for cancel in &cmd.cancels {
1085 let client_id_str = cancel.client_order_id.as_str();
1086 let client_id_u32 = match self.get_client_order_id_int(client_id_str) {
1087 Some(id) => id,
1088 None => {
1089 log::warn!(
1090 "No u32 mapping found for client_order_id={client_id_str}, skipping cancel"
1091 );
1092 continue;
1093 }
1094 };
1095 orders_to_cancel.push((cancel.instrument_id, client_id_u32));
1096 }
1097
1098 if orders_to_cancel.is_empty() {
1099 log::warn!("No valid orders to cancel in batch");
1100 return Ok(());
1101 }
1102
1103 let grpc_client = self.grpc_client.clone();
1104 let wallet = self.wallet.clone();
1105 let http_client = self.http_client.clone();
1106 let wallet_address = self.wallet_address.clone();
1107 let subaccount_number = self.subaccount_number;
1108 let block_height = self.block_height.load(std::sync::atomic::Ordering::Relaxed) as u32;
1109 let chain_id = self.get_chain_id();
1110 let authenticator_ids = self.config.authenticator_ids.clone();
1111
1112 log::info!(
1113 "Batch cancelling {} orders: {:?}",
1114 orders_to_cancel.len(),
1115 orders_to_cancel
1116 );
1117
1118 self.spawn_task("batch_cancel_orders", async move {
1119 let wallet_guard = wallet.read().await;
1120 let wallet_ref = wallet_guard
1121 .as_ref()
1122 .ok_or_else(|| anyhow::anyhow!("Wallet not initialized"))?;
1123
1124 let grpc_guard = grpc_client.read().await;
1125 let submitter = OrderSubmitter::new(
1126 (*grpc_guard).clone(),
1127 http_client.clone(),
1128 wallet_address,
1129 subaccount_number,
1130 chain_id,
1131 authenticator_ids,
1132 );
1133
1134 match submitter
1135 .cancel_orders_batch(wallet_ref, &orders_to_cancel, block_height)
1136 .await
1137 {
1138 Ok(()) => {
1139 log::info!(
1140 "Successfully batch cancelled {} orders",
1141 orders_to_cancel.len()
1142 );
1143 }
1144 Err(e) => {
1145 log::error!("Batch cancel failed: {e:?}");
1146 }
1147 }
1148
1149 Ok(())
1150 });
1151
1152 Ok(())
1153 }
1154
1155 fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
1156 Ok(())
1157 }
1158
1159 fn query_order(&self, _cmd: &QueryOrder) -> anyhow::Result<()> {
1160 Ok(())
1161 }
1162
1163 async fn connect(&mut self) -> anyhow::Result<()> {
1164 if self.connected {
1165 log::warn!("dYdX execution client already connected");
1166 return Ok(());
1167 }
1168
1169 log::info!("Connecting to dYdX");
1170
1171 log::debug!("Loading instruments from HTTP API");
1174 self.http_client.fetch_and_cache_instruments().await?;
1175 log::info!(
1176 "Loaded {} instruments from HTTP",
1177 self.http_client.instruments_cache.len()
1178 );
1179
1180 self.cache_instruments_from_http();
1182
1183 if let Some(mnemonic) = &self.config.mnemonic {
1185 let wallet = Wallet::from_mnemonic(mnemonic)?;
1186 *self.wallet.write().await = Some(wallet);
1187 log::debug!("Wallet initialized");
1188 }
1189
1190 self.ws_client.connect().await?;
1192 log::debug!("WebSocket connected");
1193
1194 self.ws_client.subscribe_block_height().await?;
1196 log::debug!("Subscribed to block height updates");
1197
1198 self.ws_client.subscribe_markets().await?;
1200 log::debug!("Subscribed to markets");
1201
1202 if self.config.mnemonic.is_some() {
1204 self.ws_client
1205 .subscribe_subaccount(&self.wallet_address, self.subaccount_number)
1206 .await?;
1207 log::debug!(
1208 "Subscribed to subaccount updates: {}/{}",
1209 self.wallet_address,
1210 self.subaccount_number
1211 );
1212
1213 if let Some(mut rx) = self.ws_client.take_receiver() {
1216 let account_id = self.core.account_id;
1218 let instruments = self.instruments.clone();
1219 let oracle_prices = self.oracle_prices.clone();
1220 let clob_pair_id_to_instrument = self.clob_pair_id_to_instrument.clone();
1221 let block_height = self.block_height.clone();
1222
1223 let handle = get_runtime().spawn(async move {
1224 while let Some(msg) = rx.recv().await {
1225 match msg {
1226 NautilusWsMessage::Order(report) => {
1227 log::debug!("Received order update: {:?}", report.order_status);
1228 dispatch_execution_report(ExecutionReport::Order(report));
1229 }
1230 NautilusWsMessage::Fill(report) => {
1231 log::debug!("Received fill update");
1232 dispatch_execution_report(ExecutionReport::Fill(report));
1233 }
1234 NautilusWsMessage::Position(report) => {
1235 log::debug!("Received position update");
1236 let sender = get_exec_event_sender();
1238 let exec_report =
1239 NautilusExecutionReport::Position(Box::new(*report));
1240 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1241 log::warn!("Failed to send position status report: {e}");
1242 }
1243 }
1244 NautilusWsMessage::AccountState(state) => {
1245 log::debug!("Received account state update");
1246 dispatch_account_state(*state);
1247 }
1248 NautilusWsMessage::SubaccountSubscribed(msg) => {
1249 log::debug!(
1250 "Parsing subaccount subscription with full context"
1251 );
1252
1253 let inst_map: std::collections::HashMap<_, _> = instruments
1255 .iter()
1256 .map(|entry| (*entry.key(), entry.value().clone()))
1257 .collect();
1258
1259 let oracle_map: std::collections::HashMap<_, _> = oracle_prices
1261 .iter()
1262 .map(|entry| (*entry.key(), *entry.value()))
1263 .collect();
1264
1265 let ts_init =
1266 nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
1267 let ts_event = ts_init;
1268
1269 match crate::http::parse::parse_account_state(
1270 &msg.contents.subaccount,
1271 account_id,
1272 &inst_map,
1273 &oracle_map,
1274 ts_event,
1275 ts_init,
1276 ) {
1277 Ok(account_state) => {
1278 log::info!(
1279 "Parsed account state: {} balance(s), {} margin(s)",
1280 account_state.balances.len(),
1281 account_state.margins.len()
1282 );
1283 dispatch_account_state(account_state);
1284 }
1285 Err(e) => {
1286 log::error!("Failed to parse account state: {e}");
1287 }
1288 }
1289
1290 if let Some(ref positions) =
1292 msg.contents.subaccount.open_perpetual_positions
1293 {
1294 log::debug!(
1295 "Parsing {} position(s) from subscription",
1296 positions.len()
1297 );
1298
1299 for (market, ws_position) in positions {
1300 match crate::websocket::parse::parse_ws_position_report(
1301 ws_position,
1302 &instruments,
1303 account_id,
1304 ts_init,
1305 ) {
1306 Ok(report) => {
1307 log::debug!(
1308 "Parsed position report: {} {} {} {}",
1309 report.instrument_id,
1310 report.position_side,
1311 report.quantity,
1312 market
1313 );
1314 let sender = get_exec_event_sender();
1315 let exec_report = NautilusExecutionReport::Position(
1316 Box::new(report),
1317 );
1318 if let Err(e) =
1319 sender.send(ExecutionEvent::Report(exec_report))
1320 {
1321 log::warn!(
1322 "Failed to send position status report: {e}"
1323 );
1324 }
1325 }
1326 Err(e) => {
1327 log::error!(
1328 "Failed to parse WebSocket position for {market}: {e}"
1329 );
1330 }
1331 }
1332 }
1333 }
1334 }
1335 NautilusWsMessage::SubaccountsChannelData(data) => {
1336 log::debug!("Processing subaccounts channel data");
1337 let ts_init =
1338 nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
1339
1340 if let Some(ref orders) = data.contents.orders {
1342 for ws_order in orders {
1343 match crate::websocket::parse::parse_ws_order_report(
1344 ws_order,
1345 &clob_pair_id_to_instrument,
1346 &instruments,
1347 account_id,
1348 ts_init,
1349 ) {
1350 Ok(report) => {
1351 log::debug!(
1352 "Parsed order report: {} {} {} @ {}",
1353 report.instrument_id,
1354 report.order_side,
1355 report.order_status,
1356 report.quantity
1357 );
1358 let sender = get_exec_event_sender();
1359 let exec_report =
1360 NautilusExecutionReport::Order(Box::new(
1361 report,
1362 ));
1363 if let Err(e) =
1364 sender.send(ExecutionEvent::Report(exec_report))
1365 {
1366 log::warn!(
1367 "Failed to send order status report: {e}"
1368 );
1369 }
1370 }
1371 Err(e) => {
1372 log::error!(
1373 "Failed to parse WebSocket order: {e}"
1374 );
1375 }
1376 }
1377 }
1378 }
1379
1380 if let Some(ref fills) = data.contents.fills {
1382 for ws_fill in fills {
1383 match crate::websocket::parse::parse_ws_fill_report(
1384 ws_fill,
1385 &instruments,
1386 account_id,
1387 ts_init,
1388 ) {
1389 Ok(report) => {
1390 log::debug!(
1391 "Parsed fill report: {} {} {} @ {}",
1392 report.instrument_id,
1393 report.venue_order_id,
1394 report.last_qty,
1395 report.last_px
1396 );
1397 let sender = get_exec_event_sender();
1398 let exec_report =
1399 NautilusExecutionReport::Fill(Box::new(report));
1400 if let Err(e) =
1401 sender.send(ExecutionEvent::Report(exec_report))
1402 {
1403 log::warn!(
1404 "Failed to send fill report: {e}"
1405 );
1406 }
1407 }
1408 Err(e) => {
1409 log::error!(
1410 "Failed to parse WebSocket fill: {e}"
1411 );
1412 }
1413 }
1414 }
1415 }
1416 }
1417 NautilusWsMessage::OraclePrices(oracle_prices_map) => {
1418 log::debug!(
1419 "Processing oracle price updates for {} markets",
1420 oracle_prices_map.len()
1421 );
1422
1423 for (market_symbol, oracle_data) in &oracle_prices_map {
1425 match oracle_data.oracle_price.parse::<Decimal>()
1427 {
1428 Ok(price) => {
1429 let symbol_with_perp = format!("{market_symbol}-PERP");
1432
1433 if let Some(entry) = instruments.iter().find(|entry| {
1435 entry.value().id().symbol.as_str()
1436 == symbol_with_perp
1437 }) {
1438 let instrument_id = *entry.key();
1439 oracle_prices.insert(instrument_id, price);
1440 log::trace!(
1441 "Updated oracle price for {instrument_id}: {price}"
1442 );
1443 } else {
1444 log::debug!(
1445 "No instrument found for market symbol '{market_symbol}' (tried '{symbol_with_perp}')"
1446 );
1447 }
1448 }
1449 Err(e) => {
1450 log::warn!(
1451 "Failed to parse oracle price for {market_symbol}: {e}"
1452 );
1453 }
1454 }
1455 }
1456 }
1457 NautilusWsMessage::BlockHeight(height) => {
1458 log::debug!("Block height update: {height}");
1459 block_height.store(height, std::sync::atomic::Ordering::Relaxed);
1460 }
1461 NautilusWsMessage::Error(err) => {
1462 log::error!("WebSocket error: {err:?}");
1463 }
1464 NautilusWsMessage::Reconnected => {
1465 log::info!("WebSocket reconnected");
1466 }
1467 _ => {
1468 }
1470 }
1471 }
1472 log::info!("WebSocket message processing task ended");
1473 });
1474
1475 self.ws_stream_handle = Some(handle);
1476 log::debug!("Spawned WebSocket message processing task");
1477 }
1478 }
1479
1480 self.connected = true;
1481 log::info!("Connected: client_id={}", self.core.client_id);
1482 Ok(())
1483 }
1484
1485 async fn disconnect(&mut self) -> anyhow::Result<()> {
1486 if !self.connected {
1487 log::warn!("dYdX execution client not connected");
1488 return Ok(());
1489 }
1490
1491 log::info!("Disconnecting from dYdX");
1492
1493 if self.config.mnemonic.is_some() {
1495 let _ = self
1496 .ws_client
1497 .unsubscribe_subaccount(&self.wallet_address, self.subaccount_number)
1498 .await
1499 .map_err(|e| log::warn!("Failed to unsubscribe from subaccount: {e}"));
1500 }
1501
1502 let _ = self
1504 .ws_client
1505 .unsubscribe_markets()
1506 .await
1507 .map_err(|e| log::warn!("Failed to unsubscribe from markets: {e}"));
1508
1509 let _ = self
1511 .ws_client
1512 .unsubscribe_block_height()
1513 .await
1514 .map_err(|e| log::warn!("Failed to unsubscribe from block height: {e}"));
1515
1516 self.ws_client.disconnect().await?;
1518
1519 if let Some(handle) = self.ws_stream_handle.take() {
1521 handle.abort();
1522 log::debug!("Aborted WebSocket message processing task");
1523 }
1524
1525 self.abort_pending_tasks();
1527
1528 self.connected = false;
1529 log::info!("Disconnected: client_id={}", self.core.client_id);
1530 Ok(())
1531 }
1532
1533 async fn generate_order_status_report(
1534 &self,
1535 cmd: &GenerateOrderStatusReport,
1536 ) -> anyhow::Result<Option<OrderStatusReport>> {
1537 let response = self
1539 .http_client
1540 .inner
1541 .get_orders(
1542 &self.wallet_address,
1543 self.subaccount_number,
1544 None, Some(1), )
1547 .await
1548 .context("failed to fetch order from dYdX API")?;
1549
1550 if response.is_empty() {
1551 return Ok(None);
1552 }
1553
1554 let order = &response[0];
1555 let ts_init = UnixNanos::default();
1556
1557 let instrument = match self.get_instrument_by_clob_pair_id(order.clob_pair_id) {
1558 Some(inst) => inst,
1559 None => return Ok(None),
1560 };
1561
1562 let report = crate::http::parse::parse_order_status_report(
1563 order,
1564 &instrument,
1565 self.core.account_id,
1566 ts_init,
1567 )
1568 .context("failed to parse order status report")?;
1569
1570 if let Some(client_order_id) = cmd.client_order_id
1571 && report.client_order_id != Some(client_order_id)
1572 {
1573 return Ok(None);
1574 }
1575
1576 if let Some(venue_order_id) = cmd.venue_order_id
1577 && report.venue_order_id.as_str() != venue_order_id.as_str()
1578 {
1579 return Ok(None);
1580 }
1581
1582 if let Some(instrument_id) = cmd.instrument_id
1583 && report.instrument_id != instrument_id
1584 {
1585 return Ok(None);
1586 }
1587
1588 Ok(Some(report))
1589 }
1590
1591 async fn generate_order_status_reports(
1592 &self,
1593 cmd: &GenerateOrderStatusReports,
1594 ) -> anyhow::Result<Vec<OrderStatusReport>> {
1595 let response = self
1597 .http_client
1598 .inner
1599 .get_orders(
1600 &self.wallet_address,
1601 self.subaccount_number,
1602 None, None, )
1605 .await
1606 .context("failed to fetch orders from dYdX API")?;
1607
1608 let mut reports = Vec::new();
1609 let ts_init = UnixNanos::default();
1610
1611 for order in response {
1612 let instrument = match self.get_instrument_by_clob_pair_id(order.clob_pair_id) {
1613 Some(inst) => inst,
1614 None => continue,
1615 };
1616
1617 if let Some(filter_id) = cmd.instrument_id
1618 && instrument.id() != filter_id
1619 {
1620 continue;
1621 }
1622
1623 let report = match crate::http::parse::parse_order_status_report(
1624 &order,
1625 &instrument,
1626 self.core.account_id,
1627 ts_init,
1628 ) {
1629 Ok(r) => r,
1630 Err(e) => {
1631 log::warn!("Failed to parse order status report: {e}");
1632 continue;
1633 }
1634 };
1635
1636 reports.push(report);
1637 }
1638
1639 if cmd.open_only {
1641 reports.retain(|r| r.order_status.is_open());
1642 }
1643
1644 if let Some(start) = cmd.start {
1646 reports.retain(|r| r.ts_last >= start);
1647 }
1648 if let Some(end) = cmd.end {
1649 reports.retain(|r| r.ts_last <= end);
1650 }
1651
1652 Ok(reports)
1653 }
1654
1655 async fn generate_fill_reports(
1656 &self,
1657 cmd: GenerateFillReports,
1658 ) -> anyhow::Result<Vec<FillReport>> {
1659 let response = self
1660 .http_client
1661 .inner
1662 .get_fills(
1663 &self.wallet_address,
1664 self.subaccount_number,
1665 None, None, )
1668 .await
1669 .context("failed to fetch fills from dYdX API")?;
1670
1671 let mut reports = Vec::new();
1672 let ts_init = UnixNanos::default();
1673
1674 for fill in response.fills {
1675 let instrument = match self.get_instrument_by_market(&fill.market) {
1676 Some(inst) => inst,
1677 None => {
1678 log::warn!("Unknown market in fill: {}", fill.market);
1679 continue;
1680 }
1681 };
1682
1683 if let Some(filter_id) = cmd.instrument_id
1684 && instrument.id() != filter_id
1685 {
1686 continue;
1687 }
1688
1689 let report = match crate::http::parse::parse_fill_report(
1690 &fill,
1691 &instrument,
1692 self.core.account_id,
1693 ts_init,
1694 ) {
1695 Ok(r) => r,
1696 Err(e) => {
1697 log::warn!("Failed to parse fill report: {e}");
1698 continue;
1699 }
1700 };
1701
1702 reports.push(report);
1703 }
1704
1705 if let Some(venue_order_id) = cmd.venue_order_id {
1706 reports.retain(|r| r.venue_order_id.as_str() == venue_order_id.as_str());
1707 }
1708
1709 Ok(reports)
1710 }
1711
1712 async fn generate_position_status_reports(
1713 &self,
1714 cmd: &GeneratePositionStatusReports,
1715 ) -> anyhow::Result<Vec<PositionStatusReport>> {
1716 let response = self
1718 .http_client
1719 .inner
1720 .get_subaccount(&self.wallet_address, self.subaccount_number)
1721 .await
1722 .context("failed to fetch subaccount from dYdX API")?;
1723
1724 let mut reports = Vec::new();
1725 let ts_init = UnixNanos::default();
1726
1727 for (market_ticker, perp_position) in &response.subaccount.open_perpetual_positions {
1728 let instrument = match self.get_instrument_by_market(market_ticker) {
1729 Some(inst) => inst,
1730 None => {
1731 log::warn!("Unknown market in position: {market_ticker}");
1732 continue;
1733 }
1734 };
1735
1736 if let Some(filter_id) = cmd.instrument_id
1737 && instrument.id() != filter_id
1738 {
1739 continue;
1740 }
1741
1742 let report = match crate::http::parse::parse_position_status_report(
1743 perp_position,
1744 &instrument,
1745 self.core.account_id,
1746 ts_init,
1747 ) {
1748 Ok(r) => r,
1749 Err(e) => {
1750 log::warn!("Failed to parse position status report: {e}");
1751 continue;
1752 }
1753 };
1754
1755 reports.push(report);
1756 }
1757
1758 Ok(reports)
1759 }
1760
1761 async fn generate_mass_status(
1762 &self,
1763 lookback_mins: Option<u64>,
1764 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
1765 let ts_init = UnixNanos::default();
1766
1767 let orders_response = self
1769 .http_client
1770 .inner
1771 .get_orders(&self.wallet_address, self.subaccount_number, None, None)
1772 .await
1773 .context("failed to fetch orders for mass status")?;
1774
1775 let subaccount_response = self
1777 .http_client
1778 .inner
1779 .get_subaccount(&self.wallet_address, self.subaccount_number)
1780 .await
1781 .context("failed to fetch subaccount for mass status")?;
1782
1783 let fills_response = self
1785 .http_client
1786 .inner
1787 .get_fills(&self.wallet_address, self.subaccount_number, None, None)
1788 .await
1789 .context("failed to fetch fills for mass status")?;
1790
1791 let mut order_reports = Vec::new();
1793 let mut orders_filtered = 0usize;
1794 for order in orders_response {
1795 let instrument = match self.get_instrument_by_clob_pair_id(order.clob_pair_id) {
1796 Some(inst) => inst,
1797 None => {
1798 orders_filtered += 1;
1799 continue;
1800 }
1801 };
1802
1803 match crate::http::parse::parse_order_status_report(
1804 &order,
1805 &instrument,
1806 self.core.account_id,
1807 ts_init,
1808 ) {
1809 Ok(r) => order_reports.push(r),
1810 Err(e) => {
1811 log::warn!("Failed to parse order status report: {e}");
1812 orders_filtered += 1;
1813 }
1814 }
1815 }
1816
1817 let mut position_reports = Vec::new();
1819 for (market_ticker, perp_position) in
1820 &subaccount_response.subaccount.open_perpetual_positions
1821 {
1822 let instrument = match self.get_instrument_by_market(market_ticker) {
1823 Some(inst) => inst,
1824 None => continue,
1825 };
1826
1827 match crate::http::parse::parse_position_status_report(
1828 perp_position,
1829 &instrument,
1830 self.core.account_id,
1831 ts_init,
1832 ) {
1833 Ok(r) => position_reports.push(r),
1834 Err(e) => {
1835 log::warn!("Failed to parse position status report: {e}");
1836 }
1837 }
1838 }
1839
1840 let mut fill_reports = Vec::new();
1842 let mut fills_filtered = 0usize;
1843 for fill in fills_response.fills {
1844 let instrument = match self.get_instrument_by_market(&fill.market) {
1845 Some(inst) => inst,
1846 None => {
1847 fills_filtered += 1;
1848 continue;
1849 }
1850 };
1851
1852 match crate::http::parse::parse_fill_report(
1853 &fill,
1854 &instrument,
1855 self.core.account_id,
1856 ts_init,
1857 ) {
1858 Ok(r) => fill_reports.push(r),
1859 Err(e) => {
1860 log::warn!("Failed to parse fill report: {e}");
1861 fills_filtered += 1;
1862 }
1863 }
1864 }
1865
1866 if lookback_mins.is_some() {
1867 log::debug!(
1868 "lookback_mins={:?} filtering not yet implemented. Returning all: {} orders ({} filtered), {} positions, {} fills ({} filtered)",
1869 lookback_mins,
1870 order_reports.len(),
1871 orders_filtered,
1872 position_reports.len(),
1873 fill_reports.len(),
1874 fills_filtered
1875 );
1876 } else {
1877 log::info!(
1878 "Generated mass status: {} orders, {} positions, {} fills",
1879 order_reports.len(),
1880 position_reports.len(),
1881 fill_reports.len()
1882 );
1883 }
1884
1885 let mut mass_status = ExecutionMassStatus::new(
1887 self.core.client_id,
1888 self.core.account_id,
1889 self.core.venue,
1890 ts_init,
1891 None, );
1893
1894 mass_status.add_order_reports(order_reports);
1895 mass_status.add_position_reports(position_reports);
1896 mass_status.add_fill_reports(fill_reports);
1897
1898 Ok(Some(mass_status))
1899 }
1900}
1901
1902fn dispatch_account_state(state: AccountState) {
1907 use std::any::Any;
1908 msgbus::send_any("Portfolio.update_account".into(), &state as &dyn Any);
1909}
1910
1911fn dispatch_execution_report(report: ExecutionReport) {
1926 let sender = get_exec_event_sender();
1927 match report {
1928 ExecutionReport::Order(order_report) => {
1929 log::debug!(
1930 "Dispatching order report: status={:?}, venue_order_id={:?}, client_order_id={:?}",
1931 order_report.order_status,
1932 order_report.venue_order_id,
1933 order_report.client_order_id
1934 );
1935 let exec_report = NautilusExecutionReport::Order(order_report);
1936 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1937 log::warn!("Failed to send order status report: {e}");
1938 }
1939 }
1940 ExecutionReport::Fill(fill_report) => {
1941 log::debug!(
1942 "Dispatching fill report: venue_order_id={}, trade_id={}",
1943 fill_report.venue_order_id,
1944 fill_report.trade_id
1945 );
1946 let exec_report = NautilusExecutionReport::Fill(fill_report);
1947 if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1948 log::warn!("Failed to send fill report: {e}");
1949 }
1950 }
1951 }
1952}