1use std::{
42 sync::{Arc, Mutex},
43 time::{Duration, Instant},
44};
45
46use anyhow::Context;
47use async_trait::async_trait;
48use dashmap::DashMap;
49use futures_util::{Stream, StreamExt, pin_mut};
50use nautilus_common::{
51 clients::ExecutionClient,
52 live::{get_runtime, runner::get_exec_event_sender},
53 messages::execution::{
54 BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
55 GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
56 ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
57 },
58};
59use nautilus_core::{
60 MUTEX_POISONED, UUID4, UnixNanos,
61 time::{AtomicTime, get_atomic_clock_realtime},
62};
63use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
64use nautilus_model::{
65 accounts::AccountAny,
66 enums::{AccountType, OmsType, OrderSide, OrderType, TimeInForce},
67 events::AccountState,
68 identifiers::{
69 AccountId, ClientId, ClientOrderId, InstrumentId, StrategyId, Venue, VenueOrderId,
70 },
71 instruments::{Instrument, InstrumentAny},
72 orders::Order,
73 reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
74 types::{AccountBalance, Currency, MarginBalance, Money},
75};
76use nautilus_network::retry::RetryConfig;
77use rust_decimal::Decimal;
78use tokio::task::JoinHandle;
79
80use crate::{
81 common::{
82 consts::DYDX_VENUE, credential::DydxCredential, instrument_cache::InstrumentCache,
83 parse::nanos_to_secs_i64,
84 },
85 config::DydxAdapterConfig,
86 execution::{
87 broadcaster::TxBroadcaster,
88 encoder::ClientOrderIdEncoder,
89 order_builder::OrderMessageBuilder,
90 tx_manager::TransactionManager,
91 types::{LimitOrderParams, OrderContext},
92 },
93 grpc::{DydxGrpcClient, SHORT_TERM_ORDER_MAXIMUM_LIFETIME, types::ChainId},
94 http::{
95 client::DydxHttpClient,
96 parse::{
97 parse_account_state, parse_fill_report, parse_order_status_report,
98 parse_position_status_report,
99 },
100 },
101 websocket::{
102 client::DydxWebSocketClient,
103 enums::NautilusWsMessage,
104 parse::{parse_ws_fill_report, parse_ws_order_report, parse_ws_position_report},
105 },
106};
107
108pub mod block_time;
109pub mod broadcaster;
110pub mod encoder;
111pub mod order_builder;
112pub mod submitter;
113pub mod tx_manager;
114pub mod types;
115pub mod wallet;
116
117use block_time::BlockTimeMonitor;
118
119pub const MAX_CLIENT_ID: u32 = u32::MAX;
126
127#[derive(Debug)]
142pub struct DydxExecutionClient {
143 core: ExecutionClientCore,
144 clock: &'static AtomicTime,
145 config: DydxAdapterConfig,
146 emitter: ExecutionEventEmitter,
147 http_client: DydxHttpClient,
148 ws_client: DydxWebSocketClient,
149 grpc_client: Arc<tokio::sync::RwLock<Option<DydxGrpcClient>>>,
150 instrument_cache: Arc<InstrumentCache>,
151 block_time_monitor: Arc<BlockTimeMonitor>,
153 oracle_prices: Arc<DashMap<InstrumentId, Decimal>>,
154 encoder: Arc<ClientOrderIdEncoder>,
157 order_contexts: Arc<DashMap<u32, OrderContext>>,
158 order_id_map: Arc<DashMap<String, (u32, u32)>>,
161 wallet_address: String,
162 subaccount_number: u32,
163 tx_manager: Option<Arc<TransactionManager>>,
164 broadcaster: Option<Arc<TxBroadcaster>>,
167 order_builder: Option<Arc<OrderMessageBuilder>>,
170 ws_stream_handle: Option<JoinHandle<()>>,
171 pending_tasks: Mutex<Vec<JoinHandle<()>>>,
172}
173
174impl DydxExecutionClient {
175 pub fn new(
181 core: ExecutionClientCore,
182 config: DydxAdapterConfig,
183 wallet_address: String,
184 subaccount_number: u32,
185 ) -> anyhow::Result<Self> {
186 let trader_id = core.trader_id;
187 let account_id = core.account_id;
188 let clock = get_atomic_clock_realtime();
189 let emitter =
190 ExecutionEventEmitter::new(clock, trader_id, account_id, AccountType::Margin, None);
191
192 let retry_config = RetryConfig {
193 max_retries: config.max_retries,
194 initial_delay_ms: config.retry_delay_initial_ms,
195 max_delay_ms: config.retry_delay_max_ms,
196 ..Default::default()
197 };
198 let http_client = DydxHttpClient::new(
199 Some(config.base_url.clone()),
200 Some(config.timeout_secs),
201 None, config.is_testnet,
203 Some(retry_config),
204 )?;
205
206 let instrument_cache = http_client.instrument_cache().clone();
208
209 let credential = DydxCredential::resolve(
211 config.private_key.clone(),
212 config.is_testnet,
213 config.authenticator_ids.clone(),
214 )?
215 .ok_or_else(|| anyhow::anyhow!("Credentials required for execution client"))?;
216
217 let ws_client = DydxWebSocketClient::new_private_with_cache(
219 config.ws_url.clone(),
220 credential,
221 core.account_id,
222 instrument_cache.clone(),
223 Some(20),
224 );
225
226 let grpc_client = Arc::new(tokio::sync::RwLock::new(None));
227
228 Ok(Self {
229 core,
230 clock,
231 config,
232 emitter,
233 http_client,
234 ws_client,
235 grpc_client,
236 instrument_cache,
237 block_time_monitor: Arc::new(BlockTimeMonitor::new()),
238 oracle_prices: Arc::new(DashMap::new()),
239 encoder: Arc::new(ClientOrderIdEncoder::new()),
240 order_contexts: Arc::new(DashMap::new()),
241 order_id_map: Arc::new(DashMap::new()),
242 wallet_address,
243 subaccount_number,
244 tx_manager: None,
245 broadcaster: None,
246 order_builder: None,
247 ws_stream_handle: None,
248 pending_tasks: Mutex::new(Vec::new()),
249 })
250 }
251
252 fn resolve_private_key(config: &DydxAdapterConfig) -> anyhow::Result<String> {
256 let private_key_env = if config.is_testnet {
257 "DYDX_TESTNET_PRIVATE_KEY"
258 } else {
259 "DYDX_PRIVATE_KEY"
260 };
261
262 if let Some(ref pk) = config.private_key
264 && !pk.trim().is_empty()
265 {
266 return Ok(pk.clone());
267 }
268
269 if let Some(pk) = std::env::var(private_key_env)
271 .ok()
272 .filter(|s| !s.trim().is_empty())
273 {
274 return Ok(pk);
275 }
276
277 anyhow::bail!("{private_key_env} not found in config or environment")
278 }
279
280 fn register_order_context(&self, client_id_u32: u32, context: OrderContext) {
282 self.order_contexts.insert(client_id_u32, context);
283 }
284
285 fn get_order_context(&self, client_id_u32: u32) -> Option<OrderContext> {
289 self.order_contexts
290 .get(&client_id_u32)
291 .map(|r| r.value().clone())
292 }
293
294 fn get_chain_id(&self) -> ChainId {
298 self.config.get_chain_id()
299 }
300
301 fn spawn_ws_stream_handler(
303 &mut self,
304 stream: impl Stream<Item = NautilusWsMessage> + Send + 'static,
305 ) {
306 if self.ws_stream_handle.is_some() {
307 return;
308 }
309
310 log::debug!("Starting execution WebSocket message processing task");
311
312 let account_id = self.core.account_id;
314 let instrument_cache = self.instrument_cache.clone();
315 let oracle_prices = self.oracle_prices.clone();
316 let encoder = self.encoder.clone();
317 let order_contexts = self.order_contexts.clone();
318 let order_id_map = self.order_id_map.clone();
319 let block_time_monitor = self.block_time_monitor.clone();
320 let emitter = self.emitter.clone();
321 let clock = self.clock;
322
323 let handle = get_runtime().spawn(async move {
324 log::debug!("Execution WebSocket message loop started");
325 pin_mut!(stream);
326 while let Some(msg) = stream.next().await {
327 match msg {
328 NautilusWsMessage::Order(report) => {
329 log::debug!("Received order update: {:?}", report.order_status);
330 emitter.send_order_status_report(*report);
331 }
332 NautilusWsMessage::Fill(report) => {
333 log::debug!("Received fill update");
334 emitter.send_fill_report(*report);
335 }
336 NautilusWsMessage::Position(report) => {
337 log::debug!("Received position update");
338 emitter.send_position_report(*report);
339 }
340 NautilusWsMessage::AccountState(state) => {
341 log::debug!("Received account state update");
342 emitter.send_account_state(*state);
343 }
344 NautilusWsMessage::SubaccountSubscribed(msg) => {
345
346 log::debug!("Parsing subaccount subscription with full context");
347
348 let inst_map = instrument_cache.to_instrument_id_map();
350
351 let oracle_map: std::collections::HashMap<_, _> = oracle_prices
353 .iter()
354 .map(|entry| (*entry.key(), *entry.value()))
355 .collect();
356
357 let ts_init = clock.get_time_ns();
358 let ts_event = ts_init;
359
360 if let Some(ref subaccount) = msg.contents.subaccount {
361 match parse_account_state(
362 subaccount,
363 account_id,
364 &inst_map,
365 &oracle_map,
366 ts_event,
367 ts_init,
368 ) {
369 Ok(account_state) => {
370 log::debug!(
371 "Parsed account state: {} balance(s), {} margin(s)",
372 account_state.balances.len(),
373 account_state.margins.len()
374 );
375 emitter.send_account_state(account_state);
376 }
377 Err(e) => {
378 log::error!("Failed to parse account state: {e}");
379 }
380 }
381
382 if let Some(ref positions) =
384 subaccount.open_perpetual_positions
385 {
386 log::debug!(
387 "Parsing {} position(s) from subscription",
388 positions.len()
389 );
390
391 for (market, ws_position) in positions {
392 match parse_ws_position_report(
393 ws_position,
394 &instrument_cache,
395 account_id,
396 ts_init,
397 ) {
398 Ok(report) => {
399 log::debug!(
400 "Parsed position report: {} {} {} {}",
401 report.instrument_id,
402 report.position_side,
403 report.quantity,
404 market
405 );
406 emitter.send_position_report(report);
407 }
408 Err(e) => {
409 log::error!(
410 "Failed to parse WebSocket position for {market}: {e}"
411 );
412 }
413 }
414 }
415 }
416 } else {
417 log::warn!("Subaccount subscription without initial state (new/empty subaccount)");
418
419 let currency = Currency::get_or_create_crypto_with_context("USDC", None);
421 let zero = Money::zero(currency);
422 let balance = AccountBalance::new_checked(zero, zero, zero)
423 .expect("zero balance should always be valid");
424 let account_state = AccountState::new(
425 account_id,
426 AccountType::Margin,
427 vec![balance],
428 vec![],
429 true,
430 UUID4::new(),
431 ts_init,
432 ts_init,
433 None,
434 );
435 emitter.send_account_state(account_state);
436 }
437 }
438 NautilusWsMessage::SubaccountsChannelData(data) => {
439 log::debug!(
440 "Processing subaccounts channel data (orders={:?}, fills={:?})",
441 data.contents.orders.as_ref().map(|o| o.len()),
442 data.contents.fills.as_ref().map(|f| f.len())
443 );
444 let ts_init = clock.get_time_ns();
445
446 let mut terminal_orders: Vec<(u32, u32, String)> = Vec::new();
448
449 let mut pending_order_reports = Vec::new();
453 if let Some(ref orders) = data.contents.orders {
454 log::info!(
455 "Processing {} orders from SubaccountsChannelData",
456 orders.len()
457 );
458 for ws_order in orders {
459 log::info!(
460 "Parsing WS order: clob_pair_id={}, status={:?}, client_id={}",
461 ws_order.clob_pair_id,
462 ws_order.status,
463 ws_order.client_id
464 );
465
466 if let Ok(client_id_u32) = ws_order.client_id.parse::<u32>() {
469 let client_meta = ws_order.client_metadata
470 .as_ref()
471 .and_then(|s| s.parse::<u32>().ok())
472 .unwrap_or(crate::grpc::DEFAULT_RUST_CLIENT_METADATA);
473 order_id_map.insert(ws_order.id.clone(), (client_id_u32, client_meta));
474 }
475
476 match parse_ws_order_report(
477 ws_order,
478 &instrument_cache,
479 &order_contexts,
480 &encoder,
481 account_id,
482 ts_init,
483 ) {
484 Ok(report) => {
485 if !report.order_status.is_open()
487 && let Ok(cid) = ws_order.client_id.parse::<u32>()
488 {
489 let meta = ws_order.client_metadata
490 .as_ref()
491 .and_then(|s| s.parse::<u32>().ok())
492 .unwrap_or(crate::grpc::DEFAULT_RUST_CLIENT_METADATA);
493 terminal_orders.push((cid, meta, ws_order.id.clone()));
494 }
495 log::info!(
496 "Parsed order report: {} {} {:?} qty={} client_order_id={:?}",
497 report.instrument_id,
498 report.order_side,
499 report.order_status,
500 report.quantity,
501 report.client_order_id
502 );
503 pending_order_reports.push(report);
504 }
505 Err(e) => {
506 log::error!("Failed to parse WebSocket order: {e}");
507 }
508 }
509 }
510 }
511
512 if let Some(ref fills) = data.contents.fills {
515 for ws_fill in fills {
516 match parse_ws_fill_report(
517 ws_fill,
518 &instrument_cache,
519 &order_id_map,
520 &order_contexts,
521 &encoder,
522 account_id,
523 ts_init,
524 ) {
525 Ok(report) => {
526 log::info!(
527 "Parsed fill report: {} {} {} @ {} client_order_id={:?}",
528 report.instrument_id,
529 report.venue_order_id,
530 report.last_qty,
531 report.last_px,
532 report.client_order_id
533 );
534 emitter.send_fill_report(report);
535 }
536 Err(e) => {
537 log::error!("Failed to parse WebSocket fill: {e}");
538 }
539 }
540 }
541 }
542
543 for report in pending_order_reports {
545 emitter.send_order_status_report(report);
546 }
547
548 for (client_id, client_metadata, order_id) in terminal_orders {
551 order_contexts.remove(&client_id);
552 encoder.remove(client_id, client_metadata);
553 order_id_map.remove(&order_id);
554 }
555 }
556 NautilusWsMessage::MarkPrice(mark_price) => {
557 let price_dec = Decimal::from(mark_price.value);
558 oracle_prices.insert(mark_price.instrument_id, price_dec);
559 log::trace!(
560 "Updated oracle price for {}: {price_dec}",
561 mark_price.instrument_id
562 );
563 }
564 NautilusWsMessage::IndexPrice(_) => {
565 }
567 NautilusWsMessage::BlockHeight { height, time } => {
568 log::debug!("Block height update: {height} at {time}");
569 block_time_monitor.record_block(height, time);
570 }
571 NautilusWsMessage::Error(err) => {
572 log::error!("WebSocket error: {err:?}");
573 }
574 NautilusWsMessage::Reconnected => {
575 log::info!("WebSocket reconnected");
576 }
577 NautilusWsMessage::FundingRate(_) => {
578 }
580 _ => {
581 }
583 }
584 }
585 log::debug!("WebSocket message processing task ended");
586 });
587
588 self.ws_stream_handle = Some(handle);
589 log::info!("WebSocket stream handler started");
590 }
591
592 fn mark_instruments_initialized(&mut self) {
597 let count = self.instrument_cache.len();
598 self.core.set_instruments_initialized();
599 log::debug!("Instruments initialized: {count} instruments in shared cache");
600 }
601
602 fn get_instrument_by_market(&self, market: &str) -> Option<InstrumentAny> {
604 self.instrument_cache.get_by_market(market)
605 }
606
607 fn get_instrument_by_clob_pair_id(&self, clob_pair_id: u32) -> Option<InstrumentAny> {
609 let instrument = self.instrument_cache.get_by_clob_id(clob_pair_id);
610
611 if instrument.is_none() {
612 self.instrument_cache.log_missing_clob_pair_id(clob_pair_id);
613 }
614
615 instrument
616 }
617
618 fn get_execution_components(
622 &self,
623 ) -> anyhow::Result<(
624 Arc<TransactionManager>,
625 Arc<TxBroadcaster>,
626 Arc<OrderMessageBuilder>,
627 )> {
628 let tx_manager = self
629 .tx_manager
630 .as_ref()
631 .ok_or_else(|| {
632 anyhow::anyhow!("TransactionManager not initialized - call connect() first")
633 })?
634 .clone();
635 let broadcaster = self
636 .broadcaster
637 .as_ref()
638 .ok_or_else(|| anyhow::anyhow!("TxBroadcaster not initialized - call connect() first"))?
639 .clone();
640 let order_builder = self
641 .order_builder
642 .as_ref()
643 .ok_or_else(|| {
644 anyhow::anyhow!("OrderMessageBuilder not initialized - call connect() first")
645 })?
646 .clone();
647 Ok((tx_manager, broadcaster, order_builder))
648 }
649
650 fn spawn_task<F>(&self, label: &'static str, fut: F)
651 where
652 F: Future<Output = anyhow::Result<()>> + Send + 'static,
653 {
654 let handle = get_runtime().spawn(async move {
655 if let Err(e) = fut.await {
656 log::error!("{label}: {e:?}");
657 }
658 });
659
660 self.pending_tasks
661 .lock()
662 .expect(MUTEX_POISONED)
663 .push(handle);
664 }
665
666 fn spawn_order_task<F>(
670 &self,
671 label: &'static str,
672 strategy_id: StrategyId,
673 instrument_id: InstrumentId,
674 client_order_id: ClientOrderId,
675 fut: F,
676 ) where
677 F: Future<Output = anyhow::Result<()>> + Send + 'static,
678 {
679 let emitter = self.emitter.clone();
680 let clock = self.clock;
681
682 let handle = get_runtime().spawn(async move {
683 if let Err(e) = fut.await {
684 let error_msg = format!("{label} failed: {e:?}");
685 log::error!("{error_msg}");
686
687 let ts_event = clock.get_time_ns();
688 emitter.emit_order_rejected_event(
689 strategy_id,
690 instrument_id,
691 client_order_id,
692 &error_msg,
693 ts_event,
694 false,
695 );
696 }
697 });
698
699 self.pending_tasks
700 .lock()
701 .expect(MUTEX_POISONED)
702 .push(handle);
703 }
704
705 fn abort_pending_tasks(&self) {
706 let mut guard = self.pending_tasks.lock().expect(MUTEX_POISONED);
707 for handle in guard.drain(..) {
708 handle.abort();
709 }
710 }
711
712 fn send_modify_rejected(
714 &self,
715 strategy_id: StrategyId,
716 instrument_id: InstrumentId,
717 client_order_id: ClientOrderId,
718 venue_order_id: Option<VenueOrderId>,
719 reason: &str,
720 ) {
721 let ts_event = self.clock.get_time_ns();
722 self.emitter.emit_order_modify_rejected_event(
723 strategy_id,
724 instrument_id,
725 client_order_id,
726 venue_order_id,
727 reason,
728 ts_event,
729 );
730 }
731
732 async fn await_account_registered(&self, timeout_secs: f64) -> anyhow::Result<()> {
742 let account_id = self.core.account_id;
743
744 if self.core.cache().account(&account_id).is_some() {
745 log::info!("Account {account_id} registered");
746 return Ok(());
747 }
748
749 let start = Instant::now();
750 let timeout = Duration::from_secs_f64(timeout_secs);
751 let interval = Duration::from_millis(10);
752
753 loop {
754 tokio::time::sleep(interval).await;
755
756 if self.core.cache().account(&account_id).is_some() {
757 log::info!("Account {account_id} registered");
758 return Ok(());
759 }
760
761 if start.elapsed() >= timeout {
762 anyhow::bail!(
763 "Timeout waiting for account {account_id} to be registered after {timeout_secs}s"
764 );
765 }
766 }
767 }
768}
769
770#[async_trait(?Send)]
771impl ExecutionClient for DydxExecutionClient {
772 fn is_connected(&self) -> bool {
773 self.core.is_connected()
774 }
775
776 fn client_id(&self) -> ClientId {
777 self.core.client_id
778 }
779
780 fn account_id(&self) -> AccountId {
781 self.core.account_id
782 }
783
784 fn venue(&self) -> Venue {
785 *DYDX_VENUE
786 }
787
788 fn oms_type(&self) -> OmsType {
789 self.core.oms_type
790 }
791
792 fn get_account(&self) -> Option<AccountAny> {
793 self.core.cache().account(&self.core.account_id).cloned()
794 }
795
796 fn generate_account_state(
797 &self,
798 balances: Vec<AccountBalance>,
799 margins: Vec<MarginBalance>,
800 reported: bool,
801 ts_event: UnixNanos,
802 ) -> anyhow::Result<()> {
803 self.emitter
804 .emit_account_state(balances, margins, reported, ts_event);
805 Ok(())
806 }
807
808 fn start(&mut self) -> anyhow::Result<()> {
809 if self.core.is_started() {
810 log::warn!("dYdX execution client already started");
811 return Ok(());
812 }
813
814 let sender = get_exec_event_sender();
815 self.emitter.set_sender(sender);
816 log::info!("Starting dYdX execution client");
817 self.core.set_started();
818 Ok(())
819 }
820
821 fn stop(&mut self) -> anyhow::Result<()> {
822 if self.core.is_stopped() {
823 log::warn!("dYdX execution client not started");
824 return Ok(());
825 }
826
827 log::info!("Stopping dYdX execution client");
828 self.abort_pending_tasks();
829 self.core.set_stopped();
830 self.core.set_disconnected();
831 Ok(())
832 }
833
834 fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
851 if !self.is_connected() {
853 let reason = "Cannot submit order: execution client not connected";
854 log::error!("{reason}");
855 anyhow::bail!(reason);
856 }
857
858 let current_block = self.block_time_monitor.current_block_height();
860 let order = self
861 .core
862 .cache()
863 .order(&cmd.client_order_id)
864 .cloned()
865 .ok_or_else(|| {
866 anyhow::anyhow!("Order not found in cache for {}", cmd.client_order_id)
867 })?;
868
869 let client_order_id = order.client_order_id();
870 let instrument_id = order.instrument_id();
871 let strategy_id = order.strategy_id();
872
873 if current_block == 0 {
874 let reason = "Block height not initialized";
875 log::warn!("Cannot submit order {client_order_id}: {reason}");
876 let ts_event = self.clock.get_time_ns();
877 self.emitter.emit_order_rejected_event(
878 strategy_id,
879 instrument_id,
880 client_order_id,
881 reason,
882 ts_event,
883 false,
884 );
885 return Ok(());
886 }
887
888 if order.is_closed() {
890 log::warn!("Cannot submit closed order {client_order_id}");
891 return Ok(());
892 }
893
894 match order.order_type() {
896 OrderType::Market
897 | OrderType::Limit
898 | OrderType::StopMarket
899 | OrderType::StopLimit
900 | OrderType::MarketIfTouched
901 | OrderType::LimitIfTouched => {}
902 OrderType::TrailingStopMarket | OrderType::TrailingStopLimit => {
904 let reason = "Trailing stop orders not supported by dYdX v4 protocol";
905 log::error!("{reason}");
906 let ts_event = self.clock.get_time_ns();
907 self.emitter.emit_order_rejected_event(
908 strategy_id,
909 instrument_id,
910 client_order_id,
911 reason,
912 ts_event,
913 false,
914 );
915 return Ok(());
916 }
917 order_type => {
918 let reason = format!("Order type {order_type:?} not supported by dYdX");
919 log::error!("{reason}");
920 let ts_event = self.clock.get_time_ns();
921 self.emitter.emit_order_rejected_event(
922 strategy_id,
923 instrument_id,
924 client_order_id,
925 &reason,
926 ts_event,
927 false,
928 );
929 return Ok(());
930 }
931 }
932
933 self.emitter.emit_order_submitted(&order);
934
935 let (tx_manager, broadcaster, order_builder) = match self.get_execution_components() {
937 Ok(components) => components,
938 Err(e) => {
939 log::error!("Failed to get execution components: {e}");
940 let ts_event = self.clock.get_time_ns();
941 self.emitter.emit_order_rejected_event(
942 strategy_id,
943 instrument_id,
944 client_order_id,
945 &e.to_string(),
946 ts_event,
947 false,
948 );
949 return Ok(());
950 }
951 };
952
953 let block_height = self.block_time_monitor.current_block_height() as u32;
954
955 let encoded = match self.encoder.encode(client_order_id) {
957 Ok(enc) => enc,
958 Err(e) => {
959 log::error!("Failed to generate client order ID: {e}");
960 let ts_event = self.clock.get_time_ns();
961 self.emitter.emit_order_rejected_event(
962 strategy_id,
963 instrument_id,
964 client_order_id,
965 &e.to_string(),
966 ts_event,
967 false,
968 );
969 return Ok(());
970 }
971 };
972 let client_id_u32 = encoded.client_id;
973 let client_metadata = encoded.client_metadata;
974
975 log::info!(
976 "[SUBMIT_ORDER] Nautilus '{}' -> dYdX u32={} meta={:#x} | instrument={} side={:?} qty={} type={:?}",
977 client_order_id,
978 client_id_u32,
979 client_metadata,
980 instrument_id,
981 order.order_side(),
982 order.quantity(),
983 order.order_type()
984 );
985
986 let expire_time = order.expire_time().map(nanos_to_secs_i64);
988
989 let order_flags = match order.order_type() {
991 OrderType::StopMarket
993 | OrderType::StopLimit
994 | OrderType::MarketIfTouched
995 | OrderType::LimitIfTouched => types::ORDER_FLAG_CONDITIONAL,
996 OrderType::Market => types::ORDER_FLAG_SHORT_TERM,
998 OrderType::Limit => {
1000 let lifetime = types::OrderLifetime::from_time_in_force(
1001 order.time_in_force(),
1002 expire_time,
1003 false,
1004 order_builder.max_short_term_secs(),
1005 );
1006 lifetime.order_flags()
1007 }
1008 _ => types::ORDER_FLAG_LONG_TERM,
1010 };
1011
1012 let ts_submitted = self.clock.get_time_ns();
1014 let trader_id = order.trader_id();
1015 self.register_order_context(
1016 client_id_u32,
1017 OrderContext {
1018 client_order_id,
1019 trader_id,
1020 strategy_id,
1021 instrument_id,
1022 submitted_at: ts_submitted,
1023 order_flags,
1024 },
1025 );
1026
1027 self.spawn_order_task(
1028 "submit_order",
1029 strategy_id,
1030 instrument_id,
1031 client_order_id,
1032 async move {
1033 let (msg, order_type_str) = match order.order_type() {
1035 OrderType::Market => {
1036 let msg = order_builder.build_market_order(
1037 instrument_id,
1038 client_id_u32,
1039 client_metadata,
1040 order.order_side(),
1041 order.quantity(),
1042 block_height,
1043 )?;
1044 (msg, "market")
1045 }
1046 OrderType::Limit => {
1047 let msg = order_builder.build_limit_order(
1049 instrument_id,
1050 client_id_u32,
1051 client_metadata,
1052 order.order_side(),
1053 order
1054 .price()
1055 .ok_or_else(|| anyhow::anyhow!("Limit order missing price"))?,
1056 order.quantity(),
1057 order.time_in_force(),
1058 order.is_post_only(),
1059 order.is_reduce_only(),
1060 block_height,
1061 expire_time, )?;
1063 (msg, "limit")
1064 }
1065 OrderType::StopMarket => {
1068 let trigger_price = order.trigger_price().ok_or_else(|| {
1069 anyhow::anyhow!("Stop market order missing trigger_price")
1070 })?;
1071 let cond_expire = order.expire_time().map(nanos_to_secs_i64);
1072 let msg = order_builder.build_stop_market_order(
1073 instrument_id,
1074 client_id_u32,
1075 client_metadata,
1076 order.order_side(),
1077 trigger_price,
1078 order.quantity(),
1079 order.is_reduce_only(),
1080 cond_expire,
1081 )?;
1082 (msg, "stop_market")
1083 }
1084 OrderType::StopLimit => {
1085 let trigger_price = order.trigger_price().ok_or_else(|| {
1086 anyhow::anyhow!("Stop limit order missing trigger_price")
1087 })?;
1088 let limit_price = order.price().ok_or_else(|| {
1089 anyhow::anyhow!("Stop limit order missing limit price")
1090 })?;
1091 let cond_expire = order.expire_time().map(nanos_to_secs_i64);
1092 let msg = order_builder.build_stop_limit_order(
1093 instrument_id,
1094 client_id_u32,
1095 client_metadata,
1096 order.order_side(),
1097 trigger_price,
1098 limit_price,
1099 order.quantity(),
1100 order.time_in_force(),
1101 order.is_post_only(),
1102 order.is_reduce_only(),
1103 cond_expire,
1104 )?;
1105 (msg, "stop_limit")
1106 }
1107 OrderType::MarketIfTouched => {
1109 let trigger_price = order.trigger_price().ok_or_else(|| {
1110 anyhow::anyhow!("Take profit market order missing trigger_price")
1111 })?;
1112 let cond_expire = order.expire_time().map(nanos_to_secs_i64);
1113 let msg = order_builder.build_take_profit_market_order(
1114 instrument_id,
1115 client_id_u32,
1116 client_metadata,
1117 order.order_side(),
1118 trigger_price,
1119 order.quantity(),
1120 order.is_reduce_only(),
1121 cond_expire,
1122 )?;
1123 (msg, "take_profit_market")
1124 }
1125 OrderType::LimitIfTouched => {
1127 let trigger_price = order.trigger_price().ok_or_else(|| {
1128 anyhow::anyhow!("Take profit limit order missing trigger_price")
1129 })?;
1130 let limit_price = order.price().ok_or_else(|| {
1131 anyhow::anyhow!("Take profit limit order missing limit price")
1132 })?;
1133 let cond_expire = order.expire_time().map(nanos_to_secs_i64);
1134 let msg = order_builder.build_take_profit_limit_order(
1135 instrument_id,
1136 client_id_u32,
1137 client_metadata,
1138 order.order_side(),
1139 trigger_price,
1140 limit_price,
1141 order.quantity(),
1142 order.time_in_force(),
1143 order.is_post_only(),
1144 order.is_reduce_only(),
1145 cond_expire,
1146 )?;
1147 (msg, "take_profit_limit")
1148 }
1149 _ => unreachable!("Order type already validated"),
1150 };
1151
1152 let operation = format!("Submit {order_type_str} order {client_order_id}");
1155 if order_flags == types::ORDER_FLAG_SHORT_TERM {
1156 broadcaster
1157 .broadcast_short_term(&tx_manager, vec![msg], &operation)
1158 .await?;
1159 } else {
1160 broadcaster
1161 .broadcast_with_retry(&tx_manager, vec![msg], &operation)
1162 .await?;
1163 }
1164 log::debug!("Successfully submitted {order_type_str} order: {client_order_id}");
1165
1166 Ok(())
1167 },
1168 );
1169
1170 Ok(())
1171 }
1172
1173 fn submit_order_list(&self, cmd: &SubmitOrderList) -> anyhow::Result<()> {
1174 let orders = self.core.get_orders_for_list(&cmd.order_list)?;
1175 let order_count = orders.len();
1176
1177 if !self.is_connected() {
1179 let reason = "Cannot submit order list: execution client not connected";
1180 log::error!("{reason}");
1181 anyhow::bail!(reason);
1182 }
1183
1184 let current_block = self.block_time_monitor.current_block_height();
1186 if current_block == 0 {
1187 let reason = "Block height not initialized";
1188 log::warn!("Cannot submit order list: {reason}");
1189 let ts_event = self.clock.get_time_ns();
1191 for order in &orders {
1192 self.emitter.emit_order_rejected_event(
1193 order.strategy_id(),
1194 order.instrument_id(),
1195 order.client_order_id(),
1196 reason,
1197 ts_event,
1198 false,
1199 );
1200 }
1201 return Ok(());
1202 }
1203
1204 let (tx_manager, broadcaster, order_builder) = match self.get_execution_components() {
1206 Ok(components) => components,
1207 Err(e) => {
1208 log::error!("Failed to get execution components for batch: {e}");
1209 let ts_event = self.clock.get_time_ns();
1211 for order in &orders {
1212 self.emitter.emit_order_rejected_event(
1213 order.strategy_id(),
1214 order.instrument_id(),
1215 order.client_order_id(),
1216 &e.to_string(),
1217 ts_event,
1218 false,
1219 );
1220 }
1221 return Ok(());
1222 }
1223 };
1224
1225 let mut order_params: Vec<LimitOrderParams> = Vec::with_capacity(order_count);
1227 let mut order_info: Vec<(ClientOrderId, InstrumentId, StrategyId)> =
1228 Vec::with_capacity(order_count);
1229
1230 for order in &orders {
1231 if order.order_type() != OrderType::Limit {
1233 log::warn!(
1234 "Order {} has type {:?}, falling back to individual submission",
1235 order.client_order_id(),
1236 order.order_type()
1237 );
1238 let submit_cmd = SubmitOrder::new(
1240 cmd.trader_id,
1241 cmd.client_id,
1242 cmd.strategy_id,
1243 order.instrument_id(),
1244 order.client_order_id(),
1245 order.init_event().clone(),
1246 cmd.exec_algorithm_id,
1247 cmd.position_id,
1248 cmd.params.clone(),
1249 UUID4::new(),
1250 cmd.ts_init,
1251 );
1252 if let Err(e) = self.submit_order(&submit_cmd) {
1253 log::error!(
1254 "Failed to submit order {} from order list: {e}",
1255 order.client_order_id()
1256 );
1257 }
1258 continue;
1259 }
1260
1261 let Some(price) = order.price() else {
1263 let ts_event = self.clock.get_time_ns();
1264 self.emitter.emit_order_rejected_event(
1265 order.strategy_id(),
1266 order.instrument_id(),
1267 order.client_order_id(),
1268 "Limit order missing price",
1269 ts_event,
1270 false,
1271 );
1272 continue;
1273 };
1274
1275 let encoded = match self.encoder.encode(order.client_order_id()) {
1277 Ok(enc) => enc,
1278 Err(e) => {
1279 log::error!("Failed to generate client order ID: {e}");
1280 let ts_event = self.clock.get_time_ns();
1281 self.emitter.emit_order_rejected_event(
1282 order.strategy_id(),
1283 order.instrument_id(),
1284 order.client_order_id(),
1285 &e.to_string(),
1286 ts_event,
1287 false,
1288 );
1289 continue;
1290 }
1291 };
1292 let client_id_u32 = encoded.client_id;
1293 let client_metadata = encoded.client_metadata;
1294
1295 self.emitter.emit_order_submitted(order);
1297
1298 let expire_time_secs = order.expire_time().map(nanos_to_secs_i64);
1300 let lifetime = types::OrderLifetime::from_time_in_force(
1301 order.time_in_force(),
1302 expire_time_secs,
1303 false,
1304 order_builder.max_short_term_secs(),
1305 );
1306
1307 let ts_submitted = self.clock.get_time_ns();
1309 self.register_order_context(
1310 client_id_u32,
1311 OrderContext {
1312 client_order_id: order.client_order_id(),
1313 trader_id: order.trader_id(),
1314 strategy_id: order.strategy_id(),
1315 instrument_id: order.instrument_id(),
1316 submitted_at: ts_submitted,
1317 order_flags: lifetime.order_flags(),
1318 },
1319 );
1320
1321 order_params.push(LimitOrderParams {
1323 instrument_id: order.instrument_id(),
1324 client_order_id: client_id_u32,
1325 client_metadata,
1326 side: order.order_side(),
1327 price,
1328 quantity: order.quantity(),
1329 time_in_force: order.time_in_force(),
1330 post_only: order.is_post_only(),
1331 reduce_only: order.is_reduce_only(),
1332 expire_time_ns: order.expire_time(),
1333 });
1334 order_info.push((
1335 order.client_order_id(),
1336 order.instrument_id(),
1337 order.strategy_id(),
1338 ));
1339 }
1340
1341 if order_params.is_empty() {
1343 return Ok(());
1344 }
1345
1346 let has_short_term = order_params
1350 .iter()
1351 .any(|params| order_builder.is_short_term_order(params));
1352
1353 let block_height = current_block as u32;
1354 let emitter = self.emitter.clone();
1355 let clock = self.clock;
1356
1357 if has_short_term {
1358 log::debug!(
1360 "Submitting {} short-term limit orders concurrently (sequence not consumed)",
1361 order_params.len()
1362 );
1363
1364 let order_count = order_params.len();
1365 let handle = get_runtime().spawn(async move {
1366 let mut handles = Vec::with_capacity(order_count);
1369
1370 for (params, (client_order_id, instrument_id, strategy_id)) in
1371 order_params.into_iter().zip(order_info.into_iter())
1372 {
1373 let tx_manager = tx_manager.clone();
1374 let broadcaster = broadcaster.clone();
1375 let order_builder = order_builder.clone();
1376 let emitter = emitter.clone();
1377
1378 let handle = get_runtime().spawn(async move {
1379 let msg = match order_builder
1381 .build_limit_order_from_params(¶ms, block_height)
1382 {
1383 Ok(m) => m,
1384 Err(e) => {
1385 let error_msg = format!("Failed to build order message: {e:?}");
1386 log::error!("{error_msg}");
1387 let ts_event = clock.get_time_ns();
1388 emitter.emit_order_rejected_event(
1389 strategy_id,
1390 instrument_id,
1391 client_order_id,
1392 &error_msg,
1393 ts_event,
1394 false,
1395 );
1396 return;
1397 }
1398 };
1399
1400 let operation = format!("Submit short-term order {client_order_id}");
1402 if let Err(e) = broadcaster
1403 .broadcast_short_term(&tx_manager, vec![msg], &operation)
1404 .await
1405 {
1406 let error_msg = format!("Order submission failed: {e:?}");
1407 log::error!("{error_msg}");
1408 let ts_event = clock.get_time_ns();
1409 emitter.emit_order_rejected_event(
1410 strategy_id,
1411 instrument_id,
1412 client_order_id,
1413 &error_msg,
1414 ts_event,
1415 false,
1416 );
1417 }
1418 });
1419
1420 handles.push(handle);
1421 }
1422
1423 for handle in handles {
1425 let _ = handle.await;
1426 }
1427 });
1428
1429 self.pending_tasks
1431 .lock()
1432 .expect(MUTEX_POISONED)
1433 .push(handle);
1434 } else {
1435 log::info!(
1437 "Batch submitting {} long-term limit orders in single transaction",
1438 order_params.len()
1439 );
1440
1441 let handle = get_runtime().spawn(async move {
1442 let msgs: Result<Vec<_>, _> = order_params
1444 .iter()
1445 .map(|params| order_builder.build_limit_order_from_params(params, block_height))
1446 .collect();
1447
1448 let msgs = match msgs {
1449 Ok(m) => m,
1450 Err(e) => {
1451 let error_msg = format!("Failed to build batch order messages: {e:?}");
1452 log::error!("{error_msg}");
1453 let ts_event = clock.get_time_ns();
1455 for (client_order_id, instrument_id, strategy_id) in order_info {
1456 emitter.emit_order_rejected_event(
1457 strategy_id,
1458 instrument_id,
1459 client_order_id,
1460 &error_msg,
1461 ts_event,
1462 false,
1463 );
1464 }
1465 return;
1466 }
1467 };
1468
1469 let operation = format!("Submit batch of {} limit orders", msgs.len());
1471 if let Err(e) = broadcaster
1472 .broadcast_with_retry(&tx_manager, msgs, &operation)
1473 .await
1474 {
1475 let error_msg = format!("Batch order submission failed: {e:?}");
1476 log::error!("{error_msg}");
1477
1478 let ts_event = clock.get_time_ns();
1480 for (client_order_id, instrument_id, strategy_id) in order_info {
1481 emitter.emit_order_rejected_event(
1482 strategy_id,
1483 instrument_id,
1484 client_order_id,
1485 &error_msg,
1486 ts_event,
1487 false,
1488 );
1489 }
1490 }
1491 });
1492
1493 self.pending_tasks
1495 .lock()
1496 .expect(MUTEX_POISONED)
1497 .push(handle);
1498 }
1499
1500 Ok(())
1501 }
1502
1503 fn modify_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
1507 let reason = "dYdX does not support order modification. Use cancel and resubmit instead.";
1508 log::error!("{reason}");
1509
1510 self.send_modify_rejected(
1511 cmd.strategy_id,
1512 cmd.instrument_id,
1513 cmd.client_order_id,
1514 cmd.venue_order_id,
1515 reason,
1516 );
1517 Ok(())
1518 }
1519
1520 fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
1539 if !self.is_connected() {
1540 anyhow::bail!("Cannot cancel order: not connected");
1541 }
1542
1543 let client_order_id = cmd.client_order_id;
1544 let instrument_id = cmd.instrument_id;
1545 let strategy_id = cmd.strategy_id;
1546 let venue_order_id = cmd.venue_order_id;
1547
1548 let (order_time_in_force, order_expire_time) = {
1549 let cache = self.core.cache();
1550
1551 let order = match cache.order(&client_order_id) {
1552 Some(order) => order,
1553 None => {
1554 log::error!("Cannot cancel order {client_order_id}: not found in cache");
1555 return Ok(()); }
1557 };
1558
1559 if order.is_closed() {
1561 log::warn!(
1562 "CancelOrder command for {} when order already {} (will not send to exchange)",
1563 client_order_id,
1564 order.status()
1565 );
1566 return Ok(());
1567 }
1568
1569 if cache.instrument(&instrument_id).is_none() {
1571 log::error!(
1572 "Cannot cancel order {client_order_id}: instrument {instrument_id} not found in cache"
1573 );
1574 return Ok(()); }
1576
1577 (
1579 order.time_in_force(),
1580 order.expire_time().map(nanos_to_secs_i64),
1581 )
1582 }; log::debug!("Cancelling order {client_order_id} for instrument {instrument_id}");
1585
1586 let (tx_manager, broadcaster, order_builder) = match self.get_execution_components() {
1588 Ok(components) => components,
1589 Err(e) => {
1590 log::error!("Failed to get execution components for cancel: {e}");
1591 return Ok(());
1592 }
1593 };
1594
1595 let block_height = self.block_time_monitor.current_block_height() as u32;
1596
1597 let encoded = match self.encoder.get(&client_order_id) {
1599 Some(enc) => enc,
1600 None => {
1601 log::error!("Client order ID {client_order_id} not found in cache");
1602 anyhow::bail!("Client order ID not found in cache")
1603 }
1604 };
1605 let client_id_u32 = encoded.client_id;
1606
1607 log::info!(
1608 "[CANCEL_ORDER] Nautilus '{client_order_id}' -> dYdX u32={client_id_u32} | instrument={instrument_id}"
1609 );
1610
1611 let order_flags = self.get_order_context(client_id_u32).map_or_else(
1614 || {
1615 log::warn!(
1617 "Order context not found for {client_order_id}, deriving flags from order"
1618 );
1619 types::OrderLifetime::from_time_in_force(
1620 order_time_in_force, order_expire_time, false,
1623 order_builder.max_short_term_secs(),
1624 )
1625 .order_flags()
1626 },
1627 |ctx| ctx.order_flags,
1628 );
1629
1630 let clock = self.clock;
1631 let emitter = self.emitter.clone();
1632
1633 self.spawn_task("cancel_order", async move {
1634 let cancel_msg = match order_builder.build_cancel_order_with_flags(
1636 instrument_id,
1637 client_id_u32,
1638 order_flags,
1639 block_height,
1640 ) {
1641 Ok(msg) => msg,
1642 Err(e) => {
1643 log::error!("Failed to build cancel message for {client_order_id}: {e:?}");
1644 let ts_event = clock.get_time_ns();
1645 emitter.emit_order_cancel_rejected_event(
1646 strategy_id,
1647 instrument_id,
1648 client_order_id,
1649 venue_order_id,
1650 &format!("Cancel build failed: {e:?}"),
1651 ts_event,
1652 );
1653 return Ok(());
1654 }
1655 };
1656
1657 let cancel_op = format!("Cancel order {client_order_id}");
1659 let result = if order_flags == types::ORDER_FLAG_SHORT_TERM {
1660 broadcaster
1661 .broadcast_short_term(&tx_manager, vec![cancel_msg], &cancel_op)
1662 .await
1663 } else {
1664 broadcaster
1665 .broadcast_with_retry(&tx_manager, vec![cancel_msg], &cancel_op)
1666 .await
1667 };
1668 match result {
1669 Ok(_) => {
1670 log::debug!("Successfully cancelled order: {client_order_id}");
1671 }
1672 Err(e) => {
1673 log::error!("Failed to cancel order {client_order_id}: {e:?}");
1674
1675 let ts_event = clock.get_time_ns();
1676 emitter.emit_order_cancel_rejected_event(
1677 strategy_id,
1678 instrument_id,
1679 client_order_id,
1680 venue_order_id,
1681 &format!("Cancel order failed: {e:?}"),
1682 ts_event,
1683 );
1684 }
1685 }
1686
1687 Ok(())
1688 });
1689
1690 Ok(())
1691 }
1692
1693 fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
1694 if !self.is_connected() {
1695 anyhow::bail!("Cannot cancel orders: not connected");
1696 }
1697
1698 let instrument_id = cmd.instrument_id;
1699 let order_side_filter = cmd.order_side;
1700
1701 let order_data: Vec<(ClientOrderId, TimeInForce, Option<UnixNanos>)> = {
1704 let cache = self.core.cache();
1705 cache
1706 .orders_open(None, None, None, None, None)
1707 .into_iter()
1708 .filter(|order| order.instrument_id() == instrument_id)
1709 .filter(|order| {
1710 order_side_filter == OrderSide::NoOrderSide
1711 || order.order_side() == order_side_filter
1712 })
1713 .map(|order| {
1714 (
1715 order.client_order_id(),
1716 order.time_in_force(),
1717 order.expire_time(),
1718 )
1719 })
1720 .collect()
1721 }; let short_term_count = order_data
1725 .iter()
1726 .filter(|(_, tif, _)| matches!(tif, TimeInForce::Ioc | TimeInForce::Fok))
1727 .count();
1728 let long_term_count = order_data.len() - short_term_count;
1729
1730 log::debug!(
1731 "Cancel all orders: total={}, short_term={}, long_term={}, instrument_id={instrument_id}, order_side={order_side_filter:?}",
1732 order_data.len(),
1733 short_term_count,
1734 long_term_count
1735 );
1736
1737 let (tx_manager, broadcaster, order_builder) = match self.get_execution_components() {
1739 Ok(components) => components,
1740 Err(e) => {
1741 log::error!("Failed to get execution components for cancel_all: {e}");
1742 return Ok(());
1743 }
1744 };
1745
1746 let block_height = self.block_time_monitor.current_block_height() as u32;
1747
1748 let mut orders_to_cancel = Vec::new();
1751 for (client_order_id, time_in_force, expire_time) in &order_data {
1752 if let Some(encoded) = self.encoder.get(client_order_id) {
1753 let client_id_u32 = encoded.client_id;
1754 let order_flags = self.get_order_context(client_id_u32).map_or_else(
1756 || {
1757 log::warn!(
1759 "Order context not found for {client_order_id}, deriving flags from order"
1760 );
1761 let expire_secs = expire_time.map(nanos_to_secs_i64);
1762 types::OrderLifetime::from_time_in_force(
1763 *time_in_force,
1764 expire_secs,
1765 false,
1766 order_builder.max_short_term_secs(),
1767 )
1768 .order_flags()
1769 },
1770 |ctx| ctx.order_flags,
1771 );
1772 orders_to_cancel.push((instrument_id, client_id_u32, order_flags));
1773 } else {
1774 log::warn!(
1775 "Cannot cancel order {client_order_id}: client_order_id not found in cache"
1776 );
1777 }
1778 }
1779
1780 if orders_to_cancel.is_empty() {
1781 return Ok(());
1782 }
1783
1784 let has_short_term = orders_to_cancel
1787 .iter()
1788 .any(|(_, _, flags)| *flags == types::ORDER_FLAG_SHORT_TERM);
1789
1790 if has_short_term {
1791 log::info!(
1793 "Cancelling {} orders individually (short-term cancels cannot be batched)",
1794 orders_to_cancel.len()
1795 );
1796
1797 self.spawn_task("cancel_all_orders", async move {
1798 let mut handles = Vec::with_capacity(orders_to_cancel.len());
1799
1800 for (inst_id, client_id, order_flags) in orders_to_cancel {
1801 let tx_manager = tx_manager.clone();
1802 let broadcaster = broadcaster.clone();
1803 let order_builder = order_builder.clone();
1804
1805 let handle = get_runtime().spawn(async move {
1806 let msg = match order_builder.build_cancel_order_with_flags(
1808 inst_id,
1809 client_id,
1810 order_flags,
1811 block_height,
1812 ) {
1813 Ok(m) => m,
1814 Err(e) => {
1815 log::error!(
1816 "Failed to build cancel message for client_id={client_id}: {e:?}"
1817 );
1818 return;
1819 }
1820 };
1821
1822 let cancel_op = format!("Cancel order {client_id}");
1824 let result = if order_flags == types::ORDER_FLAG_SHORT_TERM {
1825 broadcaster
1826 .broadcast_short_term(&tx_manager, vec![msg], &cancel_op)
1827 .await
1828 } else {
1829 broadcaster
1830 .broadcast_with_retry(&tx_manager, vec![msg], &cancel_op)
1831 .await
1832 };
1833 if let Err(e) = result {
1834 log::error!("Failed to cancel order client_id={client_id}: {e:?}");
1835 }
1836 });
1837
1838 handles.push(handle);
1839 }
1840
1841 for handle in handles {
1843 let _ = handle.await;
1844 }
1845
1846 Ok(())
1847 });
1848 } else {
1849 log::info!(
1851 "Batch cancelling {} long-term orders in single transaction",
1852 orders_to_cancel.len()
1853 );
1854
1855 self.spawn_task("cancel_all_orders", async move {
1856 let msgs: Result<Vec<_>, _> = orders_to_cancel
1858 .iter()
1859 .map(|(inst_id, client_id, order_flags)| {
1860 order_builder.build_cancel_order_with_flags(
1861 *inst_id,
1862 *client_id,
1863 *order_flags,
1864 block_height,
1865 )
1866 })
1867 .collect();
1868
1869 let msgs = match msgs {
1870 Ok(m) => m,
1871 Err(e) => {
1872 log::error!("Failed to build cancel messages: {e:?}");
1873 return Ok(());
1874 }
1875 };
1876
1877 if msgs.is_empty() {
1878 return Ok(());
1879 }
1880
1881 match broadcaster
1883 .broadcast_with_retry(
1884 &tx_manager,
1885 msgs,
1886 &format!("Cancel {} orders", orders_to_cancel.len()),
1887 )
1888 .await
1889 {
1890 Ok(_) => {
1891 log::debug!("Successfully cancelled {} orders", orders_to_cancel.len());
1892 }
1893 Err(e) => {
1894 log::error!("Batch cancel failed: {e:?}");
1895 }
1896 }
1897
1898 Ok(())
1899 });
1900 }
1901
1902 Ok(())
1903 }
1904
1905 fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
1906 if cmd.cancels.is_empty() {
1907 return Ok(());
1908 }
1909
1910 if !self.is_connected() {
1911 anyhow::bail!("Cannot cancel orders: not connected");
1912 }
1913
1914 let (tx_manager, broadcaster, order_builder) = match self.get_execution_components() {
1916 Ok(components) => components,
1917 Err(e) => {
1918 log::error!("Failed to get execution components for batch cancel: {e}");
1919 return Ok(());
1920 }
1921 };
1922
1923 let cache = self.core.cache();
1925
1926 let mut orders_to_cancel = Vec::with_capacity(cmd.cancels.len());
1927 for cancel in &cmd.cancels {
1928 let client_order_id = cancel.client_order_id;
1929 let encoded = match self.encoder.get(&client_order_id) {
1930 Some(enc) => enc,
1931 None => {
1932 log::warn!(
1933 "No u32 mapping found for client_order_id={client_order_id}, skipping cancel"
1934 );
1935 continue;
1936 }
1937 };
1938 let client_id_u32 = encoded.client_id;
1939
1940 let order_flags = self.get_order_context(client_id_u32).map_or_else(
1942 || {
1943 log::warn!(
1945 "Order context not found for {client_order_id}, deriving flags from order"
1946 );
1947 match cache.order(&client_order_id) {
1948 Some(order) => {
1949 let expire_time = order.expire_time().map(nanos_to_secs_i64);
1950 types::OrderLifetime::from_time_in_force(
1951 order.time_in_force(),
1952 expire_time,
1953 false,
1954 order_builder.max_short_term_secs(),
1955 )
1956 .order_flags()
1957 }
1958 None => types::ORDER_FLAG_LONG_TERM, }
1960 },
1961 |ctx| ctx.order_flags,
1962 );
1963
1964 orders_to_cancel.push((cancel.instrument_id, client_id_u32, order_flags));
1965 }
1966 drop(cache);
1967
1968 if orders_to_cancel.is_empty() {
1969 log::warn!("No valid orders to cancel in batch");
1970 return Ok(());
1971 }
1972
1973 let block_height = self.block_time_monitor.current_block_height() as u32;
1974
1975 let has_short_term = orders_to_cancel
1978 .iter()
1979 .any(|(_, _, flags)| *flags == types::ORDER_FLAG_SHORT_TERM);
1980
1981 if has_short_term {
1982 log::info!(
1984 "Cancelling {} orders individually (short-term cancels cannot be batched)",
1985 orders_to_cancel.len()
1986 );
1987
1988 self.spawn_task("batch_cancel_orders", async move {
1989 let mut handles = Vec::with_capacity(orders_to_cancel.len());
1990
1991 for (inst_id, client_id, order_flags) in orders_to_cancel {
1992 let tx_manager = tx_manager.clone();
1993 let broadcaster = broadcaster.clone();
1994 let order_builder = order_builder.clone();
1995
1996 let handle = get_runtime().spawn(async move {
1997 let msg = match order_builder.build_cancel_order_with_flags(
1999 inst_id,
2000 client_id,
2001 order_flags,
2002 block_height,
2003 ) {
2004 Ok(m) => m,
2005 Err(e) => {
2006 log::error!(
2007 "Failed to build cancel message for client_id={client_id}: {e:?}"
2008 );
2009 return;
2010 }
2011 };
2012
2013 let cancel_op = format!("Cancel order {client_id}");
2015 let result = if order_flags == types::ORDER_FLAG_SHORT_TERM {
2016 broadcaster
2017 .broadcast_short_term(&tx_manager, vec![msg], &cancel_op)
2018 .await
2019 } else {
2020 broadcaster
2021 .broadcast_with_retry(&tx_manager, vec![msg], &cancel_op)
2022 .await
2023 };
2024 if let Err(e) = result {
2025 log::error!("Failed to cancel order client_id={client_id}: {e:?}");
2026 }
2027 });
2028
2029 handles.push(handle);
2030 }
2031
2032 for handle in handles {
2034 let _ = handle.await;
2035 }
2036
2037 Ok(())
2038 });
2039 } else {
2040 log::debug!(
2042 "Batch cancelling {} long-term orders: {:?}",
2043 orders_to_cancel.len(),
2044 orders_to_cancel
2045 );
2046
2047 self.spawn_task("batch_cancel_orders", async move {
2048 let cancel_msgs = match order_builder
2050 .build_cancel_orders_batch_with_flags(&orders_to_cancel, block_height)
2051 {
2052 Ok(msgs) => msgs,
2053 Err(e) => {
2054 log::error!("Failed to build batch cancel messages: {e:?}");
2055 return Ok(());
2056 }
2057 };
2058
2059 match broadcaster
2061 .broadcast_with_retry(&tx_manager, cancel_msgs, "BatchCancelOrders")
2062 .await
2063 {
2064 Ok(tx_hash) => {
2065 log::debug!(
2066 "Successfully batch cancelled {} orders, tx_hash: {}",
2067 orders_to_cancel.len(),
2068 tx_hash
2069 );
2070 }
2071 Err(e) => {
2072 log::error!("Batch cancel failed: {e:?}");
2073 }
2074 }
2075
2076 Ok(())
2077 });
2078 }
2079
2080 Ok(())
2081 }
2082
2083 fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
2084 Ok(())
2085 }
2086
2087 fn query_order(&self, _cmd: &QueryOrder) -> anyhow::Result<()> {
2088 Ok(())
2089 }
2090
2091 async fn connect(&mut self) -> anyhow::Result<()> {
2092 if self.core.is_connected() {
2093 log::warn!("dYdX execution client already connected");
2094 return Ok(());
2095 }
2096
2097 log::info!("Connecting to dYdX");
2098
2099 log::debug!("Loading instruments from HTTP API");
2100 self.http_client.fetch_and_cache_instruments().await?;
2101 log::debug!(
2102 "Loaded {} instruments from HTTP into shared cache",
2103 self.http_client.cached_instruments_count()
2104 );
2105 self.mark_instruments_initialized();
2106
2107 let grpc_urls = self.config.get_grpc_urls();
2109 let mut grpc_client = DydxGrpcClient::new_with_fallback(&grpc_urls)
2110 .await
2111 .context("failed to construct dYdX gRPC client")?;
2112 log::debug!("gRPC client initialized");
2113
2114 let initial_height = grpc_client
2116 .latest_block_height()
2117 .await
2118 .context("failed to fetch initial block height")?;
2119 self.block_time_monitor
2121 .record_block(initial_height.0 as u64, chrono::Utc::now());
2122 log::info!("Initial block height: {}", initial_height.0);
2123
2124 *self.grpc_client.write().await = Some(grpc_client.clone());
2125
2126 let private_key =
2128 Self::resolve_private_key(&self.config).context("failed to resolve private key")?;
2129 let tx_manager = Arc::new(
2130 TransactionManager::new(
2131 grpc_client.clone(),
2132 &private_key,
2133 self.wallet_address.clone(),
2134 self.get_chain_id(),
2135 )
2136 .context("failed to create TransactionManager")?,
2137 );
2138
2139 tx_manager
2140 .resolve_authenticators()
2141 .await
2142 .context("failed to resolve authenticators")?;
2143
2144 tx_manager
2147 .initialize_sequence()
2148 .await
2149 .context("failed to initialize sequence")?;
2150
2151 self.tx_manager = Some(tx_manager);
2152 self.broadcaster = Some(Arc::new(TxBroadcaster::new(grpc_client)));
2153 self.order_builder = Some(Arc::new(OrderMessageBuilder::new(
2154 self.http_client.clone(),
2155 self.wallet_address.clone(),
2156 self.subaccount_number,
2157 self.block_time_monitor.clone(),
2158 )));
2159 log::debug!(
2160 "OrderMessageBuilder initialized (block_time_monitor ready: {}, max_short_term: {:.1}s)",
2161 self.block_time_monitor.is_ready(),
2162 SHORT_TERM_ORDER_MAXIMUM_LIFETIME as f64
2163 * self.block_time_monitor.seconds_per_block_or_default()
2164 );
2165
2166 self.ws_client.connect().await?;
2168 log::debug!("WebSocket connected");
2169
2170 self.ws_client.subscribe_block_height().await?;
2172 log::debug!("Subscribed to block height updates");
2173
2174 self.ws_client.subscribe_markets().await?;
2176 log::debug!("Subscribed to markets");
2177
2178 log::info!(
2180 "Using wallet address for queries: {} (subaccount {})",
2181 self.wallet_address,
2182 self.subaccount_number
2183 );
2184 self.ws_client
2185 .subscribe_subaccount(&self.wallet_address, self.subaccount_number)
2186 .await?;
2187 log::debug!(
2188 "Subscribed to subaccount updates: {}/{}",
2189 self.wallet_address,
2190 self.subaccount_number
2191 );
2192
2193 let stream = self.ws_client.stream();
2194 self.spawn_ws_stream_handler(stream);
2195
2196 self.await_account_registered(30.0).await?;
2200
2201 self.core.set_connected();
2202 log::info!("Connected: client_id={}", self.core.client_id);
2203 Ok(())
2204 }
2205
2206 async fn disconnect(&mut self) -> anyhow::Result<()> {
2207 if self.core.is_disconnected() {
2208 log::warn!("dYdX execution client not connected");
2209 return Ok(());
2210 }
2211
2212 log::info!("Disconnecting from dYdX");
2213
2214 let _ = self
2216 .ws_client
2217 .unsubscribe_subaccount(&self.wallet_address, self.subaccount_number)
2218 .await
2219 .map_err(|e| log::warn!("Failed to unsubscribe from subaccount: {e}"));
2220
2221 let _ = self
2223 .ws_client
2224 .unsubscribe_markets()
2225 .await
2226 .map_err(|e| log::warn!("Failed to unsubscribe from markets: {e}"));
2227
2228 let _ = self
2230 .ws_client
2231 .unsubscribe_block_height()
2232 .await
2233 .map_err(|e| log::warn!("Failed to unsubscribe from block height: {e}"));
2234
2235 self.ws_client.disconnect().await?;
2237
2238 if let Some(handle) = self.ws_stream_handle.take() {
2240 handle.abort();
2241 log::debug!("Aborted WebSocket message processing task");
2242 }
2243
2244 self.abort_pending_tasks();
2246
2247 self.core.set_disconnected();
2248 log::info!("Disconnected: client_id={}", self.core.client_id);
2249 Ok(())
2250 }
2251
2252 async fn generate_order_status_report(
2253 &self,
2254 cmd: &GenerateOrderStatusReport,
2255 ) -> anyhow::Result<Option<OrderStatusReport>> {
2256 let response = self
2258 .http_client
2259 .inner
2260 .get_orders(
2261 &self.wallet_address,
2262 self.subaccount_number,
2263 None, Some(1), )
2266 .await
2267 .context("failed to fetch order from dYdX API")?;
2268
2269 if response.is_empty() {
2270 return Ok(None);
2271 }
2272
2273 let order = &response[0];
2274 let ts_init = UnixNanos::default();
2275
2276 let instrument = match self.get_instrument_by_clob_pair_id(order.clob_pair_id) {
2277 Some(inst) => inst,
2278 None => return Ok(None),
2279 };
2280
2281 let report = parse_order_status_report(order, &instrument, self.core.account_id, ts_init)
2282 .context("failed to parse order status report")?;
2283
2284 if let Some(client_order_id) = cmd.client_order_id
2285 && report.client_order_id != Some(client_order_id)
2286 {
2287 return Ok(None);
2288 }
2289
2290 if let Some(venue_order_id) = cmd.venue_order_id
2291 && report.venue_order_id.as_str() != venue_order_id.as_str()
2292 {
2293 return Ok(None);
2294 }
2295
2296 if let Some(instrument_id) = cmd.instrument_id
2297 && report.instrument_id != instrument_id
2298 {
2299 return Ok(None);
2300 }
2301
2302 Ok(Some(report))
2303 }
2304
2305 async fn generate_order_status_reports(
2306 &self,
2307 cmd: &GenerateOrderStatusReports,
2308 ) -> anyhow::Result<Vec<OrderStatusReport>> {
2309 let response = self
2311 .http_client
2312 .inner
2313 .get_orders(
2314 &self.wallet_address,
2315 self.subaccount_number,
2316 None, None, )
2319 .await
2320 .context("failed to fetch orders from dYdX API")?;
2321
2322 let mut reports = Vec::new();
2323 let ts_init = UnixNanos::default();
2324
2325 for order in response {
2326 let instrument = match self.get_instrument_by_clob_pair_id(order.clob_pair_id) {
2327 Some(inst) => inst,
2328 None => continue,
2329 };
2330
2331 if let Some(filter_id) = cmd.instrument_id
2332 && instrument.id() != filter_id
2333 {
2334 continue;
2335 }
2336
2337 let report =
2338 match parse_order_status_report(&order, &instrument, self.core.account_id, ts_init)
2339 {
2340 Ok(r) => r,
2341 Err(e) => {
2342 log::warn!("Failed to parse order status report: {e}");
2343 continue;
2344 }
2345 };
2346
2347 reports.push(report);
2348 }
2349
2350 if cmd.open_only {
2352 reports.retain(|r| r.order_status.is_open());
2353 }
2354
2355 if let Some(start) = cmd.start {
2357 reports.retain(|r| r.ts_last >= start);
2358 }
2359 if let Some(end) = cmd.end {
2360 reports.retain(|r| r.ts_last <= end);
2361 }
2362
2363 Ok(reports)
2364 }
2365
2366 async fn generate_fill_reports(
2367 &self,
2368 cmd: GenerateFillReports,
2369 ) -> anyhow::Result<Vec<FillReport>> {
2370 let response = self
2371 .http_client
2372 .inner
2373 .get_fills(
2374 &self.wallet_address,
2375 self.subaccount_number,
2376 None, None, )
2379 .await
2380 .context("failed to fetch fills from dYdX API")?;
2381
2382 let mut reports = Vec::new();
2383 let ts_init = UnixNanos::default();
2384
2385 for fill in response.fills {
2386 let instrument = match self.get_instrument_by_market(&fill.market) {
2387 Some(inst) => inst,
2388 None => {
2389 log::warn!("Unknown market in fill: {}", fill.market);
2390 continue;
2391 }
2392 };
2393
2394 if let Some(filter_id) = cmd.instrument_id
2395 && instrument.id() != filter_id
2396 {
2397 continue;
2398 }
2399
2400 let report = match parse_fill_report(&fill, &instrument, self.core.account_id, ts_init)
2401 {
2402 Ok(r) => r,
2403 Err(e) => {
2404 log::warn!("Failed to parse fill report: {e}");
2405 continue;
2406 }
2407 };
2408
2409 reports.push(report);
2410 }
2411
2412 if let Some(venue_order_id) = cmd.venue_order_id {
2413 reports.retain(|r| r.venue_order_id.as_str() == venue_order_id.as_str());
2414 }
2415
2416 Ok(reports)
2417 }
2418
2419 async fn generate_position_status_reports(
2420 &self,
2421 cmd: &GeneratePositionStatusReports,
2422 ) -> anyhow::Result<Vec<PositionStatusReport>> {
2423 let response = self
2425 .http_client
2426 .inner
2427 .get_subaccount(&self.wallet_address, self.subaccount_number)
2428 .await
2429 .context("failed to fetch subaccount from dYdX API")?;
2430
2431 let mut reports = Vec::new();
2432 let ts_init = UnixNanos::default();
2433
2434 for (market_ticker, perp_position) in &response.subaccount.open_perpetual_positions {
2435 let instrument = match self.get_instrument_by_market(market_ticker) {
2436 Some(inst) => inst,
2437 None => {
2438 log::warn!("Unknown market in position: {market_ticker}");
2439 continue;
2440 }
2441 };
2442
2443 if let Some(filter_id) = cmd.instrument_id
2444 && instrument.id() != filter_id
2445 {
2446 continue;
2447 }
2448
2449 let report = match parse_position_status_report(
2450 perp_position,
2451 &instrument,
2452 self.core.account_id,
2453 ts_init,
2454 ) {
2455 Ok(r) => r,
2456 Err(e) => {
2457 log::warn!("Failed to parse position status report: {e}");
2458 continue;
2459 }
2460 };
2461
2462 reports.push(report);
2463 }
2464
2465 Ok(reports)
2466 }
2467
2468 async fn generate_mass_status(
2469 &self,
2470 lookback_mins: Option<u64>,
2471 ) -> anyhow::Result<Option<ExecutionMassStatus>> {
2472 let ts_init = UnixNanos::default();
2473
2474 let orders_response = self
2476 .http_client
2477 .inner
2478 .get_orders(&self.wallet_address, self.subaccount_number, None, None)
2479 .await
2480 .context("failed to fetch orders for mass status")?;
2481
2482 let subaccount_response = self
2484 .http_client
2485 .inner
2486 .get_subaccount(&self.wallet_address, self.subaccount_number)
2487 .await
2488 .context("failed to fetch subaccount for mass status")?;
2489
2490 let fills_response = self
2492 .http_client
2493 .inner
2494 .get_fills(&self.wallet_address, self.subaccount_number, None, None)
2495 .await
2496 .context("failed to fetch fills for mass status")?;
2497
2498 let mut order_reports = Vec::new();
2500 let mut orders_filtered = 0usize;
2501 for order in orders_response {
2502 let instrument = match self.get_instrument_by_clob_pair_id(order.clob_pair_id) {
2503 Some(inst) => inst,
2504 None => {
2505 orders_filtered += 1;
2506 continue;
2507 }
2508 };
2509
2510 match parse_order_status_report(&order, &instrument, self.core.account_id, ts_init) {
2511 Ok(mut r) => {
2512 if !order.client_id.is_empty()
2514 && let Ok(client_id_u32) = order.client_id.parse::<u32>()
2515 && let Some(decoded) =
2516 self.encoder.decode(client_id_u32, order.client_metadata)
2517 {
2518 log::debug!(
2519 "Decoded reconciliation order: dYdX client_id={} meta={:#x} -> '{}'",
2520 client_id_u32,
2521 order.client_metadata,
2522 decoded,
2523 );
2524 r.client_order_id = Some(decoded);
2525 }
2526 order_reports.push(r);
2527 }
2528 Err(e) => {
2529 log::warn!("Failed to parse order status report: {e}");
2530 orders_filtered += 1;
2531 }
2532 }
2533 }
2534
2535 let mut position_reports = Vec::new();
2537 for (market_ticker, perp_position) in
2538 &subaccount_response.subaccount.open_perpetual_positions
2539 {
2540 let instrument = match self.get_instrument_by_market(market_ticker) {
2541 Some(inst) => inst,
2542 None => continue,
2543 };
2544
2545 match parse_position_status_report(
2546 perp_position,
2547 &instrument,
2548 self.core.account_id,
2549 ts_init,
2550 ) {
2551 Ok(r) => position_reports.push(r),
2552 Err(e) => {
2553 log::warn!("Failed to parse position status report: {e}");
2554 }
2555 }
2556 }
2557
2558 let mut fill_reports = Vec::new();
2560 let mut fills_filtered = 0usize;
2561 for fill in fills_response.fills {
2562 let instrument = match self.get_instrument_by_market(&fill.market) {
2563 Some(inst) => inst,
2564 None => {
2565 fills_filtered += 1;
2566 continue;
2567 }
2568 };
2569
2570 match parse_fill_report(&fill, &instrument, self.core.account_id, ts_init) {
2571 Ok(r) => fill_reports.push(r),
2572 Err(e) => {
2573 log::warn!("Failed to parse fill report: {e}");
2574 fills_filtered += 1;
2575 }
2576 }
2577 }
2578
2579 if let Some(mins) = lookback_mins {
2581 let now_ns = get_atomic_clock_realtime().get_time_ns();
2582 let cutoff_ns = now_ns.as_u64().saturating_sub(mins * 60 * 1_000_000_000);
2583 let cutoff = UnixNanos::from(cutoff_ns);
2584
2585 let orders_before = order_reports.len();
2586 order_reports.retain(|r| r.ts_last >= cutoff);
2587 let orders_removed = orders_before - order_reports.len();
2588
2589 let fills_before = fill_reports.len();
2590 fill_reports.retain(|r| r.ts_event >= cutoff);
2591 let fills_removed = fills_before - fill_reports.len();
2592
2593 log::info!(
2594 "Lookback filter ({}min): orders {}->{} (removed {}), fills {}->{} (removed {}), positions {} (unfiltered)",
2595 mins,
2596 orders_before,
2597 order_reports.len(),
2598 orders_removed,
2599 fills_before,
2600 fill_reports.len(),
2601 fills_removed,
2602 position_reports.len(),
2603 );
2604 } else {
2605 log::debug!(
2606 "Generated mass status: {} orders ({} filtered), {} positions, {} fills ({} filtered)",
2607 order_reports.len(),
2608 orders_filtered,
2609 position_reports.len(),
2610 fill_reports.len(),
2611 fills_filtered,
2612 );
2613 }
2614
2615 let mut mass_status = ExecutionMassStatus::new(
2617 self.core.client_id,
2618 self.core.account_id,
2619 self.core.venue,
2620 ts_init,
2621 None, );
2623
2624 mass_status.add_order_reports(order_reports);
2625 mass_status.add_position_reports(position_reports);
2626 mass_status.add_fill_reports(fill_reports);
2627
2628 Ok(Some(mass_status))
2629 }
2630}