nautilus_dydx/execution/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2025 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live execution client implementation for the dYdX adapter.
17//!
18//! This module provides the execution client for submitting orders, cancellations,
19//! and managing positions on dYdX v4.
20//!
21//! # Order Types
22//!
23//! dYdX supports the following order types:
24//!
25//! - **Market**: Execute immediately at best available price
26//! - **Limit**: Execute at specified price or better
27//! - **Stop Market**: Triggered when price crosses stop price, then executes as market order
28//! - **Stop Limit**: Triggered when price crosses stop price, then places limit order
29//! - **Take Profit Market**: Close position at profit target, executes as market order
30//! - **Take Profit Limit**: Close position at profit target, places limit order
31//!
32//! See <https://docs.dydx.xyz/concepts/trading/orders#types> for details.
33//!
34//! # Order Lifetimes
35//!
36//! Orders can be short-term (expire by block height) or long-term/stateful (expire by timestamp).
37//! Conditional orders (Stop/TakeProfit) are always stateful.
38//!
39//! See <https://docs.dydx.xyz/concepts/trading/orders#short-term-vs-long-term> for details.
40
41use std::sync::{
42    Arc, Mutex,
43    atomic::{AtomicU32, AtomicU64},
44};
45
46use anyhow::Context;
47use async_trait::async_trait;
48use chrono::{Duration, Utc};
49use dashmap::DashMap;
50use nautilus_common::{
51    live::{runner::get_exec_event_sender, runtime::get_runtime},
52    messages::{
53        ExecutionEvent, ExecutionReport as NautilusExecutionReport,
54        execution::{
55            BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
56            GenerateOrderStatusReport, GeneratePositionReports, ModifyOrder, QueryAccount,
57            QueryOrder, SubmitOrder, SubmitOrderList,
58        },
59    },
60    msgbus,
61};
62use nautilus_core::{MUTEX_POISONED, UUID4, UnixNanos};
63use nautilus_execution::client::{ExecutionClient, base::ExecutionClientCore};
64use nautilus_live::execution::client::LiveExecutionClient;
65use nautilus_model::{
66    accounts::AccountAny,
67    enums::{OmsType, OrderSide, OrderType, TimeInForce},
68    events::{AccountState, OrderCancelRejected, OrderEventAny, OrderRejected},
69    identifiers::{AccountId, ClientId, ClientOrderId, InstrumentId, StrategyId, Venue},
70    instruments::{Instrument, InstrumentAny},
71    orders::Order,
72    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
73    types::{AccountBalance, MarginBalance},
74};
75use rust_decimal::Decimal;
76use tokio::task::JoinHandle;
77
78use crate::{
79    common::{consts::DYDX_VENUE, credential::DydxCredential, parse::nanos_to_secs_i64},
80    config::DydxAdapterConfig,
81    execution::submitter::OrderSubmitter,
82    grpc::{DydxGrpcClient, Wallet, types::ChainId},
83    http::client::DydxHttpClient,
84    websocket::{client::DydxWebSocketClient, enums::NautilusWsMessage},
85};
86
87pub mod submitter;
88
89/// Maximum client order ID value for dYdX (informational - not enforced by adapter).
90///
91/// dYdX protocol accepts u32 client IDs. The current implementation uses sequential
92/// allocation starting from 1, which will wrap at u32::MAX. If dYdX has a stricter
93/// limit, this constant should be updated and enforced in `generate_client_order_id_int`.
94pub const MAX_CLIENT_ID: u32 = u32::MAX;
95
96/// Execution report types dispatched from WebSocket message handler.
97///
98/// This enum groups order and fill reports for unified dispatch handling,
99/// following the pattern used by reference adapters (Hyperliquid, OKX).
100enum ExecutionReport {
101    Order(Box<OrderStatusReport>),
102    Fill(Box<FillReport>),
103}
104
105/// Live execution client for the dYdX v4 exchange adapter.
106///
107/// Supports Market, Limit, Stop Market, Stop Limit, Take Profit Market (MarketIfTouched),
108/// and Take Profit Limit (LimitIfTouched) orders via gRPC. Trailing stops are NOT supported
109/// by the dYdX v4 protocol. dYdX requires u32 client IDs - strings are hashed to fit.
110///
111/// # Architecture
112///
113/// The client follows a two-layer execution model:
114/// 1. **Synchronous validation** - Immediate checks and event generation
115/// 2. **Async submission** - Non-blocking gRPC calls via `OrderSubmitter`
116///
117/// This matches the pattern used in OKX and other exchange adapters, ensuring
118/// consistent behavior across the Nautilus ecosystem.
119#[derive(Debug)]
120pub struct DydxExecutionClient {
121    core: ExecutionClientCore,
122    config: DydxAdapterConfig,
123    http_client: DydxHttpClient,
124    ws_client: DydxWebSocketClient,
125    grpc_client: Arc<tokio::sync::RwLock<DydxGrpcClient>>,
126    wallet: Arc<tokio::sync::RwLock<Option<Wallet>>>,
127    instruments: DashMap<InstrumentId, InstrumentAny>,
128    market_to_instrument: DashMap<String, InstrumentId>,
129    clob_pair_id_to_instrument: DashMap<u32, InstrumentId>,
130    block_height: AtomicU64,
131    oracle_prices: DashMap<InstrumentId, Decimal>,
132    client_id_to_int: DashMap<String, u32>,
133    int_to_client_id: DashMap<u32, String>,
134    next_client_id: AtomicU32,
135    wallet_address: String,
136    subaccount_number: u32,
137    started: bool,
138    connected: bool,
139    instruments_initialized: bool,
140    ws_stream_handle: Option<JoinHandle<()>>,
141    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
142}
143
144impl DydxExecutionClient {
145    /// Creates a new [`DydxExecutionClient`].
146    ///
147    /// # Errors
148    ///
149    /// Returns an error if the WebSocket client fails to construct.
150    pub fn new(
151        core: ExecutionClientCore,
152        config: DydxAdapterConfig,
153        wallet_address: String,
154        subaccount_number: u32,
155    ) -> anyhow::Result<Self> {
156        let http_client = DydxHttpClient::default();
157
158        // Use private WebSocket client for authenticated subaccount subscriptions
159        let ws_client = if let Some(ref mnemonic) = config.mnemonic {
160            let credential = DydxCredential::from_mnemonic(
161                mnemonic,
162                subaccount_number,
163                config.authenticator_ids.clone(),
164            )?;
165            DydxWebSocketClient::new_private(
166                config.ws_url.clone(),
167                credential,
168                core.account_id,
169                Some(20),
170            )
171        } else {
172            DydxWebSocketClient::new_public(config.ws_url.clone(), Some(20))
173        };
174
175        let grpc_urls = config.get_grpc_urls();
176        let grpc_client = Arc::new(tokio::sync::RwLock::new(
177            get_runtime()
178                .block_on(async { DydxGrpcClient::new_with_fallback(&grpc_urls).await })
179                .context("failed to construct dYdX gRPC client")?,
180        ));
181
182        Ok(Self {
183            core,
184            config,
185            http_client,
186            ws_client,
187            grpc_client,
188            wallet: Arc::new(tokio::sync::RwLock::new(None)),
189            instruments: DashMap::new(),
190            market_to_instrument: DashMap::new(),
191            clob_pair_id_to_instrument: DashMap::new(),
192            block_height: AtomicU64::new(0),
193            oracle_prices: DashMap::new(),
194            client_id_to_int: DashMap::new(),
195            int_to_client_id: DashMap::new(),
196            next_client_id: AtomicU32::new(1),
197            wallet_address,
198            subaccount_number,
199            started: false,
200            connected: false,
201            instruments_initialized: false,
202            ws_stream_handle: None,
203            pending_tasks: Mutex::new(Vec::new()),
204        })
205    }
206
207    /// Generate a unique client order ID integer and store the mapping.
208    ///
209    /// # Invariants
210    ///
211    /// - Same `client_order_id` string → same `u32` for the lifetime of this process
212    /// - Different `client_order_id` strings → different `u32` values (except on u32 wrap)
213    /// - Thread-safe for concurrent calls
214    ///
215    /// # Behavior
216    ///
217    /// - Parses numeric `client_order_id` directly to `u32` for stability across restarts
218    /// - For non-numeric IDs, allocates a new sequential value from an atomic counter
219    /// - Mapping is kept in-memory only; non-numeric IDs will not be recoverable after restart
220    /// - Counter starts at 1 and increments without bound checking (will wrap at u32::MAX)
221    ///
222    /// # Notes
223    ///
224    /// - Atomic counter uses `Relaxed` ordering — uniqueness is required, not cross-thread sequencing
225    /// - If dYdX enforces a maximum client ID below u32::MAX, additional range validation is needed
226    fn generate_client_order_id_int(&self, client_order_id: &str) -> u32 {
227        // Fast path: already mapped
228        if let Some(existing) = self.client_id_to_int.get(client_order_id) {
229            return *existing.value();
230        }
231
232        // Try parsing as direct integer
233        if let Ok(id) = client_order_id.parse::<u32>() {
234            self.client_id_to_int
235                .insert(client_order_id.to_string(), id);
236            self.int_to_client_id
237                .insert(id, client_order_id.to_string());
238            return id;
239        }
240
241        // Allocate new ID from atomic counter
242        use dashmap::mapref::entry::Entry;
243
244        match self.client_id_to_int.entry(client_order_id.to_string()) {
245            Entry::Occupied(entry) => *entry.get(),
246            Entry::Vacant(vacant) => {
247                let id = self
248                    .next_client_id
249                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
250                vacant.insert(id);
251                self.int_to_client_id
252                    .insert(id, client_order_id.to_string());
253                id
254            }
255        }
256    }
257
258    /// Retrieve the client order ID integer from the cache.
259    ///
260    /// Returns `None` if the mapping doesn't exist.
261    fn get_client_order_id_int(&self, client_order_id: &str) -> Option<u32> {
262        // Try parsing first
263        if let Ok(id) = client_order_id.parse::<u32>() {
264            return Some(id);
265        }
266
267        // Look up in cache
268        self.client_id_to_int
269            .get(client_order_id)
270            .map(|entry| *entry.value())
271    }
272
273    /// Get chain ID from config network field.
274    ///
275    /// This is the recommended way to get chain_id for all transaction submissions.
276    fn get_chain_id(&self) -> ChainId {
277        self.config.get_chain_id()
278    }
279
280    /// Cache instruments from HTTP client into execution client's lookup maps.
281    ///
282    /// This populates three data structures for efficient lookups:
283    /// - `instruments`: InstrumentId → InstrumentAny
284    /// - `market_to_instrument`: Market ticker (e.g., "BTC-USD") → InstrumentId
285    /// - `clob_pair_id_to_instrument`: CLOB pair ID → InstrumentId
286    fn cache_instruments_from_http(&mut self) {
287        use nautilus_model::instruments::InstrumentAny;
288
289        // Get all instruments from HTTP client cache
290        let instruments: Vec<InstrumentAny> = self
291            .http_client
292            .instruments_cache
293            .iter()
294            .map(|entry| entry.value().clone())
295            .collect();
296
297        tracing::debug!(
298            "Caching {} instruments in execution client",
299            instruments.len()
300        );
301
302        for instrument in instruments {
303            let instrument_id = instrument.id();
304            let symbol = instrument_id.symbol.as_str();
305
306            // Cache instrument by InstrumentId
307            self.instruments.insert(instrument_id, instrument.clone());
308
309            // Cache market ticker → InstrumentId mapping
310            self.market_to_instrument
311                .insert(symbol.to_string(), instrument_id);
312        }
313
314        // Copy clob_pair_id → InstrumentId mapping from HTTP client
315        // The HTTP client populates this from PerpetualMarket.clob_pair_id (authoritative source)
316        let http_mapping = self.http_client.clob_pair_id_mapping();
317        for entry in http_mapping.iter() {
318            self.clob_pair_id_to_instrument
319                .insert(*entry.key(), *entry.value());
320        }
321
322        self.instruments_initialized = true;
323        tracing::info!(
324            "Cached {} instruments ({} CLOB pair IDs) with market mappings",
325            self.instruments.len(),
326            self.clob_pair_id_to_instrument.len()
327        );
328    }
329
330    /// Get an instrument by market ticker (e.g., "BTC-USD").
331    fn get_instrument_by_market(&self, market: &str) -> Option<InstrumentAny> {
332        self.market_to_instrument
333            .get(market)
334            .and_then(|id| self.instruments.get(&id).map(|entry| entry.value().clone()))
335    }
336
337    /// Get an instrument by clob_pair_id.
338    fn get_instrument_by_clob_pair_id(&self, clob_pair_id: u32) -> Option<InstrumentAny> {
339        let instrument = self
340            .clob_pair_id_to_instrument
341            .get(&clob_pair_id)
342            .and_then(|id| self.instruments.get(&id).map(|entry| entry.value().clone()));
343
344        if instrument.is_none() {
345            self.log_missing_instrument_for_clob_pair_id(clob_pair_id);
346        }
347
348        instrument
349    }
350
351    fn log_missing_instrument_for_clob_pair_id(&self, clob_pair_id: u32) {
352        let known: Vec<(u32, String)> = self
353            .clob_pair_id_to_instrument
354            .iter()
355            .filter_map(|entry| {
356                let instrument_id = entry.value();
357                self.instruments.get(instrument_id).map(|inst_entry| {
358                    (
359                        *entry.key(),
360                        inst_entry.value().id().symbol.as_str().to_string(),
361                    )
362                })
363            })
364            .collect();
365
366        tracing::warn!(
367            "Instrument for clob_pair_id {} not found in cache. Known CLOB pair IDs and symbols: {:?}",
368            clob_pair_id,
369            known
370        );
371    }
372
373    fn spawn_task<F>(&self, label: &'static str, fut: F)
374    where
375        F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
376    {
377        let handle = tokio::spawn(async move {
378            if let Err(e) = fut.await {
379                tracing::error!("{label}: {e:?}");
380            }
381        });
382
383        self.pending_tasks
384            .lock()
385            .expect(MUTEX_POISONED)
386            .push(handle);
387    }
388
389    /// Spawns an order submission task with error handling and rejection generation.
390    ///
391    /// If the submission fails, generates an `OrderRejected` event with the error details.
392    fn spawn_order_task<F>(
393        &self,
394        label: &'static str,
395        strategy_id: StrategyId,
396        instrument_id: InstrumentId,
397        client_order_id: ClientOrderId,
398        fut: F,
399    ) where
400        F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
401    {
402        // Capture necessary data for rejection event
403        let trader_id = self.core.trader_id;
404        let account_id = self.core.account_id;
405        let sender = get_exec_event_sender();
406
407        let handle = tokio::spawn(async move {
408            if let Err(e) = fut.await {
409                let error_msg = format!("{label} failed: {e:?}");
410                tracing::error!("{}", error_msg);
411
412                // Generate OrderRejected event on submission failure
413                let ts_now = UnixNanos::default(); // Use current time
414                let event = OrderRejected::new(
415                    trader_id,
416                    strategy_id,
417                    instrument_id,
418                    client_order_id,
419                    account_id,
420                    error_msg.into(),
421                    UUID4::new(),
422                    ts_now,
423                    ts_now,
424                    false,
425                    false,
426                );
427
428                if let Err(send_err) =
429                    sender.send(ExecutionEvent::Order(OrderEventAny::Rejected(event)))
430                {
431                    tracing::error!("Failed to send OrderRejected event: {send_err}");
432                }
433            }
434        });
435
436        self.pending_tasks
437            .lock()
438            .expect(MUTEX_POISONED)
439            .push(handle);
440    }
441
442    fn abort_pending_tasks(&self) {
443        let mut guard = self.pending_tasks.lock().expect(MUTEX_POISONED);
444        for handle in guard.drain(..) {
445            handle.abort();
446        }
447    }
448}
449
450#[async_trait(?Send)]
451impl ExecutionClient for DydxExecutionClient {
452    fn is_connected(&self) -> bool {
453        self.connected
454    }
455
456    fn client_id(&self) -> ClientId {
457        self.core.client_id
458    }
459
460    fn account_id(&self) -> AccountId {
461        self.core.account_id
462    }
463
464    fn venue(&self) -> Venue {
465        *DYDX_VENUE
466    }
467
468    fn oms_type(&self) -> OmsType {
469        self.core.oms_type
470    }
471
472    fn get_account(&self) -> Option<AccountAny> {
473        self.core.get_account()
474    }
475
476    fn generate_account_state(
477        &self,
478        balances: Vec<AccountBalance>,
479        margins: Vec<MarginBalance>,
480        reported: bool,
481        ts_event: UnixNanos,
482    ) -> anyhow::Result<()> {
483        self.core
484            .generate_account_state(balances, margins, reported, ts_event)
485    }
486
487    fn start(&mut self) -> anyhow::Result<()> {
488        if self.started {
489            tracing::warn!("dYdX execution client already started");
490            return Ok(());
491        }
492
493        tracing::info!("Starting dYdX execution client");
494        self.started = true;
495        Ok(())
496    }
497
498    fn stop(&mut self) -> anyhow::Result<()> {
499        if !self.started {
500            tracing::warn!("dYdX execution client not started");
501            return Ok(());
502        }
503
504        tracing::info!("Stopping dYdX execution client");
505        self.abort_pending_tasks();
506        self.started = false;
507        self.connected = false;
508        Ok(())
509    }
510
511    /// Submits an order to dYdX via gRPC.
512    ///
513    /// dYdX requires u32 client IDs - Nautilus ClientOrderId strings are hashed to fit.
514    ///
515    /// Supported order types:
516    /// - Market orders (short-term, IOC)
517    /// - Limit orders (short-term or long-term based on TIF)
518    /// - Stop Market orders (conditional, triggered at stop price)
519    /// - Stop Limit orders (conditional, triggered at stop price, executed at limit)
520    /// - Take Profit Market (MarketIfTouched - triggered at take profit price)
521    /// - Take Profit Limit (LimitIfTouched - triggered at take profit price, executed at limit)
522    ///
523    /// Trailing stop orders are NOT supported by dYdX v4 protocol.
524    ///
525    /// Validates synchronously, generates OrderSubmitted event, then spawns async task for
526    /// gRPC submission to avoid blocking. Unsupported order types generate OrderRejected.
527    fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
528        let order = cmd.order.clone();
529
530        // Check connection status
531        if !self.is_connected() {
532            let reason = "Cannot submit order: execution client not connected";
533            tracing::error!("{}", reason);
534            anyhow::bail!(reason);
535        }
536
537        // Check if order is already closed
538        if order.is_closed() {
539            tracing::warn!("Cannot submit closed order {}", order.client_order_id());
540            return Ok(());
541        }
542
543        // Validate order type
544        match order.order_type() {
545            OrderType::Market | OrderType::Limit => {
546                tracing::debug!(
547                    "Submitting {} order: {}",
548                    if matches!(order.order_type(), OrderType::Market) {
549                        "MARKET"
550                    } else {
551                        "LIMIT"
552                    },
553                    order.client_order_id()
554                );
555            }
556            // Conditional orders (stop/take-profit) - supported by dYdX
557            OrderType::StopMarket | OrderType::StopLimit => {
558                tracing::debug!(
559                    "Submitting {} order: {}",
560                    if matches!(order.order_type(), OrderType::StopMarket) {
561                        "STOP_MARKET"
562                    } else {
563                        "STOP_LIMIT"
564                    },
565                    order.client_order_id()
566                );
567            }
568            // dYdX TakeProfit/TakeProfitLimit map to MarketIfTouched/LimitIfTouched
569            OrderType::MarketIfTouched | OrderType::LimitIfTouched => {
570                tracing::debug!(
571                    "Submitting {} order: {}",
572                    if matches!(order.order_type(), OrderType::MarketIfTouched) {
573                        "TAKE_PROFIT_MARKET"
574                    } else {
575                        "TAKE_PROFIT_LIMIT"
576                    },
577                    order.client_order_id()
578                );
579            }
580            // Trailing stops not supported by dYdX v4 protocol
581            OrderType::TrailingStopMarket | OrderType::TrailingStopLimit => {
582                let reason = "Trailing stop orders not supported by dYdX v4 protocol";
583                tracing::error!("{}", reason);
584                self.core.generate_order_rejected(
585                    order.strategy_id(),
586                    order.instrument_id(),
587                    order.client_order_id(),
588                    reason,
589                    cmd.ts_init,
590                    false,
591                );
592                return Ok(());
593            }
594            order_type => {
595                let reason = format!("Order type {order_type:?} not supported by dYdX");
596                tracing::error!("{}", reason);
597                self.core.generate_order_rejected(
598                    order.strategy_id(),
599                    order.instrument_id(),
600                    order.client_order_id(),
601                    &reason,
602                    cmd.ts_init,
603                    false,
604                );
605                return Ok(());
606            }
607        }
608
609        // Generate OrderSubmitted event immediately
610        self.core.generate_order_submitted(
611            order.strategy_id(),
612            order.instrument_id(),
613            order.client_order_id(),
614            cmd.ts_init,
615        );
616
617        let grpc_client = self.grpc_client.clone();
618        let wallet = self.wallet.clone();
619        let http_client = self.http_client.clone();
620        let wallet_address = self.wallet_address.clone();
621        let subaccount_number = self.subaccount_number;
622        let client_order_id = order.client_order_id();
623        let instrument_id = order.instrument_id();
624        let block_height = self.block_height.load(std::sync::atomic::Ordering::Relaxed) as u32;
625        let chain_id = self.get_chain_id();
626        let authenticator_ids = self.config.authenticator_ids.clone();
627        #[allow(clippy::redundant_clone)]
628        let order_clone = order.clone();
629
630        // Generate client_order_id as u32 before async block (dYdX requires u32 client IDs)
631        let client_id_u32 = self.generate_client_order_id_int(client_order_id.as_str());
632
633        self.spawn_order_task(
634            "submit_order",
635            order.strategy_id(),
636            order.instrument_id(),
637            order.client_order_id(),
638            async move {
639                let wallet_guard = wallet.read().await;
640                let wallet_ref = wallet_guard
641                    .as_ref()
642                    .ok_or_else(|| anyhow::anyhow!("Wallet not initialized"))?;
643
644                let grpc_guard = grpc_client.read().await;
645                let submitter = OrderSubmitter::new(
646                    (*grpc_guard).clone(),
647                    http_client.clone(),
648                    wallet_address,
649                    subaccount_number,
650                    chain_id,
651                    authenticator_ids,
652                );
653
654                // Submit order based on type
655                match order_clone.order_type() {
656                    OrderType::Market => {
657                        submitter
658                            .submit_market_order(
659                                wallet_ref,
660                                instrument_id,
661                                client_id_u32,
662                                order_clone.order_side(),
663                                order_clone.quantity(),
664                                block_height,
665                            )
666                            .await?;
667                        tracing::info!("Successfully submitted market order: {}", client_order_id);
668                    }
669                    OrderType::Limit => {
670                        let expire_time = order_clone.expire_time().map(nanos_to_secs_i64);
671                        submitter
672                            .submit_limit_order(
673                                wallet_ref,
674                                instrument_id,
675                                client_id_u32,
676                                order_clone.order_side(),
677                                order_clone
678                                    .price()
679                                    .ok_or_else(|| anyhow::anyhow!("Limit order missing price"))?,
680                                order_clone.quantity(),
681                                order_clone.time_in_force(),
682                                order_clone.is_post_only(),
683                                order_clone.is_reduce_only(),
684                                block_height,
685                                expire_time,
686                            )
687                            .await?;
688                        tracing::info!("Successfully submitted limit order: {}", client_order_id);
689                    }
690                    OrderType::StopMarket => {
691                        let trigger_price = order_clone.trigger_price().ok_or_else(|| {
692                            anyhow::anyhow!("Stop market order missing trigger_price")
693                        })?;
694                        let expire_time = order_clone.expire_time().map(nanos_to_secs_i64);
695                        submitter
696                            .submit_stop_market_order(
697                                wallet_ref,
698                                instrument_id,
699                                client_id_u32,
700                                order_clone.order_side(),
701                                trigger_price,
702                                order_clone.quantity(),
703                                order_clone.is_reduce_only(),
704                                expire_time,
705                            )
706                            .await?;
707                        tracing::info!(
708                            "Successfully submitted stop market order: {}",
709                            client_order_id
710                        );
711                    }
712                    OrderType::StopLimit => {
713                        let trigger_price = order_clone.trigger_price().ok_or_else(|| {
714                            anyhow::anyhow!("Stop limit order missing trigger_price")
715                        })?;
716                        let limit_price = order_clone.price().ok_or_else(|| {
717                            anyhow::anyhow!("Stop limit order missing limit price")
718                        })?;
719                        let expire_time = order_clone.expire_time().map(nanos_to_secs_i64);
720                        submitter
721                            .submit_stop_limit_order(
722                                wallet_ref,
723                                instrument_id,
724                                client_id_u32,
725                                order_clone.order_side(),
726                                trigger_price,
727                                limit_price,
728                                order_clone.quantity(),
729                                order_clone.time_in_force(),
730                                order_clone.is_post_only(),
731                                order_clone.is_reduce_only(),
732                                expire_time,
733                            )
734                            .await?;
735                        tracing::info!(
736                            "Successfully submitted stop limit order: {}",
737                            client_order_id
738                        );
739                    }
740                    // dYdX TakeProfitMarket maps to Nautilus MarketIfTouched
741                    OrderType::MarketIfTouched => {
742                        let trigger_price = order_clone.trigger_price().ok_or_else(|| {
743                            anyhow::anyhow!("Take profit market order missing trigger_price")
744                        })?;
745                        let expire_time = order_clone.expire_time().map(nanos_to_secs_i64);
746                        submitter
747                            .submit_take_profit_market_order(
748                                wallet_ref,
749                                instrument_id,
750                                client_id_u32,
751                                order_clone.order_side(),
752                                trigger_price,
753                                order_clone.quantity(),
754                                order_clone.is_reduce_only(),
755                                expire_time,
756                            )
757                            .await?;
758                        tracing::info!(
759                            "Successfully submitted take profit market order: {}",
760                            client_order_id
761                        );
762                    }
763                    // dYdX TakeProfitLimit maps to Nautilus LimitIfTouched
764                    OrderType::LimitIfTouched => {
765                        let trigger_price = order_clone.trigger_price().ok_or_else(|| {
766                            anyhow::anyhow!("Take profit limit order missing trigger_price")
767                        })?;
768                        let limit_price = order_clone.price().ok_or_else(|| {
769                            anyhow::anyhow!("Take profit limit order missing limit price")
770                        })?;
771                        let expire_time = order_clone.expire_time().map(nanos_to_secs_i64);
772                        submitter
773                            .submit_take_profit_limit_order(
774                                wallet_ref,
775                                instrument_id,
776                                client_id_u32,
777                                order_clone.order_side(),
778                                trigger_price,
779                                limit_price,
780                                order_clone.quantity(),
781                                order_clone.time_in_force(),
782                                order_clone.is_post_only(),
783                                order_clone.is_reduce_only(),
784                                expire_time,
785                            )
786                            .await?;
787                        tracing::info!(
788                            "Successfully submitted take profit limit order: {}",
789                            client_order_id
790                        );
791                    }
792                    _ => unreachable!("Order type already validated"),
793                }
794
795                Ok(())
796            },
797        );
798
799        Ok(())
800    }
801
802    fn submit_order_list(&self, _cmd: &SubmitOrderList) -> anyhow::Result<()> {
803        anyhow::bail!("Order lists not supported by dYdX")
804    }
805
806    fn modify_order(&self, _cmd: &ModifyOrder) -> anyhow::Result<()> {
807        anyhow::bail!("Order modification not supported by dYdX")
808    }
809
810    /// Cancels an order on dYdX exchange.
811    ///
812    /// Validates the order state and retrieves instrument details before
813    /// spawning an async task to cancel via gRPC.
814    ///
815    /// # Validation
816    /// - Checks order exists in cache
817    /// - Validates order is not already closed
818    /// - Retrieves instrument from cache for order builder
819    ///
820    /// The `cmd` contains client/venue order IDs. Returns `Ok(())` if cancel request is
821    /// spawned successfully or validation fails gracefully. Returns `Err` if not connected.
822    ///
823    /// # Events
824    /// - `OrderCanceled` - Generated when WebSocket confirms cancellation
825    /// - `OrderCancelRejected` - Generated if exchange rejects cancellation
826    fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
827        if !self.is_connected() {
828            anyhow::bail!("Cannot cancel order: not connected");
829        }
830
831        let client_order_id = cmd.client_order_id;
832
833        // Validate order exists in cache and is not closed
834        let cache = self.core.cache();
835        let cache_borrow = cache.borrow();
836
837        let order = match cache_borrow.order(&client_order_id) {
838            Some(order) => order,
839            None => {
840                tracing::error!(
841                    "Cannot cancel order {}: not found in cache",
842                    client_order_id
843                );
844                return Ok(()); // Not an error - order may have been filled/canceled already
845            }
846        };
847
848        // Validate order is not already closed
849        if order.is_closed() {
850            tracing::warn!(
851                "CancelOrder command for {} when order already {} (will not send to exchange)",
852                client_order_id,
853                order.status()
854            );
855            return Ok(());
856        }
857
858        // Retrieve instrument from cache
859        let instrument_id = cmd.instrument_id;
860        let instrument = match cache_borrow.instrument(&instrument_id) {
861            Some(instrument) => instrument,
862            None => {
863                tracing::error!(
864                    "Cannot cancel order {}: instrument {} not found in cache",
865                    client_order_id,
866                    instrument_id
867                );
868                return Ok(()); // Not an error - missing instrument is a cache issue
869            }
870        };
871
872        tracing::debug!(
873            "Cancelling order {} for instrument {}",
874            client_order_id,
875            instrument.id()
876        );
877
878        let grpc_client = self.grpc_client.clone();
879        let wallet = self.wallet.clone();
880        let http_client = self.http_client.clone();
881        let wallet_address = self.wallet_address.clone();
882        let subaccount_number = self.subaccount_number;
883        let block_height = self.block_height.load(std::sync::atomic::Ordering::Relaxed) as u32;
884        let chain_id = self.get_chain_id();
885        let authenticator_ids = self.config.authenticator_ids.clone();
886        let trader_id = cmd.trader_id;
887        let strategy_id = cmd.strategy_id;
888        let venue_order_id = cmd.venue_order_id;
889
890        // Convert client_order_id to u32 before async block
891        let client_id_u32 = match self.get_client_order_id_int(client_order_id.as_str()) {
892            Some(id) => id,
893            None => {
894                tracing::error!("Client order ID {} not found in cache", client_order_id);
895                anyhow::bail!("Client order ID not found in cache")
896            }
897        };
898
899        self.spawn_task("cancel_order", async move {
900            let wallet_guard = wallet.read().await;
901            let wallet_ref = wallet_guard
902                .as_ref()
903                .ok_or_else(|| anyhow::anyhow!("Wallet not initialized"))?;
904
905            let grpc_guard = grpc_client.read().await;
906            let submitter = OrderSubmitter::new(
907                (*grpc_guard).clone(),
908                http_client.clone(),
909                wallet_address,
910                subaccount_number,
911                chain_id,
912                authenticator_ids,
913            );
914
915            // Attempt cancellation via submitter
916            match submitter
917                .cancel_order(wallet_ref, instrument_id, client_id_u32, block_height)
918                .await
919            {
920                Ok(_) => {
921                    tracing::info!("Successfully cancelled order: {}", client_order_id);
922                }
923                Err(e) => {
924                    tracing::error!("Failed to cancel order {}: {:?}", client_order_id, e);
925
926                    // Generate OrderCancelRejected event
927                    let sender = get_exec_event_sender();
928                    let ts_now = UnixNanos::default();
929                    let event = OrderCancelRejected::new(
930                        trader_id,
931                        strategy_id,
932                        instrument_id,
933                        client_order_id,
934                        format!("Cancel order failed: {e:?}").into(),
935                        UUID4::new(),
936                        ts_now,
937                        ts_now,
938                        false,
939                        Some(venue_order_id),
940                        None, // account_id not available in async context
941                    );
942                    sender
943                        .send(ExecutionEvent::Order(OrderEventAny::CancelRejected(event)))
944                        .unwrap();
945                }
946            }
947
948            Ok(())
949        });
950
951        Ok(())
952    }
953
954    fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
955        if !self.is_connected() {
956            anyhow::bail!("Cannot cancel orders: not connected");
957        }
958
959        // Query all open orders from cache
960        let cache = self.core.cache().borrow();
961        let mut open_orders: Vec<_> = cache
962            .orders_open(None, None, None, None)
963            .into_iter()
964            .collect();
965
966        // Filter by instrument_id (always specified in command)
967        let instrument_id = cmd.instrument_id;
968        open_orders.retain(|order| order.instrument_id() == instrument_id);
969
970        // Filter by order_side if specified (NoOrderSide means all sides)
971        if cmd.order_side != OrderSide::NoOrderSide {
972            let order_side = cmd.order_side;
973            open_orders.retain(|order| order.order_side() == order_side);
974        }
975
976        // Split orders into short-term and long-term based on TimeInForce
977        // Short-term: IOC, FOK (expire by block height)
978        // Long-term: GTC, GTD, DAY, POST_ONLY (expire by timestamp)
979        let mut short_term_orders = Vec::new();
980        let mut long_term_orders = Vec::new();
981
982        for order in &open_orders {
983            match order.time_in_force() {
984                TimeInForce::Ioc | TimeInForce::Fok => short_term_orders.push(order),
985                TimeInForce::Gtc
986                | TimeInForce::Gtd
987                | TimeInForce::Day
988                | TimeInForce::AtTheOpen
989                | TimeInForce::AtTheClose => long_term_orders.push(order),
990            }
991        }
992
993        tracing::info!(
994            "Cancel all orders: total={}, short_term={}, long_term={}, instrument_id={}, order_side={:?}",
995            open_orders.len(),
996            short_term_orders.len(),
997            long_term_orders.len(),
998            instrument_id,
999            cmd.order_side
1000        );
1001
1002        // Cancel each order individually (dYdX requires separate transactions)
1003        let grpc_client = self.grpc_client.clone();
1004        let wallet = self.wallet.clone();
1005        let http_client = self.http_client.clone();
1006        let wallet_address = self.wallet_address.clone();
1007        let subaccount_number = self.subaccount_number;
1008        let block_height = self.block_height.load(std::sync::atomic::Ordering::Relaxed) as u32;
1009        let chain_id = self.get_chain_id();
1010        let authenticator_ids = self.config.authenticator_ids.clone();
1011
1012        // Collect (instrument_id, client_id) tuples for batch cancel
1013        let mut orders_to_cancel = Vec::new();
1014        for order in &open_orders {
1015            let client_order_id = order.client_order_id();
1016            if let Some(client_id_u32) = self.get_client_order_id_int(client_order_id.as_str()) {
1017                orders_to_cancel.push((instrument_id, client_id_u32));
1018            } else {
1019                tracing::warn!(
1020                    "Cannot cancel order {}: client_order_id not found in cache",
1021                    client_order_id
1022                );
1023            }
1024        }
1025
1026        self.spawn_task("cancel_all_orders", async move {
1027            let wallet_guard = wallet.read().await;
1028            let wallet_ref = wallet_guard
1029                .as_ref()
1030                .ok_or_else(|| anyhow::anyhow!("Wallet not initialized"))?;
1031
1032            let grpc_guard = grpc_client.read().await;
1033            let submitter = OrderSubmitter::new(
1034                (*grpc_guard).clone(),
1035                http_client.clone(),
1036                wallet_address,
1037                subaccount_number,
1038                chain_id,
1039                authenticator_ids,
1040            );
1041
1042            // Cancel orders using batch method (executes sequentially to avoid nonce conflicts)
1043            match submitter
1044                .cancel_orders_batch(wallet_ref, &orders_to_cancel, block_height)
1045                .await
1046            {
1047                Ok(_) => {
1048                    tracing::info!("Successfully cancelled {} orders", orders_to_cancel.len());
1049                }
1050                Err(e) => {
1051                    tracing::error!("Batch cancel failed: {:?}", e);
1052                }
1053            }
1054
1055            Ok(())
1056        });
1057
1058        Ok(())
1059    }
1060
1061    fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
1062        if cmd.cancels.is_empty() {
1063            return Ok(());
1064        }
1065
1066        if !self.is_connected() {
1067            anyhow::bail!("Cannot cancel orders: not connected");
1068        }
1069
1070        // Convert ClientOrderIds to u32 before async block
1071        let mut orders_to_cancel = Vec::with_capacity(cmd.cancels.len());
1072        for cancel in &cmd.cancels {
1073            let client_id_str = cancel.client_order_id.as_str();
1074            let client_id_u32 = match self.get_client_order_id_int(client_id_str) {
1075                Some(id) => id,
1076                None => {
1077                    tracing::warn!(
1078                        "No u32 mapping found for client_order_id={}, skipping cancel",
1079                        client_id_str
1080                    );
1081                    continue;
1082                }
1083            };
1084            orders_to_cancel.push((cancel.instrument_id, client_id_u32));
1085        }
1086
1087        if orders_to_cancel.is_empty() {
1088            tracing::warn!("No valid orders to cancel in batch");
1089            return Ok(());
1090        }
1091
1092        let grpc_client = self.grpc_client.clone();
1093        let wallet = self.wallet.clone();
1094        let http_client = self.http_client.clone();
1095        let wallet_address = self.wallet_address.clone();
1096        let subaccount_number = self.subaccount_number;
1097        let block_height = self.block_height.load(std::sync::atomic::Ordering::Relaxed) as u32;
1098        let chain_id = self.get_chain_id();
1099        let authenticator_ids = self.config.authenticator_ids.clone();
1100
1101        tracing::info!(
1102            "Batch cancelling {} orders: {:?}",
1103            orders_to_cancel.len(),
1104            orders_to_cancel
1105        );
1106
1107        self.spawn_task("batch_cancel_orders", async move {
1108            let wallet_guard = wallet.read().await;
1109            let wallet_ref = wallet_guard
1110                .as_ref()
1111                .ok_or_else(|| anyhow::anyhow!("Wallet not initialized"))?;
1112
1113            let grpc_guard = grpc_client.read().await;
1114            let submitter = OrderSubmitter::new(
1115                (*grpc_guard).clone(),
1116                http_client.clone(),
1117                wallet_address,
1118                subaccount_number,
1119                chain_id,
1120                authenticator_ids,
1121            );
1122
1123            match submitter
1124                .cancel_orders_batch(wallet_ref, &orders_to_cancel, block_height)
1125                .await
1126            {
1127                Ok(()) => {
1128                    tracing::info!(
1129                        "Successfully batch cancelled {} orders",
1130                        orders_to_cancel.len()
1131                    );
1132                }
1133                Err(e) => {
1134                    tracing::error!("Batch cancel failed: {:?}", e);
1135                }
1136            }
1137
1138            Ok(())
1139        });
1140
1141        Ok(())
1142    }
1143
1144    fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
1145        Ok(())
1146    }
1147
1148    fn query_order(&self, _cmd: &QueryOrder) -> anyhow::Result<()> {
1149        Ok(())
1150    }
1151
1152    async fn connect(&mut self) -> anyhow::Result<()> {
1153        if self.connected {
1154            tracing::warn!("dYdX execution client already connected");
1155            return Ok(());
1156        }
1157
1158        tracing::info!("Connecting to dYdX");
1159
1160        // Load instruments BEFORE WebSocket connection
1161        // Per Python implementation: "instruments are used in the first account channel message"
1162        tracing::debug!("Loading instruments from HTTP API");
1163        self.http_client.fetch_and_cache_instruments().await?;
1164        tracing::info!(
1165            "Loaded {} instruments from HTTP",
1166            self.http_client.instruments_cache.len()
1167        );
1168
1169        // Populate execution client's instrument lookup maps
1170        self.cache_instruments_from_http();
1171
1172        // Initialize wallet from config if mnemonic is provided
1173        if let Some(mnemonic) = &self.config.mnemonic {
1174            let wallet = Wallet::from_mnemonic(mnemonic)?;
1175            *self.wallet.write().await = Some(wallet);
1176            tracing::debug!("Wallet initialized");
1177        }
1178
1179        // Connect WebSocket
1180        self.ws_client.connect().await?;
1181        tracing::debug!("WebSocket connected");
1182
1183        // Subscribe to block height updates
1184        self.ws_client.subscribe_block_height().await?;
1185        tracing::debug!("Subscribed to block height updates");
1186
1187        // Subscribe to markets for instrument data
1188        self.ws_client.subscribe_markets().await?;
1189        tracing::debug!("Subscribed to markets");
1190
1191        // Subscribe to subaccount updates if authenticated
1192        if self.config.mnemonic.is_some() {
1193            self.ws_client
1194                .subscribe_subaccount(&self.wallet_address, self.subaccount_number)
1195                .await?;
1196            tracing::debug!(
1197                "Subscribed to subaccount updates: {}/{}",
1198                self.wallet_address,
1199                self.subaccount_number
1200            );
1201
1202            // Spawn WebSocket message processing task following standard adapter pattern
1203            // Per docs/developer_guide/adapters.md: Parse -> Dispatch -> Engine handles events
1204            if let Some(mut rx) = self.ws_client.take_receiver() {
1205                // Clone data needed for account state parsing in spawned task
1206                let account_id = self.core.account_id;
1207                let instruments = self.instruments.clone();
1208                let oracle_prices = self.oracle_prices.clone();
1209                let clob_pair_id_to_instrument = self.clob_pair_id_to_instrument.clone();
1210
1211                let handle = tokio::spawn(async move {
1212                    while let Some(msg) = rx.recv().await {
1213                        match msg {
1214                            NautilusWsMessage::Order(report) => {
1215                                tracing::debug!("Received order update: {:?}", report.order_status);
1216                                dispatch_execution_report(ExecutionReport::Order(report));
1217                            }
1218                            NautilusWsMessage::Fill(report) => {
1219                                tracing::debug!("Received fill update");
1220                                dispatch_execution_report(ExecutionReport::Fill(report));
1221                            }
1222                            NautilusWsMessage::Position(report) => {
1223                                tracing::debug!("Received position update");
1224                                // Dispatch position status reports via execution event system
1225                                let sender = get_exec_event_sender();
1226                                let exec_report =
1227                                    NautilusExecutionReport::Position(Box::new(*report));
1228                                if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1229                                    tracing::warn!("Failed to send position status report: {e}");
1230                                }
1231                            }
1232                            NautilusWsMessage::AccountState(state) => {
1233                                tracing::debug!("Received account state update");
1234                                dispatch_account_state(*state);
1235                            }
1236                            NautilusWsMessage::SubaccountSubscribed(msg) => {
1237                                tracing::debug!(
1238                                    "Parsing subaccount subscription with full context"
1239                                );
1240
1241                                // Build instruments map for parsing (clone to avoid lifetime issues)
1242                                let inst_map: std::collections::HashMap<_, _> = instruments
1243                                    .iter()
1244                                    .map(|entry| (*entry.key(), entry.value().clone()))
1245                                    .collect();
1246
1247                                // Build oracle prices map (copy Decimals)
1248                                let oracle_map: std::collections::HashMap<_, _> = oracle_prices
1249                                    .iter()
1250                                    .map(|entry| (*entry.key(), *entry.value()))
1251                                    .collect();
1252
1253                                let ts_init =
1254                                    nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
1255                                let ts_event = ts_init;
1256
1257                                match crate::http::parse::parse_account_state(
1258                                    &msg.contents.subaccount,
1259                                    account_id,
1260                                    &inst_map,
1261                                    &oracle_map,
1262                                    ts_event,
1263                                    ts_init,
1264                                ) {
1265                                    Ok(account_state) => {
1266                                        tracing::info!(
1267                                            "Parsed account state: {} balance(s), {} margin(s)",
1268                                            account_state.balances.len(),
1269                                            account_state.margins.len()
1270                                        );
1271                                        dispatch_account_state(account_state);
1272                                    }
1273                                    Err(e) => {
1274                                        tracing::error!("Failed to parse account state: {e}");
1275                                    }
1276                                }
1277
1278                                // Parse positions from the subscription
1279                                if let Some(ref positions) =
1280                                    msg.contents.subaccount.open_perpetual_positions
1281                                {
1282                                    tracing::debug!(
1283                                        "Parsing {} position(s) from subscription",
1284                                        positions.len()
1285                                    );
1286
1287                                    for (market, ws_position) in positions {
1288                                        match crate::websocket::parse::parse_ws_position_report(
1289                                            ws_position,
1290                                            &instruments,
1291                                            account_id,
1292                                            ts_init,
1293                                        ) {
1294                                            Ok(report) => {
1295                                                tracing::debug!(
1296                                                    "Parsed position report: {} {} {} {}",
1297                                                    report.instrument_id,
1298                                                    report.position_side,
1299                                                    report.quantity,
1300                                                    market
1301                                                );
1302                                                let sender = get_exec_event_sender();
1303                                                let exec_report = NautilusExecutionReport::Position(
1304                                                    Box::new(report),
1305                                                );
1306                                                if let Err(e) =
1307                                                    sender.send(ExecutionEvent::Report(exec_report))
1308                                                {
1309                                                    tracing::warn!(
1310                                                        "Failed to send position status report: {e}"
1311                                                    );
1312                                                }
1313                                            }
1314                                            Err(e) => {
1315                                                tracing::error!(
1316                                                    "Failed to parse WebSocket position for {}: {e}",
1317                                                    market
1318                                                );
1319                                            }
1320                                        }
1321                                    }
1322                                }
1323                            }
1324                            NautilusWsMessage::SubaccountsChannelData(data) => {
1325                                tracing::debug!("Processing subaccounts channel data");
1326                                let ts_init =
1327                                    nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
1328
1329                                // Process orders
1330                                if let Some(ref orders) = data.contents.orders {
1331                                    for ws_order in orders {
1332                                        match crate::websocket::parse::parse_ws_order_report(
1333                                            ws_order,
1334                                            &clob_pair_id_to_instrument,
1335                                            &instruments,
1336                                            account_id,
1337                                            ts_init,
1338                                        ) {
1339                                            Ok(report) => {
1340                                                tracing::debug!(
1341                                                    "Parsed order report: {} {} {} @ {}",
1342                                                    report.instrument_id,
1343                                                    report.order_side,
1344                                                    report.order_status,
1345                                                    report.quantity
1346                                                );
1347                                                let sender = get_exec_event_sender();
1348                                                let exec_report =
1349                                                    NautilusExecutionReport::OrderStatus(Box::new(
1350                                                        report,
1351                                                    ));
1352                                                if let Err(e) =
1353                                                    sender.send(ExecutionEvent::Report(exec_report))
1354                                                {
1355                                                    tracing::warn!(
1356                                                        "Failed to send order status report: {e}"
1357                                                    );
1358                                                }
1359                                            }
1360                                            Err(e) => {
1361                                                tracing::error!(
1362                                                    "Failed to parse WebSocket order: {e}"
1363                                                );
1364                                            }
1365                                        }
1366                                    }
1367                                }
1368
1369                                // Process fills
1370                                if let Some(ref fills) = data.contents.fills {
1371                                    for ws_fill in fills {
1372                                        match crate::websocket::parse::parse_ws_fill_report(
1373                                            ws_fill,
1374                                            &instruments,
1375                                            account_id,
1376                                            ts_init,
1377                                        ) {
1378                                            Ok(report) => {
1379                                                tracing::debug!(
1380                                                    "Parsed fill report: {} {} {} @ {}",
1381                                                    report.instrument_id,
1382                                                    report.venue_order_id,
1383                                                    report.last_qty,
1384                                                    report.last_px
1385                                                );
1386                                                let sender = get_exec_event_sender();
1387                                                let exec_report =
1388                                                    NautilusExecutionReport::Fill(Box::new(report));
1389                                                if let Err(e) =
1390                                                    sender.send(ExecutionEvent::Report(exec_report))
1391                                                {
1392                                                    tracing::warn!(
1393                                                        "Failed to send fill report: {e}"
1394                                                    );
1395                                                }
1396                                            }
1397                                            Err(e) => {
1398                                                tracing::error!(
1399                                                    "Failed to parse WebSocket fill: {e}"
1400                                                );
1401                                            }
1402                                        }
1403                                    }
1404                                }
1405                            }
1406                            NautilusWsMessage::OraclePrices(oracle_prices_map) => {
1407                                tracing::debug!(
1408                                    "Processing oracle price updates for {} markets",
1409                                    oracle_prices_map.len()
1410                                );
1411
1412                                // Update oracle_prices map with new prices
1413                                for (market_symbol, oracle_data) in &oracle_prices_map {
1414                                    // Parse oracle price
1415                                    match oracle_data.oracle_price.parse::<rust_decimal::Decimal>()
1416                                    {
1417                                        Ok(price) => {
1418                                            // Find instrument by symbol (oracle uses raw symbol like "BTC-USD")
1419                                            // Append "-PERP" to match instrument IDs
1420                                            let symbol_with_perp = format!("{market_symbol}-PERP");
1421
1422                                            // Find matching instrument
1423                                            if let Some(entry) = instruments.iter().find(|entry| {
1424                                                entry.value().id().symbol.as_str()
1425                                                    == symbol_with_perp
1426                                            }) {
1427                                                let instrument_id = *entry.key();
1428                                                oracle_prices.insert(instrument_id, price);
1429                                                tracing::trace!(
1430                                                    "Updated oracle price for {}: {}",
1431                                                    instrument_id,
1432                                                    price
1433                                                );
1434                                            } else {
1435                                                tracing::debug!(
1436                                                    "No instrument found for market symbol '{}' (tried '{}')",
1437                                                    market_symbol,
1438                                                    symbol_with_perp
1439                                                );
1440                                            }
1441                                        }
1442                                        Err(e) => {
1443                                            tracing::warn!(
1444                                                "Failed to parse oracle price for {}: {}",
1445                                                market_symbol,
1446                                                e
1447                                            );
1448                                        }
1449                                    }
1450                                }
1451                            }
1452                            NautilusWsMessage::Error(err) => {
1453                                tracing::error!("WebSocket error: {:?}", err);
1454                            }
1455                            NautilusWsMessage::Reconnected => {
1456                                tracing::info!("WebSocket reconnected");
1457                            }
1458                            _ => {
1459                                // Data, Deltas are for market data, not execution
1460                            }
1461                        }
1462                    }
1463                    tracing::info!("WebSocket message processing task ended");
1464                });
1465
1466                self.ws_stream_handle = Some(handle);
1467                tracing::debug!("Spawned WebSocket message processing task");
1468            }
1469        }
1470        self.connected = true;
1471        tracing::info!(client_id = %self.core.client_id, "Connected");
1472        Ok(())
1473    }
1474
1475    async fn disconnect(&mut self) -> anyhow::Result<()> {
1476        if !self.connected {
1477            tracing::warn!("dYdX execution client not connected");
1478            return Ok(());
1479        }
1480
1481        tracing::info!("Disconnecting from dYdX");
1482
1483        // Unsubscribe from subaccount updates if authenticated
1484        if self.config.mnemonic.is_some() {
1485            let _ = self
1486                .ws_client
1487                .unsubscribe_subaccount(&self.wallet_address, self.subaccount_number)
1488                .await
1489                .map_err(|e| tracing::warn!("Failed to unsubscribe from subaccount: {e}"));
1490        }
1491
1492        // Unsubscribe from markets
1493        let _ = self
1494            .ws_client
1495            .unsubscribe_markets()
1496            .await
1497            .map_err(|e| tracing::warn!("Failed to unsubscribe from markets: {e}"));
1498
1499        // Unsubscribe from block height
1500        let _ = self
1501            .ws_client
1502            .unsubscribe_block_height()
1503            .await
1504            .map_err(|e| tracing::warn!("Failed to unsubscribe from block height: {e}"));
1505
1506        // Disconnect WebSocket
1507        self.ws_client.disconnect().await?;
1508
1509        // Abort WebSocket message processing task
1510        if let Some(handle) = self.ws_stream_handle.take() {
1511            handle.abort();
1512            tracing::debug!("Aborted WebSocket message processing task");
1513        }
1514
1515        // Abort any pending tasks
1516        self.abort_pending_tasks();
1517
1518        self.connected = false;
1519        tracing::info!(client_id = %self.core.client_id, "Disconnected");
1520        Ok(())
1521    }
1522}
1523
1524/// Dispatches account state events to the portfolio.
1525///
1526/// AccountState events are routed to the Portfolio (not ExecEngine) via msgbus.
1527/// This follows the pattern used by BitMEX, OKX, and other reference adapters.
1528fn dispatch_account_state(state: AccountState) {
1529    use std::any::Any;
1530    msgbus::send_any("Portfolio.update_account".into(), &state as &dyn Any);
1531}
1532
1533/// Dispatches execution reports to the execution engine.
1534///
1535/// This follows the standard adapter pattern where WebSocket handlers parse messages
1536/// into reports, and a dispatch function sends them via the execution event system.
1537/// The execution engine then handles cache lookups and event generation.
1538///
1539/// # Architecture
1540///
1541/// Per `docs/developer_guide/adapters.md`, adapters should:
1542/// 1. Parse WebSocket messages into ExecutionReports in the handler
1543/// 2. Dispatch reports via `get_exec_event_sender()`
1544/// 3. Let the execution engine handle event generation (has cache access)
1545///
1546/// This pattern is used by Hyperliquid, OKX, BitMEX, and other reference adapters.
1547fn dispatch_execution_report(report: ExecutionReport) {
1548    let sender = get_exec_event_sender();
1549    match report {
1550        ExecutionReport::Order(order_report) => {
1551            tracing::debug!(
1552                "Dispatching order report: status={:?}, venue_order_id={:?}, client_order_id={:?}",
1553                order_report.order_status,
1554                order_report.venue_order_id,
1555                order_report.client_order_id
1556            );
1557            let exec_report = NautilusExecutionReport::OrderStatus(order_report);
1558            if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1559                tracing::warn!("Failed to send order status report: {e}");
1560            }
1561        }
1562        ExecutionReport::Fill(fill_report) => {
1563            tracing::debug!(
1564                "Dispatching fill report: venue_order_id={}, trade_id={}",
1565                fill_report.venue_order_id,
1566                fill_report.trade_id
1567            );
1568            let exec_report = NautilusExecutionReport::Fill(fill_report);
1569            if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1570                tracing::warn!("Failed to send fill report: {e}");
1571            }
1572        }
1573    }
1574}
1575
1576#[async_trait(?Send)]
1577impl LiveExecutionClient for DydxExecutionClient {
1578    async fn generate_order_status_report(
1579        &self,
1580        cmd: &GenerateOrderStatusReport,
1581    ) -> anyhow::Result<Option<OrderStatusReport>> {
1582        use anyhow::Context;
1583
1584        // Query single order from dYdX API
1585        let response = self
1586            .http_client
1587            .inner
1588            .get_orders(
1589                &self.wallet_address,
1590                self.subaccount_number,
1591                None,    // market filter
1592                Some(1), // limit to 1 result
1593            )
1594            .await
1595            .context("failed to fetch order from dYdX API")?;
1596
1597        if response.is_empty() {
1598            return Ok(None);
1599        }
1600
1601        let order = &response[0];
1602        let ts_init = UnixNanos::default();
1603
1604        // Get instrument by clob_pair_id
1605        let instrument = match self.get_instrument_by_clob_pair_id(order.clob_pair_id) {
1606            Some(inst) => inst,
1607            None => return Ok(None),
1608        };
1609
1610        // Parse to OrderStatusReport
1611        let report = crate::http::parse::parse_order_status_report(
1612            order,
1613            &instrument,
1614            self.core.account_id,
1615            ts_init,
1616        )
1617        .context("failed to parse order status report")?;
1618
1619        // Filter by client_order_id if specified
1620        if let Some(client_order_id) = cmd.client_order_id
1621            && report.client_order_id != Some(client_order_id)
1622        {
1623            return Ok(None);
1624        }
1625
1626        // Filter by venue_order_id if specified
1627        if let Some(venue_order_id) = cmd.venue_order_id
1628            && report.venue_order_id.as_str() != venue_order_id.as_str()
1629        {
1630            return Ok(None);
1631        }
1632
1633        // Filter by instrument_id if specified
1634        if let Some(instrument_id) = cmd.instrument_id
1635            && report.instrument_id != instrument_id
1636        {
1637            return Ok(None);
1638        }
1639
1640        Ok(Some(report))
1641    }
1642
1643    async fn generate_order_status_reports(
1644        &self,
1645        cmd: &GenerateOrderStatusReport,
1646    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1647        use anyhow::Context;
1648
1649        // Query orders from dYdX API
1650        let response = self
1651            .http_client
1652            .inner
1653            .get_orders(
1654                &self.wallet_address,
1655                self.subaccount_number,
1656                None, // market filter
1657                None, // limit
1658            )
1659            .await
1660            .context("failed to fetch orders from dYdX API")?;
1661
1662        let mut reports = Vec::new();
1663        let ts_init = UnixNanos::default();
1664
1665        for order in response {
1666            // Get instrument by clob_pair_id using efficient lookup
1667            let instrument = match self.get_instrument_by_clob_pair_id(order.clob_pair_id) {
1668                Some(inst) => inst,
1669                None => continue,
1670            };
1671
1672            // Filter by instrument_id if specified
1673            if let Some(filter_id) = cmd.instrument_id
1674                && instrument.id() != filter_id
1675            {
1676                continue;
1677            }
1678
1679            // Parse to OrderStatusReport
1680            match crate::http::parse::parse_order_status_report(
1681                &order,
1682                &instrument,
1683                self.core.account_id,
1684                ts_init,
1685            ) {
1686                Ok(report) => {
1687                    // Filter by client_order_id if specified
1688                    if let Some(client_order_id) = cmd.client_order_id
1689                        && report.client_order_id != Some(client_order_id)
1690                    {
1691                        continue;
1692                    }
1693
1694                    // Filter by venue_order_id if specified
1695                    if let Some(venue_order_id) = cmd.venue_order_id
1696                        && report.venue_order_id.as_str() != venue_order_id.as_str()
1697                    {
1698                        continue;
1699                    }
1700
1701                    reports.push(report);
1702                }
1703                Err(e) => tracing::error!("Failed to parse order status report: {e}"),
1704            }
1705        }
1706
1707        tracing::info!("Generated {} order status reports", reports.len());
1708        Ok(reports)
1709    }
1710
1711    async fn generate_fill_reports(
1712        &self,
1713        cmd: GenerateFillReports,
1714    ) -> anyhow::Result<Vec<FillReport>> {
1715        use anyhow::Context;
1716
1717        // Query fills from dYdX API
1718        let response = self
1719            .http_client
1720            .inner
1721            .get_fills(
1722                &self.wallet_address,
1723                self.subaccount_number,
1724                None, // market filter
1725                None, // limit
1726            )
1727            .await
1728            .context("failed to fetch fills from dYdX API")?;
1729
1730        let mut reports = Vec::new();
1731        let ts_init = UnixNanos::default();
1732
1733        for fill in response.fills {
1734            // Get instrument by market ticker using efficient lookup
1735            let instrument = match self.get_instrument_by_market(&fill.market) {
1736                Some(inst) => inst,
1737                None => {
1738                    tracing::warn!(
1739                        "Instrument for market {} not found in cache, skipping fill {}",
1740                        fill.market,
1741                        fill.id
1742                    );
1743                    continue;
1744                }
1745            };
1746
1747            // Filter by instrument_id if specified
1748            if let Some(filter_id) = cmd.instrument_id
1749                && instrument.id() != filter_id
1750            {
1751                continue;
1752            }
1753
1754            // Parse to FillReport
1755            match crate::http::parse::parse_fill_report(
1756                &fill,
1757                &instrument,
1758                self.core.account_id,
1759                ts_init,
1760            ) {
1761                Ok(report) => {
1762                    // Filter by venue_order_id if specified
1763                    if let Some(venue_order_id) = cmd.venue_order_id
1764                        && report.venue_order_id.as_str() != venue_order_id.as_str()
1765                    {
1766                        continue;
1767                    }
1768
1769                    // Filter by time range if specified
1770                    if let (Some(start), Some(end)) = (cmd.start, cmd.end) {
1771                        if report.ts_event >= start && report.ts_event <= end {
1772                            reports.push(report);
1773                        }
1774                    } else if let Some(start) = cmd.start {
1775                        if report.ts_event >= start {
1776                            reports.push(report);
1777                        }
1778                    } else if let Some(end) = cmd.end {
1779                        if report.ts_event <= end {
1780                            reports.push(report);
1781                        }
1782                    } else {
1783                        reports.push(report);
1784                    }
1785                }
1786                Err(e) => tracing::error!("Failed to parse fill report: {e}"),
1787            }
1788        }
1789
1790        tracing::info!("Generated {} fill reports", reports.len());
1791        Ok(reports)
1792    }
1793
1794    async fn generate_position_status_reports(
1795        &self,
1796        cmd: &GeneratePositionReports,
1797    ) -> anyhow::Result<Vec<PositionStatusReport>> {
1798        use anyhow::Context;
1799
1800        // Query subaccount data from dYdX API to get positions
1801        let response = self
1802            .http_client
1803            .inner
1804            .get_subaccount(&self.wallet_address, self.subaccount_number)
1805            .await
1806            .context("failed to fetch subaccount from dYdX API")?;
1807
1808        let mut reports = Vec::new();
1809        let ts_init = UnixNanos::default();
1810
1811        // Iterate through open perpetual positions
1812        for (market_ticker, position) in &response.subaccount.open_perpetual_positions {
1813            // Get instrument by market ticker using efficient lookup
1814            let instrument = match self.get_instrument_by_market(market_ticker) {
1815                Some(inst) => inst,
1816                None => {
1817                    tracing::warn!(
1818                        "Instrument for market {} not found in cache, skipping position",
1819                        market_ticker
1820                    );
1821                    continue;
1822                }
1823            };
1824
1825            // Filter by instrument_id if specified
1826            if let Some(filter_id) = cmd.instrument_id
1827                && instrument.id() != filter_id
1828            {
1829                continue;
1830            }
1831
1832            // Parse to PositionStatusReport
1833            match crate::http::parse::parse_position_status_report(
1834                position,
1835                &instrument,
1836                self.core.account_id,
1837                ts_init,
1838            ) {
1839                Ok(report) => reports.push(report),
1840                Err(e) => {
1841                    tracing::error!("Failed to parse position status report: {e}");
1842                }
1843            }
1844        }
1845
1846        tracing::info!("Generated {} position status reports", reports.len());
1847        Ok(reports)
1848    }
1849
1850    async fn generate_mass_status(
1851        &self,
1852        lookback_mins: Option<u64>,
1853    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
1854        use anyhow::Context;
1855
1856        tracing::info!(
1857            "Generating mass execution status{}",
1858            lookback_mins.map_or_else(
1859                || " (unbounded)".to_string(),
1860                |mins| format!(" (lookback: {mins} minutes)")
1861            )
1862        );
1863
1864        // Calculate cutoff time if lookback is specified
1865        let cutoff_time = lookback_mins.map(|mins| Utc::now() - Duration::minutes(mins as i64));
1866
1867        // Query all orders
1868        let orders_response = self
1869            .http_client
1870            .inner
1871            .get_orders(&self.wallet_address, self.subaccount_number, None, None)
1872            .await
1873            .context("failed to fetch orders for mass status")?;
1874
1875        // Query subaccount for positions
1876        let subaccount_response = self
1877            .http_client
1878            .inner
1879            .get_subaccount(&self.wallet_address, self.subaccount_number)
1880            .await
1881            .context("failed to fetch subaccount for mass status")?;
1882
1883        // Query fills
1884        let fills_response = self
1885            .http_client
1886            .inner
1887            .get_fills(&self.wallet_address, self.subaccount_number, None, None)
1888            .await
1889            .context("failed to fetch fills for mass status")?;
1890
1891        let ts_init = UnixNanos::default();
1892        let mut order_reports = Vec::new();
1893        let mut position_reports = Vec::new();
1894        let mut fill_reports = Vec::new();
1895
1896        // Counters for logging (only used when filtering is active)
1897        let mut orders_filtered = 0;
1898        let mut fills_filtered = 0;
1899
1900        // Parse orders (with optional time filtering)
1901        for order in orders_response {
1902            // Filter by time window if specified (use updated_at for orders)
1903            if let Some(cutoff) = cutoff_time
1904                && order.updated_at.is_some_and(|dt| dt < cutoff)
1905            {
1906                orders_filtered += 1;
1907                continue;
1908            }
1909
1910            if let Some(instrument) = self.get_instrument_by_clob_pair_id(order.clob_pair_id) {
1911                match crate::http::parse::parse_order_status_report(
1912                    &order,
1913                    &instrument,
1914                    self.core.account_id,
1915                    ts_init,
1916                ) {
1917                    Ok(report) => order_reports.push(report),
1918                    Err(e) => tracing::error!("Failed to parse order in mass status: {e}"),
1919                }
1920            }
1921        }
1922
1923        // Parse positions (no time filtering - positions are current state)
1924        for (market_ticker, position) in &subaccount_response.subaccount.open_perpetual_positions {
1925            if let Some(instrument) = self.get_instrument_by_market(market_ticker) {
1926                match crate::http::parse::parse_position_status_report(
1927                    position,
1928                    &instrument,
1929                    self.core.account_id,
1930                    ts_init,
1931                ) {
1932                    Ok(report) => position_reports.push(report),
1933                    Err(e) => tracing::error!("Failed to parse position in mass status: {e}"),
1934                }
1935            }
1936        }
1937
1938        // Parse fills (with optional time filtering)
1939        for fill in fills_response.fills {
1940            // Filter by time window if specified
1941            if let Some(cutoff) = cutoff_time
1942                && fill.created_at < cutoff
1943            {
1944                fills_filtered += 1;
1945                continue;
1946            }
1947
1948            if let Some(instrument) = self.get_instrument_by_market(&fill.market) {
1949                match crate::http::parse::parse_fill_report(
1950                    &fill,
1951                    &instrument,
1952                    self.core.account_id,
1953                    ts_init,
1954                ) {
1955                    Ok(report) => fill_reports.push(report),
1956                    Err(e) => tracing::error!("Failed to parse fill in mass status: {e}"),
1957                }
1958            }
1959        }
1960
1961        if cutoff_time.is_some() {
1962            tracing::info!(
1963                "Generated mass status: {} orders ({} filtered), {} positions, {} fills ({} filtered)",
1964                order_reports.len(),
1965                orders_filtered,
1966                position_reports.len(),
1967                fill_reports.len(),
1968                fills_filtered
1969            );
1970        } else {
1971            tracing::info!(
1972                "Generated mass status: {} orders, {} positions, {} fills",
1973                order_reports.len(),
1974                position_reports.len(),
1975                fill_reports.len()
1976            );
1977        }
1978
1979        // Create mass status and add reports
1980        let mut mass_status = ExecutionMassStatus::new(
1981            self.core.client_id,
1982            self.core.account_id,
1983            self.core.venue,
1984            ts_init,
1985            None, // report_id will be auto-generated
1986        );
1987
1988        mass_status.add_order_reports(order_reports);
1989        mass_status.add_position_reports(position_reports);
1990        mass_status.add_fill_reports(fill_reports);
1991
1992        Ok(Some(mass_status))
1993    }
1994}