nautilus_dydx/execution/
mod.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live execution client implementation for the dYdX adapter.
17//!
18//! This module provides the execution client for submitting orders, cancellations,
19//! and managing positions on dYdX v4.
20//!
21//! # Order Types
22//!
23//! dYdX supports the following order types:
24//!
25//! - **Market**: Execute immediately at best available price
26//! - **Limit**: Execute at specified price or better
27//! - **Stop Market**: Triggered when price crosses stop price, then executes as market order
28//! - **Stop Limit**: Triggered when price crosses stop price, then places limit order
29//! - **Take Profit Market**: Close position at profit target, executes as market order
30//! - **Take Profit Limit**: Close position at profit target, places limit order
31//!
32//! See <https://docs.dydx.xyz/concepts/trading/orders#types> for details.
33//!
34//! # Order Lifetimes
35//!
36//! Orders can be short-term (expire by block height) or long-term/stateful (expire by timestamp).
37//! Conditional orders (Stop/TakeProfit) are always stateful.
38//!
39//! See <https://docs.dydx.xyz/concepts/trading/orders#short-term-vs-long-term> for details.
40
41use std::sync::{
42    Arc, Mutex,
43    atomic::{AtomicU32, AtomicU64, Ordering},
44};
45
46use anyhow::Context;
47use async_trait::async_trait;
48use dashmap::DashMap;
49use nautilus_common::{
50    clients::ExecutionClient,
51    live::{runner::get_exec_event_sender, runtime::get_runtime},
52    messages::{
53        ExecutionEvent, ExecutionReport as NautilusExecutionReport,
54        execution::{
55            BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
56            GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
57            ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
58        },
59    },
60    msgbus,
61};
62use nautilus_core::{MUTEX_POISONED, UUID4, UnixNanos};
63use nautilus_live::ExecutionClientCore;
64use nautilus_model::{
65    accounts::AccountAny,
66    enums::{OmsType, OrderSide, OrderType, TimeInForce},
67    events::{AccountState, OrderCancelRejected, OrderEventAny, OrderRejected},
68    identifiers::{AccountId, ClientId, ClientOrderId, InstrumentId, StrategyId, Venue},
69    instruments::{Instrument, InstrumentAny},
70    orders::Order,
71    reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
72    types::{AccountBalance, MarginBalance},
73};
74use nautilus_network::retry::RetryConfig;
75use rust_decimal::Decimal;
76use tokio::task::JoinHandle;
77
78use crate::{
79    common::{consts::DYDX_VENUE, credential::DydxCredential, parse::nanos_to_secs_i64},
80    config::DydxAdapterConfig,
81    execution::submitter::OrderSubmitter,
82    grpc::{DydxGrpcClient, Wallet, types::ChainId},
83    http::client::DydxHttpClient,
84    websocket::{client::DydxWebSocketClient, enums::NautilusWsMessage},
85};
86
87pub mod submitter;
88
89/// Maximum client order ID value for dYdX (informational - not enforced by adapter).
90///
91/// dYdX protocol accepts u32 client IDs. The current implementation uses sequential
92/// allocation starting from 1, which will wrap at u32::MAX. If dYdX has a stricter
93/// limit, this constant should be updated and enforced in `generate_client_order_id_int`.
94pub const MAX_CLIENT_ID: u32 = u32::MAX;
95
96/// Execution report types dispatched from WebSocket message handler.
97///
98/// This enum groups order and fill reports for unified dispatch handling,
99/// following the pattern used by reference adapters (Hyperliquid, OKX).
100enum ExecutionReport {
101    Order(Box<OrderStatusReport>),
102    Fill(Box<FillReport>),
103}
104
105/// Live execution client for the dYdX v4 exchange adapter.
106///
107/// Supports Market, Limit, Stop Market, Stop Limit, Take Profit Market (MarketIfTouched),
108/// and Take Profit Limit (LimitIfTouched) orders via gRPC. Trailing stops are NOT supported
109/// by the dYdX v4 protocol. dYdX requires u32 client IDs - strings are hashed to fit.
110///
111/// # Architecture
112///
113/// The client follows a two-layer execution model:
114/// 1. **Synchronous validation** - Immediate checks and event generation
115/// 2. **Async submission** - Non-blocking gRPC calls via `OrderSubmitter`
116///
117/// This matches the pattern used in OKX and other exchange adapters, ensuring
118/// consistent behavior across the Nautilus ecosystem.
119#[derive(Debug)]
120pub struct DydxExecutionClient {
121    core: ExecutionClientCore,
122    config: DydxAdapterConfig,
123    http_client: DydxHttpClient,
124    ws_client: DydxWebSocketClient,
125    grpc_client: Arc<tokio::sync::RwLock<DydxGrpcClient>>,
126    wallet: Arc<tokio::sync::RwLock<Option<Wallet>>>,
127    instruments: DashMap<InstrumentId, InstrumentAny>,
128    market_to_instrument: DashMap<String, InstrumentId>,
129    clob_pair_id_to_instrument: DashMap<u32, InstrumentId>,
130    block_height: Arc<AtomicU64>,
131    oracle_prices: Arc<DashMap<InstrumentId, Decimal>>,
132    client_id_to_int: DashMap<String, u32>,
133    int_to_client_id: DashMap<u32, String>,
134    next_client_id: AtomicU32,
135    wallet_address: String,
136    subaccount_number: u32,
137    started: bool,
138    connected: bool,
139    instruments_initialized: bool,
140    ws_stream_handle: Option<JoinHandle<()>>,
141    pending_tasks: Mutex<Vec<JoinHandle<()>>>,
142}
143
144impl DydxExecutionClient {
145    /// Creates a new [`DydxExecutionClient`].
146    ///
147    /// # Errors
148    ///
149    /// Returns an error if the WebSocket client fails to construct.
150    pub fn new(
151        core: ExecutionClientCore,
152        config: DydxAdapterConfig,
153        wallet_address: String,
154        subaccount_number: u32,
155    ) -> anyhow::Result<Self> {
156        // Build HTTP client from config (respects testnet URLs, timeouts, retries)
157        let retry_config = RetryConfig {
158            max_retries: config.max_retries,
159            initial_delay_ms: config.retry_delay_initial_ms,
160            max_delay_ms: config.retry_delay_max_ms,
161            ..Default::default()
162        };
163        let http_client = DydxHttpClient::new(
164            Some(config.base_url.clone()),
165            Some(config.timeout_secs),
166            None, // proxy_url - not in DydxAdapterConfig currently
167            config.is_testnet,
168            Some(retry_config),
169        )?;
170
171        // Use private WebSocket client for authenticated subaccount subscriptions
172        let ws_client = if let Some(ref mnemonic) = config.mnemonic {
173            let credential = DydxCredential::from_mnemonic(
174                mnemonic,
175                subaccount_number,
176                config.authenticator_ids.clone(),
177            )?;
178            DydxWebSocketClient::new_private(
179                config.ws_url.clone(),
180                credential,
181                core.account_id,
182                Some(20),
183            )
184        } else {
185            DydxWebSocketClient::new_public(config.ws_url.clone(), Some(20))
186        };
187
188        let grpc_urls = config.get_grpc_urls();
189        let grpc_client = Arc::new(tokio::sync::RwLock::new(
190            get_runtime()
191                .block_on(async { DydxGrpcClient::new_with_fallback(&grpc_urls).await })
192                .context("failed to construct dYdX gRPC client")?,
193        ));
194
195        Ok(Self {
196            core,
197            config,
198            http_client,
199            ws_client,
200            grpc_client,
201            wallet: Arc::new(tokio::sync::RwLock::new(None)),
202            instruments: DashMap::new(),
203            market_to_instrument: DashMap::new(),
204            clob_pair_id_to_instrument: DashMap::new(),
205            block_height: Arc::new(AtomicU64::new(0)),
206            oracle_prices: Arc::new(DashMap::new()),
207            client_id_to_int: DashMap::new(),
208            int_to_client_id: DashMap::new(),
209            next_client_id: AtomicU32::new(1),
210            wallet_address,
211            subaccount_number,
212            started: false,
213            connected: false,
214            instruments_initialized: false,
215            ws_stream_handle: None,
216            pending_tasks: Mutex::new(Vec::new()),
217        })
218    }
219
220    /// Generate a unique client order ID integer and store the mapping.
221    ///
222    /// # Invariants
223    ///
224    /// - Same `client_order_id` string → same `u32` for the lifetime of this process
225    /// - Different `client_order_id` strings → different `u32` values (except on u32 wrap)
226    /// - Thread-safe for concurrent calls
227    ///
228    /// # Behavior
229    ///
230    /// - Parses numeric `client_order_id` directly to `u32` for stability across restarts
231    /// - For non-numeric IDs, allocates a new sequential value from an atomic counter
232    /// - Mapping is kept in-memory only; non-numeric IDs will not be recoverable after restart
233    /// - Counter starts at 1 and increments without bound checking (will wrap at u32::MAX)
234    ///
235    /// # Notes
236    ///
237    /// - Atomic counter uses `Relaxed` ordering — uniqueness is required, not cross-thread sequencing
238    /// - If dYdX enforces a maximum client ID below u32::MAX, additional range validation is needed
239    fn generate_client_order_id_int(&self, client_order_id: &str) -> u32 {
240        use dashmap::mapref::entry::Entry;
241
242        // Fast path: already mapped
243        if let Some(existing) = self.client_id_to_int.get(client_order_id) {
244            return *existing.value();
245        }
246
247        // Try parsing as direct integer
248        if let Ok(id) = client_order_id.parse::<u32>() {
249            self.client_id_to_int
250                .insert(client_order_id.to_string(), id);
251            self.int_to_client_id
252                .insert(id, client_order_id.to_string());
253            return id;
254        }
255
256        // Allocate new ID from atomic counter
257        match self.client_id_to_int.entry(client_order_id.to_string()) {
258            Entry::Occupied(entry) => *entry.get(),
259            Entry::Vacant(vacant) => {
260                let id = self
261                    .next_client_id
262                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
263                vacant.insert(id);
264                self.int_to_client_id
265                    .insert(id, client_order_id.to_string());
266                id
267            }
268        }
269    }
270
271    /// Retrieve the client order ID integer from the cache.
272    ///
273    /// Returns `None` if the mapping doesn't exist.
274    fn get_client_order_id_int(&self, client_order_id: &str) -> Option<u32> {
275        // Try parsing first
276        if let Ok(id) = client_order_id.parse::<u32>() {
277            return Some(id);
278        }
279
280        // Look up in cache
281        self.client_id_to_int
282            .get(client_order_id)
283            .map(|entry| *entry.value())
284    }
285
286    /// Get chain ID from config network field.
287    ///
288    /// This is the recommended way to get chain_id for all transaction submissions.
289    fn get_chain_id(&self) -> ChainId {
290        self.config.get_chain_id()
291    }
292
293    /// Cache instruments from HTTP client into execution client's lookup maps.
294    ///
295    /// This populates three data structures for efficient lookups:
296    /// - `instruments`: InstrumentId → InstrumentAny
297    /// - `market_to_instrument`: Market ticker (e.g., "BTC-USD") → InstrumentId
298    /// - `clob_pair_id_to_instrument`: CLOB pair ID → InstrumentId
299    fn cache_instruments_from_http(&mut self) {
300        // Get all instruments from HTTP client cache
301        let instruments: Vec<InstrumentAny> = self
302            .http_client
303            .instruments_cache
304            .iter()
305            .map(|entry| entry.value().clone())
306            .collect();
307
308        log::debug!(
309            "Caching {} instruments in execution client",
310            instruments.len()
311        );
312
313        for instrument in instruments {
314            let instrument_id = instrument.id();
315            let symbol = instrument_id.symbol.as_str();
316
317            // Cache instrument by InstrumentId
318            self.instruments.insert(instrument_id, instrument.clone());
319
320            // Cache market ticker → InstrumentId mapping
321            self.market_to_instrument
322                .insert(symbol.to_string(), instrument_id);
323        }
324
325        // Copy clob_pair_id → InstrumentId mapping from HTTP client
326        // The HTTP client populates this from PerpetualMarket.clob_pair_id (authoritative source)
327        let http_mapping = self.http_client.clob_pair_id_mapping();
328        for entry in http_mapping.iter() {
329            self.clob_pair_id_to_instrument
330                .insert(*entry.key(), *entry.value());
331        }
332
333        self.instruments_initialized = true;
334        log::info!(
335            "Cached {} instruments ({} CLOB pair IDs) with market mappings",
336            self.instruments.len(),
337            self.clob_pair_id_to_instrument.len()
338        );
339    }
340
341    /// Get an instrument by market ticker (e.g., "BTC-USD").
342    fn get_instrument_by_market(&self, market: &str) -> Option<InstrumentAny> {
343        self.market_to_instrument
344            .get(market)
345            .and_then(|id| self.instruments.get(&id).map(|entry| entry.value().clone()))
346    }
347
348    /// Get an instrument by clob_pair_id.
349    fn get_instrument_by_clob_pair_id(&self, clob_pair_id: u32) -> Option<InstrumentAny> {
350        let instrument = self
351            .clob_pair_id_to_instrument
352            .get(&clob_pair_id)
353            .and_then(|id| self.instruments.get(&id).map(|entry| entry.value().clone()));
354
355        if instrument.is_none() {
356            self.log_missing_instrument_for_clob_pair_id(clob_pair_id);
357        }
358
359        instrument
360    }
361
362    fn log_missing_instrument_for_clob_pair_id(&self, clob_pair_id: u32) {
363        let known: Vec<(u32, String)> = self
364            .clob_pair_id_to_instrument
365            .iter()
366            .filter_map(|entry| {
367                let instrument_id = entry.value();
368                self.instruments.get(instrument_id).map(|inst_entry| {
369                    (
370                        *entry.key(),
371                        inst_entry.value().id().symbol.as_str().to_string(),
372                    )
373                })
374            })
375            .collect();
376
377        log::warn!(
378            "Instrument for clob_pair_id {clob_pair_id} not found in cache. Known CLOB pair IDs and symbols: {known:?}"
379        );
380    }
381
382    fn spawn_task<F>(&self, label: &'static str, fut: F)
383    where
384        F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
385    {
386        let handle = get_runtime().spawn(async move {
387            if let Err(e) = fut.await {
388                log::error!("{label}: {e:?}");
389            }
390        });
391
392        self.pending_tasks
393            .lock()
394            .expect(MUTEX_POISONED)
395            .push(handle);
396    }
397
398    /// Spawns an order submission task with error handling and rejection generation.
399    ///
400    /// If the submission fails, generates an `OrderRejected` event with the error details.
401    fn spawn_order_task<F>(
402        &self,
403        label: &'static str,
404        strategy_id: StrategyId,
405        instrument_id: InstrumentId,
406        client_order_id: ClientOrderId,
407        fut: F,
408    ) where
409        F: std::future::Future<Output = anyhow::Result<()>> + Send + 'static,
410    {
411        // Capture necessary data for rejection event
412        let trader_id = self.core.trader_id;
413        let account_id = self.core.account_id;
414        let sender = get_exec_event_sender();
415
416        let handle = get_runtime().spawn(async move {
417            if let Err(e) = fut.await {
418                let error_msg = format!("{label} failed: {e:?}");
419                log::error!("{error_msg}");
420
421                let ts_now = UnixNanos::default(); // Use current time
422                let event = OrderRejected::new(
423                    trader_id,
424                    strategy_id,
425                    instrument_id,
426                    client_order_id,
427                    account_id,
428                    error_msg.into(),
429                    UUID4::new(),
430                    ts_now,
431                    ts_now,
432                    false,
433                    false,
434                );
435
436                if let Err(send_err) =
437                    sender.send(ExecutionEvent::Order(OrderEventAny::Rejected(event)))
438                {
439                    log::error!("Failed to send OrderRejected event: {send_err}");
440                }
441            }
442        });
443
444        self.pending_tasks
445            .lock()
446            .expect(MUTEX_POISONED)
447            .push(handle);
448    }
449
450    fn abort_pending_tasks(&self) {
451        let mut guard = self.pending_tasks.lock().expect(MUTEX_POISONED);
452        for handle in guard.drain(..) {
453            handle.abort();
454        }
455    }
456}
457
458#[async_trait(?Send)]
459impl ExecutionClient for DydxExecutionClient {
460    fn is_connected(&self) -> bool {
461        self.connected
462    }
463
464    fn client_id(&self) -> ClientId {
465        self.core.client_id
466    }
467
468    fn account_id(&self) -> AccountId {
469        self.core.account_id
470    }
471
472    fn venue(&self) -> Venue {
473        *DYDX_VENUE
474    }
475
476    fn oms_type(&self) -> OmsType {
477        self.core.oms_type
478    }
479
480    fn get_account(&self) -> Option<AccountAny> {
481        self.core.get_account()
482    }
483
484    fn generate_account_state(
485        &self,
486        balances: Vec<AccountBalance>,
487        margins: Vec<MarginBalance>,
488        reported: bool,
489        ts_event: UnixNanos,
490    ) -> anyhow::Result<()> {
491        self.core
492            .generate_account_state(balances, margins, reported, ts_event)
493    }
494
495    fn start(&mut self) -> anyhow::Result<()> {
496        if self.started {
497            log::warn!("dYdX execution client already started");
498            return Ok(());
499        }
500
501        log::info!("Starting dYdX execution client");
502        self.started = true;
503        Ok(())
504    }
505
506    fn stop(&mut self) -> anyhow::Result<()> {
507        if !self.started {
508            log::warn!("dYdX execution client not started");
509            return Ok(());
510        }
511
512        log::info!("Stopping dYdX execution client");
513        self.abort_pending_tasks();
514        self.started = false;
515        self.connected = false;
516        Ok(())
517    }
518
519    /// Submits an order to dYdX via gRPC.
520    ///
521    /// dYdX requires u32 client IDs - Nautilus ClientOrderId strings are hashed to fit.
522    ///
523    /// Supported order types:
524    /// - Market orders (short-term, IOC)
525    /// - Limit orders (short-term or long-term based on TIF)
526    /// - Stop Market orders (conditional, triggered at stop price)
527    /// - Stop Limit orders (conditional, triggered at stop price, executed at limit)
528    /// - Take Profit Market (MarketIfTouched - triggered at take profit price)
529    /// - Take Profit Limit (LimitIfTouched - triggered at take profit price, executed at limit)
530    ///
531    /// Trailing stop orders are NOT supported by dYdX v4 protocol.
532    ///
533    /// Validates synchronously, generates OrderSubmitted event, then spawns async task for
534    /// gRPC submission to avoid blocking. Unsupported order types generate OrderRejected.
535    fn submit_order(&self, cmd: &SubmitOrder) -> anyhow::Result<()> {
536        let order = cmd.order.clone();
537
538        // Check connection status
539        if !self.is_connected() {
540            let reason = "Cannot submit order: execution client not connected";
541            log::error!("{reason}");
542            anyhow::bail!(reason);
543        }
544
545        // Check block height is available for short-term orders
546        let current_block = self.block_height.load(Ordering::Relaxed);
547        if current_block == 0 {
548            let reason = "Block height not initialized";
549            log::warn!(
550                "Cannot submit order {}: {}",
551                order.client_order_id(),
552                reason
553            );
554            self.core.generate_order_rejected(
555                order.strategy_id(),
556                order.instrument_id(),
557                order.client_order_id(),
558                reason,
559                cmd.ts_init,
560                false,
561            );
562            return Ok(());
563        }
564
565        // Check if order is already closed
566        if order.is_closed() {
567            log::warn!("Cannot submit closed order {}", order.client_order_id());
568            return Ok(());
569        }
570
571        // Validate order type
572        match order.order_type() {
573            OrderType::Market | OrderType::Limit => {
574                log::debug!(
575                    "Submitting {} order: {}",
576                    if matches!(order.order_type(), OrderType::Market) {
577                        "MARKET"
578                    } else {
579                        "LIMIT"
580                    },
581                    order.client_order_id()
582                );
583            }
584            // Conditional orders (stop/take-profit) - supported by dYdX
585            OrderType::StopMarket | OrderType::StopLimit => {
586                log::debug!(
587                    "Submitting {} order: {}",
588                    if matches!(order.order_type(), OrderType::StopMarket) {
589                        "STOP_MARKET"
590                    } else {
591                        "STOP_LIMIT"
592                    },
593                    order.client_order_id()
594                );
595            }
596            // dYdX TakeProfit/TakeProfitLimit map to MarketIfTouched/LimitIfTouched
597            OrderType::MarketIfTouched | OrderType::LimitIfTouched => {
598                log::debug!(
599                    "Submitting {} order: {}",
600                    if matches!(order.order_type(), OrderType::MarketIfTouched) {
601                        "TAKE_PROFIT_MARKET"
602                    } else {
603                        "TAKE_PROFIT_LIMIT"
604                    },
605                    order.client_order_id()
606                );
607            }
608            // Trailing stops not supported by dYdX v4 protocol
609            OrderType::TrailingStopMarket | OrderType::TrailingStopLimit => {
610                let reason = "Trailing stop orders not supported by dYdX v4 protocol";
611                log::error!("{reason}");
612                self.core.generate_order_rejected(
613                    order.strategy_id(),
614                    order.instrument_id(),
615                    order.client_order_id(),
616                    reason,
617                    cmd.ts_init,
618                    false,
619                );
620                return Ok(());
621            }
622            order_type => {
623                let reason = format!("Order type {order_type:?} not supported by dYdX");
624                log::error!("{reason}");
625                self.core.generate_order_rejected(
626                    order.strategy_id(),
627                    order.instrument_id(),
628                    order.client_order_id(),
629                    &reason,
630                    cmd.ts_init,
631                    false,
632                );
633                return Ok(());
634            }
635        }
636
637        // Generate OrderSubmitted event immediately
638        self.core.generate_order_submitted(
639            order.strategy_id(),
640            order.instrument_id(),
641            order.client_order_id(),
642            cmd.ts_init,
643        );
644
645        let grpc_client = self.grpc_client.clone();
646        let wallet = self.wallet.clone();
647        let http_client = self.http_client.clone();
648        let wallet_address = self.wallet_address.clone();
649        let subaccount_number = self.subaccount_number;
650        let client_order_id = order.client_order_id();
651        let instrument_id = order.instrument_id();
652        let block_height = self.block_height.load(std::sync::atomic::Ordering::Relaxed) as u32;
653        let chain_id = self.get_chain_id();
654        let authenticator_ids = self.config.authenticator_ids.clone();
655        #[allow(clippy::redundant_clone)]
656        let order_clone = order.clone();
657
658        // Generate client_order_id as u32 before async block (dYdX requires u32 client IDs)
659        let client_id_u32 = self.generate_client_order_id_int(client_order_id.as_str());
660
661        self.spawn_order_task(
662            "submit_order",
663            order.strategy_id(),
664            order.instrument_id(),
665            order.client_order_id(),
666            async move {
667                let wallet_guard = wallet.read().await;
668                let wallet_ref = wallet_guard
669                    .as_ref()
670                    .ok_or_else(|| anyhow::anyhow!("Wallet not initialized"))?;
671
672                let grpc_guard = grpc_client.read().await;
673                let submitter = OrderSubmitter::new(
674                    (*grpc_guard).clone(),
675                    http_client.clone(),
676                    wallet_address,
677                    subaccount_number,
678                    chain_id,
679                    authenticator_ids,
680                );
681
682                // Submit order based on type
683                match order_clone.order_type() {
684                    OrderType::Market => {
685                        submitter
686                            .submit_market_order(
687                                wallet_ref,
688                                instrument_id,
689                                client_id_u32,
690                                order_clone.order_side(),
691                                order_clone.quantity(),
692                                block_height,
693                            )
694                            .await?;
695                        log::info!("Successfully submitted market order: {client_order_id}");
696                    }
697                    OrderType::Limit => {
698                        let expire_time = order_clone.expire_time().map(nanos_to_secs_i64);
699                        submitter
700                            .submit_limit_order(
701                                wallet_ref,
702                                instrument_id,
703                                client_id_u32,
704                                order_clone.order_side(),
705                                order_clone
706                                    .price()
707                                    .ok_or_else(|| anyhow::anyhow!("Limit order missing price"))?,
708                                order_clone.quantity(),
709                                order_clone.time_in_force(),
710                                order_clone.is_post_only(),
711                                order_clone.is_reduce_only(),
712                                block_height,
713                                expire_time,
714                            )
715                            .await?;
716                        log::info!("Successfully submitted limit order: {client_order_id}");
717                    }
718                    OrderType::StopMarket => {
719                        let trigger_price = order_clone.trigger_price().ok_or_else(|| {
720                            anyhow::anyhow!("Stop market order missing trigger_price")
721                        })?;
722                        let expire_time = order_clone.expire_time().map(nanos_to_secs_i64);
723                        submitter
724                            .submit_stop_market_order(
725                                wallet_ref,
726                                instrument_id,
727                                client_id_u32,
728                                order_clone.order_side(),
729                                trigger_price,
730                                order_clone.quantity(),
731                                order_clone.is_reduce_only(),
732                                expire_time,
733                            )
734                            .await?;
735                        log::info!("Successfully submitted stop market order: {client_order_id}");
736                    }
737                    OrderType::StopLimit => {
738                        let trigger_price = order_clone.trigger_price().ok_or_else(|| {
739                            anyhow::anyhow!("Stop limit order missing trigger_price")
740                        })?;
741                        let limit_price = order_clone.price().ok_or_else(|| {
742                            anyhow::anyhow!("Stop limit order missing limit price")
743                        })?;
744                        let expire_time = order_clone.expire_time().map(nanos_to_secs_i64);
745                        submitter
746                            .submit_stop_limit_order(
747                                wallet_ref,
748                                instrument_id,
749                                client_id_u32,
750                                order_clone.order_side(),
751                                trigger_price,
752                                limit_price,
753                                order_clone.quantity(),
754                                order_clone.time_in_force(),
755                                order_clone.is_post_only(),
756                                order_clone.is_reduce_only(),
757                                expire_time,
758                            )
759                            .await?;
760                        log::info!("Successfully submitted stop limit order: {client_order_id}");
761                    }
762                    // dYdX TakeProfitMarket maps to Nautilus MarketIfTouched
763                    OrderType::MarketIfTouched => {
764                        let trigger_price = order_clone.trigger_price().ok_or_else(|| {
765                            anyhow::anyhow!("Take profit market order missing trigger_price")
766                        })?;
767                        let expire_time = order_clone.expire_time().map(nanos_to_secs_i64);
768                        submitter
769                            .submit_take_profit_market_order(
770                                wallet_ref,
771                                instrument_id,
772                                client_id_u32,
773                                order_clone.order_side(),
774                                trigger_price,
775                                order_clone.quantity(),
776                                order_clone.is_reduce_only(),
777                                expire_time,
778                            )
779                            .await?;
780                        log::info!(
781                            "Successfully submitted take profit market order: {client_order_id}"
782                        );
783                    }
784                    // dYdX TakeProfitLimit maps to Nautilus LimitIfTouched
785                    OrderType::LimitIfTouched => {
786                        let trigger_price = order_clone.trigger_price().ok_or_else(|| {
787                            anyhow::anyhow!("Take profit limit order missing trigger_price")
788                        })?;
789                        let limit_price = order_clone.price().ok_or_else(|| {
790                            anyhow::anyhow!("Take profit limit order missing limit price")
791                        })?;
792                        let expire_time = order_clone.expire_time().map(nanos_to_secs_i64);
793                        submitter
794                            .submit_take_profit_limit_order(
795                                wallet_ref,
796                                instrument_id,
797                                client_id_u32,
798                                order_clone.order_side(),
799                                trigger_price,
800                                limit_price,
801                                order_clone.quantity(),
802                                order_clone.time_in_force(),
803                                order_clone.is_post_only(),
804                                order_clone.is_reduce_only(),
805                                expire_time,
806                            )
807                            .await?;
808                        log::info!(
809                            "Successfully submitted take profit limit order: {client_order_id}"
810                        );
811                    }
812                    _ => unreachable!("Order type already validated"),
813                }
814
815                Ok(())
816            },
817        );
818
819        Ok(())
820    }
821
822    fn submit_order_list(&self, _cmd: &SubmitOrderList) -> anyhow::Result<()> {
823        anyhow::bail!("Order lists not supported by dYdX")
824    }
825
826    fn modify_order(&self, _cmd: &ModifyOrder) -> anyhow::Result<()> {
827        anyhow::bail!("Order modification not supported by dYdX")
828    }
829
830    /// Cancels an order on dYdX exchange.
831    ///
832    /// Validates the order state and retrieves instrument details before
833    /// spawning an async task to cancel via gRPC.
834    ///
835    /// # Validation
836    /// - Checks order exists in cache
837    /// - Validates order is not already closed
838    /// - Retrieves instrument from cache for order builder
839    ///
840    /// The `cmd` contains client/venue order IDs. Returns `Ok(())` if cancel request is
841    /// spawned successfully or validation fails gracefully. Returns `Err` if not connected.
842    ///
843    /// # Events
844    /// - `OrderCanceled` - Generated when WebSocket confirms cancellation
845    /// - `OrderCancelRejected` - Generated if exchange rejects cancellation
846    fn cancel_order(&self, cmd: &CancelOrder) -> anyhow::Result<()> {
847        if !self.is_connected() {
848            anyhow::bail!("Cannot cancel order: not connected");
849        }
850
851        let client_order_id = cmd.client_order_id;
852
853        // Validate order exists in cache and is not closed
854        let cache = self.core.cache();
855        let cache_borrow = cache.borrow();
856
857        let order = match cache_borrow.order(&client_order_id) {
858            Some(order) => order,
859            None => {
860                log::error!("Cannot cancel order {client_order_id}: not found in cache");
861                return Ok(()); // Not an error - order may have been filled/canceled already
862            }
863        };
864
865        // Validate order is not already closed
866        if order.is_closed() {
867            log::warn!(
868                "CancelOrder command for {} when order already {} (will not send to exchange)",
869                client_order_id,
870                order.status()
871            );
872            return Ok(());
873        }
874
875        // Retrieve instrument from cache
876        let instrument_id = cmd.instrument_id;
877        let instrument = match cache_borrow.instrument(&instrument_id) {
878            Some(instrument) => instrument,
879            None => {
880                log::error!(
881                    "Cannot cancel order {client_order_id}: instrument {instrument_id} not found in cache"
882                );
883                return Ok(()); // Not an error - missing instrument is a cache issue
884            }
885        };
886
887        log::debug!(
888            "Cancelling order {} for instrument {}",
889            client_order_id,
890            instrument.id()
891        );
892
893        let grpc_client = self.grpc_client.clone();
894        let wallet = self.wallet.clone();
895        let http_client = self.http_client.clone();
896        let wallet_address = self.wallet_address.clone();
897        let subaccount_number = self.subaccount_number;
898        let block_height = self.block_height.load(std::sync::atomic::Ordering::Relaxed) as u32;
899        let chain_id = self.get_chain_id();
900        let authenticator_ids = self.config.authenticator_ids.clone();
901        let trader_id = cmd.trader_id;
902        let strategy_id = cmd.strategy_id;
903        let venue_order_id = cmd.venue_order_id;
904
905        // Convert client_order_id to u32 before async block
906        let client_id_u32 = match self.get_client_order_id_int(client_order_id.as_str()) {
907            Some(id) => id,
908            None => {
909                log::error!("Client order ID {client_order_id} not found in cache");
910                anyhow::bail!("Client order ID not found in cache")
911            }
912        };
913
914        self.spawn_task("cancel_order", async move {
915            let wallet_guard = wallet.read().await;
916            let wallet_ref = wallet_guard
917                .as_ref()
918                .ok_or_else(|| anyhow::anyhow!("Wallet not initialized"))?;
919
920            let grpc_guard = grpc_client.read().await;
921            let submitter = OrderSubmitter::new(
922                (*grpc_guard).clone(),
923                http_client.clone(),
924                wallet_address,
925                subaccount_number,
926                chain_id,
927                authenticator_ids,
928            );
929
930            // Attempt cancellation via submitter
931            match submitter
932                .cancel_order(wallet_ref, instrument_id, client_id_u32, block_height)
933                .await
934            {
935                Ok(()) => {
936                    log::info!("Successfully cancelled order: {client_order_id}");
937                }
938                Err(e) => {
939                    log::error!("Failed to cancel order {client_order_id}: {e:?}");
940
941                    let sender = get_exec_event_sender();
942                    let ts_now = UnixNanos::default();
943                    let event = OrderCancelRejected::new(
944                        trader_id,
945                        strategy_id,
946                        instrument_id,
947                        client_order_id,
948                        format!("Cancel order failed: {e:?}").into(),
949                        UUID4::new(),
950                        ts_now,
951                        ts_now,
952                        false,
953                        venue_order_id,
954                        None, // account_id not available in async context
955                    );
956                    sender
957                        .send(ExecutionEvent::Order(OrderEventAny::CancelRejected(event)))
958                        .unwrap();
959                }
960            }
961
962            Ok(())
963        });
964
965        Ok(())
966    }
967
968    fn cancel_all_orders(&self, cmd: &CancelAllOrders) -> anyhow::Result<()> {
969        if !self.is_connected() {
970            anyhow::bail!("Cannot cancel orders: not connected");
971        }
972
973        // Query all open orders from cache
974        let cache = self.core.cache().borrow();
975        let mut open_orders: Vec<_> = cache
976            .orders_open(None, None, None, None)
977            .into_iter()
978            .collect();
979
980        let instrument_id = cmd.instrument_id;
981        open_orders.retain(|order| order.instrument_id() == instrument_id);
982
983        // Filter by order_side if specified (NoOrderSide means all sides)
984        if cmd.order_side != OrderSide::NoOrderSide {
985            let order_side = cmd.order_side;
986            open_orders.retain(|order| order.order_side() == order_side);
987        }
988
989        // Split orders into short-term and long-term based on TimeInForce
990        // Short-term: IOC, FOK (expire by block height)
991        // Long-term: GTC, GTD, DAY, POST_ONLY (expire by timestamp)
992        let mut short_term_orders = Vec::new();
993        let mut long_term_orders = Vec::new();
994
995        for order in &open_orders {
996            match order.time_in_force() {
997                TimeInForce::Ioc | TimeInForce::Fok => short_term_orders.push(order),
998                TimeInForce::Gtc
999                | TimeInForce::Gtd
1000                | TimeInForce::Day
1001                | TimeInForce::AtTheOpen
1002                | TimeInForce::AtTheClose => long_term_orders.push(order),
1003            }
1004        }
1005
1006        log::info!(
1007            "Cancel all orders: total={}, short_term={}, long_term={}, instrument_id={}, order_side={:?}",
1008            open_orders.len(),
1009            short_term_orders.len(),
1010            long_term_orders.len(),
1011            instrument_id,
1012            cmd.order_side
1013        );
1014
1015        // Cancel each order individually (dYdX requires separate transactions)
1016        let grpc_client = self.grpc_client.clone();
1017        let wallet = self.wallet.clone();
1018        let http_client = self.http_client.clone();
1019        let wallet_address = self.wallet_address.clone();
1020        let subaccount_number = self.subaccount_number;
1021        let block_height = self.block_height.load(std::sync::atomic::Ordering::Relaxed) as u32;
1022        let chain_id = self.get_chain_id();
1023        let authenticator_ids = self.config.authenticator_ids.clone();
1024
1025        // Collect (instrument_id, client_id) tuples for batch cancel
1026        let mut orders_to_cancel = Vec::new();
1027        for order in &open_orders {
1028            let client_order_id = order.client_order_id();
1029            if let Some(client_id_u32) = self.get_client_order_id_int(client_order_id.as_str()) {
1030                orders_to_cancel.push((instrument_id, client_id_u32));
1031            } else {
1032                log::warn!(
1033                    "Cannot cancel order {client_order_id}: client_order_id not found in cache"
1034                );
1035            }
1036        }
1037
1038        self.spawn_task("cancel_all_orders", async move {
1039            let wallet_guard = wallet.read().await;
1040            let wallet_ref = wallet_guard
1041                .as_ref()
1042                .ok_or_else(|| anyhow::anyhow!("Wallet not initialized"))?;
1043
1044            let grpc_guard = grpc_client.read().await;
1045            let submitter = OrderSubmitter::new(
1046                (*grpc_guard).clone(),
1047                http_client.clone(),
1048                wallet_address,
1049                subaccount_number,
1050                chain_id,
1051                authenticator_ids,
1052            );
1053
1054            // Cancel orders using batch method (executes sequentially to avoid nonce conflicts)
1055            match submitter
1056                .cancel_orders_batch(wallet_ref, &orders_to_cancel, block_height)
1057                .await
1058            {
1059                Ok(()) => {
1060                    log::info!("Successfully cancelled {} orders", orders_to_cancel.len());
1061                }
1062                Err(e) => {
1063                    log::error!("Batch cancel failed: {e:?}");
1064                }
1065            }
1066
1067            Ok(())
1068        });
1069
1070        Ok(())
1071    }
1072
1073    fn batch_cancel_orders(&self, cmd: &BatchCancelOrders) -> anyhow::Result<()> {
1074        if cmd.cancels.is_empty() {
1075            return Ok(());
1076        }
1077
1078        if !self.is_connected() {
1079            anyhow::bail!("Cannot cancel orders: not connected");
1080        }
1081
1082        // Convert ClientOrderIds to u32 before async block
1083        let mut orders_to_cancel = Vec::with_capacity(cmd.cancels.len());
1084        for cancel in &cmd.cancels {
1085            let client_id_str = cancel.client_order_id.as_str();
1086            let client_id_u32 = match self.get_client_order_id_int(client_id_str) {
1087                Some(id) => id,
1088                None => {
1089                    log::warn!(
1090                        "No u32 mapping found for client_order_id={client_id_str}, skipping cancel"
1091                    );
1092                    continue;
1093                }
1094            };
1095            orders_to_cancel.push((cancel.instrument_id, client_id_u32));
1096        }
1097
1098        if orders_to_cancel.is_empty() {
1099            log::warn!("No valid orders to cancel in batch");
1100            return Ok(());
1101        }
1102
1103        let grpc_client = self.grpc_client.clone();
1104        let wallet = self.wallet.clone();
1105        let http_client = self.http_client.clone();
1106        let wallet_address = self.wallet_address.clone();
1107        let subaccount_number = self.subaccount_number;
1108        let block_height = self.block_height.load(std::sync::atomic::Ordering::Relaxed) as u32;
1109        let chain_id = self.get_chain_id();
1110        let authenticator_ids = self.config.authenticator_ids.clone();
1111
1112        log::info!(
1113            "Batch cancelling {} orders: {:?}",
1114            orders_to_cancel.len(),
1115            orders_to_cancel
1116        );
1117
1118        self.spawn_task("batch_cancel_orders", async move {
1119            let wallet_guard = wallet.read().await;
1120            let wallet_ref = wallet_guard
1121                .as_ref()
1122                .ok_or_else(|| anyhow::anyhow!("Wallet not initialized"))?;
1123
1124            let grpc_guard = grpc_client.read().await;
1125            let submitter = OrderSubmitter::new(
1126                (*grpc_guard).clone(),
1127                http_client.clone(),
1128                wallet_address,
1129                subaccount_number,
1130                chain_id,
1131                authenticator_ids,
1132            );
1133
1134            match submitter
1135                .cancel_orders_batch(wallet_ref, &orders_to_cancel, block_height)
1136                .await
1137            {
1138                Ok(()) => {
1139                    log::info!(
1140                        "Successfully batch cancelled {} orders",
1141                        orders_to_cancel.len()
1142                    );
1143                }
1144                Err(e) => {
1145                    log::error!("Batch cancel failed: {e:?}");
1146                }
1147            }
1148
1149            Ok(())
1150        });
1151
1152        Ok(())
1153    }
1154
1155    fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
1156        Ok(())
1157    }
1158
1159    fn query_order(&self, _cmd: &QueryOrder) -> anyhow::Result<()> {
1160        Ok(())
1161    }
1162
1163    async fn connect(&mut self) -> anyhow::Result<()> {
1164        if self.connected {
1165            log::warn!("dYdX execution client already connected");
1166            return Ok(());
1167        }
1168
1169        log::info!("Connecting to dYdX");
1170
1171        // Load instruments BEFORE WebSocket connection
1172        // Per Python implementation: "instruments are used in the first account channel message"
1173        log::debug!("Loading instruments from HTTP API");
1174        self.http_client.fetch_and_cache_instruments().await?;
1175        log::info!(
1176            "Loaded {} instruments from HTTP",
1177            self.http_client.instruments_cache.len()
1178        );
1179
1180        // Populate execution client's instrument lookup maps
1181        self.cache_instruments_from_http();
1182
1183        // Initialize wallet from config if mnemonic is provided
1184        if let Some(mnemonic) = &self.config.mnemonic {
1185            let wallet = Wallet::from_mnemonic(mnemonic)?;
1186            *self.wallet.write().await = Some(wallet);
1187            log::debug!("Wallet initialized");
1188        }
1189
1190        // Connect WebSocket
1191        self.ws_client.connect().await?;
1192        log::debug!("WebSocket connected");
1193
1194        // Subscribe to block height updates
1195        self.ws_client.subscribe_block_height().await?;
1196        log::debug!("Subscribed to block height updates");
1197
1198        // Subscribe to markets for instrument data
1199        self.ws_client.subscribe_markets().await?;
1200        log::debug!("Subscribed to markets");
1201
1202        // Subscribe to subaccount updates if authenticated
1203        if self.config.mnemonic.is_some() {
1204            self.ws_client
1205                .subscribe_subaccount(&self.wallet_address, self.subaccount_number)
1206                .await?;
1207            log::debug!(
1208                "Subscribed to subaccount updates: {}/{}",
1209                self.wallet_address,
1210                self.subaccount_number
1211            );
1212
1213            // Spawn WebSocket message processing task following standard adapter pattern
1214            // Per docs/developer_guide/adapters.md: Parse -> Dispatch -> Engine handles events
1215            if let Some(mut rx) = self.ws_client.take_receiver() {
1216                // Clone data needed for account state parsing in spawned task
1217                let account_id = self.core.account_id;
1218                let instruments = self.instruments.clone();
1219                let oracle_prices = self.oracle_prices.clone();
1220                let clob_pair_id_to_instrument = self.clob_pair_id_to_instrument.clone();
1221                let block_height = self.block_height.clone();
1222
1223                let handle = get_runtime().spawn(async move {
1224                    while let Some(msg) = rx.recv().await {
1225                        match msg {
1226                            NautilusWsMessage::Order(report) => {
1227                                log::debug!("Received order update: {:?}", report.order_status);
1228                                dispatch_execution_report(ExecutionReport::Order(report));
1229                            }
1230                            NautilusWsMessage::Fill(report) => {
1231                                log::debug!("Received fill update");
1232                                dispatch_execution_report(ExecutionReport::Fill(report));
1233                            }
1234                            NautilusWsMessage::Position(report) => {
1235                                log::debug!("Received position update");
1236                                // Dispatch position status reports via execution event system
1237                                let sender = get_exec_event_sender();
1238                                let exec_report =
1239                                    NautilusExecutionReport::Position(Box::new(*report));
1240                                if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1241                                    log::warn!("Failed to send position status report: {e}");
1242                                }
1243                            }
1244                            NautilusWsMessage::AccountState(state) => {
1245                                log::debug!("Received account state update");
1246                                dispatch_account_state(*state);
1247                            }
1248                            NautilusWsMessage::SubaccountSubscribed(msg) => {
1249                                log::debug!(
1250                                    "Parsing subaccount subscription with full context"
1251                                );
1252
1253                                // Build instruments map for parsing (clone to avoid lifetime issues)
1254                                let inst_map: std::collections::HashMap<_, _> = instruments
1255                                    .iter()
1256                                    .map(|entry| (*entry.key(), entry.value().clone()))
1257                                    .collect();
1258
1259                                // Build oracle prices map (copy Decimals)
1260                                let oracle_map: std::collections::HashMap<_, _> = oracle_prices
1261                                    .iter()
1262                                    .map(|entry| (*entry.key(), *entry.value()))
1263                                    .collect();
1264
1265                                let ts_init =
1266                                    nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
1267                                let ts_event = ts_init;
1268
1269                                match crate::http::parse::parse_account_state(
1270                                    &msg.contents.subaccount,
1271                                    account_id,
1272                                    &inst_map,
1273                                    &oracle_map,
1274                                    ts_event,
1275                                    ts_init,
1276                                ) {
1277                                    Ok(account_state) => {
1278                                        log::info!(
1279                                            "Parsed account state: {} balance(s), {} margin(s)",
1280                                            account_state.balances.len(),
1281                                            account_state.margins.len()
1282                                        );
1283                                        dispatch_account_state(account_state);
1284                                    }
1285                                    Err(e) => {
1286                                        log::error!("Failed to parse account state: {e}");
1287                                    }
1288                                }
1289
1290                                // Parse positions from the subscription
1291                                if let Some(ref positions) =
1292                                    msg.contents.subaccount.open_perpetual_positions
1293                                {
1294                                    log::debug!(
1295                                        "Parsing {} position(s) from subscription",
1296                                        positions.len()
1297                                    );
1298
1299                                    for (market, ws_position) in positions {
1300                                        match crate::websocket::parse::parse_ws_position_report(
1301                                            ws_position,
1302                                            &instruments,
1303                                            account_id,
1304                                            ts_init,
1305                                        ) {
1306                                            Ok(report) => {
1307                                                log::debug!(
1308                                                    "Parsed position report: {} {} {} {}",
1309                                                    report.instrument_id,
1310                                                    report.position_side,
1311                                                    report.quantity,
1312                                                    market
1313                                                );
1314                                                let sender = get_exec_event_sender();
1315                                                let exec_report = NautilusExecutionReport::Position(
1316                                                    Box::new(report),
1317                                                );
1318                                                if let Err(e) =
1319                                                    sender.send(ExecutionEvent::Report(exec_report))
1320                                                {
1321                                                    log::warn!(
1322                                                        "Failed to send position status report: {e}"
1323                                                    );
1324                                                }
1325                                            }
1326                                            Err(e) => {
1327                                                log::error!(
1328                                                    "Failed to parse WebSocket position for {market}: {e}"
1329                                                );
1330                                            }
1331                                        }
1332                                    }
1333                                }
1334                            }
1335                            NautilusWsMessage::SubaccountsChannelData(data) => {
1336                                log::debug!("Processing subaccounts channel data");
1337                                let ts_init =
1338                                    nautilus_core::time::get_atomic_clock_realtime().get_time_ns();
1339
1340                                // Process orders
1341                                if let Some(ref orders) = data.contents.orders {
1342                                    for ws_order in orders {
1343                                        match crate::websocket::parse::parse_ws_order_report(
1344                                            ws_order,
1345                                            &clob_pair_id_to_instrument,
1346                                            &instruments,
1347                                            account_id,
1348                                            ts_init,
1349                                        ) {
1350                                            Ok(report) => {
1351                                                log::debug!(
1352                                                    "Parsed order report: {} {} {} @ {}",
1353                                                    report.instrument_id,
1354                                                    report.order_side,
1355                                                    report.order_status,
1356                                                    report.quantity
1357                                                );
1358                                                let sender = get_exec_event_sender();
1359                                                let exec_report =
1360                                                    NautilusExecutionReport::Order(Box::new(
1361                                                        report,
1362                                                    ));
1363                                                if let Err(e) =
1364                                                    sender.send(ExecutionEvent::Report(exec_report))
1365                                                {
1366                                                    log::warn!(
1367                                                        "Failed to send order status report: {e}"
1368                                                    );
1369                                                }
1370                                            }
1371                                            Err(e) => {
1372                                                log::error!(
1373                                                    "Failed to parse WebSocket order: {e}"
1374                                                );
1375                                            }
1376                                        }
1377                                    }
1378                                }
1379
1380                                // Process fills
1381                                if let Some(ref fills) = data.contents.fills {
1382                                    for ws_fill in fills {
1383                                        match crate::websocket::parse::parse_ws_fill_report(
1384                                            ws_fill,
1385                                            &instruments,
1386                                            account_id,
1387                                            ts_init,
1388                                        ) {
1389                                            Ok(report) => {
1390                                                log::debug!(
1391                                                    "Parsed fill report: {} {} {} @ {}",
1392                                                    report.instrument_id,
1393                                                    report.venue_order_id,
1394                                                    report.last_qty,
1395                                                    report.last_px
1396                                                );
1397                                                let sender = get_exec_event_sender();
1398                                                let exec_report =
1399                                                    NautilusExecutionReport::Fill(Box::new(report));
1400                                                if let Err(e) =
1401                                                    sender.send(ExecutionEvent::Report(exec_report))
1402                                                {
1403                                                    log::warn!(
1404                                                        "Failed to send fill report: {e}"
1405                                                    );
1406                                                }
1407                                            }
1408                                            Err(e) => {
1409                                                log::error!(
1410                                                    "Failed to parse WebSocket fill: {e}"
1411                                                );
1412                                            }
1413                                        }
1414                                    }
1415                                }
1416                            }
1417                            NautilusWsMessage::OraclePrices(oracle_prices_map) => {
1418                                log::debug!(
1419                                    "Processing oracle price updates for {} markets",
1420                                    oracle_prices_map.len()
1421                                );
1422
1423                                // Update oracle_prices map with new prices
1424                                for (market_symbol, oracle_data) in &oracle_prices_map {
1425                                    // Parse oracle price
1426                                    match oracle_data.oracle_price.parse::<Decimal>()
1427                                    {
1428                                        Ok(price) => {
1429                                            // Find instrument by symbol (oracle uses raw symbol like "BTC-USD")
1430                                            // Append "-PERP" to match instrument IDs
1431                                            let symbol_with_perp = format!("{market_symbol}-PERP");
1432
1433                                            // Find matching instrument
1434                                            if let Some(entry) = instruments.iter().find(|entry| {
1435                                                entry.value().id().symbol.as_str()
1436                                                    == symbol_with_perp
1437                                            }) {
1438                                                let instrument_id = *entry.key();
1439                                                oracle_prices.insert(instrument_id, price);
1440                                                log::trace!(
1441                                                    "Updated oracle price for {instrument_id}: {price}"
1442                                                );
1443                                            } else {
1444                                                log::debug!(
1445                                                    "No instrument found for market symbol '{market_symbol}' (tried '{symbol_with_perp}')"
1446                                                );
1447                                            }
1448                                        }
1449                                        Err(e) => {
1450                                            log::warn!(
1451                                                "Failed to parse oracle price for {market_symbol}: {e}"
1452                                            );
1453                                        }
1454                                    }
1455                                }
1456                            }
1457                            NautilusWsMessage::BlockHeight(height) => {
1458                                log::debug!("Block height update: {height}");
1459                                block_height.store(height, std::sync::atomic::Ordering::Relaxed);
1460                            }
1461                            NautilusWsMessage::Error(err) => {
1462                                log::error!("WebSocket error: {err:?}");
1463                            }
1464                            NautilusWsMessage::Reconnected => {
1465                                log::info!("WebSocket reconnected");
1466                            }
1467                            _ => {
1468                                // Data, Deltas are for market data, not execution
1469                            }
1470                        }
1471                    }
1472                    log::info!("WebSocket message processing task ended");
1473                });
1474
1475                self.ws_stream_handle = Some(handle);
1476                log::debug!("Spawned WebSocket message processing task");
1477            }
1478        }
1479
1480        self.connected = true;
1481        log::info!("Connected: client_id={}", self.core.client_id);
1482        Ok(())
1483    }
1484
1485    async fn disconnect(&mut self) -> anyhow::Result<()> {
1486        if !self.connected {
1487            log::warn!("dYdX execution client not connected");
1488            return Ok(());
1489        }
1490
1491        log::info!("Disconnecting from dYdX");
1492
1493        // Unsubscribe from subaccount updates if authenticated
1494        if self.config.mnemonic.is_some() {
1495            let _ = self
1496                .ws_client
1497                .unsubscribe_subaccount(&self.wallet_address, self.subaccount_number)
1498                .await
1499                .map_err(|e| log::warn!("Failed to unsubscribe from subaccount: {e}"));
1500        }
1501
1502        // Unsubscribe from markets
1503        let _ = self
1504            .ws_client
1505            .unsubscribe_markets()
1506            .await
1507            .map_err(|e| log::warn!("Failed to unsubscribe from markets: {e}"));
1508
1509        // Unsubscribe from block height
1510        let _ = self
1511            .ws_client
1512            .unsubscribe_block_height()
1513            .await
1514            .map_err(|e| log::warn!("Failed to unsubscribe from block height: {e}"));
1515
1516        // Disconnect WebSocket
1517        self.ws_client.disconnect().await?;
1518
1519        // Abort WebSocket message processing task
1520        if let Some(handle) = self.ws_stream_handle.take() {
1521            handle.abort();
1522            log::debug!("Aborted WebSocket message processing task");
1523        }
1524
1525        // Abort any pending tasks
1526        self.abort_pending_tasks();
1527
1528        self.connected = false;
1529        log::info!("Disconnected: client_id={}", self.core.client_id);
1530        Ok(())
1531    }
1532
1533    async fn generate_order_status_report(
1534        &self,
1535        cmd: &GenerateOrderStatusReport,
1536    ) -> anyhow::Result<Option<OrderStatusReport>> {
1537        // Query single order from dYdX API
1538        let response = self
1539            .http_client
1540            .inner
1541            .get_orders(
1542                &self.wallet_address,
1543                self.subaccount_number,
1544                None,    // market filter
1545                Some(1), // limit to 1 result
1546            )
1547            .await
1548            .context("failed to fetch order from dYdX API")?;
1549
1550        if response.is_empty() {
1551            return Ok(None);
1552        }
1553
1554        let order = &response[0];
1555        let ts_init = UnixNanos::default();
1556
1557        let instrument = match self.get_instrument_by_clob_pair_id(order.clob_pair_id) {
1558            Some(inst) => inst,
1559            None => return Ok(None),
1560        };
1561
1562        let report = crate::http::parse::parse_order_status_report(
1563            order,
1564            &instrument,
1565            self.core.account_id,
1566            ts_init,
1567        )
1568        .context("failed to parse order status report")?;
1569
1570        if let Some(client_order_id) = cmd.client_order_id
1571            && report.client_order_id != Some(client_order_id)
1572        {
1573            return Ok(None);
1574        }
1575
1576        if let Some(venue_order_id) = cmd.venue_order_id
1577            && report.venue_order_id.as_str() != venue_order_id.as_str()
1578        {
1579            return Ok(None);
1580        }
1581
1582        if let Some(instrument_id) = cmd.instrument_id
1583            && report.instrument_id != instrument_id
1584        {
1585            return Ok(None);
1586        }
1587
1588        Ok(Some(report))
1589    }
1590
1591    async fn generate_order_status_reports(
1592        &self,
1593        cmd: &GenerateOrderStatusReports,
1594    ) -> anyhow::Result<Vec<OrderStatusReport>> {
1595        // Query orders from dYdX API
1596        let response = self
1597            .http_client
1598            .inner
1599            .get_orders(
1600                &self.wallet_address,
1601                self.subaccount_number,
1602                None, // market filter
1603                None, // limit
1604            )
1605            .await
1606            .context("failed to fetch orders from dYdX API")?;
1607
1608        let mut reports = Vec::new();
1609        let ts_init = UnixNanos::default();
1610
1611        for order in response {
1612            let instrument = match self.get_instrument_by_clob_pair_id(order.clob_pair_id) {
1613                Some(inst) => inst,
1614                None => continue,
1615            };
1616
1617            if let Some(filter_id) = cmd.instrument_id
1618                && instrument.id() != filter_id
1619            {
1620                continue;
1621            }
1622
1623            let report = match crate::http::parse::parse_order_status_report(
1624                &order,
1625                &instrument,
1626                self.core.account_id,
1627                ts_init,
1628            ) {
1629                Ok(r) => r,
1630                Err(e) => {
1631                    log::warn!("Failed to parse order status report: {e}");
1632                    continue;
1633                }
1634            };
1635
1636            reports.push(report);
1637        }
1638
1639        // Filter by open_only if specified
1640        if cmd.open_only {
1641            reports.retain(|r| r.order_status.is_open());
1642        }
1643
1644        // Filter by time range if specified
1645        if let Some(start) = cmd.start {
1646            reports.retain(|r| r.ts_last >= start);
1647        }
1648        if let Some(end) = cmd.end {
1649            reports.retain(|r| r.ts_last <= end);
1650        }
1651
1652        Ok(reports)
1653    }
1654
1655    async fn generate_fill_reports(
1656        &self,
1657        cmd: GenerateFillReports,
1658    ) -> anyhow::Result<Vec<FillReport>> {
1659        let response = self
1660            .http_client
1661            .inner
1662            .get_fills(
1663                &self.wallet_address,
1664                self.subaccount_number,
1665                None, // market filter
1666                None, // limit
1667            )
1668            .await
1669            .context("failed to fetch fills from dYdX API")?;
1670
1671        let mut reports = Vec::new();
1672        let ts_init = UnixNanos::default();
1673
1674        for fill in response.fills {
1675            let instrument = match self.get_instrument_by_market(&fill.market) {
1676                Some(inst) => inst,
1677                None => {
1678                    log::warn!("Unknown market in fill: {}", fill.market);
1679                    continue;
1680                }
1681            };
1682
1683            if let Some(filter_id) = cmd.instrument_id
1684                && instrument.id() != filter_id
1685            {
1686                continue;
1687            }
1688
1689            let report = match crate::http::parse::parse_fill_report(
1690                &fill,
1691                &instrument,
1692                self.core.account_id,
1693                ts_init,
1694            ) {
1695                Ok(r) => r,
1696                Err(e) => {
1697                    log::warn!("Failed to parse fill report: {e}");
1698                    continue;
1699                }
1700            };
1701
1702            reports.push(report);
1703        }
1704
1705        if let Some(venue_order_id) = cmd.venue_order_id {
1706            reports.retain(|r| r.venue_order_id.as_str() == venue_order_id.as_str());
1707        }
1708
1709        Ok(reports)
1710    }
1711
1712    async fn generate_position_status_reports(
1713        &self,
1714        cmd: &GeneratePositionStatusReports,
1715    ) -> anyhow::Result<Vec<PositionStatusReport>> {
1716        // Query subaccount positions from dYdX API
1717        let response = self
1718            .http_client
1719            .inner
1720            .get_subaccount(&self.wallet_address, self.subaccount_number)
1721            .await
1722            .context("failed to fetch subaccount from dYdX API")?;
1723
1724        let mut reports = Vec::new();
1725        let ts_init = UnixNanos::default();
1726
1727        for (market_ticker, perp_position) in &response.subaccount.open_perpetual_positions {
1728            let instrument = match self.get_instrument_by_market(market_ticker) {
1729                Some(inst) => inst,
1730                None => {
1731                    log::warn!("Unknown market in position: {market_ticker}");
1732                    continue;
1733                }
1734            };
1735
1736            if let Some(filter_id) = cmd.instrument_id
1737                && instrument.id() != filter_id
1738            {
1739                continue;
1740            }
1741
1742            let report = match crate::http::parse::parse_position_status_report(
1743                perp_position,
1744                &instrument,
1745                self.core.account_id,
1746                ts_init,
1747            ) {
1748                Ok(r) => r,
1749                Err(e) => {
1750                    log::warn!("Failed to parse position status report: {e}");
1751                    continue;
1752                }
1753            };
1754
1755            reports.push(report);
1756        }
1757
1758        Ok(reports)
1759    }
1760
1761    async fn generate_mass_status(
1762        &self,
1763        lookback_mins: Option<u64>,
1764    ) -> anyhow::Result<Option<ExecutionMassStatus>> {
1765        let ts_init = UnixNanos::default();
1766
1767        // Query orders
1768        let orders_response = self
1769            .http_client
1770            .inner
1771            .get_orders(&self.wallet_address, self.subaccount_number, None, None)
1772            .await
1773            .context("failed to fetch orders for mass status")?;
1774
1775        // Query subaccount for positions
1776        let subaccount_response = self
1777            .http_client
1778            .inner
1779            .get_subaccount(&self.wallet_address, self.subaccount_number)
1780            .await
1781            .context("failed to fetch subaccount for mass status")?;
1782
1783        // Query fills
1784        let fills_response = self
1785            .http_client
1786            .inner
1787            .get_fills(&self.wallet_address, self.subaccount_number, None, None)
1788            .await
1789            .context("failed to fetch fills for mass status")?;
1790
1791        // Parse order reports
1792        let mut order_reports = Vec::new();
1793        let mut orders_filtered = 0usize;
1794        for order in orders_response {
1795            let instrument = match self.get_instrument_by_clob_pair_id(order.clob_pair_id) {
1796                Some(inst) => inst,
1797                None => {
1798                    orders_filtered += 1;
1799                    continue;
1800                }
1801            };
1802
1803            match crate::http::parse::parse_order_status_report(
1804                &order,
1805                &instrument,
1806                self.core.account_id,
1807                ts_init,
1808            ) {
1809                Ok(r) => order_reports.push(r),
1810                Err(e) => {
1811                    log::warn!("Failed to parse order status report: {e}");
1812                    orders_filtered += 1;
1813                }
1814            }
1815        }
1816
1817        // Parse position reports
1818        let mut position_reports = Vec::new();
1819        for (market_ticker, perp_position) in
1820            &subaccount_response.subaccount.open_perpetual_positions
1821        {
1822            let instrument = match self.get_instrument_by_market(market_ticker) {
1823                Some(inst) => inst,
1824                None => continue,
1825            };
1826
1827            match crate::http::parse::parse_position_status_report(
1828                perp_position,
1829                &instrument,
1830                self.core.account_id,
1831                ts_init,
1832            ) {
1833                Ok(r) => position_reports.push(r),
1834                Err(e) => {
1835                    log::warn!("Failed to parse position status report: {e}");
1836                }
1837            }
1838        }
1839
1840        // Parse fill reports
1841        let mut fill_reports = Vec::new();
1842        let mut fills_filtered = 0usize;
1843        for fill in fills_response.fills {
1844            let instrument = match self.get_instrument_by_market(&fill.market) {
1845                Some(inst) => inst,
1846                None => {
1847                    fills_filtered += 1;
1848                    continue;
1849                }
1850            };
1851
1852            match crate::http::parse::parse_fill_report(
1853                &fill,
1854                &instrument,
1855                self.core.account_id,
1856                ts_init,
1857            ) {
1858                Ok(r) => fill_reports.push(r),
1859                Err(e) => {
1860                    log::warn!("Failed to parse fill report: {e}");
1861                    fills_filtered += 1;
1862                }
1863            }
1864        }
1865
1866        if lookback_mins.is_some() {
1867            log::debug!(
1868                "lookback_mins={:?} filtering not yet implemented. Returning all: {} orders ({} filtered), {} positions, {} fills ({} filtered)",
1869                lookback_mins,
1870                order_reports.len(),
1871                orders_filtered,
1872                position_reports.len(),
1873                fill_reports.len(),
1874                fills_filtered
1875            );
1876        } else {
1877            log::info!(
1878                "Generated mass status: {} orders, {} positions, {} fills",
1879                order_reports.len(),
1880                position_reports.len(),
1881                fill_reports.len()
1882            );
1883        }
1884
1885        // Create mass status and add reports
1886        let mut mass_status = ExecutionMassStatus::new(
1887            self.core.client_id,
1888            self.core.account_id,
1889            self.core.venue,
1890            ts_init,
1891            None, // report_id will be auto-generated
1892        );
1893
1894        mass_status.add_order_reports(order_reports);
1895        mass_status.add_position_reports(position_reports);
1896        mass_status.add_fill_reports(fill_reports);
1897
1898        Ok(Some(mass_status))
1899    }
1900}
1901
1902/// Dispatches account state events to the portfolio.
1903///
1904/// AccountState events are routed to the Portfolio (not ExecEngine) via msgbus.
1905/// This follows the pattern used by BitMEX, OKX, and other reference adapters.
1906fn dispatch_account_state(state: AccountState) {
1907    use std::any::Any;
1908    msgbus::send_any("Portfolio.update_account".into(), &state as &dyn Any);
1909}
1910
1911/// Dispatches execution reports to the execution engine.
1912///
1913/// This follows the standard adapter pattern where WebSocket handlers parse messages
1914/// into reports, and a dispatch function sends them via the execution event system.
1915/// The execution engine then handles cache lookups and event generation.
1916///
1917/// # Architecture
1918///
1919/// Per `docs/developer_guide/adapters.md`, adapters should:
1920/// 1. Parse WebSocket messages into ExecutionReports in the handler
1921/// 2. Dispatch reports via `get_exec_event_sender()`
1922/// 3. Let the execution engine handle event generation (has cache access)
1923///
1924/// This pattern is used by Hyperliquid, OKX, BitMEX, and other reference adapters.
1925fn dispatch_execution_report(report: ExecutionReport) {
1926    let sender = get_exec_event_sender();
1927    match report {
1928        ExecutionReport::Order(order_report) => {
1929            log::debug!(
1930                "Dispatching order report: status={:?}, venue_order_id={:?}, client_order_id={:?}",
1931                order_report.order_status,
1932                order_report.venue_order_id,
1933                order_report.client_order_id
1934            );
1935            let exec_report = NautilusExecutionReport::Order(order_report);
1936            if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1937                log::warn!("Failed to send order status report: {e}");
1938            }
1939        }
1940        ExecutionReport::Fill(fill_report) => {
1941            log::debug!(
1942                "Dispatching fill report: venue_order_id={}, trade_id={}",
1943                fill_report.venue_order_id,
1944                fill_report.trade_id
1945            );
1946            let exec_report = NautilusExecutionReport::Fill(fill_report);
1947            if let Err(e) = sender.send(ExecutionEvent::Report(exec_report)) {
1948                log::warn!("Failed to send fill report: {e}");
1949            }
1950        }
1951    }
1952}