Skip to main content

nautilus_dydx/execution/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live execution client implementation for the dYdX adapter.
17//!
18//! This module provides the execution client for submitting orders, cancellations,
19//! and managing positions on dYdX v4.
20//!
21//! # Order Types
22//!
23//! dYdX supports the following order types:
24//!
25//! - **Market**: Execute immediately at best available price.
26//! - **Limit**: Execute at specified price or better.
27//! - **Stop Market**: Triggered when price crosses stop price, then executes as market order.
28//! - **Stop Limit**: Triggered when price crosses stop price, then places limit order.
29//! - **Take Profit Market**: Close position at profit target, executes as market order.
30//! - **Take Profit Limit**: Close position at profit target, places limit order.
31//!
32//! See <https://docs.dydx.xyz/concepts/trading/orders#types> for details.
33//!
34//! # Order Lifetimes
35//!
36//! Orders can be short-term (expire by block height) or long-term/stateful (expire by timestamp).
37//! Conditional orders (Stop/TakeProfit) are always stateful.
38//!
39//! See <https://docs.dydx.xyz/concepts/trading/orders#short-term-vs-long-term> for details.
40
41use std::{
42    sync::{Arc, Mutex},
43    time::{Duration, Instant},
44};
45
46use anyhow::Context;
47use async_trait::async_trait;
48use dashmap::DashMap;
49use futures_util::{Stream, StreamExt, pin_mut};
50use nautilus_common::{
51    clients::ExecutionClient,
52    live::{get_runtime, runner::get_exec_event_sender},
53    messages::execution::{
54        BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
55        GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
56        ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
57    },
58};
59use nautilus_core::{
60    MUTEX_POISONED, UUID4, UnixNanos,
61    time::{AtomicTime, get_atomic_clock_realtime},
62};
63use nautilus_live::{ExecutionClientCore, ExecutionEventEmitter};
64use nautilus_model::{
65    accounts::AccountAny,
66    enums::{AccountType, OmsType, OrderSide, OrderType, TimeInForce},
67    events::AccountState,
68    identifiers::{
69        AccountId, ClientId, ClientOrderId, InstrumentId, StrategyId, Venue, VenueOrderId,
70    },
71    instruments::{Instrument, InstrumentAny},
72    orders::Order,
73    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
74    types::{AccountBalance, Currency, MarginBalance, Money},
75};
76use nautilus_network::retry::RetryConfig;
77use rust_decimal::Decimal;
78use tokio::task::JoinHandle;
79
80use crate::{
81    common::{
82        consts::DYDX_VENUE, credential::DydxCredential, instrument_cache::InstrumentCache,
83        parse::nanos_to_secs_i64,
84    },
85    config::DydxAdapterConfig,
86    execution::{
87        broadcaster::TxBroadcaster,
88        encoder::ClientOrderIdEncoder,
89        order_builder::OrderMessageBuilder,
90        tx_manager::TransactionManager,
91        types::{LimitOrderParams, OrderContext},
92    },
93    grpc::{DydxGrpcClient, SHORT_TERM_ORDER_MAXIMUM_LIFETIME, types::ChainId},
94    http::{
95        client::DydxHttpClient,
96        parse::{
97            parse_account_state, parse_fill_report, parse_order_status_report,
98            parse_position_status_report,
99        },
100    },
101    websocket::{
102        client::DydxWebSocketClient,
103        enums::NautilusWsMessage,
104        parse::{parse_ws_fill_report, parse_ws_order_report, parse_ws_position_report},
105    },
106};
107
108pub mod block_time;
109pub mod broadcaster;
110pub mod encoder;
111pub mod order_builder;
112pub mod submitter;
113pub mod tx_manager;
114pub mod types;
115pub mod wallet;
116
117use block_time::BlockTimeMonitor;
118
119/// Maximum client order ID value for dYdX (informational - not enforced by adapter).
120///
121/// Maximum client order ID value for dYdX (informational).
122///
123/// dYdX protocol accepts u32 client IDs. The `ClientOrderIdEncoder` uses sequential
124/// allocation starting from 1, with overflow protection near `u32::MAX - 1000`.
125pub const MAX_CLIENT_ID: u32 = u32::MAX;
126
127/// Live execution client for the dYdX v4 exchange adapter.
128///
129/// Supports Market, Limit, Stop Market, Stop Limit, Take Profit Market (MarketIfTouched),
130/// and Take Profit Limit (LimitIfTouched) orders via gRPC. Trailing stops are NOT supported
131/// by the dYdX v4 protocol. dYdX requires u32 client IDs - strings are hashed to fit.
132///
133/// # Architecture
134///
135/// The client follows a two-layer execution model:
136/// 1. **Synchronous validation** - Immediate checks and event generation.
137/// 2. **Async submission** - Non-blocking gRPC calls via `TransactionManager`, `TxBroadcaster`, and `OrderMessageBuilder`.
138///
139/// This matches the pattern used in OKX and other exchange adapters, ensuring
140/// consistent behavior across the Nautilus ecosystem.
141#[derive(Debug)]
142pub struct DydxExecutionClient {
143    core: ExecutionClientCore,
144    clock: &'static AtomicTime,
145    config: DydxAdapterConfig,
146    emitter: ExecutionEventEmitter,
147    http_client: DydxHttpClient,
148    ws_client: DydxWebSocketClient,
149    grpc_client: Arc<tokio::sync::RwLock<Option<DydxGrpcClient>>>,
150    instrument_cache: Arc<InstrumentCache>,
151    /// Block time monitor for tracking rolling average block times and expiration estimation.
152    block_time_monitor: Arc<BlockTimeMonitor>,
153    oracle_prices: Arc<DashMap<InstrumentId, Decimal>>,
154    /// Bidirectional encoder for ClientOrderId ↔ u32 mapping.
155    /// Wrapped in Arc for sharing with async WebSocket handler.
156    encoder: Arc<ClientOrderIdEncoder>,
157    order_contexts: Arc<DashMap<u32, OrderContext>>,
158    /// Reverse mapping: dYdX order ID → (client_id, client_metadata) for fill correlation.
159    /// Fills don't carry client_id, so we map via order_id from order updates.
160    order_id_map: Arc<DashMap<String, (u32, u32)>>,
161    wallet_address: String,
162    subaccount_number: u32,
163    tx_manager: Option<Arc<TransactionManager>>,
164    /// Transaction broadcaster with retry logic.
165    /// Wrapped in Arc for sharing with async order tasks.
166    broadcaster: Option<Arc<TxBroadcaster>>,
167    /// Order message builder for creating dYdX proto messages.
168    /// Wrapped in Arc for sharing with async order tasks.
169    order_builder: Option<Arc<OrderMessageBuilder>>,
170    ws_stream_handle: Option<JoinHandle<()>>,
171    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
172}
173
174impl DydxExecutionClient {
175    /// Creates a new [`DydxExecutionClient`].
176    ///
177    /// # Errors
178    ///
179    /// Returns an error if credentials are not found or client fails to construct.
180    pub fn new(
181        core: ExecutionClientCore,
182        config: DydxAdapterConfig,
183        wallet_address: String,
184        subaccount_number: u32,
185    ) -> anyhow::Result<Self> {
186        let trader_id = core.trader_id;
187        let account_id = core.account_id;
188        let clock = get_atomic_clock_realtime();
189        let emitter =
190            ExecutionEventEmitter::new(clock, trader_id, account_id, AccountType::Margin, None);
191
192        let retry_config = RetryConfig {
193            max_retries: config.max_retries,
194            initial_delay_ms: config.retry_delay_initial_ms,
195            max_delay_ms: config.retry_delay_max_ms,
196            ..Default::default()
197        };
198        let http_client = DydxHttpClient::new(
199            Some(config.base_url.clone()),
200            Some(config.timeout_secs),
201            None, // proxy_url - not in DydxAdapterConfig currently
202            config.is_testnet,
203            Some(retry_config),
204        )?;
205
206        // Share the HTTP client's instrument cache with WebSocket client
207        let instrument_cache = http_client.instrument_cache().clone();
208
209        // Use private WebSocket client for authenticated subaccount subscriptions
210        let credential = DydxCredential::resolve(
211            config.private_key.clone(),
212            config.is_testnet,
213            config.authenticator_ids.clone(),
214        )?
215        .ok_or_else(|| anyhow::anyhow!("Credentials required for execution client"))?;
216
217        // Create WS client with shared instrument cache
218        let ws_client = DydxWebSocketClient::new_private_with_cache(
219            config.ws_url.clone(),
220            credential,
221            core.account_id,
222            instrument_cache.clone(),
223            Some(20),
224        );
225
226        let grpc_client = Arc::new(tokio::sync::RwLock::new(None));
227
228        Ok(Self {
229            core,
230            clock,
231            config,
232            emitter,
233            http_client,
234            ws_client,
235            grpc_client,
236            instrument_cache,
237            block_time_monitor: Arc::new(BlockTimeMonitor::new()),
238            oracle_prices: Arc::new(DashMap::new()),
239            encoder: Arc::new(ClientOrderIdEncoder::new()),
240            order_contexts: Arc::new(DashMap::new()),
241            order_id_map: Arc::new(DashMap::new()),
242            wallet_address,
243            subaccount_number,
244            tx_manager: None,
245            broadcaster: None,
246            order_builder: None,
247            ws_stream_handle: None,
248            pending_tasks: Mutex::new(Vec::new()),
249        })
250    }
251
252    /// Resolves private key from config or environment.
253    ///
254    /// Priority: 1. config private_key, 2. env DYDX_PRIVATE_KEY
255    fn resolve_private_key(config: &DydxAdapterConfig) -> anyhow::Result<String> {
256        let private_key_env = if config.is_testnet {
257            "DYDX_TESTNET_PRIVATE_KEY"
258        } else {
259            "DYDX_PRIVATE_KEY"
260        };
261
262        // 1. Try private key from config
263        if let Some(ref pk) = config.private_key
264            && !pk.trim().is_empty()
265        {
266            return Ok(pk.clone());
267        }
268
269        // 2. Try private key from env var
270        if let Some(pk) = std::env::var(private_key_env)
271            .ok()
272            .filter(|s| !s.trim().is_empty())
273        {
274            return Ok(pk);
275        }
276
277        anyhow::bail!("{private_key_env} not found in config or environment")
278    }
279
280    /// Registers a full order context for WebSocket correlation and cancellation.
281    fn register_order_context(&self, client_id_u32: u32, context: OrderContext) {
282        self.order_contexts.insert(client_id_u32, context);
283    }
284
285    /// Gets the order context for a given dYdX client ID.
286    ///
287    /// Returns `None` if no context has been registered for this ID.
288    fn get_order_context(&self, client_id_u32: u32) -> Option<OrderContext> {
289        self.order_contexts
290            .get(&client_id_u32)
291            .map(|r| r.value().clone())
292    }
293
294    /// Get chain ID from config network field.
295    ///
296    /// This is the recommended way to get chain_id for all transaction submissions.
297    fn get_chain_id(&self) -> ChainId {
298        self.config.get_chain_id()
299    }
300
301    /// Spawns a stream handler to dispatch WebSocket messages to the execution engine.
302    fn spawn_ws_stream_handler(
303        &mut self,
304        stream: impl Stream<Item = NautilusWsMessage> + Send + 'static,
305    ) {
306        if self.ws_stream_handle.is_some() {
307            return;
308        }
309
310        log::debug!("Starting execution WebSocket message processing task");
311
312        // Clone data needed for account state parsing in spawned task
313        let account_id = self.core.account_id;
314        let instrument_cache = self.instrument_cache.clone();
315        let oracle_prices = self.oracle_prices.clone();
316        let encoder = self.encoder.clone();
317        let order_contexts = self.order_contexts.clone();
318        let order_id_map = self.order_id_map.clone();
319        let block_time_monitor = self.block_time_monitor.clone();
320        let emitter = self.emitter.clone();
321        let clock = self.clock;
322
323        let handle = get_runtime().spawn(async move {
324            log::debug!("Execution WebSocket message loop started");
325            pin_mut!(stream);
326            while let Some(msg) = stream.next().await {
327                match msg {
328                    NautilusWsMessage::Order(report) => {
329                        log::debug!("Received order update: {:?}", report.order_status);
330                        emitter.send_order_status_report(*report);
331                    }
332                    NautilusWsMessage::Fill(report) => {
333                        log::debug!("Received fill update");
334                        emitter.send_fill_report(*report);
335                    }
336                    NautilusWsMessage::Position(report) => {
337                        log::debug!("Received position update");
338                        emitter.send_position_report(*report);
339                    }
340                    NautilusWsMessage::AccountState(state) => {
341                        log::debug!("Received account state update");
342                        emitter.send_account_state(*state);
343                    }
344                    NautilusWsMessage::SubaccountSubscribed(msg) => {
345
346                        log::debug!("Parsing subaccount subscription with full context");
347
348                        // Build instruments map for parsing from shared cache
349                        let inst_map = instrument_cache.to_instrument_id_map();
350
351                        // Build oracle prices map (copy Decimals)
352                        let oracle_map: std::collections::HashMap<_, _> = oracle_prices
353                            .iter()
354                            .map(|entry| (*entry.key(), *entry.value()))
355                            .collect();
356
357                        let ts_init = clock.get_time_ns();
358                        let ts_event = ts_init;
359
360                        if let Some(ref subaccount) = msg.contents.subaccount {
361                        match parse_account_state(
362                            subaccount,
363                            account_id,
364                            &inst_map,
365                            &oracle_map,
366                            ts_event,
367                            ts_init,
368                        ) {
369                            Ok(account_state) => {
370                                log::debug!(
371                                    "Parsed account state: {} balance(s), {} margin(s)",
372                                    account_state.balances.len(),
373                                    account_state.margins.len()
374                                );
375                                emitter.send_account_state(account_state);
376                            }
377                            Err(e) => {
378                                log::error!("Failed to parse account state: {e}");
379                            }
380                        }
381
382                        // Parse positions from the subscription
383                        if let Some(ref positions) =
384                            subaccount.open_perpetual_positions
385                        {
386                            log::debug!(
387                                "Parsing {} position(s) from subscription",
388                                positions.len()
389                            );
390
391                            for (market, ws_position) in positions {
392                                match parse_ws_position_report(
393                                    ws_position,
394                                    &instrument_cache,
395                                    account_id,
396                                    ts_init,
397                                ) {
398                                    Ok(report) => {
399                                        log::debug!(
400                                            "Parsed position report: {} {} {} {}",
401                                            report.instrument_id,
402                                            report.position_side,
403                                            report.quantity,
404                                            market
405                                        );
406                                        emitter.send_position_report(report);
407                                    }
408                                    Err(e) => {
409                                        log::error!(
410                                            "Failed to parse WebSocket position for {market}: {e}"
411                                        );
412                                    }
413                                }
414                            }
415                        }
416                        } else {
417                            log::warn!("Subaccount subscription without initial state (new/empty subaccount)");
418
419                            // Emit zero-balance account state so account gets registered
420                            let currency = Currency::get_or_create_crypto_with_context("USDC", None);
421                            let zero = Money::zero(currency);
422                            let balance = AccountBalance::new_checked(zero, zero, zero)
423                                .expect("zero balance should always be valid");
424                            let account_state = AccountState::new(
425                                account_id,
426                                AccountType::Margin,
427                                vec![balance],
428                                vec![],
429                                true,
430                                UUID4::new(),
431                                ts_init,
432                                ts_init,
433                                None,
434                            );
435                            emitter.send_account_state(account_state);
436                        }
437                    }
438                    NautilusWsMessage::SubaccountsChannelData(data) => {
439                        log::debug!(
440                            "Processing subaccounts channel data (orders={:?}, fills={:?})",
441                            data.contents.orders.as_ref().map(|o| o.len()),
442                            data.contents.fills.as_ref().map(|f| f.len())
443                        );
444                        let ts_init = clock.get_time_ns();
445
446                        // Track terminal orders for deferred cleanup (after fills)
447                        let mut terminal_orders: Vec<(u32, u32, String)> = Vec::new();
448
449                        // Phase 1: Parse orders and build order_id_map (needed for fill correlation)
450                        // but DON'T send order reports yet — fills must be sent first
451                        // to prevent reconciliation from inferring fills at the limit price.
452                        let mut pending_order_reports = Vec::new();
453                        if let Some(ref orders) = data.contents.orders {
454                            log::info!(
455                                "Processing {} orders from SubaccountsChannelData",
456                                orders.len()
457                            );
458                            for ws_order in orders {
459                                log::info!(
460                                    "Parsing WS order: clob_pair_id={}, status={:?}, client_id={}",
461                                    ws_order.clob_pair_id,
462                                    ws_order.status,
463                                    ws_order.client_id
464                                );
465
466                                // Build order_id → (client_id, client_metadata) mapping for fill correlation
467                                // (fills don't carry client_id, only order_id)
468                                if let Ok(client_id_u32) = ws_order.client_id.parse::<u32>() {
469                                    let client_meta = ws_order.client_metadata
470                                        .as_ref()
471                                        .and_then(|s| s.parse::<u32>().ok())
472                                        .unwrap_or(crate::grpc::DEFAULT_RUST_CLIENT_METADATA);
473                                    order_id_map.insert(ws_order.id.clone(), (client_id_u32, client_meta));
474                                }
475
476                                match parse_ws_order_report(
477                                    ws_order,
478                                    &instrument_cache,
479                                    &order_contexts,
480                                    &encoder,
481                                    account_id,
482                                    ts_init,
483                                ) {
484                                    Ok(report) => {
485                                        // Collect terminal orders for deferred cleanup
486                                        if !report.order_status.is_open()
487                                            && let Ok(cid) = ws_order.client_id.parse::<u32>()
488                                        {
489                                            let meta = ws_order.client_metadata
490                                                .as_ref()
491                                                .and_then(|s| s.parse::<u32>().ok())
492                                                .unwrap_or(crate::grpc::DEFAULT_RUST_CLIENT_METADATA);
493                                            terminal_orders.push((cid, meta, ws_order.id.clone()));
494                                        }
495                                        log::info!(
496                                            "Parsed order report: {} {} {:?} qty={} client_order_id={:?}",
497                                            report.instrument_id,
498                                            report.order_side,
499                                            report.order_status,
500                                            report.quantity,
501                                            report.client_order_id
502                                        );
503                                        pending_order_reports.push(report);
504                                    }
505                                    Err(e) => {
506                                        log::error!("Failed to parse WebSocket order: {e}");
507                                    }
508                                }
509                            }
510                        }
511
512                        // Phase 2: Send fills FIRST so reconciliation sees them before
513                        // the terminal order status (prevents inferred fills at limit price)
514                        if let Some(ref fills) = data.contents.fills {
515                            for ws_fill in fills {
516                                match parse_ws_fill_report(
517                                    ws_fill,
518                                    &instrument_cache,
519                                    &order_id_map,
520                                    &order_contexts,
521                                    &encoder,
522                                    account_id,
523                                    ts_init,
524                                ) {
525                                    Ok(report) => {
526                                        log::info!(
527                                            "Parsed fill report: {} {} {} @ {} client_order_id={:?}",
528                                            report.instrument_id,
529                                            report.venue_order_id,
530                                            report.last_qty,
531                                            report.last_px,
532                                            report.client_order_id
533                                        );
534                                        emitter.send_fill_report(report);
535                                    }
536                                    Err(e) => {
537                                        log::error!("Failed to parse WebSocket fill: {e}");
538                                    }
539                                }
540                            }
541                        }
542
543                        // Phase 3: Now send order status reports
544                        for report in pending_order_reports {
545                            emitter.send_order_status_report(report);
546                        }
547
548                        // Deferred cleanup: remove mappings for terminal orders
549                        // after fills have been correlated
550                        for (client_id, client_metadata, order_id) in terminal_orders {
551                            order_contexts.remove(&client_id);
552                            encoder.remove(client_id, client_metadata);
553                            order_id_map.remove(&order_id);
554                        }
555                    }
556                    NautilusWsMessage::MarkPrice(mark_price) => {
557                        let price_dec = Decimal::from(mark_price.value);
558                        oracle_prices.insert(mark_price.instrument_id, price_dec);
559                        log::trace!(
560                            "Updated oracle price for {}: {price_dec}",
561                            mark_price.instrument_id
562                        );
563                    }
564                    NautilusWsMessage::IndexPrice(_) => {
565                        // Index prices not needed by execution client
566                    }
567                    NautilusWsMessage::BlockHeight { height, time } => {
568                        log::debug!("Block height update: {height} at {time}");
569                        block_time_monitor.record_block(height, time);
570                    }
571                    NautilusWsMessage::Error(err) => {
572                        log::error!("WebSocket error: {err:?}");
573                    }
574                    NautilusWsMessage::Reconnected => {
575                        log::info!("WebSocket reconnected");
576                    }
577                    NautilusWsMessage::FundingRate(_) => {
578                        // Funding rates are handled by the data client
579                    }
580                    _ => {
581                        // Data, Deltas are for market data, not execution
582                    }
583                }
584            }
585            log::debug!("WebSocket message processing task ended");
586        });
587
588        self.ws_stream_handle = Some(handle);
589        log::info!("WebSocket stream handler started");
590    }
591
592    /// Marks instruments as initialized after HTTP client has fetched them.
593    ///
594    /// The instruments are stored in the shared `InstrumentCache` which is automatically
595    /// populated by the HTTP client during `fetch_and_cache_instruments()`.
596    fn mark_instruments_initialized(&mut self) {
597        let count = self.instrument_cache.len();
598        self.core.set_instruments_initialized();
599        log::debug!("Instruments initialized: {count} instruments in shared cache");
600    }
601
602    /// Get an instrument by market ticker (e.g., "BTC-USD").
603    fn get_instrument_by_market(&self, market: &str) -> Option<InstrumentAny> {
604        self.instrument_cache.get_by_market(market)
605    }
606
607    /// Get an instrument by clob_pair_id.
608    fn get_instrument_by_clob_pair_id(&self, clob_pair_id: u32) -> Option<InstrumentAny> {
609        let instrument = self.instrument_cache.get_by_clob_id(clob_pair_id);
610
611        if instrument.is_none() {
612            self.instrument_cache.log_missing_clob_pair_id(clob_pair_id);
613        }
614
615        instrument
616    }
617
618    /// Gets the execution components, returning an error if not initialized.
619    ///
620    /// This should only be called after `connect()` has completed.
621    fn get_execution_components(
622        &self,
623    ) -> anyhow::Result<(
624        Arc<TransactionManager>,
625        Arc<TxBroadcaster>,
626        Arc<OrderMessageBuilder>,
627    )> {
628        let tx_manager = self
629            .tx_manager
630            .as_ref()
631            .ok_or_else(|| {
632                anyhow::anyhow!("TransactionManager not initialized - call connect() first")
633            })?
634            .clone();
635        let broadcaster = self
636            .broadcaster
637            .as_ref()
638            .ok_or_else(|| anyhow::anyhow!("TxBroadcaster not initialized - call connect() first"))?
639            .clone();
640        let order_builder = self
641            .order_builder
642            .as_ref()
643            .ok_or_else(|| {
644                anyhow::anyhow!("OrderMessageBuilder not initialized - call connect() first")
645            })?
646            .clone();
647        Ok((tx_manager, broadcaster, order_builder))
648    }
649
650    fn spawn_task<F>(&self, label: &'static str, fut: F)
651    where
652        F: Future<Output = anyhow::Result<()>> + Send + 'static,
653    {
654        let handle = get_runtime().spawn(async move {
655            if let Err(e) = fut.await {
656                log::error!("{label}: {e:?}");
657            }
658        });
659
660        self.pending_tasks
661            .lock()
662            .expect(MUTEX_POISONED)
663            .push(handle);
664    }
665
666    /// Spawns an order submission task with error handling and rejection generation.
667    ///
668    /// If the submission fails, generates an `OrderRejected` event with the error details.
669    fn spawn_order_task<F>(
670        &self,
671        label: &'static str,
672        strategy_id: StrategyId,
673        instrument_id: InstrumentId,
674        client_order_id: ClientOrderId,
675        fut: F,
676    ) where
677        F: Future<Output = anyhow::Result<()>> + Send + 'static,
678    {
679        let emitter = self.emitter.clone();
680        let clock = self.clock;
681
682        let handle = get_runtime().spawn(async move {
683            if let Err(e) = fut.await {
684                let error_msg = format!("{label} failed: {e:?}");
685                log::error!("{error_msg}");
686
687                let ts_event = clock.get_time_ns();
688                emitter.emit_order_rejected_event(
689                    strategy_id,
690                    instrument_id,
691                    client_order_id,
692                    &error_msg,
693                    ts_event,
694                    false,
695                );
696            }
697        });
698
699        self.pending_tasks
700            .lock()
701            .expect(MUTEX_POISONED)
702            .push(handle);
703    }
704
705    fn abort_pending_tasks(&self) {
706        let mut guard = self.pending_tasks.lock().expect(MUTEX_POISONED);
707        for handle in guard.drain(..) {
708            handle.abort();
709        }
710    }
711
712    /// Sends an OrderModifyRejected event.
713    fn send_modify_rejected(
714        &self,
715        strategy_id: StrategyId,
716        instrument_id: InstrumentId,
717        client_order_id: ClientOrderId,
718        venue_order_id: Option<VenueOrderId>,
719        reason: &str,
720    ) {
721        let ts_event = self.clock.get_time_ns();
722        self.emitter.emit_order_modify_rejected_event(
723            strategy_id,
724            instrument_id,
725            client_order_id,
726            venue_order_id,
727            reason,
728            ts_event,
729        );
730    }
731
732    /// Waits for the account to be registered in the cache.
733    ///
734    /// This method polls the cache until the account is registered, ensuring that
735    /// execution state reconciliation can process fills correctly (fills require
736    /// the account to be registered for portfolio updates).
737    ///
738    /// # Errors
739    ///
740    /// Returns an error if the account is not registered within the timeout period.
741    async fn await_account_registered(&self, timeout_secs: f64) -> anyhow::Result<()> {
742        let account_id = self.core.account_id;
743
744        if self.core.cache().account(&account_id).is_some() {
745            log::info!("Account {account_id} registered");
746            return Ok(());
747        }
748
749        let start = Instant::now();
750        let timeout = Duration::from_secs_f64(timeout_secs);
751        let interval = Duration::from_millis(10);
752
753        loop {
754            tokio::time::sleep(interval).await;
755
756            if self.core.cache().account(&account_id).is_some() {
757                log::info!("Account {account_id} registered");
758                return Ok(());
759            }
760
761            if start.elapsed() >= timeout {
762                anyhow::bail!(
763                    "Timeout waiting for account {account_id} to be registered after {timeout_secs}s"
764                );
765            }
766        }
767    }
768}
769
770#[async_trait(?Send)]
771impl ExecutionClient for DydxExecutionClient {
772    fn is_connected(&self) -> bool {
773        self.core.is_connected()
774    }
775
776    fn client_id(&self) -> ClientId {
777        self.core.client_id
778    }
779
780    fn account_id(&self) -> AccountId {
781        self.core.account_id
782    }
783
784    fn venue(&self) -> Venue {
785        *DYDX_VENUE
786    }
787
788    fn oms_type(&self) -> OmsType {
789        self.core.oms_type
790    }
791
792    fn get_account(&self) -> Option<AccountAny> {
793        self.core.cache().account(&self.core.account_id).cloned()
794    }
795
796    fn generate_account_state(
797        &self,
798        balances: Vec<AccountBalance>,
799        margins: Vec<MarginBalance>,
800        reported: bool,
801        ts_event: UnixNanos,
802    ) -> anyhow::Result<()> {
803        self.emitter
804            .emit_account_state(balances, margins, reported, ts_event);
805        Ok(())
806    }
807
808    fn start(&mut self) -> anyhow::Result<()> {
809        if self.core.is_started() {
810            log::warn!("dYdX execution client already started");
811            return Ok(());
812        }
813
814        let sender = get_exec_event_sender();
815        self.emitter.set_sender(sender);
816        log::info!("Starting dYdX execution client");
817        self.core.set_started();
818        Ok(())
819    }
820
821    fn stop(&mut self) -> anyhow::Result<()> {
822        if self.core.is_stopped() {
823            log::warn!("dYdX execution client not started");
824            return Ok(());
825        }
826
827        log::info!("Stopping dYdX execution client");
828        self.abort_pending_tasks();
829        self.core.set_stopped();
830        self.core.set_disconnected();
831        Ok(())
832    }
833
834    /// Submits an order to dYdX via gRPC.
835    ///
836    /// dYdX requires u32 client IDs - Nautilus ClientOrderId strings are hashed to fit.
837    ///
838    /// Supported order types:
839    /// - Market orders (short-term, IOC).
840    /// - Limit orders (short-term or long-term based on TIF).
841    /// - Stop Market orders (conditional, triggered at stop price).
842    /// - Stop Limit orders (conditional, triggered at stop price, executed at limit).
843    /// - Take Profit Market (MarketIfTouched - triggered at take profit price).
844    /// - Take Profit Limit (LimitIfTouched - triggered at take profit price, executed at limit).
845    ///
846    /// Trailing stop orders are NOT supported by dYdX v4 protocol.
847    ///
848    /// Validates synchronously, generates OrderSubmitted event, then spawns async task for
849    /// gRPC submission to avoid blocking. Unsupported order types generate OrderRejected.
850    fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
851        // Check connection status first (doesn't need order)
852        if !self.is_connected() {
853            let reason = "Cannot submit order: execution client not connected";
854            log::error!("{reason}");
855            anyhow::bail!(reason);
856        }
857
858        // Check block height is available for short-term orders
859        let current_block = self.block_time_monitor.current_block_height();
860        let order = self
861            .core
862            .cache()
863            .order(&cmd.client_order_id)
864            .cloned()
865            .ok_or_else(|| {
866                anyhow::anyhow!("Order not found in cache for {}", cmd.client_order_id)
867            })?;
868
869        let client_order_id = order.client_order_id();
870        let instrument_id = order.instrument_id();
871        let strategy_id = order.strategy_id();
872
873        if current_block == 0 {
874            let reason = "Block height not initialized";
875            log::warn!("Cannot submit order {client_order_id}: {reason}");
876            let ts_event = self.clock.get_time_ns();
877            self.emitter.emit_order_rejected_event(
878                strategy_id,
879                instrument_id,
880                client_order_id,
881                reason,
882                ts_event,
883                false,
884            );
885            return Ok(());
886        }
887
888        // Check if order is already closed
889        if order.is_closed() {
890            log::warn!("Cannot submit closed order {client_order_id}");
891            return Ok(());
892        }
893
894        // Reject unsupported order types
895        match order.order_type() {
896            OrderType::Market
897            | OrderType::Limit
898            | OrderType::StopMarket
899            | OrderType::StopLimit
900            | OrderType::MarketIfTouched
901            | OrderType::LimitIfTouched => {}
902            // Trailing stops not supported by dYdX v4 protocol
903            OrderType::TrailingStopMarket | OrderType::TrailingStopLimit => {
904                let reason = "Trailing stop orders not supported by dYdX v4 protocol";
905                log::error!("{reason}");
906                let ts_event = self.clock.get_time_ns();
907                self.emitter.emit_order_rejected_event(
908                    strategy_id,
909                    instrument_id,
910                    client_order_id,
911                    reason,
912                    ts_event,
913                    false,
914                );
915                return Ok(());
916            }
917            order_type => {
918                let reason = format!("Order type {order_type:?} not supported by dYdX");
919                log::error!("{reason}");
920                let ts_event = self.clock.get_time_ns();
921                self.emitter.emit_order_rejected_event(
922                    strategy_id,
923                    instrument_id,
924                    client_order_id,
925                    &reason,
926                    ts_event,
927                    false,
928                );
929                return Ok(());
930            }
931        }
932
933        self.emitter.emit_order_submitted(&order);
934
935        // Get execution components (must be initialized after connect())
936        let (tx_manager, broadcaster, order_builder) = match self.get_execution_components() {
937            Ok(components) => components,
938            Err(e) => {
939                log::error!("Failed to get execution components: {e}");
940                let ts_event = self.clock.get_time_ns();
941                self.emitter.emit_order_rejected_event(
942                    strategy_id,
943                    instrument_id,
944                    client_order_id,
945                    &e.to_string(),
946                    ts_event,
947                    false,
948                );
949                return Ok(());
950            }
951        };
952
953        let block_height = self.block_time_monitor.current_block_height() as u32;
954
955        // Generate client_order_id as (u32, u32) pair before async block (dYdX requires u32 client IDs)
956        let encoded = match self.encoder.encode(client_order_id) {
957            Ok(enc) => enc,
958            Err(e) => {
959                log::error!("Failed to generate client order ID: {e}");
960                let ts_event = self.clock.get_time_ns();
961                self.emitter.emit_order_rejected_event(
962                    strategy_id,
963                    instrument_id,
964                    client_order_id,
965                    &e.to_string(),
966                    ts_event,
967                    false,
968                );
969                return Ok(());
970            }
971        };
972        let client_id_u32 = encoded.client_id;
973        let client_metadata = encoded.client_metadata;
974
975        log::info!(
976            "[SUBMIT_ORDER] Nautilus '{}' -> dYdX u32={} meta={:#x} | instrument={} side={:?} qty={} type={:?}",
977            client_order_id,
978            client_id_u32,
979            client_metadata,
980            instrument_id,
981            order.order_side(),
982            order.quantity(),
983            order.order_type()
984        );
985
986        // Convert expire_time from nanoseconds to seconds if present
987        let expire_time = order.expire_time().map(nanos_to_secs_i64);
988
989        // Determine order_flags based on order type for later cancellation
990        let order_flags = match order.order_type() {
991            // Conditional orders always use ORDER_FLAG_CONDITIONAL
992            OrderType::StopMarket
993            | OrderType::StopLimit
994            | OrderType::MarketIfTouched
995            | OrderType::LimitIfTouched => types::ORDER_FLAG_CONDITIONAL,
996            // Market orders are always short-term
997            OrderType::Market => types::ORDER_FLAG_SHORT_TERM,
998            // Limit orders depend on time_in_force and expire_time
999            OrderType::Limit => {
1000                let lifetime = types::OrderLifetime::from_time_in_force(
1001                    order.time_in_force(),
1002                    expire_time,
1003                    false,
1004                    order_builder.max_short_term_secs(),
1005                );
1006                lifetime.order_flags()
1007            }
1008            // Default to long-term for unknown types
1009            _ => types::ORDER_FLAG_LONG_TERM,
1010        };
1011
1012        // Register order context for WebSocket correlation and cancellation
1013        let ts_submitted = self.clock.get_time_ns();
1014        let trader_id = order.trader_id();
1015        self.register_order_context(
1016            client_id_u32,
1017            OrderContext {
1018                client_order_id,
1019                trader_id,
1020                strategy_id,
1021                instrument_id,
1022                submitted_at: ts_submitted,
1023                order_flags,
1024            },
1025        );
1026
1027        self.spawn_order_task(
1028            "submit_order",
1029            strategy_id,
1030            instrument_id,
1031            client_order_id,
1032            async move {
1033                // Build the order message based on order type
1034                let (msg, order_type_str) = match order.order_type() {
1035                    OrderType::Market => {
1036                        let msg = order_builder.build_market_order(
1037                            instrument_id,
1038                            client_id_u32,
1039                            client_metadata,
1040                            order.order_side(),
1041                            order.quantity(),
1042                            block_height,
1043                        )?;
1044                        (msg, "market")
1045                    }
1046                    OrderType::Limit => {
1047                        // Use pre-computed expire_time (with default_short_term_expiry applied)
1048                        let msg = order_builder.build_limit_order(
1049                            instrument_id,
1050                            client_id_u32,
1051                            client_metadata,
1052                            order.order_side(),
1053                            order
1054                                .price()
1055                                .ok_or_else(|| anyhow::anyhow!("Limit order missing price"))?,
1056                            order.quantity(),
1057                            order.time_in_force(),
1058                            order.is_post_only(),
1059                            order.is_reduce_only(),
1060                            block_height,
1061                            expire_time, // Uses default_short_term_expiry if configured
1062                        )?;
1063                        (msg, "limit")
1064                    }
1065                    // Conditional orders use their own expiration logic (not affected by default_short_term_expiry)
1066                    // They are always stored on-chain with long-term semantics
1067                    OrderType::StopMarket => {
1068                        let trigger_price = order.trigger_price().ok_or_else(|| {
1069                            anyhow::anyhow!("Stop market order missing trigger_price")
1070                        })?;
1071                        let cond_expire = order.expire_time().map(nanos_to_secs_i64);
1072                        let msg = order_builder.build_stop_market_order(
1073                            instrument_id,
1074                            client_id_u32,
1075                            client_metadata,
1076                            order.order_side(),
1077                            trigger_price,
1078                            order.quantity(),
1079                            order.is_reduce_only(),
1080                            cond_expire,
1081                        )?;
1082                        (msg, "stop_market")
1083                    }
1084                    OrderType::StopLimit => {
1085                        let trigger_price = order.trigger_price().ok_or_else(|| {
1086                            anyhow::anyhow!("Stop limit order missing trigger_price")
1087                        })?;
1088                        let limit_price = order.price().ok_or_else(|| {
1089                            anyhow::anyhow!("Stop limit order missing limit price")
1090                        })?;
1091                        let cond_expire = order.expire_time().map(nanos_to_secs_i64);
1092                        let msg = order_builder.build_stop_limit_order(
1093                            instrument_id,
1094                            client_id_u32,
1095                            client_metadata,
1096                            order.order_side(),
1097                            trigger_price,
1098                            limit_price,
1099                            order.quantity(),
1100                            order.time_in_force(),
1101                            order.is_post_only(),
1102                            order.is_reduce_only(),
1103                            cond_expire,
1104                        )?;
1105                        (msg, "stop_limit")
1106                    }
1107                    // dYdX TakeProfitMarket maps to Nautilus MarketIfTouched
1108                    OrderType::MarketIfTouched => {
1109                        let trigger_price = order.trigger_price().ok_or_else(|| {
1110                            anyhow::anyhow!("Take profit market order missing trigger_price")
1111                        })?;
1112                        let cond_expire = order.expire_time().map(nanos_to_secs_i64);
1113                        let msg = order_builder.build_take_profit_market_order(
1114                            instrument_id,
1115                            client_id_u32,
1116                            client_metadata,
1117                            order.order_side(),
1118                            trigger_price,
1119                            order.quantity(),
1120                            order.is_reduce_only(),
1121                            cond_expire,
1122                        )?;
1123                        (msg, "take_profit_market")
1124                    }
1125                    // dYdX TakeProfitLimit maps to Nautilus LimitIfTouched
1126                    OrderType::LimitIfTouched => {
1127                        let trigger_price = order.trigger_price().ok_or_else(|| {
1128                            anyhow::anyhow!("Take profit limit order missing trigger_price")
1129                        })?;
1130                        let limit_price = order.price().ok_or_else(|| {
1131                            anyhow::anyhow!("Take profit limit order missing limit price")
1132                        })?;
1133                        let cond_expire = order.expire_time().map(nanos_to_secs_i64);
1134                        let msg = order_builder.build_take_profit_limit_order(
1135                            instrument_id,
1136                            client_id_u32,
1137                            client_metadata,
1138                            order.order_side(),
1139                            trigger_price,
1140                            limit_price,
1141                            order.quantity(),
1142                            order.time_in_force(),
1143                            order.is_post_only(),
1144                            order.is_reduce_only(),
1145                            cond_expire,
1146                        )?;
1147                        (msg, "take_profit_limit")
1148                    }
1149                    _ => unreachable!("Order type already validated"),
1150                };
1151
1152                // Broadcast: short-term orders use cached sequence (no increment),
1153                // stateful orders use broadcast_with_retry (proper sequence management)
1154                let operation = format!("Submit {order_type_str} order {client_order_id}");
1155                if order_flags == types::ORDER_FLAG_SHORT_TERM {
1156                    broadcaster
1157                        .broadcast_short_term(&tx_manager, vec![msg], &operation)
1158                        .await?;
1159                } else {
1160                    broadcaster
1161                        .broadcast_with_retry(&tx_manager, vec![msg], &operation)
1162                        .await?;
1163                }
1164                log::debug!("Successfully submitted {order_type_str} order: {client_order_id}");
1165
1166                Ok(())
1167            },
1168        );
1169
1170        Ok(())
1171    }
1172
1173    fn submit_order_list(&self, cmd: &SubmitOrderList) -> anyhow::Result<()> {
1174        let orders = self.core.get_orders_for_list(&cmd.order_list)?;
1175        let order_count = orders.len();
1176
1177        // Check connection status
1178        if !self.is_connected() {
1179            let reason = "Cannot submit order list: execution client not connected";
1180            log::error!("{reason}");
1181            anyhow::bail!(reason);
1182        }
1183
1184        // Check block height is available
1185        let current_block = self.block_time_monitor.current_block_height();
1186        if current_block == 0 {
1187            let reason = "Block height not initialized";
1188            log::warn!("Cannot submit order list: {reason}");
1189            // Reject all orders in the list
1190            let ts_event = self.clock.get_time_ns();
1191            for order in &orders {
1192                self.emitter.emit_order_rejected_event(
1193                    order.strategy_id(),
1194                    order.instrument_id(),
1195                    order.client_order_id(),
1196                    reason,
1197                    ts_event,
1198                    false,
1199                );
1200            }
1201            return Ok(());
1202        }
1203
1204        // Get execution components early so we can register order contexts
1205        let (tx_manager, broadcaster, order_builder) = match self.get_execution_components() {
1206            Ok(components) => components,
1207            Err(e) => {
1208                log::error!("Failed to get execution components for batch: {e}");
1209                // Reject all orders in the list
1210                let ts_event = self.clock.get_time_ns();
1211                for order in &orders {
1212                    self.emitter.emit_order_rejected_event(
1213                        order.strategy_id(),
1214                        order.instrument_id(),
1215                        order.client_order_id(),
1216                        &e.to_string(),
1217                        ts_event,
1218                        false,
1219                    );
1220                }
1221                return Ok(());
1222            }
1223        };
1224
1225        // Collect limit order parameters for batch submission
1226        let mut order_params: Vec<LimitOrderParams> = Vec::with_capacity(order_count);
1227        let mut order_info: Vec<(ClientOrderId, InstrumentId, StrategyId)> =
1228            Vec::with_capacity(order_count);
1229
1230        for order in &orders {
1231            // Only limit orders can be batched
1232            if order.order_type() != OrderType::Limit {
1233                log::warn!(
1234                    "Order {} has type {:?}, falling back to individual submission",
1235                    order.client_order_id(),
1236                    order.order_type()
1237                );
1238                // Fall back to individual submission for non-limit orders
1239                let submit_cmd = SubmitOrder::new(
1240                    cmd.trader_id,
1241                    cmd.client_id,
1242                    cmd.strategy_id,
1243                    order.instrument_id(),
1244                    order.client_order_id(),
1245                    order.init_event().clone(),
1246                    cmd.exec_algorithm_id,
1247                    cmd.position_id,
1248                    cmd.params.clone(),
1249                    UUID4::new(),
1250                    cmd.ts_init,
1251                );
1252                if let Err(e) = self.submit_order(&submit_cmd) {
1253                    log::error!(
1254                        "Failed to submit order {} from order list: {e}",
1255                        order.client_order_id()
1256                    );
1257                }
1258                continue;
1259            }
1260
1261            // Get price (required for limit orders)
1262            let Some(price) = order.price() else {
1263                let ts_event = self.clock.get_time_ns();
1264                self.emitter.emit_order_rejected_event(
1265                    order.strategy_id(),
1266                    order.instrument_id(),
1267                    order.client_order_id(),
1268                    "Limit order missing price",
1269                    ts_event,
1270                    false,
1271                );
1272                continue;
1273            };
1274
1275            // Generate client order ID as (u32, u32) pair
1276            let encoded = match self.encoder.encode(order.client_order_id()) {
1277                Ok(enc) => enc,
1278                Err(e) => {
1279                    log::error!("Failed to generate client order ID: {e}");
1280                    let ts_event = self.clock.get_time_ns();
1281                    self.emitter.emit_order_rejected_event(
1282                        order.strategy_id(),
1283                        order.instrument_id(),
1284                        order.client_order_id(),
1285                        &e.to_string(),
1286                        ts_event,
1287                        false,
1288                    );
1289                    continue;
1290                }
1291            };
1292            let client_id_u32 = encoded.client_id;
1293            let client_metadata = encoded.client_metadata;
1294
1295            // Send OrderSubmitted event
1296            self.emitter.emit_order_submitted(order);
1297
1298            // Determine order_flags for limit orders
1299            let expire_time_secs = order.expire_time().map(nanos_to_secs_i64);
1300            let lifetime = types::OrderLifetime::from_time_in_force(
1301                order.time_in_force(),
1302                expire_time_secs,
1303                false,
1304                order_builder.max_short_term_secs(),
1305            );
1306
1307            // Register order context for WebSocket correlation and cancellation
1308            let ts_submitted = self.clock.get_time_ns();
1309            self.register_order_context(
1310                client_id_u32,
1311                OrderContext {
1312                    client_order_id: order.client_order_id(),
1313                    trader_id: order.trader_id(),
1314                    strategy_id: order.strategy_id(),
1315                    instrument_id: order.instrument_id(),
1316                    submitted_at: ts_submitted,
1317                    order_flags: lifetime.order_flags(),
1318                },
1319            );
1320
1321            // Collect order parameters (builder will apply default_short_term_expiry if needed)
1322            order_params.push(LimitOrderParams {
1323                instrument_id: order.instrument_id(),
1324                client_order_id: client_id_u32,
1325                client_metadata,
1326                side: order.order_side(),
1327                price,
1328                quantity: order.quantity(),
1329                time_in_force: order.time_in_force(),
1330                post_only: order.is_post_only(),
1331                reduce_only: order.is_reduce_only(),
1332                expire_time_ns: order.expire_time(),
1333            });
1334            order_info.push((
1335                order.client_order_id(),
1336                order.instrument_id(),
1337                order.strategy_id(),
1338            ));
1339        }
1340
1341        // If no limit orders to batch, we're done
1342        if order_params.is_empty() {
1343            return Ok(());
1344        }
1345
1346        // Check if any orders are short-term
1347        // dYdX protocol restriction: short-term orders CANNOT be batched
1348        // Each short-term order must be in its own transaction
1349        let has_short_term = order_params
1350            .iter()
1351            .any(|params| order_builder.is_short_term_order(params));
1352
1353        let block_height = current_block as u32;
1354        let emitter = self.emitter.clone();
1355        let clock = self.clock;
1356
1357        if has_short_term {
1358            // Submit each order individually (short-term orders cannot be batched).
1359            log::debug!(
1360                "Submitting {} short-term limit orders concurrently (sequence not consumed)",
1361                order_params.len()
1362            );
1363
1364            let order_count = order_params.len();
1365            let handle = get_runtime().spawn(async move {
1366                // Build and broadcast all orders concurrently — no sequence coordination needed.
1367                // Short-term orders use cached sequence (not incremented) via broadcast_short_term.
1368                let mut handles = Vec::with_capacity(order_count);
1369
1370                for (params, (client_order_id, instrument_id, strategy_id)) in
1371                    order_params.into_iter().zip(order_info.into_iter())
1372                {
1373                    let tx_manager = tx_manager.clone();
1374                    let broadcaster = broadcaster.clone();
1375                    let order_builder = order_builder.clone();
1376                    let emitter = emitter.clone();
1377
1378                    let handle = get_runtime().spawn(async move {
1379                        // Build order message
1380                        let msg = match order_builder
1381                            .build_limit_order_from_params(&params, block_height)
1382                        {
1383                            Ok(m) => m,
1384                            Err(e) => {
1385                                let error_msg = format!("Failed to build order message: {e:?}");
1386                                log::error!("{error_msg}");
1387                                let ts_event = clock.get_time_ns();
1388                                emitter.emit_order_rejected_event(
1389                                    strategy_id,
1390                                    instrument_id,
1391                                    client_order_id,
1392                                    &error_msg,
1393                                    ts_event,
1394                                    false,
1395                                );
1396                                return;
1397                            }
1398                        };
1399
1400                        // Broadcast with cached sequence (short-term orders don't consume sequences)
1401                        let operation = format!("Submit short-term order {client_order_id}");
1402                        if let Err(e) = broadcaster
1403                            .broadcast_short_term(&tx_manager, vec![msg], &operation)
1404                            .await
1405                        {
1406                            let error_msg = format!("Order submission failed: {e:?}");
1407                            log::error!("{error_msg}");
1408                            let ts_event = clock.get_time_ns();
1409                            emitter.emit_order_rejected_event(
1410                                strategy_id,
1411                                instrument_id,
1412                                client_order_id,
1413                                &error_msg,
1414                                ts_event,
1415                                false,
1416                            );
1417                        }
1418                    });
1419
1420                    handles.push(handle);
1421                }
1422
1423                // Wait for all orders to be submitted
1424                for handle in handles {
1425                    let _ = handle.await;
1426                }
1427            });
1428
1429            // Track the task
1430            self.pending_tasks
1431                .lock()
1432                .expect(MUTEX_POISONED)
1433                .push(handle);
1434        } else {
1435            // All orders are long-term - can batch in single transaction
1436            log::info!(
1437                "Batch submitting {} long-term limit orders in single transaction",
1438                order_params.len()
1439            );
1440
1441            let handle = get_runtime().spawn(async move {
1442                // Build all order messages
1443                let msgs: Result<Vec<_>, _> = order_params
1444                    .iter()
1445                    .map(|params| order_builder.build_limit_order_from_params(params, block_height))
1446                    .collect();
1447
1448                let msgs = match msgs {
1449                    Ok(m) => m,
1450                    Err(e) => {
1451                        let error_msg = format!("Failed to build batch order messages: {e:?}");
1452                        log::error!("{error_msg}");
1453                        // Send OrderRejected for all orders
1454                        let ts_event = clock.get_time_ns();
1455                        for (client_order_id, instrument_id, strategy_id) in order_info {
1456                            emitter.emit_order_rejected_event(
1457                                strategy_id,
1458                                instrument_id,
1459                                client_order_id,
1460                                &error_msg,
1461                                ts_event,
1462                                false,
1463                            );
1464                        }
1465                        return;
1466                    }
1467                };
1468
1469                // Broadcast batch with retry
1470                let operation = format!("Submit batch of {} limit orders", msgs.len());
1471                if let Err(e) = broadcaster
1472                    .broadcast_with_retry(&tx_manager, msgs, &operation)
1473                    .await
1474                {
1475                    let error_msg = format!("Batch order submission failed: {e:?}");
1476                    log::error!("{error_msg}");
1477
1478                    // Send OrderRejected for all orders in the batch
1479                    let ts_event = clock.get_time_ns();
1480                    for (client_order_id, instrument_id, strategy_id) in order_info {
1481                        emitter.emit_order_rejected_event(
1482                            strategy_id,
1483                            instrument_id,
1484                            client_order_id,
1485                            &error_msg,
1486                            ts_event,
1487                            false,
1488                        );
1489                    }
1490                }
1491            });
1492
1493            // Track the task
1494            self.pending_tasks
1495                .lock()
1496                .expect(MUTEX_POISONED)
1497                .push(handle);
1498        }
1499
1500        Ok(())
1501    }
1502
1503    /// dYdX does not support native order modification.
1504    ///
1505    /// Strategies should handle `OrderModifyRejected` by canceling and resubmitting.
1506    fn modify_order(&self, cmd: &ModifyOrder) -> anyhow::Result<()> {
1507        let reason = "dYdX does not support order modification. Use cancel and resubmit instead.";
1508        log::error!("{reason}");
1509
1510        self.send_modify_rejected(
1511            cmd.strategy_id,
1512            cmd.instrument_id,
1513            cmd.client_order_id,
1514            cmd.venue_order_id,
1515            reason,
1516        );
1517        Ok(())
1518    }
1519
1520    /// Cancels an order on dYdX exchange.
1521    ///
1522    /// Validates the order state and retrieves instrument details before
1523    /// spawning an async task to cancel via gRPC.
1524    ///
1525    /// # Validation
1526    ///
1527    /// - Checks order exists in cache.
1528    /// - Validates order is not already closed.
1529    /// - Retrieves instrument from cache for order builder.
1530    ///
1531    /// The `cmd` contains client/venue order IDs. Returns `Ok(())` if cancel request is
1532    /// spawned successfully or validation fails gracefully. Returns `Err` if not connected.
1533    ///
1534    /// # Events
1535    ///
1536    /// - `OrderCanceled` - Generated when WebSocket confirms cancellation.
1537    /// - `OrderCancelRejected` - Generated if exchange rejects cancellation.
1538    fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
1539        if !self.is_connected() {
1540            anyhow::bail!("Cannot cancel order: not connected");
1541        }
1542
1543        let client_order_id = cmd.client_order_id;
1544        let instrument_id = cmd.instrument_id;
1545        let strategy_id = cmd.strategy_id;
1546        let venue_order_id = cmd.venue_order_id;
1547
1548        let (order_time_in_force, order_expire_time) = {
1549            let cache = self.core.cache();
1550
1551            let order = match cache.order(&client_order_id) {
1552                Some(order) => order,
1553                None => {
1554                    log::error!("Cannot cancel order {client_order_id}: not found in cache");
1555                    return Ok(()); // Not an error - order may have been filled/canceled already
1556                }
1557            };
1558
1559            // Validate order is not already closed
1560            if order.is_closed() {
1561                log::warn!(
1562                    "CancelOrder command for {} when order already {} (will not send to exchange)",
1563                    client_order_id,
1564                    order.status()
1565                );
1566                return Ok(());
1567            }
1568
1569            // Verify instrument exists (no need to hold reference)
1570            if cache.instrument(&instrument_id).is_none() {
1571                log::error!(
1572                    "Cannot cancel order {client_order_id}: instrument {instrument_id} not found in cache"
1573                );
1574                return Ok(()); // Not an error - missing instrument is a cache issue
1575            }
1576
1577            // Extract data needed for order_flags fallback
1578            (
1579                order.time_in_force(),
1580                order.expire_time().map(nanos_to_secs_i64),
1581            )
1582        }; // Cache borrow released here
1583
1584        log::debug!("Cancelling order {client_order_id} for instrument {instrument_id}");
1585
1586        // Get execution components (no cache borrow held)
1587        let (tx_manager, broadcaster, order_builder) = match self.get_execution_components() {
1588            Ok(components) => components,
1589            Err(e) => {
1590                log::error!("Failed to get execution components for cancel: {e}");
1591                return Ok(());
1592            }
1593        };
1594
1595        let block_height = self.block_time_monitor.current_block_height() as u32;
1596
1597        // Convert client_order_id to (u32, u32) pair before async block
1598        let encoded = match self.encoder.get(&client_order_id) {
1599            Some(enc) => enc,
1600            None => {
1601                log::error!("Client order ID {client_order_id} not found in cache");
1602                anyhow::bail!("Client order ID not found in cache")
1603            }
1604        };
1605        let client_id_u32 = encoded.client_id;
1606
1607        log::info!(
1608            "[CANCEL_ORDER] Nautilus '{client_order_id}' -> dYdX u32={client_id_u32} | instrument={instrument_id}"
1609        );
1610
1611        // Get stored order_flags from order context (set at submission time)
1612        // This ensures we use the correct flags even if the order has expired
1613        let order_flags = self.get_order_context(client_id_u32).map_or_else(
1614            || {
1615                // Fallback: derive from order parameters if context not found
1616                log::warn!(
1617                    "Order context not found for {client_order_id}, deriving flags from order"
1618                );
1619                types::OrderLifetime::from_time_in_force(
1620                    order_time_in_force, // Using extracted value
1621                    order_expire_time,   // Using extracted value
1622                    false,
1623                    order_builder.max_short_term_secs(),
1624                )
1625                .order_flags()
1626            },
1627            |ctx| ctx.order_flags,
1628        );
1629
1630        let clock = self.clock;
1631        let emitter = self.emitter.clone();
1632
1633        self.spawn_task("cancel_order", async move {
1634            // Build cancel message using stored order_flags
1635            let cancel_msg = match order_builder.build_cancel_order_with_flags(
1636                instrument_id,
1637                client_id_u32,
1638                order_flags,
1639                block_height,
1640            ) {
1641                Ok(msg) => msg,
1642                Err(e) => {
1643                    log::error!("Failed to build cancel message for {client_order_id}: {e:?}");
1644                    let ts_event = clock.get_time_ns();
1645                    emitter.emit_order_cancel_rejected_event(
1646                        strategy_id,
1647                        instrument_id,
1648                        client_order_id,
1649                        venue_order_id,
1650                        &format!("Cancel build failed: {e:?}"),
1651                        ts_event,
1652                    );
1653                    return Ok(());
1654                }
1655            };
1656
1657            // Broadcast cancel: short-term uses cached sequence, stateful uses retry
1658            let cancel_op = format!("Cancel order {client_order_id}");
1659            let result = if order_flags == types::ORDER_FLAG_SHORT_TERM {
1660                broadcaster
1661                    .broadcast_short_term(&tx_manager, vec![cancel_msg], &cancel_op)
1662                    .await
1663            } else {
1664                broadcaster
1665                    .broadcast_with_retry(&tx_manager, vec![cancel_msg], &cancel_op)
1666                    .await
1667            };
1668            match result {
1669                Ok(_) => {
1670                    log::debug!("Successfully cancelled order: {client_order_id}");
1671                }
1672                Err(e) => {
1673                    log::error!("Failed to cancel order {client_order_id}: {e:?}");
1674
1675                    let ts_event = clock.get_time_ns();
1676                    emitter.emit_order_cancel_rejected_event(
1677                        strategy_id,
1678                        instrument_id,
1679                        client_order_id,
1680                        venue_order_id,
1681                        &format!("Cancel order failed: {e:?}"),
1682                        ts_event,
1683                    );
1684                }
1685            }
1686
1687            Ok(())
1688        });
1689
1690        Ok(())
1691    }
1692
1693    fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
1694        if !self.is_connected() {
1695            anyhow::bail!("Cannot cancel orders: not connected");
1696        }
1697
1698        let instrument_id = cmd.instrument_id;
1699        let order_side_filter = cmd.order_side;
1700
1701        // Extract order data from cache with short-lived borrow
1702        // Collect (client_order_id, time_in_force, expire_time) for each matching order
1703        let order_data: Vec<(ClientOrderId, TimeInForce, Option<UnixNanos>)> = {
1704            let cache = self.core.cache();
1705            cache
1706                .orders_open(None, None, None, None, None)
1707                .into_iter()
1708                .filter(|order| order.instrument_id() == instrument_id)
1709                .filter(|order| {
1710                    order_side_filter == OrderSide::NoOrderSide
1711                        || order.order_side() == order_side_filter
1712                })
1713                .map(|order| {
1714                    (
1715                        order.client_order_id(),
1716                        order.time_in_force(),
1717                        order.expire_time(),
1718                    )
1719                })
1720                .collect()
1721        }; // Cache borrow released here
1722
1723        // Count short-term vs long-term for logging
1724        let short_term_count = order_data
1725            .iter()
1726            .filter(|(_, tif, _)| matches!(tif, TimeInForce::Ioc | TimeInForce::Fok))
1727            .count();
1728        let long_term_count = order_data.len() - short_term_count;
1729
1730        log::debug!(
1731            "Cancel all orders: total={}, short_term={}, long_term={}, instrument_id={instrument_id}, order_side={order_side_filter:?}",
1732            order_data.len(),
1733            short_term_count,
1734            long_term_count
1735        );
1736
1737        // Get execution components (no cache borrow held)
1738        let (tx_manager, broadcaster, order_builder) = match self.get_execution_components() {
1739            Ok(components) => components,
1740            Err(e) => {
1741                log::error!("Failed to get execution components for cancel_all: {e}");
1742                return Ok(());
1743            }
1744        };
1745
1746        let block_height = self.block_time_monitor.current_block_height() as u32;
1747
1748        // Collect (instrument_id, client_id, order_flags) tuples for cancel
1749        // Use stored order_flags from order context to ensure correct cancellation
1750        let mut orders_to_cancel = Vec::new();
1751        for (client_order_id, time_in_force, expire_time) in &order_data {
1752            if let Some(encoded) = self.encoder.get(client_order_id) {
1753                let client_id_u32 = encoded.client_id;
1754                // Get stored order_flags from order context
1755                let order_flags = self.get_order_context(client_id_u32).map_or_else(
1756                    || {
1757                        // Fallback: derive from order parameters if context not found
1758                        log::warn!(
1759                            "Order context not found for {client_order_id}, deriving flags from order"
1760                        );
1761                        let expire_secs = expire_time.map(nanos_to_secs_i64);
1762                        types::OrderLifetime::from_time_in_force(
1763                            *time_in_force,
1764                            expire_secs,
1765                            false,
1766                            order_builder.max_short_term_secs(),
1767                        )
1768                        .order_flags()
1769                    },
1770                    |ctx| ctx.order_flags,
1771                );
1772                orders_to_cancel.push((instrument_id, client_id_u32, order_flags));
1773            } else {
1774                log::warn!(
1775                    "Cannot cancel order {client_order_id}: client_order_id not found in cache"
1776                );
1777            }
1778        }
1779
1780        if orders_to_cancel.is_empty() {
1781            return Ok(());
1782        }
1783
1784        // Check if any orders are short-term (order_flags == 0)
1785        // dYdX protocol restriction: short-term MsgCancelOrder cannot be batched
1786        let has_short_term = orders_to_cancel
1787            .iter()
1788            .any(|(_, _, flags)| *flags == types::ORDER_FLAG_SHORT_TERM);
1789
1790        if has_short_term {
1791            // Cancel each order individually (short-term cancels cannot be batched)
1792            log::info!(
1793                "Cancelling {} orders individually (short-term cancels cannot be batched)",
1794                orders_to_cancel.len()
1795            );
1796
1797            self.spawn_task("cancel_all_orders", async move {
1798                let mut handles = Vec::with_capacity(orders_to_cancel.len());
1799
1800                for (inst_id, client_id, order_flags) in orders_to_cancel {
1801                    let tx_manager = tx_manager.clone();
1802                    let broadcaster = broadcaster.clone();
1803                    let order_builder = order_builder.clone();
1804
1805                    let handle = get_runtime().spawn(async move {
1806                        // Build cancel message using stored order_flags
1807                        let msg = match order_builder.build_cancel_order_with_flags(
1808                            inst_id,
1809                            client_id,
1810                            order_flags,
1811                            block_height,
1812                        ) {
1813                            Ok(m) => m,
1814                            Err(e) => {
1815                                log::error!(
1816                                    "Failed to build cancel message for client_id={client_id}: {e:?}"
1817                                );
1818                                return;
1819                            }
1820                        };
1821
1822                        // Short-term: cached sequence, no retry. Stateful: proper sequence management.
1823                        let cancel_op = format!("Cancel order {client_id}");
1824                        let result = if order_flags == types::ORDER_FLAG_SHORT_TERM {
1825                            broadcaster
1826                                .broadcast_short_term(&tx_manager, vec![msg], &cancel_op)
1827                                .await
1828                        } else {
1829                            broadcaster
1830                                .broadcast_with_retry(&tx_manager, vec![msg], &cancel_op)
1831                                .await
1832                        };
1833                        if let Err(e) = result {
1834                            log::error!("Failed to cancel order client_id={client_id}: {e:?}");
1835                        }
1836                    });
1837
1838                    handles.push(handle);
1839                }
1840
1841                // Wait for all cancels to complete
1842                for handle in handles {
1843                    let _ = handle.await;
1844                }
1845
1846                Ok(())
1847            });
1848        } else {
1849            // All orders are long-term - can batch cancels
1850            log::info!(
1851                "Batch cancelling {} long-term orders in single transaction",
1852                orders_to_cancel.len()
1853            );
1854
1855            self.spawn_task("cancel_all_orders", async move {
1856                // Build all cancel messages using stored order_flags
1857                let msgs: Result<Vec<_>, _> = orders_to_cancel
1858                    .iter()
1859                    .map(|(inst_id, client_id, order_flags)| {
1860                        order_builder.build_cancel_order_with_flags(
1861                            *inst_id,
1862                            *client_id,
1863                            *order_flags,
1864                            block_height,
1865                        )
1866                    })
1867                    .collect();
1868
1869                let msgs = match msgs {
1870                    Ok(m) => m,
1871                    Err(e) => {
1872                        log::error!("Failed to build cancel messages: {e:?}");
1873                        return Ok(());
1874                    }
1875                };
1876
1877                if msgs.is_empty() {
1878                    return Ok(());
1879                }
1880
1881                // Broadcast batch cancel
1882                match broadcaster
1883                    .broadcast_with_retry(
1884                        &tx_manager,
1885                        msgs,
1886                        &format!("Cancel {} orders", orders_to_cancel.len()),
1887                    )
1888                    .await
1889                {
1890                    Ok(_) => {
1891                        log::debug!("Successfully cancelled {} orders", orders_to_cancel.len());
1892                    }
1893                    Err(e) => {
1894                        log::error!("Batch cancel failed: {e:?}");
1895                    }
1896                }
1897
1898                Ok(())
1899            });
1900        }
1901
1902        Ok(())
1903    }
1904
1905    fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
1906        if cmd.cancels.is_empty() {
1907            return Ok(());
1908        }
1909
1910        if !self.is_connected() {
1911            anyhow::bail!("Cannot cancel orders: not connected");
1912        }
1913
1914        // Get execution components early for order_flags derivation
1915        let (tx_manager, broadcaster, order_builder) = match self.get_execution_components() {
1916            Ok(components) => components,
1917            Err(e) => {
1918                log::error!("Failed to get execution components for batch cancel: {e}");
1919                return Ok(());
1920            }
1921        };
1922
1923        // Convert ClientOrderIds to u32 and get order_flags
1924        let cache = self.core.cache();
1925
1926        let mut orders_to_cancel = Vec::with_capacity(cmd.cancels.len());
1927        for cancel in &cmd.cancels {
1928            let client_order_id = cancel.client_order_id;
1929            let encoded = match self.encoder.get(&client_order_id) {
1930                Some(enc) => enc,
1931                None => {
1932                    log::warn!(
1933                        "No u32 mapping found for client_order_id={client_order_id}, skipping cancel"
1934                    );
1935                    continue;
1936                }
1937            };
1938            let client_id_u32 = encoded.client_id;
1939
1940            // Get stored order_flags from order context
1941            let order_flags = self.get_order_context(client_id_u32).map_or_else(
1942                || {
1943                    // Fallback: derive from order parameters if context not found
1944                    log::warn!(
1945                        "Order context not found for {client_order_id}, deriving flags from order"
1946                    );
1947                    match cache.order(&client_order_id) {
1948                        Some(order) => {
1949                            let expire_time = order.expire_time().map(nanos_to_secs_i64);
1950                            types::OrderLifetime::from_time_in_force(
1951                                order.time_in_force(),
1952                                expire_time,
1953                                false,
1954                                order_builder.max_short_term_secs(),
1955                            )
1956                            .order_flags()
1957                        }
1958                        None => types::ORDER_FLAG_LONG_TERM, // Default to long-term if not found
1959                    }
1960                },
1961                |ctx| ctx.order_flags,
1962            );
1963
1964            orders_to_cancel.push((cancel.instrument_id, client_id_u32, order_flags));
1965        }
1966        drop(cache);
1967
1968        if orders_to_cancel.is_empty() {
1969            log::warn!("No valid orders to cancel in batch");
1970            return Ok(());
1971        }
1972
1973        let block_height = self.block_time_monitor.current_block_height() as u32;
1974
1975        // Check if any orders are short-term (order_flags == 0)
1976        // dYdX protocol restriction: short-term MsgCancelOrder cannot be batched
1977        let has_short_term = orders_to_cancel
1978            .iter()
1979            .any(|(_, _, flags)| *flags == types::ORDER_FLAG_SHORT_TERM);
1980
1981        if has_short_term {
1982            // Cancel each order individually (short-term cancels cannot be batched)
1983            log::info!(
1984                "Cancelling {} orders individually (short-term cancels cannot be batched)",
1985                orders_to_cancel.len()
1986            );
1987
1988            self.spawn_task("batch_cancel_orders", async move {
1989                let mut handles = Vec::with_capacity(orders_to_cancel.len());
1990
1991                for (inst_id, client_id, order_flags) in orders_to_cancel {
1992                    let tx_manager = tx_manager.clone();
1993                    let broadcaster = broadcaster.clone();
1994                    let order_builder = order_builder.clone();
1995
1996                    let handle = get_runtime().spawn(async move {
1997                        // Build cancel message using stored order_flags
1998                        let msg = match order_builder.build_cancel_order_with_flags(
1999                            inst_id,
2000                            client_id,
2001                            order_flags,
2002                            block_height,
2003                        ) {
2004                            Ok(m) => m,
2005                            Err(e) => {
2006                                log::error!(
2007                                    "Failed to build cancel message for client_id={client_id}: {e:?}"
2008                                );
2009                                return;
2010                            }
2011                        };
2012
2013                        // Short-term: cached sequence, no retry. Stateful: proper sequence management.
2014                        let cancel_op = format!("Cancel order {client_id}");
2015                        let result = if order_flags == types::ORDER_FLAG_SHORT_TERM {
2016                            broadcaster
2017                                .broadcast_short_term(&tx_manager, vec![msg], &cancel_op)
2018                                .await
2019                        } else {
2020                            broadcaster
2021                                .broadcast_with_retry(&tx_manager, vec![msg], &cancel_op)
2022                                .await
2023                        };
2024                        if let Err(e) = result {
2025                            log::error!("Failed to cancel order client_id={client_id}: {e:?}");
2026                        }
2027                    });
2028
2029                    handles.push(handle);
2030                }
2031
2032                // Wait for all cancels to complete
2033                for handle in handles {
2034                    let _ = handle.await;
2035                }
2036
2037                Ok(())
2038            });
2039        } else {
2040            // All orders are long-term - can batch cancels
2041            log::debug!(
2042                "Batch cancelling {} long-term orders: {:?}",
2043                orders_to_cancel.len(),
2044                orders_to_cancel
2045            );
2046
2047            self.spawn_task("batch_cancel_orders", async move {
2048                // Build cancel messages using stored order_flags
2049                let cancel_msgs = match order_builder
2050                    .build_cancel_orders_batch_with_flags(&orders_to_cancel, block_height)
2051                {
2052                    Ok(msgs) => msgs,
2053                    Err(e) => {
2054                        log::error!("Failed to build batch cancel messages: {e:?}");
2055                        return Ok(());
2056                    }
2057                };
2058
2059                // Broadcast with retry
2060                match broadcaster
2061                    .broadcast_with_retry(&tx_manager, cancel_msgs, "BatchCancelOrders")
2062                    .await
2063                {
2064                    Ok(tx_hash) => {
2065                        log::debug!(
2066                            "Successfully batch cancelled {} orders, tx_hash: {}",
2067                            orders_to_cancel.len(),
2068                            tx_hash
2069                        );
2070                    }
2071                    Err(e) => {
2072                        log::error!("Batch cancel failed: {e:?}");
2073                    }
2074                }
2075
2076                Ok(())
2077            });
2078        }
2079
2080        Ok(())
2081    }
2082
2083    fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
2084        Ok(())
2085    }
2086
2087    fn query_order(&self, _cmd: &QueryOrder) -> anyhow::Result<()> {
2088        Ok(())
2089    }
2090
2091    async fn connect(&mut self) -> anyhow::Result<()> {
2092        if self.core.is_connected() {
2093            log::warn!("dYdX execution client already connected");
2094            return Ok(());
2095        }
2096
2097        log::info!("Connecting to dYdX");
2098
2099        log::debug!("Loading instruments from HTTP API");
2100        self.http_client.fetch_and_cache_instruments().await?;
2101        log::debug!(
2102            "Loaded {} instruments from HTTP into shared cache",
2103            self.http_client.cached_instruments_count()
2104        );
2105        self.mark_instruments_initialized();
2106
2107        // Initialize gRPC client (deferred from constructor to avoid blocking)
2108        let grpc_urls = self.config.get_grpc_urls();
2109        let mut grpc_client = DydxGrpcClient::new_with_fallback(&grpc_urls)
2110            .await
2111            .context("failed to construct dYdX gRPC client")?;
2112        log::debug!("gRPC client initialized");
2113
2114        // Fetch initial block height synchronously so orders can be submitted immediately after connect()
2115        let initial_height = grpc_client
2116            .latest_block_height()
2117            .await
2118            .context("failed to fetch initial block height")?;
2119        // Use current time as approximation; actual timestamps will come from WebSocket updates
2120        self.block_time_monitor
2121            .record_block(initial_height.0 as u64, chrono::Utc::now());
2122        log::info!("Initial block height: {}", initial_height.0);
2123
2124        *self.grpc_client.write().await = Some(grpc_client.clone());
2125
2126        // Resolve private key and create TransactionManager (owns wallet and sequence management)
2127        let private_key =
2128            Self::resolve_private_key(&self.config).context("failed to resolve private key")?;
2129        let tx_manager = Arc::new(
2130            TransactionManager::new(
2131                grpc_client.clone(),
2132                &private_key,
2133                self.wallet_address.clone(),
2134                self.get_chain_id(),
2135            )
2136            .context("failed to create TransactionManager")?,
2137        );
2138
2139        tx_manager
2140            .resolve_authenticators()
2141            .await
2142            .context("failed to resolve authenticators")?;
2143
2144        // Proactively initialize sequence from chain so orders can be submitted
2145        // immediately after connect() without first-transaction latency penalty.
2146        tx_manager
2147            .initialize_sequence()
2148            .await
2149            .context("failed to initialize sequence")?;
2150
2151        self.tx_manager = Some(tx_manager);
2152        self.broadcaster = Some(Arc::new(TxBroadcaster::new(grpc_client)));
2153        self.order_builder = Some(Arc::new(OrderMessageBuilder::new(
2154            self.http_client.clone(),
2155            self.wallet_address.clone(),
2156            self.subaccount_number,
2157            self.block_time_monitor.clone(),
2158        )));
2159        log::debug!(
2160            "OrderMessageBuilder initialized (block_time_monitor ready: {}, max_short_term: {:.1}s)",
2161            self.block_time_monitor.is_ready(),
2162            SHORT_TERM_ORDER_MAXIMUM_LIFETIME as f64
2163                * self.block_time_monitor.seconds_per_block_or_default()
2164        );
2165
2166        // Connect WebSocket
2167        self.ws_client.connect().await?;
2168        log::debug!("WebSocket connected");
2169
2170        // Subscribe to block height updates
2171        self.ws_client.subscribe_block_height().await?;
2172        log::debug!("Subscribed to block height updates");
2173
2174        // Subscribe to markets for instrument data
2175        self.ws_client.subscribe_markets().await?;
2176        log::debug!("Subscribed to markets");
2177
2178        // Subscribe to subaccount updates (wallet is always initialized for execution client)
2179        log::info!(
2180            "Using wallet address for queries: {} (subaccount {})",
2181            self.wallet_address,
2182            self.subaccount_number
2183        );
2184        self.ws_client
2185            .subscribe_subaccount(&self.wallet_address, self.subaccount_number)
2186            .await?;
2187        log::debug!(
2188            "Subscribed to subaccount updates: {}/{}",
2189            self.wallet_address,
2190            self.subaccount_number
2191        );
2192
2193        let stream = self.ws_client.stream();
2194        self.spawn_ws_stream_handler(stream);
2195
2196        // Wait for account to be registered in cache before continuing.
2197        // This ensures execution state reconciliation can process fills correctly
2198        // (fills require the account to be registered for portfolio updates).
2199        self.await_account_registered(30.0).await?;
2200
2201        self.core.set_connected();
2202        log::info!("Connected: client_id={}", self.core.client_id);
2203        Ok(())
2204    }
2205
2206    async fn disconnect(&mut self) -> anyhow::Result<()> {
2207        if self.core.is_disconnected() {
2208            log::warn!("dYdX execution client not connected");
2209            return Ok(());
2210        }
2211
2212        log::info!("Disconnecting from dYdX");
2213
2214        // Unsubscribe from subaccount (execution client always has credentials)
2215        let _ = self
2216            .ws_client
2217            .unsubscribe_subaccount(&self.wallet_address, self.subaccount_number)
2218            .await
2219            .map_err(|e| log::warn!("Failed to unsubscribe from subaccount: {e}"));
2220
2221        // Unsubscribe from markets
2222        let _ = self
2223            .ws_client
2224            .unsubscribe_markets()
2225            .await
2226            .map_err(|e| log::warn!("Failed to unsubscribe from markets: {e}"));
2227
2228        // Unsubscribe from block height
2229        let _ = self
2230            .ws_client
2231            .unsubscribe_block_height()
2232            .await
2233            .map_err(|e| log::warn!("Failed to unsubscribe from block height: {e}"));
2234
2235        // Disconnect WebSocket
2236        self.ws_client.disconnect().await?;
2237
2238        // Abort WebSocket message processing task
2239        if let Some(handle) = self.ws_stream_handle.take() {
2240            handle.abort();
2241            log::debug!("Aborted WebSocket message processing task");
2242        }
2243
2244        // Abort any pending tasks
2245        self.abort_pending_tasks();
2246
2247        self.core.set_disconnected();
2248        log::info!("Disconnected: client_id={}", self.core.client_id);
2249        Ok(())
2250    }
2251
2252    async fn generate_order_status_report(
2253        &self,
2254        cmd: &GenerateOrderStatusReport,
2255    ) -> anyhow::Result<Option<OrderStatusReport>> {
2256        // Query single order from dYdX API
2257        let response = self
2258            .http_client
2259            .inner
2260            .get_orders(
2261                &self.wallet_address,
2262                self.subaccount_number,
2263                None,    // market filter
2264                Some(1), // limit to 1 result
2265            )
2266            .await
2267            .context("failed to fetch order from dYdX API")?;
2268
2269        if response.is_empty() {
2270            return Ok(None);
2271        }
2272
2273        let order = &response[0];
2274        let ts_init = UnixNanos::default();
2275
2276        let instrument = match self.get_instrument_by_clob_pair_id(order.clob_pair_id) {
2277            Some(inst) => inst,
2278            None => return Ok(None),
2279        };
2280
2281        let report = parse_order_status_report(order, &instrument, self.core.account_id, ts_init)
2282            .context("failed to parse order status report")?;
2283
2284        if let Some(client_order_id) = cmd.client_order_id
2285            && report.client_order_id != Some(client_order_id)
2286        {
2287            return Ok(None);
2288        }
2289
2290        if let Some(venue_order_id) = cmd.venue_order_id
2291            && report.venue_order_id.as_str() != venue_order_id.as_str()
2292        {
2293            return Ok(None);
2294        }
2295
2296        if let Some(instrument_id) = cmd.instrument_id
2297            && report.instrument_id != instrument_id
2298        {
2299            return Ok(None);
2300        }
2301
2302        Ok(Some(report))
2303    }
2304
2305    async fn generate_order_status_reports(
2306        &self,
2307        cmd: &GenerateOrderStatusReports,
2308    ) -> anyhow::Result<Vec<OrderStatusReport>> {
2309        // Query orders from dYdX API
2310        let response = self
2311            .http_client
2312            .inner
2313            .get_orders(
2314                &self.wallet_address,
2315                self.subaccount_number,
2316                None, // market filter
2317                None, // limit
2318            )
2319            .await
2320            .context("failed to fetch orders from dYdX API")?;
2321
2322        let mut reports = Vec::new();
2323        let ts_init = UnixNanos::default();
2324
2325        for order in response {
2326            let instrument = match self.get_instrument_by_clob_pair_id(order.clob_pair_id) {
2327                Some(inst) => inst,
2328                None => continue,
2329            };
2330
2331            if let Some(filter_id) = cmd.instrument_id
2332                && instrument.id() != filter_id
2333            {
2334                continue;
2335            }
2336
2337            let report =
2338                match parse_order_status_report(&order, &instrument, self.core.account_id, ts_init)
2339                {
2340                    Ok(r) => r,
2341                    Err(e) => {
2342                        log::warn!("Failed to parse order status report: {e}");
2343                        continue;
2344                    }
2345                };
2346
2347            reports.push(report);
2348        }
2349
2350        // Filter by open_only if specified
2351        if cmd.open_only {
2352            reports.retain(|r| r.order_status.is_open());
2353        }
2354
2355        // Filter by time range if specified
2356        if let Some(start) = cmd.start {
2357            reports.retain(|r| r.ts_last >= start);
2358        }
2359        if let Some(end) = cmd.end {
2360            reports.retain(|r| r.ts_last <= end);
2361        }
2362
2363        Ok(reports)
2364    }
2365
2366    async fn generate_fill_reports(
2367        &self,
2368        cmd: GenerateFillReports,
2369    ) -> anyhow::Result<Vec<FillReport>> {
2370        let response = self
2371            .http_client
2372            .inner
2373            .get_fills(
2374                &self.wallet_address,
2375                self.subaccount_number,
2376                None, // market filter
2377                None, // limit
2378            )
2379            .await
2380            .context("failed to fetch fills from dYdX API")?;
2381
2382        let mut reports = Vec::new();
2383        let ts_init = UnixNanos::default();
2384
2385        for fill in response.fills {
2386            let instrument = match self.get_instrument_by_market(&fill.market) {
2387                Some(inst) => inst,
2388                None => {
2389                    log::warn!("Unknown market in fill: {}", fill.market);
2390                    continue;
2391                }
2392            };
2393
2394            if let Some(filter_id) = cmd.instrument_id
2395                && instrument.id() != filter_id
2396            {
2397                continue;
2398            }
2399
2400            let report = match parse_fill_report(&fill, &instrument, self.core.account_id, ts_init)
2401            {
2402                Ok(r) => r,
2403                Err(e) => {
2404                    log::warn!("Failed to parse fill report: {e}");
2405                    continue;
2406                }
2407            };
2408
2409            reports.push(report);
2410        }
2411
2412        if let Some(venue_order_id) = cmd.venue_order_id {
2413            reports.retain(|r| r.venue_order_id.as_str() == venue_order_id.as_str());
2414        }
2415
2416        Ok(reports)
2417    }
2418
2419    async fn generate_position_status_reports(
2420        &self,
2421        cmd: &GeneratePositionStatusReports,
2422    ) -> anyhow::Result<Vec<PositionStatusReport>> {
2423        // Query subaccount positions from dYdX API
2424        let response = self
2425            .http_client
2426            .inner
2427            .get_subaccount(&self.wallet_address, self.subaccount_number)
2428            .await
2429            .context("failed to fetch subaccount from dYdX API")?;
2430
2431        let mut reports = Vec::new();
2432        let ts_init = UnixNanos::default();
2433
2434        for (market_ticker, perp_position) in &response.subaccount.open_perpetual_positions {
2435            let instrument = match self.get_instrument_by_market(market_ticker) {
2436                Some(inst) => inst,
2437                None => {
2438                    log::warn!("Unknown market in position: {market_ticker}");
2439                    continue;
2440                }
2441            };
2442
2443            if let Some(filter_id) = cmd.instrument_id
2444                && instrument.id() != filter_id
2445            {
2446                continue;
2447            }
2448
2449            let report = match parse_position_status_report(
2450                perp_position,
2451                &instrument,
2452                self.core.account_id,
2453                ts_init,
2454            ) {
2455                Ok(r) => r,
2456                Err(e) => {
2457                    log::warn!("Failed to parse position status report: {e}");
2458                    continue;
2459                }
2460            };
2461
2462            reports.push(report);
2463        }
2464
2465        Ok(reports)
2466    }
2467
2468    async fn generate_mass_status(
2469        &self,
2470        lookback_mins: Option<u64>,
2471    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
2472        let ts_init = UnixNanos::default();
2473
2474        // Query orders
2475        let orders_response = self
2476            .http_client
2477            .inner
2478            .get_orders(&self.wallet_address, self.subaccount_number, None, None)
2479            .await
2480            .context("failed to fetch orders for mass status")?;
2481
2482        // Query subaccount for positions
2483        let subaccount_response = self
2484            .http_client
2485            .inner
2486            .get_subaccount(&self.wallet_address, self.subaccount_number)
2487            .await
2488            .context("failed to fetch subaccount for mass status")?;
2489
2490        // Query fills
2491        let fills_response = self
2492            .http_client
2493            .inner
2494            .get_fills(&self.wallet_address, self.subaccount_number, None, None)
2495            .await
2496            .context("failed to fetch fills for mass status")?;
2497
2498        // Parse order reports
2499        let mut order_reports = Vec::new();
2500        let mut orders_filtered = 0usize;
2501        for order in orders_response {
2502            let instrument = match self.get_instrument_by_clob_pair_id(order.clob_pair_id) {
2503                Some(inst) => inst,
2504                None => {
2505                    orders_filtered += 1;
2506                    continue;
2507                }
2508            };
2509
2510            match parse_order_status_report(&order, &instrument, self.core.account_id, ts_init) {
2511                Ok(mut r) => {
2512                    // Decode dYdX u32 client_id back to Nautilus format using bidirectional encoder
2513                    if !order.client_id.is_empty()
2514                        && let Ok(client_id_u32) = order.client_id.parse::<u32>()
2515                        && let Some(decoded) =
2516                            self.encoder.decode(client_id_u32, order.client_metadata)
2517                    {
2518                        log::debug!(
2519                            "Decoded reconciliation order: dYdX client_id={} meta={:#x} -> '{}'",
2520                            client_id_u32,
2521                            order.client_metadata,
2522                            decoded,
2523                        );
2524                        r.client_order_id = Some(decoded);
2525                    }
2526                    order_reports.push(r);
2527                }
2528                Err(e) => {
2529                    log::warn!("Failed to parse order status report: {e}");
2530                    orders_filtered += 1;
2531                }
2532            }
2533        }
2534
2535        // Parse position reports
2536        let mut position_reports = Vec::new();
2537        for (market_ticker, perp_position) in
2538            &subaccount_response.subaccount.open_perpetual_positions
2539        {
2540            let instrument = match self.get_instrument_by_market(market_ticker) {
2541                Some(inst) => inst,
2542                None => continue,
2543            };
2544
2545            match parse_position_status_report(
2546                perp_position,
2547                &instrument,
2548                self.core.account_id,
2549                ts_init,
2550            ) {
2551                Ok(r) => position_reports.push(r),
2552                Err(e) => {
2553                    log::warn!("Failed to parse position status report: {e}");
2554                }
2555            }
2556        }
2557
2558        // Parse fill reports
2559        let mut fill_reports = Vec::new();
2560        let mut fills_filtered = 0usize;
2561        for fill in fills_response.fills {
2562            let instrument = match self.get_instrument_by_market(&fill.market) {
2563                Some(inst) => inst,
2564                None => {
2565                    fills_filtered += 1;
2566                    continue;
2567                }
2568            };
2569
2570            match parse_fill_report(&fill, &instrument, self.core.account_id, ts_init) {
2571                Ok(r) => fill_reports.push(r),
2572                Err(e) => {
2573                    log::warn!("Failed to parse fill report: {e}");
2574                    fills_filtered += 1;
2575                }
2576            }
2577        }
2578
2579        // Apply lookback filter to orders and fills (positions are always current state)
2580        if let Some(mins) = lookback_mins {
2581            let now_ns = get_atomic_clock_realtime().get_time_ns();
2582            let cutoff_ns = now_ns.as_u64().saturating_sub(mins * 60 * 1_000_000_000);
2583            let cutoff = UnixNanos::from(cutoff_ns);
2584
2585            let orders_before = order_reports.len();
2586            order_reports.retain(|r| r.ts_last >= cutoff);
2587            let orders_removed = orders_before - order_reports.len();
2588
2589            let fills_before = fill_reports.len();
2590            fill_reports.retain(|r| r.ts_event >= cutoff);
2591            let fills_removed = fills_before - fill_reports.len();
2592
2593            log::info!(
2594                "Lookback filter ({}min): orders {}->{} (removed {}), fills {}->{} (removed {}), positions {} (unfiltered)",
2595                mins,
2596                orders_before,
2597                order_reports.len(),
2598                orders_removed,
2599                fills_before,
2600                fill_reports.len(),
2601                fills_removed,
2602                position_reports.len(),
2603            );
2604        } else {
2605            log::debug!(
2606                "Generated mass status: {} orders ({} filtered), {} positions, {} fills ({} filtered)",
2607                order_reports.len(),
2608                orders_filtered,
2609                position_reports.len(),
2610                fill_reports.len(),
2611                fills_filtered,
2612            );
2613        }
2614
2615        // Create mass status and add reports
2616        let mut mass_status = ExecutionMassStatus::new(
2617            self.core.client_id,
2618            self.core.account_id,
2619            self.core.venue,
2620            ts_init,
2621            None, // report_id will be auto-generated
2622        );
2623
2624        mass_status.add_order_reports(order_reports);
2625        mass_status.add_position_reports(position_reports);
2626        mass_status.add_fill_reports(fill_reports);
2627
2628        Ok(Some(mass_status))
2629    }
2630}